SparkStreaming Kafka Kerberos 认证问题
目前在公司开发SparkStreaming 消费kafka数据,因为hadoop集群启用了kerberos认证,走了不少弯路,现在记录一下:代码使用local模式跑是完全ok的,但是run on yarn 上,一直报错:Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kaf
·
目前在公司开发SparkStreaming 消费kafka数据,因为hadoop集群启用了kerberos认证,走了不少弯路,现在记录一下:
代码使用local模式跑是完全ok的,但是run on yarn 上,一直报错:
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at SparkCunsumeKafka$.main(SparkCunsumeKafka.scala:345)
at SparkCunsumeKafka.main(SparkCunsumeKafka.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Jaas configuration not found
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:70)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:240)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.kafka.common.KafkaException: Jaas configuration not found
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:299)
at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)
... 25 more
Caused by: java.io.IOException: Could not find a 'KafkaClient' entry in this configuration.
at org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:50)
at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:297)
上面报错是找不到:jaas.conf 这个文件;
因为Spark 内,分为driver端,executer端;
使用local模式运行,在本地是可以将:jaas.conf 加载到环境变量里面的;
但是on yarn 上,涉及到driver端,executer端;如果使用之前的代码,在driver端是可以加载jaas.conf文件的;
但是在executer端是加载不到这个文件的;
因为这个问题,卡了很久,网上大部分的解决方案如下:
//在本地将jaas.conf加载到环境变量
System.setProperty("java.security.auth.login.config", "/tmp/kafka_jaas.conf")
//使用spark-submit提交到yarn上时,添加如下命令,将jaas.conf添加到executer环境变量
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/tmp/kafka_jaas.conf"
我试过这个问题,不知道为什么,还是不行
最终找到如下方法解决:
val sparkConf = UtilHelper.buildSparkConf(properties.getProperty("spark.config.file.path"))
sparkConf.setAppName("spark-consumer")
.set("spark.driver.extraJavaOptions", s"-Djava.security.auth.login.config=$kafkaJaasPath")
.set("spark.executor.extraJavaOptions", s"-Djava.security.auth.login.config=$kafkaJaasPath")
val sc = new SparkContext(sparkConf)
sc.addFile(kafkaJaasPath)
将jaas.conf文件使用代码添加到环境变量
更多推荐
已为社区贡献2条内容
所有评论(0)