1、flink sink to kafka 报错 java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V

Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:495)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:449)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:277)
	at com.igg.flink.tool.onlineUser.kafka.consumer.JavaKafkaOnlineConsumer.main(JavaKafkaOnlineConsumer.java:112)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)

问题描述:flink 1.8 将 DataStream 写入 Kafka,报错提示方法找不到。

问题分析:本地运行是正常的,但是使用 flink 提交到 yarn 就会报错,看到 ClassNotFoundException、NoSuchMethodError、

NoClassDefoundError 等错误,首先享到的是包冲突了。ClosureCleaner.class 是属于 flink-api.jar 包, 通过 idea maven

helper、mvn dependency:tree 等查看包依赖情况,发现 flink-api.jar 并没有冲突,所以应该不是包冲突问题。难道是 kafka 与

flink-connector 包不匹配?翻看了 flink 官网(https://ci.apache.org/projects/flink/flink-docsrelease1.8/dev/connectors/kafka.html)用

的 kafka 是 2.12 版本,flink 是 1.8 版本,应该引用 flink-connector-kafka_2.11 版本的 jar,也没问题。在倒腾了一番之后,才然

发现下载的 flink 环境包是 1.8.1 版本,而我项目中使用的 flink 版本一致都是 1.8.0 版本,包括 flink-connector 版本。只能说

flink 自身自身中间的版本还算兼容,但是与其他连接器的版本必须得完全匹配呀!!

解决办法:将项目中的 flink 版本,全部改为 1.8.1,问题解决。

2、flink sink to influxdb 报错:java.lang.NoClassDefFoundError: Could not initialize class org.influxdb.dto.Point

java.lang.NoClassDefFoundError: Could not initialize class org.influxdb.dto.Point
	at com.igg.flink.tool.sink.InfluxDBSink.invoke(InfluxDBSink.scala:35)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:649)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:602)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.igg.flink.tool.onlineUser.agg.MinuteLogicalWindowResultFunction.apply(MinuteLogicalWindowResultFunction.java:22)
	at com.igg.flink.tool.onlineUser.agg.MinuteLogicalWindowResultFunction.apply(MinuteLogicalWindowResultFunction.java:14)
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

问题描述:flink 1.8 将 DataStream 写入 influxdb,报错提示类不到。

问题分析:项目已经引用了 influxdb 包,可能还少了其他包。

<!-- influxdb -->
<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.5</version>
</dependency>

解决办法:引入 guava 包

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>18.0</version>
</dependency>

3、Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto cannot be cast to com.google.protobuf.Message

2019-07-24 02:12:21.611 [flink-akka.actor.default-dispatcher-5] ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Fatal error occurred in the cluster entrypoint.
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@local-vm-229:47467/user/resourcemanager
        at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:209)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:539)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:164)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start resource manager client.
        at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:250)
        at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:219)
        at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:207)
        ... 16 common frames omitted
Caused by: java.lang.ClassCastException: org.apache.hadoop.yarn.proto.YarnServiceProtos$RegisterApplicationMasterRequestProto cannot be cast to com.google.protobuf.Message
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:224)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
        at com.sun.proxy.$Proxy16.registerApplicationMaster(Unknown Source)
        at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.registerApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:107)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy17.registerApplicationMaster(Unknown Source)
        at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:223)
        at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.registerApplicationMaster(AMRMClientImpl.java:214)
        at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl.registerApplicationMaster(AMRMClientAsyncImpl.java:139)
        at org.apache.flink.yarn.YarnResourceManager.createAndStartResourceManagerClient(YarnResourceManager.java:216)
        at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:245)
        ... 18 common frames omitted


原因分析:

这种问题一般是由于自己工程的hadoop的jar包和flink集群的jar包冲突导致的。

解决办法:

剔除自己工程中的hadoop相关的jar,打包的时候不要打进来。

<!-- 访问hdfs -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>${hadoop.version}</version>
    <scope>provided</scope>
    <exclusions>
        <exclusion>
            <groupId>xml-apis</groupId>
            <artifactId>xml-apis</artifactId>
        </exclusion>
    </exclusions>
</dependency>

4、Could not build the program from JAR file.

[root@master bin]# ./flink run -m yarn-cluster -yn 1 -p 2 -yjm 1024 -ytm 1024 -ynm FlinkOnYarnSession-MemberLogInfoProducer -d -c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer /home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
Could not build the program from JAR file.
 
Use the help option (-h or --help) to get help on the command.


原因分析:

flink1.8没有包括hadoop相关的包。

解决办法:

官网下载flink-shaded-hadoop-2-uber-2.8.3-7.0.jar,放到flink_home/lib目录下

5、Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

2019-07-25 15:51:59,717 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:51:59,969 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:52:00,220 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
2019-07-25 15:52:00,472 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster

原因分析:

yarn没有足够的资源进行分配,如内存,vcores等。

通过yarn ui界面,可以看到可用的vcores=3,有4个节点处于不健康的状态。点击"4",查看不健康的节点。

通过查看可知,磁盘空间不足。

解决办法:

清理对应的磁盘空间即可。

6、Ask timed out on [Actor[akka.tcp://flink@datanode-16:37447/user/rpc/taskmanager_0#-1007332146]] after [10000 ms]

原因分析:初步判定是触发了akka的超时机制。Flink 使用 Akka作为组件(JobManager/TaskManager/ResourceManager)之间的 RPC框架。在 JobManager 和 TaskManagers 之间发送的消息的最大大小默认为10485760b。如果消息超过这个限制就会失败。

解决办法:

"akka.framesize": "104857600b",
"heartbeat.timeout": "300000",
"akka.ask.timeout": "60s",
"web.timeout": "600000"

一起学习

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐