向kafka里发送消息
//以下配置前提条件:kafka_2.10-0.9.0.0(即服务端2.10版本,客户端0.9.0版本)Properties props = new Properties();props.put("bootstrap.servers", "192.168.21.134:9092");props.put("acks", "1");props.put("retries", 0);props.
·
//以下配置前提条件:kafka_2.10-0.9.0.0(即服务端2.10版本,客户端0.9.0版本) Properties props = new Properties(); props.put("bootstrap.servers", "192.168.21.134:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { // 三个参数分别为topic, key,value,send()是异步的,添加到缓冲区立即返回,更高效。 Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("bjtest1", "a","aa"+i)); RecordMetadata fm = future.get(); System.out.println(fm); } producer.close();
更多推荐
已为社区贡献2条内容
所有评论(0)