Kafka-Callback回调接口
1. 什么是Callback其使用场景是什么?在这个send方法中我们看到需要传入两个参数,而第二个参数是一个CallbackCallback是一个回调接口:用户可以实现的回调接口Callback,以允许在请求完成时执行实现接口方法的代码。2.代码示例import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.
·
1. 什么是Callback其使用场景是什么?
在这个send方法中我们看到需要传入两个参数,而第二个参数是一个Callback
Callback是一个回调接口:用户可以实现的回调接口Callback,以允许在请求完成时执行实现接口方法的代码。
2.代码示例
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Producer extends Thread {
private static Logger LOG = Logger.getLogger(Producer.class);
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
/**
* @param topic
* @param isAsync
*/
public Producer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.83.100:9092");
//props.put("bootstrap.servers", "192.168.83.100:9092,192.168.83.120:9092,192.168.83.130:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
}
public void run() {
int messageNo = 1;
while (true) {
String messageStr = "Message_" + messageNo;
LOG.warn("message:" + messageStr);
long startTime = System.currentTimeMillis();
if (isAsync) { // 异步发送 Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));//*********这里使用了Callback
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,messageStr)).get();
LOG.warn("Sent message: (" + messageNo + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
++messageNo;
}
}
public static void main(String[] args) {
//产生到那个主题下面
Producer producer = new Producer("test_msg1",false);
producer.start();
}
}
//实现Callback接口的onCompletion方法
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
//实现的是DemoCallBack的有参构造
public DemoCallBack(long startTime, int key, String message) {
//通过this把获取的参数内容传递到外层类中,这有这样当回调发生时onCompletion才可以获取类的参数
this.startTime = startTime;
this.key = key;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
当producer每生产一批数据send出去后,就会调用一次实现了Callback的DemocallBack类的onCompletion方法
更多推荐
已为社区贡献3条内容
所有评论(0)