Python存入kafka报错,ReferenceError: weakly-referenced object no longer exists。
Exception in thread 14: pykafka.OwnedBroker.queue_reader for broker 101:
Traceback (most recent call last):
File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner
self.run()
File "C:\Python27\lib\threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "C:\Python27\lib\site-packages\pykafka\producer.py", line 624, in queue_reader
self.producer._worker_exception = sys.exc_info()
ReferenceError: weakly-referenced object no longer exists
解决办法是加入 producer.stop()
完整代码是:
# kafka 连接 KAFKA_IP_PORT = '192.168.1.13:9082' # KAFKA_IP_PORT = '192.168.0.23:9082' KAFKA_TOPIC_NAME = 'test-topic' def kafka_save(data): print "获得的信息是:",data client = KafkaClient(hosts=KAFKA_IP_PORT) producer = client.topics[KAFKA_TOPIC_NAME.encode(encoding="UTF-8")].get_producer() data_str = str(data) producer.produce(data_str.encode(encoding='UTF-8')) producer.stop() print "推送到kafka成功"
其中 data是推送到kafka中的数据
所有评论(0)