请教kafka 启用多台consumer 问题

2025-05-08 00:26:03
推荐回答(1个)
回答1:

public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");

// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

Map map = new HashMap();
map.put("fans", 1);

// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List> streams = topicMessageStreams.get("fans");

// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}

});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}