kafka.exceptions.KafkaException[异常处理]问题解决
排查之后,发现都没有问题。消费者也有数据产生。对于('Unknownpartitionsuppliedtocommit_offsets\n%s',KeyError(,))这个问题的原因,应该是前面的json解析失败而触发的异常。于是就进行了一个异常捕获操作加入异常捕获,当json解析不成功之后直接pass。启动kafka和zookeeper的连接。zookeeper是不是启动了。.........
编写的python文件(消费者):
1 #!/usr/bin/python
2 #coding=utf8
3 """
4 # Author: lkong
5 # Created Time : 2022-07-19 11:56:53
6
7 # File Name: python_consumer.py
8 # Description:
9
10 """
11 import json
12 import requests
13 import time
14
15 taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
16 #查询ip地址的信息(省份和运营商isp),通过taobao网的接口
17 def resolv_ip(ip):
18 response = requests.get(taobao_url+ip)
19 if response.status_code == 200:
20 tmp_dict = json.loads(response.text)
21 prov = tmp_dict["data"]["region"]
22 isp = tmp_dict["data"]["isp"]
23 return prov,isp
24 return None,None
25
26 #将日志里读取的格式转换为我们指定的格式
27 def trans_time(dt):
28 #把字符串转成时间格式
29 timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
30 #timeStamp = int(time.mktime(timeArray))
31 #把时间格式转成字符串
32 new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
33 return new_time
34
35 #从kafka里获取数据,清洗为我们需要的ip,时间,带宽
36 from pykafka import KafkaClient
37 client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
38 topic = client.topics['nginxlog']
39 balanced_consumer = topic.get_balanced_consumer(
40 consumer_group='testgroup',
41 auto_commit_enable=True,
42 zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
43 )
44 #consumer = topic.get_simple_consumer()
45 for message in balanced_consumer:
46 if message is not None:
47 line = json.loads(message.value.decode("utf-8"))
48 log = line["message"]
49 tmp_lst = log.split()
50 ip = tmp_lst[0]
51 dt = tmp_lst[3].replace("[","")
52 bt = tmp_lst[9]
53 dt = trans_time(dt)
54 prov, isp = resolv_ip(ip)
55 if prov and isp:
56 print(prov, isp,dt)
在Python程序消费清洗日志文件时,执行文件报错信息如下的:
[root@nginx-kafka01 python]# python3 python_consumer.py
Traceback (most recent call last):
File "python_consumer.py", line 47, in <module>
line = json.loads(message.value.decode("utf-8"))
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Exception ignored in: <bound method BalancedConsumer.__del__ of <pykafka.balancedconsumer.BalancedConsumer at 0x7f79a514eda0 (consumer_group=testgroup)>>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 294, in __del__
File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 359, in stop
File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 403, in stop
File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 552, in commit_offsets
pykafka.exceptions.KafkaException: ('Unknown partition supplied to commit_offsets\n%s', KeyError(<pykafka.partition.Partition at 0x7f79a514ed68 (id=0)>,))
出现这个错误进行很多问题的排查:
就去查看kafka是不是启动了:
bin/kafka-server-start.sh -daemon config/server.properties #启动kafka
zookeeper是不是启动了。
bin/zkServer.sh start #启动zookeeper
启动kafka和zookeeper的连接。
[root@nginx-kafka02 kafka_2.12-2.8.1]# cd /opt/apache-zookeeper-3.6.3-bin/bin
[root@nginx-kafka02 bin]# ./zkCli.sh
排查之后,发现都没有问题。消费者也有数据产生。然后对pykafka.exceptions.KafkaException: ('Unknown partition supplied to commit_offsets\n%s', KeyError(<pykafka.partition.Partition at 0x7f79a514ed68 (id=0)>,))这个报错信息进行了排查、查询。但是查了很多资料也没找到解决办法,于是就先解决前面报错c信息:
先打印出message中使用json格式转换的内容,发现因为当时自己写数据的时候,消费的数据不是json格式:
for message in balanced_consumer:
46 if message is not None:
47 print(message.value)
48 line = json.loads(message.value.decode("utf-8"))
[root@nginx-kafka01 sc]# python3 /lianxi/python/python_consumer_bak.py
b'hh'
Traceback (most recent call last):
File "/lianxi/python/python_consumer_bak.py", line 48, in <module>
line = json.loads(message.value.decode("utf-8"))
File "/usr/lib64/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/lib64/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/lib64/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
Exception ignored in: <bound method BalancedConsumer.__del__ of <pykafka.balancedconsumer.BalancedConsumer at 0x7f6122bbce80 (consumer_group=testgroup)>>
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 294, in __del__
File "/usr/local/lib/python3.6/site-packages/pykafka/balancedconsumer.py", line 359, in stop
File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 403, in stop
File "/usr/local/lib/python3.6/site-packages/pykafka/simpleconsumer.py", line 552, in commit_offsets
于是就进行了一个异常捕获操作:加入异常捕获,当json解析不成功之后直接pass。
1 #!/usr/bin/python
2 #coding=utf8
3 """
4 # Author: lkong
5 # Created Time : 2022-07-19 11:56:53
6
7 # File Name: python_consumer.py
8 # Description:
9
10 """
11 import json
12 import requests
13 import time
14
15 taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="
16 #查询ip地址的信息(省份和运营商isp),通过taobao网的接口
17 def resolv_ip(ip):
18 response = requests.get(taobao_url+ip)
19 if response.status_code == 200:
20 tmp_dict = json.loads(response.text)
21 prov = tmp_dict["data"]["region"]
22 isp = tmp_dict["data"]["isp"]
23 return prov,isp
24 return None,None
25
26 #将日志里读取的格式转换为我们指定的格式
27 def trans_time(dt):
28 #把字符串转成时间格式
29 timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
30 #timeStamp = int(time.mktime(timeArray))
31 #把时间格式转成字符串
32 new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
33 return new_time
34
35 #从kafka里获取数据,清洗为我们需要的ip,时间,带宽
36 from pykafka import KafkaClient
37 client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
38 topic = client.topics['nginxlog']
39 balanced_consumer = topic.get_balanced_consumer(
40 consumer_group='testgroup',
41 auto_commit_enable=True,
42 zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
43 )
44 #consumer = topic.get_simple_consumer()
45 for message in balanced_consumer:
46 if message is not None:
47 #print(message.value)
48 try:
49 line = json.loads(message.value.decode("utf-8"))
50 log = line["message"]
51 tmp_lst = log.split()
52 ip = tmp_lst[0]
53 dt = tmp_lst[3].replace("[","")
54 bt = tmp_lst[9]
55 dt = trans_time(dt)
56 prov, isp = resolv_ip(ip)
57 if prov and isp:
58 print(prov, isp,dt)
59 except ValueError as e:
60 pass
对于('Unknown partition supplied to commit_offsets\n%s', KeyError(<pykafka.partition.Partition at 0x7f79a514ed68 (id=0)>,))这个问题的原因,应该是前面的json解析失败而触发的异常。加入异常捕获之后就成功解决了这个问题,成功的为:
[root@nginx-kafka01 sc]# python3 /lianxi/python/python_consumer_bak.py
XX 内网IP 2022-07-19 12:12:44
XX 内网IP 2022-07-19 12:12:45
XX 内网IP 2022-07-19 14:34:00
XX 内网IP 2022-07-19 15:24:24
XX 内网IP 2022-07-19 15:51:53
XX 内网IP 2022-07-19 15:51:54
XX 内网IP 2022-07-19 15:51:55
XX 内网IP 2022-07-19 15:51:55
XX 内网IP 2022-07-19 15:51:56
XX 内网IP 2022-07-19 15:51:56
XX 内网IP 2022-07-19 15:51:57
XX 内网IP 2022-07-19 15:52:02
XX 内网IP 2022-07-19 15:52:02
XX 内网IP 2022-07-19 15:52:02
XX 内网IP 2022-07-19 18:06:49
XX 内网IP 2022-07-19 18:06:50
XX 内网IP 2022-07-19 18:27:58
XX 内网IP 2022-07-19 18:27:59
XX 内网IP 2022-07-19 18:28:00
XX 内网IP 2022-07-19 18:30:07
XX 内网IP 2022-07-19 18:30:07
XX 内网IP 2022-07-19 19:26:15
XX 内网IP 2022-07-19 19:58:06
更多推荐
所有评论(0)