python读写kafka
代码】python读写kafka。
·
消费者 读:
from kafka import KafkaConsumer
import json
# connect to Kafka server and pass the topic we want to consume
consumer = KafkaConsumer('jobcodeflink_kafka_hive01', group_id='jobcodeflink_kafka_hive01_consumers',
bootstrap_servers=['*:9092',
'10*0:9092',
'*9:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
try:
for msg in consumer:
try:
print("value:",msg.value)
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except Exception as e:
print("msg.value error:",e)
except KeyboardInterrupt as e:
print("KeyboardInterrupt:",e)
生产者 写:
import time,json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='10*1:9092',
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
#
#data = {"id":1,"name":"查询kafka后存储到cvs文件中"}
i=1
while i >0:
data = {"id": i, "name": "zhangsan","create_time":"2021-10-13 18:15:47.000"}
print("i:",data)
i+=1
# value = "test_" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") value=bytes('{}'.format(data),'utf-8')
producer.send(topic="jobcodeflink_kafka_hive01",value=data)
time.sleep(1)
producer.flush()
producer.close()
更多推荐
已为社区贡献7条内容
所有评论(0)