只是简略的记录如何手动初始化Kafka消费者、生产者

1.Consumer相关配置

Properties properties = new Properties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, xxx);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, xxx);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, xxx);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, xxx);
        properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, xxx);
        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, xxx);
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, xxx);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, xxx);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, xxx);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, xxx);
        properties.put(SPECIFIC_AVRO_READER_CONFIG, xxx);

2.新建Kafka Consumer实例

consumer = new KafkaConsumer<>(这里使用上面的配置)

3.订阅topic

consumer.subscribe(xxxTopics);

1.Producer相关配置

Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, xxx);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, xxx);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, xxx);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, xxx);

2.新建生产者实例

producer = new KafkaProducer<>(使用上面的配置);

3.产生内容到kafka

producer.send(new ProducerRecord<>(WhichTopic, Key, Value));
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐