Flume+Kafka+SparkStreaming的结合使用之wordcount示例

1. 生成log的py脚本

脚本文件名称:produce_log3.py,内容如下:

import time
import sched
import random


def create_log():
    file = open("/home/hadoop/log/access.log", mode="a+", encoding='utf-8')
    file.write(random.sample(list, 1)[0])
    file.flush()


if __name__ == '__main__':

    list = [
        "Apache Spark achieves high performance for both batch and streaming data using a state of the art DAG scheduler a query optimizer and a physical execution engine\n",
        "Spark offers over 80 high level operators that make it easy to build parallel apps And you can use it interactively from the Scala Python R and SQL shells\n",
        "Spark powers a stack of libraries including SQL and DataFrames MLlib for machine learning GraphX and Spark Streaming You can combine these libraries seamlessly in the same application\n",
        "You can run Spark using its standalone cluster mode on EC2 on Hadoop YARN on Mesos or on Kubernetes Access data in HDFS Apache Cassandra Apache HBase Apache Hive and hundreds of other data sources\n"]

    schedule = sched.scheduler(time.time, time.sleep)
    while True:
        schedule.enter(1, 0, create_log)
        schedule.run()

执行生成log脚本

python produce_log3.py
2. Flume到Kafka的配置

flume-kafka-sparkstreaming.conf配置如下:

配置文件放入到flume/conf目录下

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type=exec
a1.sources.s1.command=tail -F /home/hadoop/log/access.log

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=server01:9092,server02:9092,server03:9092
#设置Kafka的Topic
a1.sinks.k1.topic=flume-kafka-sparkstreaming
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#设置kafka的ack机制
a1.sinks.k1.requiredAcks = 1


a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1  
3. kafka的topic创建

注意:这里需要先启动zookeeper和kafka的集群

cd /hadoop/kafka

# 创建kafka的topic,topic的名称flume-kafka-sparkstreaming
kafka-topics.sh --create --zookeeper server01:2181,server02:2181,server03:2181 --replication-factor 3 --partitions 3 --topic flume-kafka-sparkstreaming
4. 生产端的数据测试

4.1 启动生成log日志的py脚本

python produce_log3.py

4.2 启动flume程序

cd /hadoop/flume

bin/flume-ng agent -c conf -f conf/flume-kafka-sparkstreaming.conf -name a1 -Dflume.root.logger=INFO,console

4.3 启动kafka-console-consumer.sh进行消费测试

cd /hadoop/kafka

./bin/kafka-console-consumer.sh --zookeeper server01:2181,server02:2181,server03:2181 --from-beginning --topic flume-kafka-sparkstreaming

测试结果如下:

image

看到控制台的数据,则表示生产端的数据是OK的。

5. SparkStreaming从Kafka中读取数据

编写SparkStreaming代码:

5.1 pom.xml的依赖

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>
</dependencies>

5.2 SparkStreamingKafka代码

object SparkStreamingKafka {
  // 设置Logger过滤级别
  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkStreamingKafka").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val streamingContext = new StreamingContext(sc, Seconds(5))

    streamingContext.checkpoint("./checkpoint")

    // 配置kafka的参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "server01:9092,server02:9092,server03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("flume-kafka-sparkstreaming")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    // 定义updateFunc
    val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
      iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(v => (x, v)) }
    }

    val result = stream.map(record => (record.key, record.value)).flatMap(_._2.split(" ")).map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(streamingContext.sparkContext.defaultParallelism), true)

    result.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

5.3 执行SparkStreaming程序,结果如图:

image

图中的…应该是数据太多,显示不下

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐