python实现kafka生产以及sparkstream消费
01.启动zookeeper,启动Kafka,创建Kafka主题,kafka生产者,kafka消费者详见之前文章:http://t.csdn.cn/JRFRs02.使用python语言,对Kafka进行数据的写入from kafka import KafkaProducerfrom kafka.errors import KafkaErrorproducer =KafkaProducer(boo
·
01.启动zookeeper,启动Kafka,创建Kafka主题,kafka生产者,kafka消费者
详见之前文章:http://t.csdn.cn/JRFRs
02.使用python语言,对Kafka进行数据的写入
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers='192.168.1.10:9092')
while True:
msg = input("input the msg:")
future = producer.send("sparkapp", msg.encode())
# def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
try:
record = future.get(timeout=10)
# print(record)
except KafkaError as e:
print(e)
03.注意事项:
KafkaProducer的send方法中参数说明:
Arguments:
topic (str): topic where the message will be published
value (optional): message value. Must be type bytes, or be
serializable to bytes via configured value_serializer. If value
is None, key is required and message acts as a 'delete'.
See kafka compaction documentation for more details:
https://kafka.apache.org/documentation.html#compaction
(compaction requires kafka >= 0.8.1)
partition (int, optional): optionally specify a partition. If not
set, the partition will be selected using the configured
'partitioner'.
key (optional): a key to associate with the message. Can be used to
determine which partition to send the message to. If partition
is None (and producer's partitioner config is left as default),
then messages with the same key will be delivered to the same
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
headers (optional): a list of header key value pairs. List items
are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
03.在pycharm中一边模拟输入数据(生产),一边查看数据的单词个数统计(消费)
kafka生产就是上述代码
kafka消费代码在之前文章:http://t.csdn.cn/JRFRs
运行结果:
producer.py:
consumer.py:
更多推荐
已为社区贡献2条内容
所有评论(0)