//以下配置前提条件:kafka_2.10-0.9.0.0(即服务端2.10版本,客户端0.9.0版本)
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.21.134:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
    // 三个参数分别为topic, key,value,send()是异步的,添加到缓冲区立即返回,更高效。
    Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("bjtest1", "a","aa"+i));
    RecordMetadata fm =  future.get();
    System.out.println(fm);
}
producer.close();
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐