目前在公司开发SparkStreaming 消费kafka数据,因为hadoop集群启用了kerberos认证,走了不少弯路,现在记录一下:

代码使用local模式跑是完全ok的,但是run on yarn 上,一直报错:

Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
	at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
	at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
	at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
	at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
	at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
	at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
	at SparkCunsumeKafka$.main(SparkCunsumeKafka.scala:345)
	at SparkCunsumeKafka.main(SparkCunsumeKafka.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Jaas configuration not found
	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
	at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
	at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
	at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
	at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
	at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
	at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
	at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
	at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
	at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
	at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
	at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not found
	at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:299)
	at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
	at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
	at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
	... 25 more
Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in this configuration.
	at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:50)
	at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)

上面报错是找不到:jaas.conf 这个文件;

因为Spark 内,分为driver端,executer端;

使用local模式运行,在本地是可以将:jaas.conf 加载到环境变量里面的;

但是on yarn 上,涉及到driver端,executer端;如果使用之前的代码,在driver端是可以加载jaas.conf文件的;

但是在executer端是加载不到这个文件的;

因为这个问题,卡了很久,网上大部分的解决方案如下:

//在本地将jaas.conf加载到环境变量
System.setProperty("java.security.auth.login.config", "/tmp/kafka_jaas.conf")
//使用spark-submit提交到yarn上时,添加如下命令,将jaas.conf添加到executer环境变量
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"

我试过这个问题,不知道为什么,还是不行

最终找到如下方法解决:

 val sparkConf = UtilHelper.buildSparkConf(properties.getProperty("spark.config.file.path"))
    sparkConf.setAppName("spark-consumer")
      .set("spark.driver.extraJavaOptions", s"-Djava.security.auth.login.config=$kafkaJaasPath")
      .set("spark.executor.extraJavaOptions", s"-Djava.security.auth.login.config=$kafkaJaasPath")
    val sc = new SparkContext(sparkConf)
    sc.addFile(kafkaJaasPath)

将jaas.conf文件使用代码添加到环境变量

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐