python向kafka发送数据,并接收
发送端import csvimport timefrom kafka import KafkaProducerfrom kafka import KafkaConsumerimport json# 实例化一个KafkaProducer示例,用于向Kafka投递消息producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).en
·
发送端
import csv
import time
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers='192.168.130.28:9092')
for x in range(0,1000):
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'test_data'
data = {
'name':'lizhiming',
'age':1,
'sad':345,
'sdfss':56
}
print(data)
producer.send('test_data', data)
我们再做一个接收端
from kafka import KafkaConsumer
consumer = KafkaConsumer('test_data', bootstrap_servers=['192.168.130.29:9092'])
for msg in consumer:
print(msg.value)
recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)
很简单的例子
错误SyntaxError: invalid syntax的解决方法总结
python -m pip install kafka-python
这里有个问题就是消费者,如果不配置的话,消费者每次开启后都会从最新的读取,导致历史数据没办法读取出来,我们需要配置一下KafkaConsumer。
auto_offset_reset = earliest,只配置这个,会从kafka初始的数据消费,重复消费之前的数据。
我们需要再配置group_id=‘my_group_new’。这样就可以了,
bootstrap_servers,可以配置集群
consumer = KafkaConsumer('filestorage', group_id='my_group_new',auto_offset_reset='earliest',bootstrap_servers=['xx:9092','xx:9092','xx:9092'],)
更多推荐
已为社区贡献1条内容
所有评论(0)