python 通过 API 创建kafka topic
使用 pykafka 创建 topic记得填写自己的kafka 相应的ip和portimport pykafkaclass Topic():def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]):...
·
使用 pykafka 创建 topic
记得填写自己的kafka 相应的ip和port
import pykafka
import re
class Topic():
def __init__(self, topic_name, num_partitions=1, replication_factor=1, replica_assignment=[], config_entries=[]):
"""
:param topic_name:
:param num_partitions:
:param replication_factor:
:param replica_assignment: [(partition, replicas)]
:param config_entries: [(config_name, config_value)]
"""
self.topic_name = topic_name.encode()
self.num_partitions = num_partitions
self.replication_factor = replication_factor
self.replica_assignment = replica_assignment
self.config_entries = config_entries
client = pykafka.KafkaClient(hosts="ip:port")
print(client.topics)
for i in client.brokers.values():
try:
i.create_topics(topic_reqs=(Topic("mytopic1"), ))
except Exception as e:
if re.search("41", str(e)):
print("该broker 不是 leader,交由下一个broker创建")
elif re.search("7", str(e)):
print("创建完成")
break
elif re.search("36", str(e)):
print("topic 已存在")
break
else:
raise e
- 该程序会报错
pykafka.exceptions.RequestTimedOut: Response Type: "CreateTopicsResponse" Response: [(b'mytopic1', 7)]
- 但通过查看topics 结果发现,topics 已经创建成功
client = pykafka.KafkaClient(hosts="ip:port")
print(client.topics)
{b'mytopic1': None, b'mytopic': None, b'test': None}
- 如果使用的broker 不是 leader,将会报错,41
- NOT_CONTROLLER
- This is not the correct controller for this cluster.
- 更多参看文档
https://blog.csdn.net/weixin_42290927/article/details/104479691
更多推荐
已为社区贡献2条内容
所有评论(0)