python发送kafka消息
1、安装相关依赖:pip install kafka==1.3.5pip install kafka-python==2.0.22、创建kafka,参考kafka3.0版本部署及相关明令汇总_会发paper的学渣的博客-CSDN博客3、代码部分:from kafka import KafkaProducerfrom kafka.errors import kafka_errorsproducer
·
1、安装相关依赖:
pip install kafka==1.3.5
pip install kafka-python==2.0.2
2、创建kafka,参考kafka3.0版本部署及相关明令汇总_会发paper的学渣的博客-CSDN博客
3、代码部分:
from kafka import KafkaProducer
from kafka.errors import kafka_errors
producer = KafkaProducer(bootstrap_servers = BROKER_LIST, #线上环境
# security_protocol = 'SASL_PLAINTEXT',
# sasl_mechanism = 'PLAIN',
# sasl_plain_username='test',
# sasl_plain_password='test@123',
key_serializer = lambda k :json.dumps(k).encode('utf-8'),
value_serializer = lambda v :json.dumps(v).encode('utf-8'))
def send_request_user(request_users):
print("topic user_len:{}".format(len(request_users)))
for name,page_size in request_users:
try:
partition_id = random.randint(0,4)
future = producer.send(topic=TOPIC_NAME_MAIN,
partition=partition_id,
key="custId",
value={"name":name,"pageSize":page_size})
record_metadata = future.get(timeout=30)
print("topic partition:{}, user_id :{}".format(record_metadata.partition, user))
# logger.info(record_metadata.topic)
#logger.info("offset value: %s" % record_metadata.offset)
except kafka_errors as e:
print(repr(e))
if __name__ == '__main__':
while True:
name= input()
page_size = int(input())
request_users = [(name,page_size)]
send_request_user(request_users)
更多推荐
已为社区贡献2条内容
所有评论(0)