Flink1.8 踩坑记
1、flink sink to kafka 报错 java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)VCaused by: java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCl...
1、flink sink to kafka 报错 java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:495)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:449)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:277)
at com.igg.flink.tool.onlineUser.kafka.consumer.JavaKafkaOnlineConsumer.main(JavaKafkaOnlineConsumer.java:112)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
问题描述:flink 1.8 将 DataStream 写入 Kafka,报错提示方法找不到。
问题分析:本地运行是正常的,但是使用 flink 提交到 yarn 就会报错,看到 ClassNotFoundException、NoSuchMethodError、
NoClassDefoundError 等错误,首先享到的是包冲突了。ClosureCleaner.class 是属于 flink-api.jar 包, 通过 idea maven
helper、mvn dependency:tree 等查看包依赖情况,发现 flink-api.jar 并没有冲突,所以应该不是包冲突问题。难道是 kafka 与
flink-connector 包不匹配?翻看了 flink 官网(https://ci.apache.org/projects/flink/flink-docsrelease1.8/dev/connectors/kafka.html)用
的 kafka 是 2.12 版本,flink 是 1.8 版本,应该引用 flink-connector-kafka_2.11 版本的 jar,也没问题。在倒腾了一番之后,才然
发现下载的 flink 环境包是 1.8.1 版本,而我项目中使用的 flink 版本一致都是 1.8.0 版本,包括 flink-connector 版本。只能说
flink 自身自身中间的版本还算兼容,但是与其他连接器的版本必须得完全匹配呀!!
解决办法:将项目中的 flink 版本,全部改为 1.8.1,问题解决。
2、flink sink to influxdb 报错:java.lang.NoClassDefFoundError: Could not initialize class org.influxdb.dto.Point
java.lang.NoClassDefFoundError: Could not initialize class org.influxdb.dto.Point
at com.igg.flink.tool.sink.InfluxDBSink.invoke(InfluxDBSink.scala:35)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.igg.flink.tool.onlineUser.agg.MinuteLogicalWindowResultFunction.apply(MinuteLogicalWindowResultFunction.java:22)
at com.igg.flink.tool.onlineUser.agg.MinuteLogicalWindowResultFunction.apply(MinuteLogicalWindowResultFunction.java:14)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
问题描述:flink 1.8 将 DataStream 写入 influxdb,报错提示类不到。
问题分析:项目已经引用了 influxdb 包,可能还少了其他包。
<!-- influxdb --> <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.5</version> </dependency>
解决办法:引入 guava 包
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
3、Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto cannot be cast to com.google.protobuf.Message
2019-07-24 02:12:21.611 [flink-akka.actor.default-dispatcher-5] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred in the cluster entrypoint.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@local-vm-229:47467/user/resourcemanager
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:209)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:539)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:164)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start resource manager client.
at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:250)
at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:219)
at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:207)
... 16 common frames omitted
Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:224)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy16.registerApplicationMaster(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:107)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy17.registerApplicationMaster(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:223)
at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:214)
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:139)
at org.apache.flink.yarn.YarnResourceManager.createAndStartResourceManagerClient(YarnResourceManager.java:216)
at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:245)
... 18 common frames omitted
原因分析:
这种问题一般是由于自己工程的hadoop的jar包和flink集群的jar包冲突导致的。
解决办法:
剔除自己工程中的hadoop相关的jar,打包的时候不要打进来。
<!-- 访问hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>xml-apis</groupId>
<artifactId>xml-apis</artifactId>
</exclusion>
</exclusions>
</dependency>
4、Could not build the program from JAR file.
[root@master bin]# ./flink run -m yarn-cluster -yn 1 -p 2 -yjm 1024 -ytm 1024 -ynm FlinkOnYarnSession-MemberLogInfoProducer -d -c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer /home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
Could not build the program from JAR file.
Use the help option (-h or --help) to get help on the command.
原因分析:
flink1.8没有包括hadoop相关的包。
解决办法:
官网下载flink-shaded-hadoop-2-uber-2.8.3-7.0.jar,放到flink_home/lib目录下
5、Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:51:59,717 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:51:59,969 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:52:00,220 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:52:00,472 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
原因分析:
yarn没有足够的资源进行分配,如内存,vcores等。
通过yarn ui界面,可以看到可用的vcores=3,有4个节点处于不健康的状态。点击"4",查看不健康的节点。
通过查看可知,磁盘空间不足。
解决办法:
清理对应的磁盘空间即可。
6、Ask timed out on [Actor[akka.tcp://flink@datanode-16:37447/user/rpc/taskmanager_0#-1007332146]] after [10000 ms]
原因分析:初步判定是触发了akka的超时机制。Flink 使用 Akka作为组件(JobManager/TaskManager/ResourceManager)之间的 RPC框架。在 JobManager 和 TaskManagers 之间发送的消息的最大大小默认为10485760b。如果消息超过这个限制就会失败。
解决办法:
"akka.framesize": "104857600b",
"heartbeat.timeout": "300000",
"akka.ask.timeout": "60s",
"web.timeout": "600000"
【一起学习】
更多推荐
所有评论(0)