01.启动zookeeper,启动Kafka,创建Kafka主题,kafka生产者,kafka消费者

​ 详见之前文章:http://t.csdn.cn/JRFRs

02.使用python语言,对Kafka进行数据的写入

from kafka import KafkaProducer
from kafka.errors import KafkaError
producer =  KafkaProducer(bootstrap_servers='192.168.1.10:9092')
while True:
    msg = input("input the msg:")
    future = producer.send("sparkapp", msg.encode())
    # def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
    try:
        record = future.get(timeout=10)
        # print(record)
    except KafkaError as e:
        print(e)

03.注意事项:

​ KafkaProducer的send方法中参数说明:

Arguments:
            topic (str): topic where the message will be published
            value (optional): message value. Must be type bytes, or be
                serializable to bytes via configured value_serializer. If value
                is None, key is required and message acts as a 'delete'.
                See kafka compaction documentation for more details:
                https://kafka.apache.org/documentation.html#compaction
                (compaction requires kafka >= 0.8.1)
            partition (int, optional): optionally specify a partition. If not
                set, the partition will be selected using the configured
                'partitioner'.
            key (optional): a key to associate with the message. Can be used to
                determine which partition to send the message to. If partition
                is None (and producer's partitioner config is left as default),
                then messages with the same key will be delivered to the same
                partition (but if key is None, partition is chosen randomly).
                Must be type bytes, or be serializable to bytes via configured
                key_serializer.
            headers (optional): a list of header key value pairs. List items
                are tuples of str key and bytes value.
            timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
                to use as the message timestamp. Defaults to current time.

03.在pycharm中一边模拟输入数据(生产),一边查看数据的单词个数统计(消费)

​ kafka生产就是上述代码

​ kafka消费代码在之前文章:http://t.csdn.cn/JRFRs

​ 运行结果:

producer.py:

image-20220401101640565

consumer.py:

image-20220401101728065

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐