使用 pykafka 创建 topic

记得填写自己的kafka 相应的ip和port

import pykafka
import re


class Topic():
    def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]):
        """


        :param topic_name:
        :param num_partitions:
        :param replication_factor:
        :param replica_assignment: [(partition, replicas)]
        :param config_entries: [(config_name, config_value)]
        """
        self.topic_name = topic_name.encode()
        self.num_partitions = num_partitions
        self.replication_factor = replication_factor
        self.replica_assignment = replica_assignment
        self.config_entries = config_entries


client = pykafka.KafkaClient(hosts="ip:port")
print(client.topics)
for i in client.brokers.values():
    try:
        i.create_topics(topic_reqs=(Topic("mytopic1"), ))
    except Exception as e:
        if re.search("41", str(e)):
            print("该broker 不是 leader,交由下一个broker创建")
        elif re.search("7", str(e)):
            print("创建完成")
            break
        elif re.search("36", str(e)):
            print("topic 已存在")
            break
        else:
            raise e

  1. 该程序会报错
pykafka.exceptions.RequestTimedOut: Response Type: "CreateTopicsResponse"	Response: [(b'mytopic1', 7)]
  1. 但通过查看topics 结果发现,topics 已经创建成功
client = pykafka.KafkaClient(hosts="ip:port")
print(client.topics)
{b'mytopic1': None, b'mytopic': None, b'test': None}
  1. 如果使用的broker 不是 leader,将会报错,41
    • NOT_CONTROLLER
    • This is not the correct controller for this cluster.
    • 更多参看文档 https://blog.csdn.net/weixin_42290927/article/details/104479691
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐