Kafka生产者发送数据
往kafka发送消息,提供三种构造函数形参:-- ProducerRecord(topic, partition, key, value)-- ProducerRecord(topic, key, value)-- ProducerRecord(topic, value)注:<1> 若指定Partition ID,则PR被发送至指定Partition;<2> 若未指定Par
·
往kafka发送消息,提供三种构造函数形参:
-- ProducerRecord(topic, partition, key, value)
-- ProducerRecord(topic, key, value)
-- ProducerRecord(topic, value)
注:
<1> 若指定Partition ID,则PR被发送至指定Partition;
<2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition;
<3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition;
<4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)。
1、kafka生产者同步发送数据
public class KafkaProducerTest {
private static Properties kafkaProps = new Properties();
/**
* 初始化一些配置信息
*/
public static void initProperty(){
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("retries", 3);
kafkaProps.put("acks", "all");
kafkaProps.put("client.id", "zsd");
}
/**
* 往kafka同步发送消息
*
*/
public static void syncSend() throws ExecutionException, InterruptedException {
initProperty();
// 创建kafka的生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>("topic1", "key", "value");
Future<RecordMetadata> future = kafkaProducer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println("offset:" + recordMetadata.offset()
+ "\npartition:" + recordMetadata.partition()
+ "\ntopic:" + recordMetadata.topic()
+ "\nserializedKeySize:" + recordMetadata.serializedKeySize()
+ "\nserializedValueSize:" + recordMetadata.serializedValueSize()
);
kafkaProducer.close();
}
}
2、kafka生产者异步发送数据
/**
* kafka生产者往topic中异步发送消息
*/
public static void asyncSend(){
initProperty();
// 创建kafka的生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>("topic2", "key", "value");
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("offset:" + recordMetadata.offset()
+ "\npartition:" + recordMetadata.partition()
+ "\ntopic:" + recordMetadata.topic()
+ "\nserializedKeySize:" + recordMetadata.serializedKeySize()
+ "\nserializedValueSize:" + recordMetadata.serializedValueSize()
);
if(e == null){
System.out.println("hello");
}
}
});
kafkaProducer.close();
}
3、spring boot中的kafka生产者发送数据(利用KafkaTemplate )
@RestController
@Slf4j
public class LoggerController {
@Autowired // IOC注入
KafkaTemplate kafkaTemplate;
@RequestMapping("/applog")
public String applog(@RequestBody String applog) {
log.info(applog);
// json解析
JSONObject jsonObject = JSON.parseObject(applog);
JSONObject startJsonbject = jsonObject.getJSONObject("start");
if(startJsonbject != null){
// 不为空,则为启动日志
kafkaTemplate.send("gmall_start",applog);
}else {
// 事件日志
kafkaTemplate.send("gmall_event",applog);
}
return "applog success ~~~";
}
}
更多推荐
已为社区贡献8条内容
所有评论(0)