Spark+Kafka构建实时分析Dashboard

一.框架

利用Spark+Kafka实时分析男女生每秒购物人数,利用Spark Streaming实时处理用户购物日志,然后利用websocket将数据实时推送给浏览器,最后浏览器将接收到的数据实时展现,案例的整体框架图如下:
在这里插入图片描述
详细分析下上述步骤:

  1. 应用程序将购物日志发送给Kafka,topic为”sex”,因为这里只是统计购物男女生人数,所以只需要发送购物日志中性别属性即可。这里采用模拟的方式发送购物日志,即读取购物日志数据,每间隔相同的时间发送给Kafka。
  2. 接着利用Spark Streaming从Kafka主题”sex”读取并处理消息。这里按滑动窗口的大小按顺序读取数据,例如可以按每5秒作为窗口大小读取一次数据,然后再处理数据。
  3. Spark将处理后的数据发送给Kafka,topic为”result”。
  4. 然后利用Flask搭建一个web应用程序,接收Kafka主题为”result”的消息。
  5. 利用Flask-SocketIO将数据实时推送给客户端。
  6. 客户端浏览器利用js框架socketio实时接收数据,然后利用js可视化库hightlights.js库动态展示。

二.实验环境准备

环境要求

  • Ubuntu: 16.04
    Spark: 2.1.0
    Scala: 2.11.8
    kafka: 0.8.2.2
    Python: 3.x(3.0以上版本)
    Flask: 0.12.1
    Flask-SocketIO: 2.8.6
    kafka-python: 1.3.3

三.Python操作kafka

1.数据介绍

采用的数据集压缩包为data_format.zip点击这里下载data_format.zip数据集
该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:
用户行为日志user_log.csv,日志中的字段定义如下:

  1. user_id | 买家id
  2. item_id | 商品id
  3. cat_id | 商品类别id
  4. merchant_id | 卖家id
  5. brand_id | 品牌id
  6. month | 交易时间:月
  7. day | 交易事件:日
  8. action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
  9. age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
  10. gender | 性别:0表示女性,1表示男性,2和NULL表示未知
  11. province| 收获地址省份

数据具体格式如下:

user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江
328862,623866,1271,2882,2661,08,29,0,0,2,黑龙江
328862,542871,1467,2882,2661,08,29,0,5,2,四川
328862,536347,1095,883,1647,08,29,0,7,1,吉林

实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。

2.数据预处理

使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库。

2.1创建生产者
# coding: utf-8
import csv
import time
from kafka import KafkaProducer

# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='192.168.1.30:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv", "r", encoding='UTF-8')
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)

for line in reader:
    gender = line[9]  # 性别在每行日志代码的第9个元素
    if gender == 'gender':
        continue  # 去除第一行表头
    time.sleep(0.1)  # 每隔0.1秒发送一行数据
    # 发送数据,topic为'sex'
    print(line[9].encode('utf8'))
    producer.send('sex', line[9].encode('utf8'))
2.2创建消费者
from kafka import KafkaConsumer

consumer = KafkaConsumer('result', bootstrap_servers='192.168.1.30:9092')
for msg in consumer:
    print((msg.value).decode('utf8'))

在这里插入图片描述

三.Spark Streaming实时处理数据

  • 将spark-streaming-kafka(版本为kafka010)的jar包拷贝到集群各个节点上的jars包中,
  • 实时统计每秒中男女生购物人数,而Spark Streaming接收的数据为1,1,0,2…,其中0代表女性,1代表男性,所以对于2或者null值,则不考虑。其实通过分析,可以发现这个就是典型的wordcount问题,而且是基于Spark流计算。女生的数量,即为0的个数,男生的数量,即为1的个数。利用Spark Streaming接口reduceByKeyAndWindow,设置窗口大小为1,滑动步长为1,这样统计出的0和1的个数即为每秒男生女生的人数。
  1. 首先按每秒的频率读取Kafka消息;
  2. 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
  3. 最后将上述结果封装成json发送给Kafka。

1.设置日志格式化处理

import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

2.spark streaming实时数据分析

接受kafka数据做实时流分析,创建生产者,分析结果发送给kakfa,Python web端从kafka接受分析结果可视化。

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write

// import org.json4s._
// import org.json4s.JsonDSL._
// import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.Interval
import org.apache.spark.streaming.kafka010._

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaWordCount{
    
    def main(args:Array[String]){
//         格式化日志
        StreamingExamples.setStreamingLogLevels()
        val sparkConf = new SparkConf().setAppName("KafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
//         设置chackpoint
        ssc.checkpoint(".")
        
//         创建消费者,从kafka获取数据供spark streaming做实时处理
//         新版本的kafka的参数设置
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "192.168.1.30:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "1",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
//         可能同时订阅多个topic,如何给每个topic设置分区数
        val topics = Array("sex")
//         创建数据源
        val lineMap = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        val lines =lineMap.map(record => record.value)
        val words=lines.flatMap(_.split(" "))
//         val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).print
//         对数据进行窗口截取做实时流分析
        val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => {
          if(rdd.count !=0 ){
               implicit val formats = DefaultFormats//数据格式化时需要,为了排除序列化问题,放在此处
               val props = new HashMap[String, Object]()
            //       提供brokers地址
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.30:9092")
            //       指定key value中的value可序列化方式,因为网络传输
                props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
            //       指定key value中的key可序列化方式,因为网络传输
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer")
            //       生产者对象,提供borker和kv的序列化方式
                val producer = new KafkaProducer[String, String](props)
            //       将rdd中数组转化为数组之后转化为json
                val str = write(rdd.collect)
              
//               另一种将rdd转化成json的方法
//                 val json=rdd.collect().toList.map{case (word, count) =>(word, count)}
//                 val str=compact(render(json))
                                
//                 println(rdd.collect.getClass().getName()) //Lscala.Tuple2;
//                 println(str)
                val message = new ProducerRecord[String, String]("result", null, str) 
                producer.send(message)
          }
        })
        ssc.start()
        ssc.awaitTermination()
    }
}

在这里插入图片描述

三.实时分析结果可视化(源代码下载

  • 利用Flask-SocketIO实时推送数据
  • socket.io.js实时获取数据
  • highlights.js展示数据
3.1Flask-SocketIO实时推送数据

Spark Streaming实时接收Kafka中topic为’sex’发送的日志数据,然后Spark Streaming进行实时处理,统计好每秒中男女生购物人数之后,将结果发送至Kafka,topic为’result’。在本章节,将介绍如何利用Flask-SocketIO将结果实时推送到浏览器。
文件目录结构如下:

  • 1.data目录存放的是用户日志数据;
  • 2.scripts目录存放的是Kafka生产者和消费者;
  • 3.static/js目录存放的是前端所需要的js框架;
  • 4.templates目录存放的是html页面;
  • 5.app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;
  • 6.External Libraries是本项目所依赖的Python库,是PyCharm自动生成。
    在这里插入图片描述
    app.py的功能就是作为一个简易的服务器,处理连接请求,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer('result', bootstrap_servers='192.168.1.30:9092')

# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
    girl = 0
    boy = 0
    for msg in consumer:
        data_json = msg.value.decode('utf8')
        data_list = json.loads(data_json)
        for data in data_list:
            if '0' in data.keys():
                girl = data['0']
            elif '1' in data.keys():
                boy = data['1']
            else:
                continue
        result = str(girl) + ',' + str(boy)
        print(result)
        socketio.emit('test_message', {'data': result})


# 客户端发送connect事件时的处理函数
@socketio.on('test_connect')
def connect(message):
    print(message)
    global thread
    if thread is None:
        # 单独开启一个线程给客户端发送数据
        thread = socketio.start_background_task(target=background_thread)
    socketio.emit('connected', {'data': 'Connected'})

# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
    return render_template("index.html")

# main函数
if __name__ == '__main__':
    socketio.run(app, debug=True)

4.效果展示

经过以上步骤,一切准备就绪,我们就可以启动程序来看看最后的效果。启动步骤如下:(多个python文件需要启动可以使用命令行启动方式
1.确保kafka开启。
2.开启producer.py模拟数据流。
3.启动Spark Streaming实时处理数据。提示你可以在实时处理数据启动之后,把comsumer.py的topic改成result,运行comsumer.py就可以看到数据处理后的输出结果。
4.启动app.py。

使用浏览器访问上图中给出的网址 http://127.0.0.1:5000/ ,就可以看到最终效果图:
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐