往kafka发送消息,提供三种构造函数形参:

-- ProducerRecord(topic, partition, key, value)
-- ProducerRecord(topic, key, value)
-- ProducerRecord(topic, value)

注:
<1> 若指定Partition ID,则PR被发送至指定Partition;
<2> 若未指定Partition ID,但指定了Key, PR会按照hasy(key)发送至对应Partition;
<3> 若既未指定Partition ID也没指定Key,PR会按照round-robin模式发送到每个Partition;
<4> 若同时指定了Partition ID和Key, PR只会发送到指定的Partition (Key不起作用,代码逻辑决定)。

1、kafka生产者同步发送数据

public class KafkaProducerTest {

    private static Properties kafkaProps = new Properties();

    /**
     * 初始化一些配置信息
     */
    public static void initProperty(){
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("retries", 3);
        kafkaProps.put("acks", "all");
        kafkaProps.put("client.id", "zsd");
    }

    /**
     * 往kafka同步发送消息
     *
     */
    public static   void  syncSend() throws ExecutionException, InterruptedException {
        initProperty();
        // 创建kafka的生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic1", "key", "value");
        Future<RecordMetadata> future = kafkaProducer.send(record);
        RecordMetadata recordMetadata = future.get();

        System.out.println("offset:" + recordMetadata.offset()
                + "\npartition:" + recordMetadata.partition()
                + "\ntopic:" + recordMetadata.topic()
                + "\nserializedKeySize:" + recordMetadata.serializedKeySize()
                + "\nserializedValueSize:" + recordMetadata.serializedValueSize()
        );
        kafkaProducer.close();
    }
}

2、kafka生产者异步发送数据

 /**
     * kafka生产者往topic中异步发送消息
     */
    public static void asyncSend(){
        initProperty();
        // 创建kafka的生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProps);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic2", "key", "value");
        kafkaProducer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println("offset:" + recordMetadata.offset()
                        + "\npartition:" + recordMetadata.partition()
                        + "\ntopic:" + recordMetadata.topic()
                        + "\nserializedKeySize:" + recordMetadata.serializedKeySize()
                        + "\nserializedValueSize:" + recordMetadata.serializedValueSize()
                );

                if(e == null){
                    System.out.println("hello");
                }
            }
        });

        kafkaProducer.close();
    }

3、spring boot中的kafka生产者发送数据(利用KafkaTemplate )

@RestController
@Slf4j
public class LoggerController {
    @Autowired // IOC注入
    KafkaTemplate kafkaTemplate;

    @RequestMapping("/applog")
    public String applog(@RequestBody String  applog) {
        log.info(applog);
        // json解析
        JSONObject jsonObject = JSON.parseObject(applog);
        JSONObject startJsonbject = jsonObject.getJSONObject("start");
        if(startJsonbject != null){
            // 不为空,则为启动日志
            kafkaTemplate.send("gmall_start",applog);
        }else {
            // 事件日志
            kafkaTemplate.send("gmall_event",applog);

        }
        return  "applog success ~~~";
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐