Kafka-Consumer指定消费者offset及消费条数
Kafka-Consumer指定消费者offset及消费条数摘要依赖引入代码实现更多好文摘要本文会给出一个KafkaConsumer指定消费者offset及消费条数的java demo,基于Kafka 1.1.1。依赖引入<dependency><groupId>org.apache.kafka</groupId><artifactId&g...
·
摘要
本文会给出一个KafkaConsumer指定消费者offset及消费条数的java demo,基于Kafka 1.1.1。
依赖引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
代码实现
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Properties;
/**
* @Author: chengc
* @Date: 2019-12-11 10:00
*/
public class OffsetChecker {
private static Logger logger = LoggerFactory
.getLogger(OffsetChecker.class);
private static final String KAFKA_BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String KAFKA_CONSUMER_GROUP_ID = "group.id";
private static final String KAFKA_CONSUMER_AUTO_COMMIT = "enable.auto.commit";
private static final String KAFKA_KEY_DESERIALIZER ="key.deserializer";
private static final String KAFKA_VALUE_DESERIALIZER ="value.deserializer";
private static final String KAFKA_CONSUMER_MAX_POLL_RECORDS ="max.poll.records";
/**
* 初始化kafka consumer
*/
public static KafkaConsumer initKafkaConsumer(String broker, String group, int maxPollRecords) {
Properties props = new Properties();
props.put(KAFKA_BOOTSTRAP_SERVERS, broker);
props.put(KAFKA_CONSUMER_GROUP_ID, group);
props.put(KAFKA_CONSUMER_AUTO_COMMIT, "false");
props.put(KAFKA_CONSUMER_MAX_POLL_RECORDS, maxPollRecords);
props.put(KAFKA_KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(KAFKA_VALUE_DESERIALIZER,"org.apache.kafka.common.serialization.StringDeserializer");
logger.info("properties={}", props.toString());
return new KafkaConsumer(props);
}
public static void main(String[] args) {
if(args.length < 6){
logger.error("args length < 6, exit!");
System.exit(-1);
}
String broker = args[0];
String group = args[1];
int partition = Integer.parseInt(args[2]);
String topic = args[3];
long offset = Long.parseLong(args[4]);
int maxPollRecords = Integer.parseInt(args[5]);
logger.info("broker={}, group={}, partition={}, topic={}, offset={}", broker, group, partition, topic, offset);
KafkaConsumer consumer = initKafkaConsumer(broker, group, maxPollRecords);
TopicPartition targetPartition = new TopicPartition(topic, partition);
// 指定消费分区,如果去掉该行,则不能消费到数据
consumer.assign(Collections.singletonList(targetPartition));
// poll一次数据,分配partition
consumer.poll(100);
// 指定消费起始offset
consumer.seek(targetPartition, offset);
// 真正拉取目标数据
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 数据处理,这里我简单用日志输出
logger.info(String.valueOf(record));
}
try {
consumer.close();
logger.info("kafka consumer has been closed, previous topic name ={}", topic);
} catch (Exception e) {
logger.error("an exception happened while close kafka consumer:", e);
}
}
}
更多好文
更多推荐
已为社区贡献7条内容
所有评论(0)