Kafka-Consumer指定消费者offset及消费条数

摘要

本文会给出一个KafkaConsumer指定消费者offset及消费条数的java demo,基于Kafka 1.1.1。

依赖引入

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.1</version>
</dependency>

代码实现

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Properties;

/**
 * @Author: chengc
 * @Date: 2019-12-11 10:00
 */
public class OffsetChecker {
    private static Logger logger = LoggerFactory
            .getLogger(OffsetChecker.class);

    private static final String KAFKA_BOOTSTRAP_SERVERS = "bootstrap.servers";
    private static final String KAFKA_CONSUMER_GROUP_ID = "group.id";
    private static final String KAFKA_CONSUMER_AUTO_COMMIT = "enable.auto.commit";
    private static final String KAFKA_KEY_DESERIALIZER ="key.deserializer";
    private static final String KAFKA_VALUE_DESERIALIZER ="value.deserializer";
    private static final String KAFKA_CONSUMER_MAX_POLL_RECORDS ="max.poll.records";

    /**
     * 初始化kafka consumer
     */
    public static KafkaConsumer initKafkaConsumer(String broker, String group, int maxPollRecords) {
        Properties props = new Properties();

        props.put(KAFKA_BOOTSTRAP_SERVERS, broker);
        props.put(KAFKA_CONSUMER_GROUP_ID, group);
        props.put(KAFKA_CONSUMER_AUTO_COMMIT, "false");
        props.put(KAFKA_CONSUMER_MAX_POLL_RECORDS, maxPollRecords);
        props.put(KAFKA_KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(KAFKA_VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
        logger.info("properties={}", props.toString());

        return new KafkaConsumer(props);
    }

    public static void main(String[] args) {
        if(args.length < 6){
            logger.error("args length < 6, exit!");
            System.exit(-1);
        }
        String broker = args[0];
        String group = args[1];
        int partition = Integer.parseInt(args[2]);
        String topic = args[3];
        long offset = Long.parseLong(args[4]);
        int maxPollRecords = Integer.parseInt(args[5]);

        logger.info("broker={}, group={}, partition={}, topic={}, offset={}", broker, group, partition, topic, offset);

        KafkaConsumer consumer = initKafkaConsumer(broker, group, maxPollRecords);
        TopicPartition targetPartition = new TopicPartition(topic, partition);
        // 指定消费分区,如果去掉该行,则不能消费到数据
        consumer.assign(Collections.singletonList(targetPartition));
        // poll一次数据,分配partition
        consumer.poll(100);
        // 指定消费起始offset
        consumer.seek(targetPartition, offset);
        // 真正拉取目标数据
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
        	// 数据处理,这里我简单用日志输出
           logger.info(String.valueOf(record));
        }

        try {
            consumer.close();
            logger.info("kafka consumer has been closed, previous topic name ={}", topic);
        } catch (Exception e) {
            logger.error("an exception happened while close kafka consumer:", e);
        }
    }
}

更多好文

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐