flume写入Kafka错误:The server experienced an unexpected error when processing the request
1. 今天在使用flume清洗日志数据时,遇到以下错误。java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the requestat org
1. 今天在使用flume清洗日志数据时,遇到以下错误。
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
20/07/14 19:40:59 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:268)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
... 3 more
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
20/07/14 19:41:07 ERROR kafka.KafkaSink: Failed to publish events
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:244)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
2. 分析以上错误信息:kafkasink无法传递事件,首先检查自己flume配置是否有误。
# Name the components on this agent
ibike_accessLog.sources = r1
ibike_accessLog.sinks = k1 kafkasink
ibike_accessLog.channels = c1 c2
# Describe/configure the source
ibike_accessLog.sources.r1.type = spooldir
ibike_accessLog.sources.r1.spoolDir = /opt/ibike/log/accessLog
ibike_accessLog.sources.r1.fileHeader = true
ibike_accessLog.sources.r1.interceptors = i1 i2
ibike_accessLog.sources.r1.interceptors.i1.type = com.yc.flume.AccessLogInterceptor$Builder
ibike_accessLog.sources.r1.interceptors.i2.type = host
ibike_accessLog.sources.r1.charset=UTF-8
ibike_accessLog.sources.r1.deserializer=LINE
ibike_accessLog.sources.r1.deserializer.maxLineLength=2048
ibike_accessLog.sources.r1.deserializer.outputCharset=UTF-8
# Describe the sink
ibike_accessLog.sinks.k1.type = hdfs
ibike_accessLog.sinks.k1.hdfs.path = hdfs://node1:8020/ibike/log/visitLog
ibike_accessLog.sinks.k1.hdfs.writeFormat = Text
ibike_accessLog.sinks.k1.hdfs.fileType = DataStream
ibike_accessLog.sinks.k1.hdfs.rollInterval = 0
ibike_accessLog.sinks.k1.hdfs.rollSize = 0
ibike_accessLog.sinks.k1.hdfs.rollCount = 0
ibike_accessLog.sinks.k1.hdfs.callTimeout =100000
ibike_accessLog.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
ibike_accessLog.sinks.k1.hdfs.useLocalTimeStamp = true
ibike_accessLog.sinks.k1.hdfs.closeTries = 5
ibike_accessLog.sinks.kafkasink.type=org.apache.flume.sink.kafka.KafkaSink
ibike_accessLog.sinks.kafkasink.kafka.bootstrap.servers=node1:9092,node2:9092,node3:9092
ibike_accessLog.sinks.kafkasink.kafka.topic=accesslog
ibike_accessLog.sinks.kafkasink.kafka.flumeBatchSize= 20
ibike_accessLog.sinks.kafkasink.kafka.producer.acks= 1
ibike_accessLog.sinks.kafkasink.kafka.producer.linger.ms= 1
ibike_accessLog.sinks.kafkasink.kafka.producer.compression.type = snappy
# Use a channel which buffers events in memory
ibike_accessLog.channels.c1.type = memory
ibike_accessLog.channels.c1.capacity = 1000
ibike_accessLog.channels.c1.transactionCapacity = 100
ibike_accessLog.channels.c2.type = memory
ibike_accessLog.channels.c2.capacity = 1000
ibike_accessLog.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
ibike_accessLog.sources.r1.channels = c1 c2
ibike_accessLog.sinks.k1.channel = c1
ibike_accessLog.sinks.kafkasink.channel = c2
3. 由于博主的sink输出存在于kakfa及hfds上,配置检查无误后观察kafka消费者及hdfs上是否有数据
4. 检查后得知,flume子进程不挂,部分数据也正常写入kafka和hdfs,但是报错。
分析原因:报错原因提示是请求包含的消息大于服务器将接受的最大消息大小。这不是flume的错误,而是kafka的问题,kafka中,能接受的单条消息的大小是有限制的,默认是1M,由于现有日志中包含图片信息,远大于1M,所以提升kafka能接受的单条消息的大小程度。有两种方式,一种是修改某一个topic,一种是修改kafka的配置文件。
(1) 修改kafka的一个topic(提升至200M):kafka-topics.sh --create -zookeeper node1:2181,node2:2181,node3:2181 -replication-factor 1 --partitions 3 --topic accesslog --config max.message.bytes=209715200
(2) 修改kafka的配置文件:在kafka的server.properties配置上添加两个配置:
#broker能接收消息的最大字节数
message.max.bytes=209715200
#broker可复制的消息的最大字节数,该配置项必须不小message.max.bytes,因为该配置项是消费者从partition中获取消息放入内存中所用的内存大小,
#如果小于message.max.bytes,可能会导致给消费者分配的内存放不下一个message
replica.fetch.max.bytes=209715200
(3) 修改完毕后重启启动kafka,问题就解决了
更多推荐
所有评论(0)