1、安装相关依赖:

pip install kafka==1.3.5

pip install kafka-python==2.0.2

2、创建kafka,参考kafka3.0版本部署及相关明令汇总_会发paper的学渣的博客-CSDN博客

3、代码部分:

from kafka import KafkaProducer
from kafka.errors import kafka_errors
producer = KafkaProducer(bootstrap_servers = BROKER_LIST,  #线上环境
                         # security_protocol = 'SASL_PLAINTEXT',
                         # sasl_mechanism = 'PLAIN',
                         # sasl_plain_username='test',
                         # sasl_plain_password='test@123',
                         key_serializer = lambda k :json.dumps(k).encode('utf-8'),
                         value_serializer = lambda v :json.dumps(v).encode('utf-8'))

def send_request_user(request_users):
    print("topic user_len:{}".format(len(request_users)))
    for name,page_size in  request_users:
        try:
            partition_id = random.randint(0,4)
            future = producer.send(topic=TOPIC_NAME_MAIN,
                                   partition=partition_id,
                                   key="custId",
                                   value={"name":name,"pageSize":page_size})
            record_metadata = future.get(timeout=30)
            print("topic partition:{}, user_id :{}".format(record_metadata.partition, user))
            # logger.info(record_metadata.topic)
            #logger.info("offset value: %s" % record_metadata.offset)
        except kafka_errors as e:
            print(repr(e))


if __name__ == '__main__':
    while True:
        name= input()
        page_size = int(input())
        request_users = [(name,page_size)]
        send_request_user(request_users)

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐