1 #!/usr/bin/python 2 # -*- coding:utf-8 -*- 3 4 from confluent_kafka import Producer 5 import json 6 import time 7 import sys 8 9 def delivery_report(err, msg): 10 """ Called once for each message produced to indicate delivery result. 11 Triggered by poll() or flush(). """ 12 if err is not None: 13 print('Message delivery failed: {}'.format(err)) 14 else: 15 print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) 16 17 def c_kafka(topic, payloads, sleep_time=0, **conf): 18 gu = [] 19 p = Producer(**conf) 20 for i in payloads: 21 gu.insert(0, "ping%s" % int(round(int(time.time() * 1000)))) 22 p.poll(0) 23 p.produce(topic, json.dumps(i).encode('utf-8'), callback=delivery_report) 24 if sleep_time > 0: 25 p.flush() 26 time.sleep(int(sleep_time)) 27 if sleep_time == 0: 28 p.flush 29 return gu 30 31 32 if __name__ == "__main__": 33 args = sys.argv 34 ip = args[1] 35 rate_value = args[2] 36 ranges = args[3] 37 sleep_time = args[4] 38 payloads = [] 39 conf = { 40 'bootstrap.servers': 'authtest.jdq.jd.local:9888', 41 'client.id': '23a49894' 42 } 43 topic = 'WDMEtlTest' 44 for i in range(int(ranges)): 45 payloads.append({ 46 "timestamp": int(time.time()) + ((i + 1) * 60), 47 "timestampISO": "2018-09-12T06:22:40.000Z", 48 "topic": "devEtl03", 49 "ip": args[1], 50 "if_name": "Ethernet6/4", 51 "service_status":"在线", 52 "data_level": "debug", 53 "globalunique": '%s%s' % ('dci', int(time.time())), 54 } 55 ) 56 print c_kafka(topic, payloads, sleep_time, **conf) 57
所有评论(0)