发送端

import csv
import time
from kafka import KafkaProducer
from kafka import KafkaConsumer
import json
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers='192.168.130.28:9092')
for x in range(0,1000):
    time.sleep(0.1)  # 每隔0.1秒发送一行数据
    # 发送数据,topic为'test_data'
    data = {
        'name':'lizhiming',
        'age':1,
        'sad':345,
        'sdfss':56
    }
    print(data)
    producer.send('test_data', data)

我们再做一个接收端

from kafka import KafkaConsumer

consumer = KafkaConsumer('test_data', bootstrap_servers=['192.168.130.29:9092'])
for msg in consumer:
    print(msg.value)
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)

很简单的例子

错误SyntaxError: invalid syntax的解决方法总结

python -m pip install kafka-python

这里有个问题就是消费者,如果不配置的话,消费者每次开启后都会从最新的读取,导致历史数据没办法读取出来,我们需要配置一下KafkaConsumer。
auto_offset_reset = earliest,只配置这个,会从kafka初始的数据消费,重复消费之前的数据。
我们需要再配置group_id=‘my_group_new’。这样就可以了,
bootstrap_servers,可以配置集群

consumer = KafkaConsumer('filestorage', group_id='my_group_new',auto_offset_reset='earliest',bootstrap_servers=['xx:9092','xx:9092','xx:9092'],)

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐