python操作kafka
生产者:import time,jsonfrom kafka import KafkaProducerimport randomproducer = KafkaProducer(bootstrap_servers=':9092,10.9:9092,10.180:9092',value_serializer=lambda m: json.dumps(m).encode('utf-8'))dlp_lo
·
生产者:
self.producer = KafkaProducer(bootstrap_servers=self.sink_kafka_config.get("bootstrap_servers")
,value_serializer=lambda m: json.dumps(m, ensure_ascii=False).encode('utf-8'))#'ascii'
import time,json
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers=':9092,10.9:9092,10.180:9092',
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
dlp_log =[
{
"AccidentId ": "11111-3ce5"
},
{
"AccidentId ": "22222-3ce5"
}
]
i=1
while i >0:
data = dlp_log[random.randint(0,1)]
print("i:",data)
producer.send(topic="filterAnalysisResultIncident",value=data)
time.sleep(2)
producer.flush()
producer.close()
消费者:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('filcident', group_id='f_consumers1',
bootstrap_servers=['10.1:9092',
'10.1:9092',
'10.19:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
enable_auto_commit=False,
auto_offset_reset="latest")
try:
for msg in consumer:
try:
print("value:",type(msg.value),msg.value)
#print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
except Exception as e:
print("msg.value error:",e)
except KeyboardInterrupt as e:
print("KeyboardInterrupt:",e)
更多推荐
已为社区贡献7条内容
所有评论(0)