编写的python文件(消费者):

  1 #!/usr/bin/python
  2 #coding=utf8
  3 """
  4 # Author: lkong
  5 # Created Time : 2022-07-19 11:56:53
  6  
  7 # File Name: python_consumer.py
  8 # Description:
  9  
 10 """
 11 import json
 12 import requests
 13 import time
 14  
 15 taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
 16 #查询ip地址的信息(省份和运营商isp),通过taobao网的接口
 17 def resolv_ip(ip):
 18     response = requests.get(taobao_url+ip)
 19     if response.status_code == 200:
 20        tmp_dict = json.loads(response.text)
 21        prov = tmp_dict["data"]["region"]
 22        isp = tmp_dict["data"]["isp"]
 23        return prov,isp
 24     return None,None
 25  
 26 #将日志里读取的格式转换为我们指定的格式
 27 def trans_time(dt):
 28      #把字符串转成时间格式
 29     timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
 30     #timeStamp = int(time.mktime(timeArray))
 31     #把时间格式转成字符串
 32     new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
 33     return new_time
 34  
 35 #从kafka里获取数据,清洗为我们需要的ip,时间,带宽
 36 from pykafka import KafkaClient
 37 client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
 38 topic = client.topics['nginxlog']
 39 balanced_consumer = topic.get_balanced_consumer(
 40   consumer_group='testgroup',
 41   auto_commit_enable=True,
 42   zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
 43 )
 44 #consumer = topic.get_simple_consumer() 
 45 for message in balanced_consumer:
 46    if message is not None:
 47        line = json.loads(message.value.decode("utf-8"))
 48        log = line["message"]
 49        tmp_lst = log.split()
 50        ip = tmp_lst[0]
 51        dt = tmp_lst[3].replace("[","")
 52        bt = tmp_lst[9]
 53        dt = trans_time(dt)
 54        prov, isp = resolv_ip(ip)
 55        if prov and isp:
 56           print(prov, isp,dt)

在Python程序消费清洗日志文件时,执行文件报错信息如下的:

[root@nginx-kafka01 python]# python3 python_consumer.py
Traceback (most recent call last):
  File "python_consumer.py", line 47, in <module>
    line = json.loads(message.value.decode("utf-8"))
  File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Exception ignored in: <bound method BalancedConsumer.__del__ of <pykafka.balancedconsumer.BalancedConsumer at 0x7f79a514eda0 (consumer_group=testgroup)>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 294, in __del__
  File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 359, in stop
  File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 403, in stop
  File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 552, in commit_offsets
pykafka.exceptions.KafkaException: ('Unknown partition supplied to commit_offsets\n%s', KeyError(<pykafka.partition.Partition at 0x7f79a514ed68 (id=0)>,))

出现这个错误进行很多问题的排查:

就去查看kafka是不是启动了:

bin/kafka-server-start.sh -daemon config/server.properties  #启动kafka

zookeeper是不是启动了。

bin/zkServer.sh start  #启动zookeeper

启动kafka和zookeeper的连接。

[root@nginx-kafka02 kafka_2.12-2.8.1]# cd /opt/apache-zookeeper-3.6.3-bin/bin
[root@nginx-kafka02 bin]# ./zkCli.sh

 排查之后,发现都没有问题。消费者也有数据产生。然后对pykafka.exceptions.KafkaException: ('Unknown partition supplied to commit_offsets\n%s', KeyError(<pykafka.partition.Partition at 0x7f79a514ed68 (id=0)>,))这个报错信息进行了排查、查询。但是查了很多资料也没找到解决办法,于是就先解决前面报错c信息:

先打印出message中使用json格式转换的内容,发现因为当时自己写数据的时候,消费的数据不是json格式:

 for message in balanced_consumer:
 46    if message is not None:
 47        print(message.value)
 48        line = json.loads(message.value.decode("utf-8"))
[root@nginx-kafka01 sc]# python3 /lianxi/python/python_consumer_bak.py
b'hh'
Traceback (most recent call last):
  File "/lianxi/python/python_consumer_bak.py", line 48, in <module>
    line = json.loads(message.value.decode("utf-8"))
  File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib64/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Exception ignored in: <bound method BalancedConsumer.__del__ of <pykafka.balancedconsumer.BalancedConsumer at 0x7f6122bbce80 (consumer_group=testgroup)>>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 294, in __del__
  File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 359, in stop
  File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 403, in stop
  File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 552, in commit_offsets

 于是就进行了一个异常捕获操作:加入异常捕获,当json解析不成功之后直接pass。

 1 #!/usr/bin/python
  2 #coding=utf8
  3 """     
  4 # Author: lkong
  5 # Created Time : 2022-07-19 11:56:53
  6         
  7 # File Name: python_consumer.py
  8 # Description:
  9         
 10 """     
 11 import json
 12 import requests
 13 import time
 14         
 15 taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
 16 #查询ip地址的信息(省份和运营商isp),通过taobao网的接口
 17 def resolv_ip(ip):
 18     response = requests.get(taobao_url+ip)
 19     if response.status_code == 200:
 20        tmp_dict = json.loads(response.text)
 21        prov = tmp_dict["data"]["region"]
 22        isp = tmp_dict["data"]["isp"]
 23        return prov,isp
 24     return None,None
 25         
 26 #将日志里读取的格式转换为我们指定的格式
 27 def trans_time(dt):
 28      #把字符串转成时间格式
 29     timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
 30     #timeStamp = int(time.mktime(timeArray))
 31     #把时间格式转成字符串
 32     new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
 33     return new_time
 34         
 35 #从kafka里获取数据,清洗为我们需要的ip,时间,带宽
 36 from pykafka import KafkaClient
 37 client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
 38 topic = client.topics['nginxlog']
 39 balanced_consumer = topic.get_balanced_consumer(
 40   consumer_group='testgroup',
 41   auto_commit_enable=True,
 42   zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
 43 )       
 44 #consumer = topic.get_simple_consumer() 
 45 for message in balanced_consumer:
 46    if message is not None:
 47        #print(message.value)
 48        try:
 49            line = json.loads(message.value.decode("utf-8"))
 50            log = line["message"]
 51            tmp_lst = log.split()
 52            ip = tmp_lst[0]
 53            dt = tmp_lst[3].replace("[","")
 54            bt = tmp_lst[9]
 55            dt = trans_time(dt)
 56            prov, isp = resolv_ip(ip)
 57            if prov and isp:
 58                print(prov, isp,dt)
 59        except ValueError as e:
 60             pass

对于('Unknown partition supplied to commit_offsets\n%s', KeyError(<pykafka.partition.Partition at 0x7f79a514ed68 (id=0)>,))这个问题的原因,应该是前面的json解析失败而触发的异常。加入异常捕获之后就成功解决了这个问题,成功的为:

[root@nginx-kafka01 sc]# python3  /lianxi/python/python_consumer_bak.py
XX 内网IP 2022-07-19 12:12:44
XX 内网IP 2022-07-19 12:12:45
XX 内网IP 2022-07-19 14:34:00
XX 内网IP 2022-07-19 15:24:24
XX 内网IP 2022-07-19 15:51:53
XX 内网IP 2022-07-19 15:51:54
XX 内网IP 2022-07-19 15:51:55
XX 内网IP 2022-07-19 15:51:55
XX 内网IP 2022-07-19 15:51:56
XX 内网IP 2022-07-19 15:51:56
XX 内网IP 2022-07-19 15:51:57
XX 内网IP 2022-07-19 15:52:02
XX 内网IP 2022-07-19 15:52:02
XX 内网IP 2022-07-19 15:52:02
XX 内网IP 2022-07-19 18:06:49
XX 内网IP 2022-07-19 18:06:50
XX 内网IP 2022-07-19 18:27:58
XX 内网IP 2022-07-19 18:27:59
XX 内网IP 2022-07-19 18:28:00
XX 内网IP 2022-07-19 18:30:07
XX 内网IP 2022-07-19 18:30:07
XX 内网IP 2022-07-19 19:26:15
XX 内网IP 2022-07-19 19:58:06
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐