public class kafkaConsumer extends Thread {

private String topic;

public kafkaConsumer() {

super();

}

@Override

public void run() {

List topicList = Arrays.asList("topic1","topic2","topic3","topic4","topic5","topic6");

KafkaConsumer consumer = createConsumer("aa");

List topicPartitions = new ArrayList<>();

for(String topic : topicList){

List partitionInfos = consumer.partitionsFor(topic);

for (PartitionInfo info : partitionInfos) {

TopicPartition topicPartition = new TopicPartition(info.topic(), info.partition());

topicPartitions.add(topicPartition);

}

}

consumer1.assign(topicPartitions);

while (true){

ConsumerRecords records = consumer.poll(1100);

for (ConsumerRecord record : records) {

System.out.println("---------"+Thread.currentThread().getName()+"---"+record.topic()+"---"+record.partition()+"-----"+record.value());

}

try {

Thread.sleep(2000);

}catch (Exception e){}

}

}

private KafkaConsumer createConsumer(String groupName) {

Properties props = new Properties();

props.put("bootstrap.servers", "127.0.0.1:9092");

props.put("group.id", groupName);

props.put("enable.auto.commit", "true");

props.put("max.poll.records", "100");//原来500 减小为100

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer consumer = new KafkaConsumer<>(props);

return consumer;

}

public static void main(String[] args) {

new kafkaConsumer().start();// 使用kafka集群中创建好的主题 test

new kafkaConsumer().start();// 使用kafka集群中创建好的主题 test

}

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐