python kafka按时间戳消费
公司业务需求,获取存入kafka大于某个时间段的数据# -*- coding: utf-8 -*-# @Time: 2020/11/20 09:01# @Author:from confluent_kafka import Consumer, TopicPartitionKAFKASERVERS = 'xx.xx.xx.6:9092,xx.xx.xx.4:9092,xx.xx.xx.7:9092'
·
公司业务需求,获取存入kafka大于某个时间段的数据
# -*- coding: utf-8 -*-
# @Time : 2020/11/20 09:01
# @Author :
from confluent_kafka import Consumer, TopicPartition
KAFKASERVERS = 'xx.xx.xx.6:9092,xx.xx.xx.4:9092,xx.xx.xx.7:9092'
GROUPNAME = 'my_group_name'
c = Consumer({
'bootstrap.servers': KAFKASERVERS,
'group.id': GROUPNAME,
'auto.offset.reset': 'earliest'
})
# 主题名
topic = 'my_topic_name'
# 获取当前topic存在多少个分区
cluster_data = c.list_topics(topic=topic)
topic_data = cluster_data.topics[topic]
available_partitions = topic_data.partitions
# c.subscribe([topic])
# 把每个partition的offset设置到指定时间戳下,即获取大于改timestamp入库kafka的数据
# 注意这里的时间戳位数
timestamp = 1605756666790
tps = [TopicPartition(topic, tp, timestamp) for tp in range(len(available_partitions))]
offsets = c.offsets_for_times(tps)
c.assign(offsets)
while True:
# 阻塞等待消息的最大时间
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
# 获取该数据入kafka时间戳
kafka_timestamp = msg.timestamp()[1]
# print(kafka_timestamp)
# 消费kafka相应数据
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
更多推荐
所有评论(0)