1. 实现背景

由于部门及已上项目使用的是CDH版本大数据平台,为了更好改善在线生产系统日志实时抓取分析。需要加装Kafka作为采集数据源,使用SparkStreaming处理数据。

  1. 安装环境
    1. 硬件规划

CPU

物理内存

磁盘存储

节点

1核X2颗 Intel Core Processor 2295MHZ (Haswell, no TSX)

8GB

80G

192.17.10.136

1核X2颗 Intel Core Processor 2295MHZ (Haswell, no TSX)

8GB

80G

192.17.10.138

1核X2颗 Intel Core Processor 2295MHZ (Haswell, no TSX)

8GB

80G

192.17.10.139

 

    1. 软件规划

类别

软件清单

备注

操作系统

CentOS Linux release 7.2.1511

 

大数据平台

CDH5.13.1

Hadoop2.6.0

Spark1.6.0

Java1.7.0.9

MySql 5.7.21

Hive:1.1.0

Scala2.10.5

HBase

HDFS

Kafka3.1.0

 

 

    1. 目录规划
  1. CDH相关安装目录:/opt/cloudera/parcels/CDH
  2. Kafka安装目录:cct-bigdata-3节点 /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35
  1. 安装步骤
    1. 环境配置

本次是基于已有CDH环境下,在线增加安装Kafka服务。确保服务器能正常连接外网,用于下载介质服务包。

    1. Kafka安装
  1. 登录CM控制台后,选择“主机”—>选择Parcel—>下载Kafka—>进行分配和激活
  2. 进入群集中BigData CDH parcel—>鼠标点击“操作”按钮—>添加服务—>选择Kafka服务项—>按照向导进行安装(考虑服务节点硬件配置性能,安装cct-bigdata-3节点)—>启动Kafka服务
    1. 启动验证
  1. kafka-topics.sh --create --zookeeper cct-bigdata-3:2181 --replication-factor 1 --partitions 1 --topic test

注:topic是发布消息发布的category,以单节点的配置创建了一个叫testtopic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。

  1. kafka-topics.sh --list --zookeeper cct-bigdata-3:2181

注:查看topics消息内容

  1. 用producer生产点数据:kafka-console-producer.sh --broker-list cct-bigdata-3:9092 --topic test

hello test

hello aaaa

hadoop first kafka

 

  1. 再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据。

kafka-console-consumer.sh --zookeeper cct-bigdata-3:2181 --topic test --from-beginning

正常会显示前面三条消息内容,这表示kafka安装及服务成功。

  1. Kafka安装相关问题
【报错信息】安装kafka服务启动,报ERROR kafka.tools.MirrorMaker$: whitelist must be specified when using new consumer in mirror maker.
【原因分析】白名单未配置问题导致的
【解决方法】需要添加配置如下图画红框的参数
 
cct-bigdata-3:9092
【报错信息】安装kafka服务启动,报java.lang.OutOfMemoryError: Java heap space
Dumping heap to /tmp/kafka2_kafka2-KAFKA_BROKER-1e04967977cc5aba9079e526679ab18c_pid2185.hprof ...
Heap dump file created [13353447 bytes in 0.090 secs]
【原因分析】java内存过低导致broker启动失败,Java Heap size of Broker这个选项默认配置是 50M
【解决方法】进入CM配置kafka找到Java Heap size of Broker,建议修改最低配置内存不小于256M,此处可根据自己物理内存大小合理分配。
【报错信息】启动kafka-console-producer --broker-list 127.0.0.1:9092 --topic test报18/07/06 09:00:57 WARN clients.NetworkClient: [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available
【原因分析】在配置plaintext的时候,地址配置的是cct-bigdata-3cct-bigdata-3是我在/etc/hosts文件里面配置的127.0.0.1,而我连接的时候,使用的是:127.0.0.1

【解决方法】改为和plaintext的地址配置一样,才可以。kafka-console-producer --broker-list cct-bigdata-3:9092 --topic test

 

  1. Spark Streaming集成验证
    1. 环境检查
  1. 检查org.apache.spark.streaming.kafka._相关包是否已具备,启动spark-shell

如出现此图反馈,则说明相关包都存在,否则需要到官网下载相对应版本的Jar包。

  1. 检查Sbt,如无则需要安装。安装步骤:

cd /usr/local/sbt/

vi sbt

内容如下:

#!/bin/bash

SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"

java $SBT_OPTS -jar /usr/local/sbt/bin/sbt-launch.jar "$@"

  • 修改脚本文件权限:chmod u+x sbt
  • 配置PATH环境变量:

可以选择配置~/.bashrc 或 /etc/profile,此处以配置 /etc/profile为例

vi /etc/profile

在文件尾部添加如下代码后,保存退出

export PATH=/usr/local/sbt/:$PATH

使配置文件立刻生效

source /etc/profile

  • 测试sbt是否安装成功:sbt sbt-version
  1. 检查maven
    1. 编写Scala

Spark streaming测试案例源代码存放目录:/usr/local/mycode/spark/kafka/src/main/scala

  1. vi KaWordProducer.scala,此代码目的是生成一系列字符串的程序,会产生随机的整数序列,每个整数被当做一个单词,提供给KafkaWordCount程序去进行词频统计。

源代码见

  1. vi KaWordCount.scala,此代码目的是用于字符词频统计,它会把KaWordProducer发送过来的单词进行词频统计。

源代码见

  1. vi StreamingExample.scala,此代码目的是用于设置log4j

源代码见

 

  1. 添加依赖jar库,编写依赖托管编译文件simple1.sbt,内容如下:

name:= "Simple1 Project"

version:="1.0"

scalaVersion:="2.10.5"

libraryDependencies ++= Seq(

  // Spark dependency

  "org.apache.spark" %% "spark-core" % "1.6.0",

  "org.apache.spark" %% "spark-streaming" % "1.6.0",

  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

)

因为此案例依赖spark-core\spark-streaming\spark-streaming-kafka相关jar包,需要在打包编译的时加入。

 

注意:Scala通过sbt打包编译对代码存放目录结构是有要求,具体示例如下:

[root@cct-bigdata-1 kafka]# find .

.

./src

./src/main

./src/main/scala

./src/main/scala/KaWordProducer.scala

./src/main/scala/StreamingExample.scala

./src/main/scala/KaWordCount.scala

./simple.sbt

    1. 验证Scala

测试上述已经编写词频统计程序:

  1. Sbt编译打包

/usr/local/sbt/sbt package

  1. Spark-submit提交已编译随机生成字符包

spark-submit --class org.apache.spark.examples.streaming.KaWordProducer /usr/local/mycode/spark/kafka/target/scala-2.10/simple-project_2.10-1.0.jar cct-bigdata-3:9092 wordsender 3 5

注意,上面命令中, cct-bigdata-3:9092 wordsender 3 5″是提供给KaWordProducer程序的4个输入参数,第1个参数cct-bigdata-3:9092Kafkabroker的地址,第2个参数wordsendertopic的名称,我们在KaWordCount.scala代码中已经把topic名称写死掉,所以,KaWordCount程序只能接收名称为”wordsender”topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示,每条消息包含5个单词(实际上就是5个整数)。

 

这个终端窗口就放在这里,不要关闭,由其一直不断发送单词。在新的窗口开启词频统计功能

 

  1. Spark-submit提交已编译记录词频包

spark-submit --class org.apache.spark.examples.streaming.KaWordCount /usr/local/mycode/spark/kafka/target/scala-2.10/simple-project_2.10-1.0.jar

 

如出现上图内容,则表示kafka与SparkStreaming集成OK。

    1. 异常处理

【异常信息】运行安装配置sbt异常记录

[root@cct-bigdata-1 sbt]# sbt sbt-version

Getting org.scala-sbt sbt 0.13.17  (this may take some time)...

 

:: problems summary ::

:::: WARNINGS

                module not found: org.scala-sbt#sbt;0.13.17

 

        ==== local: tried

 

          /root/.ivy2/local/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml

 

          -- artifact org.scala-sbt#sbt;0.13.17!sbt.jar:

 

          /root/.ivy2/local/org.scala-sbt/sbt/0.13.17/jars/sbt.jar

 

        ==== local-preloaded-ivy: tried

 

          file:root/.sbt/preloaded/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml

 

        ==== local-preloaded: tried

 

          file:root/.sbt/preloaded/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.pom

 

          -- artifact org.scala-sbt#sbt;0.13.17!sbt.jar:

 

          file:root/.sbt/preloaded/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.jar

 

        ==== Maven Central: tried

 

          https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.pom

 

          -- artifact org.scala-sbt#sbt;0.13.17!sbt.jar:

 

          https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.jar

 

        ==== typesafe-ivy-releases: tried

 

          https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml

 

        ==== sbt-ivy-snapshots: tried

 

          https://repo.scala-sbt.org/scalasbt/ivy-snapshots/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml

 

                ::::::::::::::::::::::::::::::::::::::::::::::

 

                ::          UNRESOLVED DEPENDENCIES         ::

 

                ::::::::::::::::::::::::::::::::::::::::::::::

 

                :: org.scala-sbt#sbt;0.13.17: not found

 

                ::::::::::::::::::::::::::::::::::::::::::::::

 

 

:::: ERRORS

        Server access Error: Received fatal alert: protocol_version url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.pom

 

        Server access Error: Received fatal alert: protocol_version url=https://repo1.maven.org/maven2/org/scala-sbt/sbt/0.13.17/sbt-0.13.17.jar

 

        Server access Error: Remote host closed connection during handshake url=https://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml

 

        Server access Error: Remote host closed connection during handshake url=https://repo.scala-sbt.org/scalasbt/ivy-snapshots/org.scala-sbt/sbt/0.13.17/ivys/ivy.xml

 

 

:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS

unresolved dependency: org.scala-sbt#sbt;0.13.17: not found

Error during sbt execution: Error retrieving required libraries

  (see /root/.sbt/boot/update.log for complete log)

Error: Could not retrieve sbt 0.13.17

【原因分析】可能因服务器采用的是虚拟机网络问题或需要访问国际网络服务,导致原有的https协议无法使用。

【解决方法】

通过修改sbt-launch.jar中sbt下sbt.boot.properties文件中:

注意先备份sbt-launch.jar

1)、[repositories]原内容maven-central改成maven-central: http://repo1.maven.org/maven2

2)、[repositories]原内容typesafe-ivy-releases和sbt-ivy-snapshots:中的https改成http协议

3)、重新打包覆盖掉sbt-launch.jar原文件

4)、重新运行sbt sbt-version,第一次运行耗时大约需10多分钟左右,下载所需依赖jar包。如成功的话,应显示:

[warn] No sbt.version set in project/build.properties, base directory: /usr/local/sbt

[info] Set current project to sbt (in build file:/usr/local/sbt/)

[info] 0.13.17

 

【异常信息】运行打包编译sbt package时,出现无法下载对应依赖包

【原因分析】可能因服务器采用的是虚拟机网络问题或需要访问国际网络服务,导致原有的https协议无法使用。

【解决方法】通过修改sbt-launch.jar中sbt下sbt.boot.properties文件中:

注意先备份sbt-launch.jar

1)、[repositories]原内容maven-central改成maven-central: http://repo1.maven.org/maven2

2)、[repositories]原内容typesafe-ivy-releases和sbt-ivy-snapshots:中的https改成http协议

3)、重新打包覆盖掉sbt-launch.jar原文件

4)、重新运行sbt package,第一次运行耗时大约需10多分钟左右,下载所需依赖jar包。

 

另外如果发现还有未下载依赖包失败,可以单独wget下载下来存放本地maven库(注意根据报错提示路径存放),然后再重新打包sbt package即可。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐