Kafka Producer/Consumer 关系解释及测试demo
这就像在Kafka中,如果某个Partition的消息积压,负责这个Partition的消费者就需要更快地处理消息,以防止延迟。这时,厨师们(Producers)开始忙碌起来,每准备好一道菜,就会放到服务台(Topic)的指定位置(Partition)。在这个餐厅中,有时候会有特别多的订单,厨师需要快速高效地准备菜肴。通过这个例子,我们可以看到,Kafka的Producer和Consumer之间是
Producer/Consumer
Kafka的生产者(Producer)和消费者(Consumer)的关系,可以通过一个餐厅的例子来形象地说明。
1. 餐厅的故事
想象一个忙碌的餐厅,这里有:
- 厨师(Producers):负责准备美味的菜肴。
- 服务台(Kafka Topic):菜肴准备好后,厨师会将它们放到服务台上,服务台有多个部分,每部分代表一个不同类型的菜(即Kafka中的不同Partition)。
- 服务员(Consumers):负责从服务台上取走菜肴,并将它们送到顾客手中。
在这个餐厅中,有时候会有特别多的订单,厨师需要快速高效地准备菜肴。每当一道菜准备好,他们就会把它放到对应的部分在服务台上。服务台非常长,可以容纳很多菜肴,让不同的服务员能够同时服务多个顾客,提高效率。
2. Kafka的工作方式
- Producers(厨师):在Kafka中,生产者的角色是发布消息到Topic中。就像厨师准备好菜肴后,会将它们放到服务台的对应部分。
- Kafka Topic(服务台):Topic是消息的分类,可以细分为多个Partitions(服务台的多个部分),这样可以提高并行处理的能力。每个Partition都是一个独立的队列。
- Consumers(服务员):消费者从Topic中读取消息。如果有多个消费者在同一个Consumer Group中,它们可以像一队服务员那样协作,每个人负责从服务台的一部分取菜,这样可以更快地服务所有顾客。每个消费者负责读取特定Partition中的消息,确保每条消息都能被及时处理。
3. 生动的场景
假设一天晚上,餐厅接到了一个大型宴会的预订,需要同时准备多道菜。这时,厨师们(Producers)开始忙碌起来,每准备好一道菜,就会放到服务台(Topic)的指定位置(Partition)。服务员们(Consumers)各自负责一部分服务台,快速地将菜肴送到顾客手中。
在这个过程中,如果某一部分的菜准备得特别快,服务台上的这一部分就会堆积更多的菜肴。负责这一部分的服务员需要加快速度,以确保所有的菜肴都能及时送出。这就像在Kafka中,如果某个Partition的消息积压,负责这个Partition的消费者就需要更快地处理消息,以防止延迟。
通过这个例子,我们可以看到,Kafka的Producer和Consumer之间是如何通过Topic(服务台)和Partition(服务台的不同部分)协作的,以实现高效、可靠的消息处理。
4. 测试Demo
4.1 KafkaProducer
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
import time
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
def process():
# Kafka配置,需自行修改
bootstrap_servers = ['ip:port']
producer_topic = 'XXX_topic'
# Kafka生产者
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda m: json.dumps(m).encode('utf-8'),
api_version=(1,0,0)
)
data = {
"task_id": 1,
"image_path": "XXX",
"video_path": "XXX",
"guidence_text": "XXX",
}
# Kafka请求监听
try:
res = data
# 发送结果到Kafka
producer.send(producer_topic, res)
logging.info(f"send data to {producer_topic}")
time.sleep(3)
except Exception as e:
# 记录错误日志
logging.error(f"Error processing kafka request: {e}")
if __name__ == "__main__":
process()
4.2 KafkaConsumer
from kafka import KafkaConsumer
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
def consume_messages():
# Kafka配置
bootstrap_servers = ['ip:port']
consumer_topic = 'XXX'
consumer_group = 'XXX'
# Kafka消费者
consumer = KafkaConsumer(
consumer_topic,
bootstrap_servers=bootstrap_servers,
group_id=consumer_group,
# auto_offset_reset='earliest', # 从最早的消息开始读取
auto_offset_reset= "latest",
value_deserializer=lambda m: json.loads(m.decode('utf-8')) # 解码JSON格式的消息
)
logging.info(f"Started consuming messages from {consumer_topic}")
# 消费消息
try:
for message in consumer:
msg = message.value
logging.info(f"Received message: {msg}")
print(f"msg:{msg}")
except KeyboardInterrupt:
logging.info("Stopping consumer...")
except Exception as e:
logging.error(f"Error while consuming messages: {e}")
finally:
consumer.close()
if __name__ == "__main__":
consume_messages()
更多推荐
所有评论(0)