基于CDH版本5.13.3验证Spark Streaming
实现背景由于部门及已上项目使用的是CDH版本大数据平台,为了更好改善在线生产系统日志实时抓取分析。需要加装Kafka作为采集数据源,使用SparkStreaming处理数据。安装环境硬件规划CPU物理内存磁盘存储节点1核X2颗 Intel Core Processor 2295MH...
- 实现背景
由于部门及已上项目使用的是CDH版本大数据平台,为了更好改善在线生产系统日志实时抓取分析。需要加装Kafka作为采集数据源,使用SparkStreaming处理数据。
- 安装环境
- 硬件规划
CPU | 物理内存 | 磁盘存储 | 节点 |
1核X2颗 Intel Core Processor 2295MHZ (Haswell, no TSX) | 8GB | 80G | 192.17.10.136 |
1核X2颗 Intel Core Processor 2295MHZ (Haswell, no TSX) | 8GB | 80G | 192.17.10.138 |
1核X2颗 Intel Core Processor 2295MHZ (Haswell, no TSX) | 8GB | 80G | 192.17.10.139 |
-
- 软件规划
类别 | 软件清单 | 备注 |
操作系统 | CentOS Linux release 7.2.1511 |
|
大数据平台 | CDH5.13.1 Hadoop2.6.0 Spark1.6.0 Java1.7.0.9 MySql 5.7.21 Hive:1.1.0 Scala2.10.5 HBase HDFS Kafka3.1.0 … |
|
-
- 目录规划
- CDH相关安装目录:/opt/cloudera/parcels/CDH
- Kafka安装目录:cct-bigdata-3节点 /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35
- 安装步骤
- 环境配置
本次是基于已有CDH环境下,在线增加安装Kafka服务。确保服务器能正常连接外网,用于下载介质服务包。
-
- Kafka安装
- 登录CM控制台后,选择“主机”—>选择Parcel—>下载Kafka—>进行分配和激活
- 进入群集中BigData CDH parcel—>鼠标点击“操作”按钮—>添加服务—>选择Kafka服务项—>按照向导进行安装(考虑服务节点硬件配置性能,安装cct-bigdata-3节点)—>启动Kafka服务
- 启动验证
- kafka-topics.sh --create --zookeeper cct-bigdata-3:2181 --replication-factor 1 --partitions 1 --topic test
注:topic是发布消息发布的category,以单节点的配置创建了一个叫test的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。
- kafka-topics.sh --list --zookeeper cct-bigdata-3:2181
注:查看topics消息内容
- 用producer生产点数据:kafka-console-producer.sh --broker-list cct-bigdata-3:9092 --topic test
hello test
hello aaaa
hadoop first kafka
- 再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据。
kafka-console-consumer.sh --zookeeper cct-bigdata-3:2181 --topic test --from-beginning
正常会显示前面三条消息内容,这表示kafka安装及服务成功。
- Kafka安装相关问题
【报错信息】安装kafka服务启动,报ERROR kafka.tools.MirrorMaker$: whitelist must be specified when using new consumer in mirror maker.
【原因分析】白名单未配置问题导致的
【解决方法】需要添加配置如下图画红框的参数
cct-bigdata-3:9092
【报错信息】安装kafka服务启动,报java.lang.OutOfMemoryError: Java heap space
Dumping heap to /tmp/kafka2_kafka2-KAFKA_BROKER-1e04967977cc5aba9079e526679ab18c_pid2185.hprof ...
Heap dump file created [13353447 bytes in 0.090 secs]
【原因分析】java内存过低导致broker启动失败,Java Heap size of Broker这个选项默认配置是 50M
【解决方法】进入CM配置kafka找到Java Heap size of Broker,建议修改最低配置内存不小于256M,此处可根据自己物理内存大小合理分配。
【报错信息】启动kafka-console-producer --broker-list 127.0.0.1:9092 --topic test报18/07/06 09:00:57 WARN clients.NetworkClient: [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available
【原因分析】在配置plaintext的时候,地址配置的是cct-bigdata-3(cct-bigdata-3是我在/etc/hosts文件里面配置的127.0.0.1),而我连接的时候,使用的是:127.0.0.1
【解决方法】改为和plaintext的地址配置一样,才可以。kafka-console-producer --broker-list cct-bigdata-3:9092 --topic test。
- Spark Streaming集成验证
- 环境检查
- 检查org.apache.spark.streaming.kafka._相关包是否已具备,启动spark-shell
如出现此图反馈,则说明相关包都存在,否则需要到官网下载相对应版本的Jar包。
- 检查Sbt,如无则需要安装。安装步骤:
- 下载官网:http://www.scala-sbt.org/download.html
- 解压下载包:tar zxvf sbt-1.0.3.tgz 解压安装目录/usr/local/sbt
- 建立启动文件:
cd /usr/local/sbt/
vi sbt
内容如下:
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar /usr/local/sbt/bin/sbt-launch.jar "$@"
- 修改脚本文件权限:chmod u+x sbt
- 配置PATH环境变量:
可以选择配置~/.bashrc 或 /etc/profile,此处以配置 /etc/profile为例
vi /etc/profile
在文件尾部添加如下代码后,保存退出
export PATH=/usr/local/sbt/:$PATH
使配置文件立刻生效
source /etc/profile
- 测试sbt是否安装成功:sbt sbt-version
- 检查maven
- 编写Scala
Spark streaming测试案例源代码存放目录:/usr/local/mycode/spark/kafka/src/main/scala
- vi KaWordProducer.scala,此代码目的是生成一系列字符串的程序,会产生随机的整数序列,每个整数被当做一个单词,提供给KafkaWordCount程序去进行词频统计。
源代码见
- vi KaWordCount.scala,此代码目的是用于字符词频统计,它会把KaWordProducer发送过来的单词进行词频统计。
源代码见
- vi StreamingExample.scala,此代码目的是用于设置log4j。
源代码见
- 添加依赖jar库,编写依赖托管编译文件simple1.sbt,内容如下:
name:= "Simple1 Project"
version:="1.0"
scalaVersion:="2.10.5"
libraryDependencies ++= Seq(
// Spark dependency
"org.apache.spark" %% "spark-core" % "1.6.0",
"org.apache.spark" %% "spark-streaming" % "1.6.0",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
)
因为此案例依赖spark-core\spark-streaming\spark-streaming-kafka相关jar包,需要在打包编译的时加入。
注意:Scala通过sbt打包编译对代码存放目录结构是有要求,具体示例如下:
[root@cct-bigdata-1 kafka]# find .
.
./src
./src/main
./src/main/scala
./src/main/scala/KaWordProducer.scala
./src/main/scala/StreamingExample.scala
./src/main/scala/KaWordCount.scala
./simple.sbt
-
- 验证Scala
测试上述已经编写词频统计程序:
- Sbt编译打包
/usr/local/sbt/sbt package
- Spark-submit提交已编译随机生成字符包
spark-submit --class org.apache.spark.examples.streaming.KaWordProducer /usr/local/mycode/spark/kafka/target/scala-2.10/simple-project_2.10-1.0.jar cct-bigdata-3:9092 wordsender 3 5
注意,上面命令中,” cct-bigdata-3:9092 wordsender 3 5″是提供给KaWordProducer程序的4个输入参数,第1个参数cct-bigdata-3:9092是Kafka的broker的地址,第2个参数wordsender是topic的名称,我们在KaWordCount.scala代码中已经把topic名称写死掉,所以,KaWordCount程序只能接收名称为”wordsender”的topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示,每条消息包含5个单词(实际上就是5个整数)。
这个终端窗口就放在这里,不要关闭,由其一直不断发送单词。在新的窗口开启词频统计功能
- Spark-submit提交已编译记录词频包
spark-submit --class org.apache.spark.examples.streaming.KaWordCount /usr/local/mycode/spark/kafka/target/scala-2.10/simple-project_2.10-1.0.jar
如出现上图内容,则表示kafka与SparkStreaming集成OK。
-
- 异常处理
【异常信息】运行安装配置sbt异常记录
[root@cct-bigdata-1 sbt]# sbt sbt-version
Getting org.scala-sbt sbt 0.13.17 (this may take some time)...
:: problems summary ::
:::: WARNINGS
module not found: org.scala-sbt#sbt;0.13.17
==== local: tried
/root/.ivy2/local/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml
-- artifact org.scala-sbt#sbt;0.13.17!sbt.jar:
/root/.ivy2/local/org.scala-sbt/sbt/0.13.17/jars/sbt.jar
==== local-preloaded-ivy: tried
file:root/.sbt/preloaded/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml
==== local-preloaded: tried
file:root/.sbt/preloaded/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.pom
-- artifact org.scala-sbt#sbt;0.13.17!sbt.jar:
file:root/.sbt/preloaded/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.jar
==== Maven Central: tried
https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.pom
-- artifact org.scala-sbt#sbt;0.13.17!sbt.jar:
https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.jar
==== typesafe-ivy-releases: tried
https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml
==== sbt-ivy-snapshots: tried
https://repo.scala-sbt.org/scalasbt/ivy-snapshots/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml
::::::::::::::::::::::::::::::::::::::::::::::
:: UNRESOLVED DEPENDENCIES ::
::::::::::::::::::::::::::::::::::::::::::::::
:: org.scala-sbt#sbt;0.13.17: not found
::::::::::::::::::::::::::::::::::::::::::::::
:::: ERRORS
Server access Error: Received fatal alert: protocol_version url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.pom
Server access Error: Received fatal alert: protocol_version url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.jar
Server access Error: Remote host closed connection during handshake url=https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml
Server access Error: Remote host closed connection during handshake url=https://repo.scala-sbt.org/scalasbt/ivy-snapshots/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
unresolved dependency: org.scala-sbt#sbt;0.13.17: not found
Error during sbt execution: Error retrieving required libraries
(see /root/.sbt/boot/update.log for complete log)
Error: Could not retrieve sbt 0.13.17
【原因分析】可能因服务器采用的是虚拟机网络问题或需要访问国际网络服务,导致原有的https协议无法使用。
【解决方法】
通过修改sbt-launch.jar中sbt下sbt.boot.properties文件中:
注意先备份sbt-launch.jar
1)、[repositories]原内容maven-central改成maven-central: http://repo1.maven.org/maven2
2)、[repositories]原内容typesafe-ivy-releases和sbt-ivy-snapshots:中的https改成http协议
3)、重新打包覆盖掉sbt-launch.jar原文件
4)、重新运行sbt sbt-version,第一次运行耗时大约需10多分钟左右,下载所需依赖jar包。如成功的话,应显示:
[warn] No sbt.version set in project/build.properties, base directory: /usr/local/sbt
[info] Set current project to sbt (in build file:/usr/local/sbt/)
[info] 0.13.17
【异常信息】运行打包编译sbt package时,出现无法下载对应依赖包
【原因分析】可能因服务器采用的是虚拟机网络问题或需要访问国际网络服务,导致原有的https协议无法使用。
【解决方法】通过修改sbt-launch.jar中sbt下sbt.boot.properties文件中:
注意先备份sbt-launch.jar
1)、[repositories]原内容maven-central改成maven-central: http://repo1.maven.org/maven2
2)、[repositories]原内容typesafe-ivy-releases和sbt-ivy-snapshots:中的https改成http协议
3)、重新打包覆盖掉sbt-launch.jar原文件
4)、重新运行sbt package,第一次运行耗时大约需10多分钟左右,下载所需依赖jar包。
另外如果发现还有未下载依赖包失败,可以单独wget下载下来存放本地maven库(注意根据报错提示路径存放),然后再重新打包sbt package即可。
更多推荐
所有评论(0)