python连接kafka
python环境Python版本:python3.6kafka第三方库安装方式:pip install kafka操作系统:window 101.生产者连接发送消息代码# -*- coding: utf-8 -*-import jsonfrom kafka import KafkaProducerproducer = KafkaProducer...
python环境
Python版本:python3.6
kafka第三方库安装方式:pip install kafka
操作系统:window 10
1.生产者连接发送消息代码
# -*- coding: utf-8 -*-
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='10.10.14.227:9092')
msg_dict = {
"interval": 10,
"producer": {
"name": "producer 1",
"host": "10.10.10.1",
"user": "root",
"password": "root"
},
"cpu": "33.5%",
"mem": "77%",
"msg": "Hello kafka"
}
msg = json.dumps(msg_dict)
producer.send('mykafka', bytes(msg, encoding='utf-8'), partition=0)
producer.close()
print("success")
运行结果:
success
2.消费者代码
from kafka import KafkaConsumer
consumer = KafkaConsumer('mykafka', group_id='20190603', bootstrap_servers=['10.10.14.227:9092'])
for msg in consumer:
result = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(result)
运行结果:
mykafka:0:10: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
3.消费者定时拉取数据消费
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(group_id='20190603', bootstrap_servers=['10.10.14.227:9092'])
consumer.subscribe(topics=('mykafka',))
index = 0
while True:
msgs = consumer.poll(timeout_ms=5) # 从kafka获取消息
if len(msgs) > 0:
for msg in msgs:
msgss = msgs[msg]
for msg in msgss:
result = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(result)
time.sleep(3)
运行结果:
mykafka:0:29: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
mykafka:0:30: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
mykafka:0:31: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
更多推荐


所有评论(0)