mykafka_producer.py 

# -*- coding: utf-8 -*-

from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic

topic = 'topic-demo'
groud_id = 'demo.id'
# kafka单机集群
kafka_host = 'localhost:9092,localhost:9093'

p = Producer({'bootstrap.servers': kafka_host})

ac = AdminClient({'bootstrap.servers': kafka_host})


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


# some_data_source = ['a', 'b', 'c', 'd', 'e']
some_data_source = [str(i) for i in range(100)]
if __name__ == '__main__':
    # 生成NewTopic实例
    newtopic = NewTopic(topic=topic, num_partitions=3, replication_factor=1)
    # 创建topic
    ac.create_topics([newtopic])
    # 获取所有topic
    topics = p.list_topics().topics
    print 'topics:{}'.format(topics)
    # partitions = topics[topic].partitions
    # print 'partitions:{}'.format(partitions)
    for data in some_data_source:
        # Trigger any available delivery report callbacks from previous produce() calls
        p.poll(1)

        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        # 发送消息到指定分区
        # p.produce(topic, data.encode('utf-8'), callback=delivery_report, partition=0)
        # 发送消息不指定分区
        p.produce(topic, data.encode('utf-8'), callback=delivery_report)

    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    p.flush()
    # p.poll(1)

mykafka_consumer_a.py

# -*- coding: utf-8 -*-
from confluent_kafka import Consumer
from confluent_kafka.cimpl import TopicPartition

from mykafka_producer import topic, kafka_host, groud_id

c = Consumer({
    'bootstrap.servers': kafka_host,
    'group.id': groud_id,
    'auto.offset.reset': 'earliest'
})

c.subscribe([topic])

# topics = c.list_topics().topics
# tp = TopicPartition(topic, 1)
# c.assign([tp])

def pull_message():
    print '开始拉取消息'
    while True:
        msg = c.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        print('Received message: {} from:{}-{}'.format(msg.value().decode('utf-8'),msg.topic(),msg.partition()))

    c.close()


if __name__ == '__main__':
    pull_message()

mykafka_consumer_b.py

代码与 mykafka_consumer_b完全一样。

将mykafka_consumer_a.py和mykafka_consumer_b.py同时运行起来时,可以看到这个消费者组下的两个消费者在同时消费同一topic中的数据,但是每个消费者消费固定分区的。

系统kafka的server.properties的主要配置

broker1端口再默认的9092

broker.id=1
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

broker2端口在9093

broker.id=2
listeners=PLAINTEXT://localhost:9093
log.dirs=/tmp/kafka2-logs
zookeeper.connect=localhost:2181

托管kafka和zookeeper的supervisor得配置文件

[group:kafka-group]
programs=zookeeper,kafka_server_1,kafka_server_2
[program:zookeeper]
command=bash bin/zookeeper-server-start.sh config/zookeeper.properties
process_name=zookeeper
directory=/Users/wangjinyu/server/kafka_node1
user=wangjinyu
stdout_logfile=/varf/www/logs/kafka/zookeeper_out.log
stderr_logfile=/var/www/logs/kafka/zookeeper_err.log
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
[program:kafka_server_1]
command=bash bin/kafka-server-start.sh config/server.properties
process_name=kafka_server_1
directory=/Users/wangjinyu/server/kafka_node1
user=wangjinyu
stdout_logfile=/var/www/logs/kafka/out.log
stderr_logfile=/var/www/logs/kafka/err.log
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
[program:kafka_server_2]
command=bash bin/kafka-server-start.sh config/server.properties
process_name=kafka_server_2
directory=/Users/wangjinyu/server/kafka_node2
user=wangjinyu
stdout_logfile=/var/www/logs/kafka/out.log
stderr_logfile=/var/www/logs/kafka/err.log
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0

 

 
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐