kafka异步发送和同步发送
kafka的部分参数配置private Producer<String,String> producer = null;public ProduceKafka(String Server,String topic){props.put("bootstrap.servers", Server);//kafka集群地址props.put(...
·
kafka的部分参数配置
private Producer<String,String> producer = null;
public ProduceKafka(String Server,String topic){
props.put("bootstrap.servers", Server);//kafka集群地址
props.put("acks", "1");//有0,1,all三种形式
props.put("retries", 5);
props.put("linger.ms",3);
props.put("batch.size",166666);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 发送消息的key,类型为String,使用String类型的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//发送消息的value,类型为String,使用String类型的序列化器
this.topic = topic;
if(producer == null){
producer = new KafkaProducer<>(props);
}
}
- 创建ProducerRecord对象,通过
topic、key、value
设置消息
Producer有同步发送和异步发送2种策略,
1、异步发送如下:
producer.send(new ProducerRecord<String, String>(this.topic, value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
exception.printStackTrace();
}
}
});
当Kafka返回错误的时候,onCompletion方法会收到一个非null的异常。上面的例子直接打印异常消息,但是如果是生产环境,需要做一些处理错误的操作。
2、同步发送
在send()方法中使用Future对象获取发送消息返回的信息
RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(this.topic, value)).get();
更多推荐
所有评论(0)