python连接kafka
python环境Python版本:python3.6kafka第三方库安装方式:pip install kafka操作系统:window 101.生产者连接发送消息代码# -*- coding: utf-8 -*-import jsonfrom kafka import KafkaProducerproducer = KafkaProducer...
python环境
Python版本:python3.6
kafka第三方库安装方式:pip install kafka
操作系统:window 10
1.生产者连接发送消息代码
# -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='10.10.14.227:9092') msg_dict = { "interval": 10, "producer": { "name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root" }, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka" } msg = json.dumps(msg_dict) producer.send('mykafka', bytes(msg, encoding='utf-8'), partition=0) producer.close() print("success")
运行结果:
success
2.消费者代码
from kafka import KafkaConsumer consumer = KafkaConsumer('mykafka', group_id='20190603', bootstrap_servers=['10.10.14.227:9092']) for msg in consumer: result = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(result)
运行结果:
mykafka:0:10: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
3.消费者定时拉取数据消费
from kafka import KafkaConsumer import time consumer = KafkaConsumer(group_id='20190603', bootstrap_servers=['10.10.14.227:9092']) consumer.subscribe(topics=('mykafka',)) index = 0 while True: msgs = consumer.poll(timeout_ms=5) # 从kafka获取消息 if len(msgs) > 0: for msg in msgs: msgss = msgs[msg] for msg in msgss: result = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(result) time.sleep(3)
运行结果:
mykafka:0:29: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
mykafka:0:30: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
mykafka:0:31: key=None value=b'{"interval": 10, "producer": {"name": "producer 1", "host": "10.10.10.1", "user": "root", "password": "root"}, "cpu": "33.5%", "mem": "77%", "msg": "Hello kafka"}'
更多推荐
所有评论(0)