maven依赖

        <!--kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <!-- log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

log4j.properties

log4j.rootLogger=INFO,R,stdout    

log4j.appender.stdout=org.apache.log4j.ConsoleAppender    
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [%t] [%c]-[%p] %m%n

log4j.appender.R=org.apache.log4j.DailyRollingFileAppender
log4j.appender.R.File=./logs/info.log
#log4j.appender.R.File=./logs/compSearcher.log
log4j.appender.R.DatePattern ='.'yyyy-MM-dd
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [%c]-[%p] %m%n

KafkaUtils封装

package com.wg.utils;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.FileReader;
import java.io.IOException;
import java.util.*;


public class KafkaUtils {

    private static Producer<String,String> producer =null;
    private static String broker_ip="xx.xx.10.182:9092";
    private static String zk_ip="xx.xx.10.182:2181";
    public static String group_id="test";

    static {
        Properties prop = loadOutProp("./conf/kafka.properties");
        broker_ip= prop.getProperty("broker.list");
        zk_ip= prop.getProperty("zookeeper.connect");
        group_id=prop.getProperty("group.id");
    }

    /**
     * 实际上调用send方法并不能保证消息被成功写入到kafka。为了实现同步的发送消息,
     * 并监控每条消息是否发送成功,需要对每次调用send方法后返回的Future对象调用get方法。
     * (get方法的调用会导致当前线程block,直到发送结果返回,不管是成功还是失败)
     * @param topic
     * @param key
     * @param value
     * @return
     */
    public static boolean send(String topic,String key,String value){
        ProducerRecord<String,String> r = new ProducerRecord<String,String>(topic,key,value);
        try {
            if(producer==null){
                producer = new KafkaProducer<String,String>(getProducerProp());
            }
            producer.send(r).get();
            System.out.println("send to topic "+topic);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    private static Properties getProducerProp() {
        // 构造一个java.util.Properties对象
        Properties props = new Properties();
        // 指定bootstrap.servers属性。必填,无默认值。用于创建向kafka broker服务器的连接。
        props.put("bootstrap.servers", broker_ip);
        // 指定key.serializer属性。必填,无默认值。被发送到broker端的任何消息的格式都必须是字节数组。
        // 因此消息的各个组件都必须首先做序列化,然后才能发送到broker。该参数就是为消息的key做序列化只用的。
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定value.serializer属性。必填,无默认值。和key.serializer类似。此被用来对消息体即消息value部分做序列化。
        // 将消息value部分转换成字节数组。
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //acks参数用于控制producer生产消息的持久性(durability)。参数可选值,0、1、-1(all)。
        props.put("acks", "-1");
        //props.put(ProducerConfig.ACKS_CONFIG, "1");
        //在producer内部自动实现了消息重新发送。默认值0代表不进行重试。
        props.put("retries", 3);
        //props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //调优producer吞吐量和延时性能指标都有非常重要作用。默认值16384即16KB。
        props.put("batch.size", 16384);
        //props.put(ProducerConfig.BATCH_SIZE_CONFIG, 323840);
        //控制消息发送延时行为的,该参数默认值是0。表示消息需要被立即发送,无须关系batch是否被填满。
        props.put("linger.ms", 10);
        //props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        //指定了producer端用于缓存消息的缓冲区的大小,单位是字节,默认值是33554432即32M。
        props.put("buffer.memory", 33554432);
        //props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put("max.block.ms", 3000);
        //props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
        //设置producer段是否压缩消息,默认值是none。即不压缩消息。GZIP、Snappy、LZ4
        //props.put("compression.type", "none");
        //props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        //该参数用于控制producer发送请求的大小。producer端能够发送的最大消息大小。
        //props.put("max.request.size", 10485760);
        //props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        //producer发送请求给broker后,broker需要在规定时间范围内将处理结果返还给producer。默认30s
        //props.put("request.timeout.ms", 60000);
        //props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);

        // 使用上面创建的Properties对象构造KafkaProducer对象
        //如果采用这种方式创建producer,那么就不需要显示的在Properties中指定key和value序列化类了呢。
        // Serializer<String> keySerializer = new StringSerializer();
        // Serializer<String> valueSerializer = new StringSerializer();
        // Producer<String, String> producer = new KafkaProducer<String, String>(props,
        // keySerializer, valueSerializer);
        return props;
    }

    /**
     * 不能容忍回写到kafka的日志丢失,因此必须使用同步的方式发送消息的配置
     * @return
     */
    private Producer<Integer, String> initKafkaProducer(){
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker_ip);//格式:host1:port1,host2:port2,....
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);//a batch size of zero will disable batching entirely
        props.put(ProducerConfig.LINGER_MS_CONFIG, 0);//send message without delay
        props.put(ProducerConfig.ACKS_CONFIG, "1");//对应partition的leader写到本地后即返回成功。极端情况下,可能导致失败
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
        return kafkaProducer;
    }

    private static Properties loadInProp(String fileName) {
        Properties properties = new Properties();
        try {
            properties.load(MyIOUtils.class.getClassLoader().getResourceAsStream(fileName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return properties;
    }

    private static Properties loadOutProp(String fileName) {
        Properties properties = new Properties();
        try {
            properties.load(new FileReader(fileName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return properties;
    }



    private static Properties getConsumerProp(){
        Properties props = new Properties();
        //bootstrap.servers   必要
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,broker_ip);
        //group id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        //是否后台自动提交offset 到kafka
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //消费者偏移自动提交到Kafka的频率(以毫秒为单位enable.auto.commit)设置为true
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //故障检测,心跳检测机制 的间隔时间,,在该值范围内,没有接收到心跳,则会删除该消费者
        //并启动再平衡(rebanlance),值必须在group.min.session.timeout 和 group.max.session.timeout.ms之间
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        //key - value 的序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //从头开始消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "earliest");
        //从最新数据开始消费
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,  "latest");


        //默认一次poll 多少条消息,要合理设置这个值
        //kafka如果超过 SESSION_TIMEOUT_MS_CONFIG 没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
        //如果没有设置好,就会出现 消费者不断被剔除,不断重新消费的情况
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, props.getProperty("max.poll.records","20"));
        return props;
    }

    private static kafka.consumer.ConsumerConfig initJavaConsumerConfig(String group_id) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zk_ip);
        props.put("group.id", group_id );
        props.put("auto.offset.reset", "smallest");
        props.put("zookeeper.session.timeout.ms",  "400000");
        props.put("zookeeper.sync.time.ms",  "200");
        props.put("auto.commit.interval.ms",  "1000");
        props.put("fetch.message.max.bytes",  "100000000");
        props.put("max.poll.records","5000");
        props.put("rebalance.backoff.ms","8000");
        props.put("rebalance.max.retries","7");
        kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
        return consumerConfig;
    }


    public static KafkaConsumer getConsumer(){
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(getConsumerProp());
        return consumer;
    }



    /**
     * 适用于多线程消费
     * @param topic
     * @param thread_num 和 topic的 partition 保持一直
     * @return
     */
    public static List<KafkaStream<byte[], byte[]>> getConsumerStreams(String topic,String group_Id,int thread_num) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        ConsumerConnector consumerConnector = getConsumerConnector(group_Id);
        topicCountMap.put(topic,thread_num);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> kafkaStreams = consumerMap.get(topic);
        return kafkaStreams;
    }

    /**
     * 单线程消费
     * @param topic
     * @return
     */
    public static  KafkaStream<byte[], byte[]>  getConsumerStream(String topic,String group_id) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        ConsumerConnector consumerConnector = getConsumerConnector(group_id);
        topicCountMap.put(topic,1);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
                .createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        return stream;
    }

    public static void main(String[] args) {
        KafkaStream<byte[], byte[]> stream = getConsumerStream("test", "test0");
        testConsumerSream(stream);
    }


    private static ConsumerConnector getConsumerConnector(String group_id) {
        kafka.consumer.ConsumerConfig consumerProp = initJavaConsumerConfig(group_id);
        ConsumerConnector javaConsumerConnector = Consumer
                .createJavaConsumerConnector(consumerProp);
        return javaConsumerConnector;
    }

    /**
     * 一次取多条 使用样例
     * @param topic
     */
    public static void testConsumer(String topic){
        KafkaConsumer consumer = getConsumer();
        consumer.subscribe(Arrays.asList(topic));
        // consumer.subscribe(Collections.singletonList(this.topic));
        while (true) {
            //consumer.poll()
            long t1 = System.currentTimeMillis();
            ConsumerRecords<String, String> records = consumer.poll(2000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value()
                        + ") offset " + record.offset()
                        + " partition " + record.partition() + ")");
            }
            long t2 = System.currentTimeMillis();
            System.out.println("one poll cost==="+(t2-t1)+" ms ");
        }
    }

    public static void testConsumerSream(KafkaStream<byte[], byte[]> stream){
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String str = new String(it.next().message());
            System.out.println("str======>>>>"+str);
        }
    }

    /** @deprecated */
    private static kafka.javaapi.producer.Producer<String, String> getOldProducer(){
        Properties props = new Properties();
        props.put("metadata.broker.list", broker_ip);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("producer.type",  "sync");
        props.put("queue.buffering.max.ms", "10000");
        props.put("request.timeout.ms", "10000");
        // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
        // 值为0,1,-1,可以参考
        props.put("request.required.acks", "1");
        kafka.producer.ProducerConfig producerConfig = new kafka.producer.ProducerConfig(props);
        kafka.javaapi.producer.Producer<String, String> producer = new kafka.javaapi.producer.Producer<>(producerConfig);
        return producer;
    }


    /**
     * 向kafka中插入数据
     *@deprecated
     * @param topic
     * @param input
     * @return 这个方法的问题是发送失败时候不会报错
     */
    private static kafka.javaapi.producer.Producer<String, String> oldProducer=null;
    public static void  oldSend(String topic, String input) {
        if(oldProducer==null){
            oldProducer = getOldProducer();
        }
        KeyedMessage<String, String> message = new KeyedMessage<String, String>(
                topic, input);
        //为了能够随机发送到不同partition
           /* String key = UUID.randomUUID().toString().substring(0, 5);
            KeyedMessage<String, String> message = new KeyedMessage<>(topic, key, input);*/
        oldProducer.send(message);
    }

    public static String getKey(int partionNum){
        return  UUID.randomUUID().toString().substring(0, partionNum);
    }
}

引用

Kafka Producer API的使用

kafka消费者Consumer参数设置及参数调优

基于kafka_2.11-2.1.0实现的生产者和消费者代码样例

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐