kafka的部分参数配置

private Producer<String,String> producer = null;

public ProduceKafka(String Server,String topic){
        props.put("bootstrap.servers", Server);//kafka集群地址
        props.put("acks", "1");//有0,1,all三种形式
        props.put("retries", 5);
        props.put("linger.ms",3);
        props.put("batch.size",166666);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 发送消息的key,类型为String,使用String类型的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//发送消息的value,类型为String,使用String类型的序列化器

        this.topic = topic;
        if(producer == null){
            producer = new KafkaProducer<>(props);
        }
    }
  • 创建ProducerRecord对象,通过topic、key、value设置消息

Producer有同步发送和异步发送2种策略,

1、异步发送如下:

 producer.send(new ProducerRecord<String, String>(this.topic, value), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(exception != null){
                     exception.printStackTrace();
                    }
                }
            });

当Kafka返回错误的时候,onCompletion方法会收到一个非null的异常。上面的例子直接打印异常消息,但是如果是生产环境,需要做一些处理错误的操作。

2、同步发送

在send()方法中使用Future对象获取发送消息返回的信息

 RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(this.topic, value)).get();
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐