Spark大数据-Spark+Kafka构建实时分析Dashboard
Spark+Kafka构建实时分析Dashboard1.框架利用Spark+Kafka实时分析男女生每秒购物人数,利用Spark Streaming实时处理用户购物日志,然后利用websocket将数据实时推送给浏览器,最后浏览器将接收到的数据实时展现,案例的整体框架图如下:详细分析下上述步骤:应用程序将购物日志发送给Kafka,topic为”sex”,因为这里只是统计购物男女生人数,...
Spark+Kafka构建实时分析Dashboard
一.框架
利用Spark+Kafka实时分析男女生每秒购物人数,利用Spark Streaming实时处理用户购物日志,然后利用websocket将数据实时推送给浏览器,最后浏览器将接收到的数据实时展现,案例的整体框架图如下:
详细分析下上述步骤:
- 应用程序将购物日志发送给Kafka,topic为”sex”,因为这里只是统计购物男女生人数,所以只需要发送购物日志中性别属性即可。这里采用模拟的方式发送购物日志,即读取购物日志数据,每间隔相同的时间发送给Kafka。
- 接着利用Spark Streaming从Kafka主题”sex”读取并处理消息。这里按滑动窗口的大小按顺序读取数据,例如可以按每5秒作为窗口大小读取一次数据,然后再处理数据。
- Spark将处理后的数据发送给Kafka,topic为”result”。
- 然后利用Flask搭建一个web应用程序,接收Kafka主题为”result”的消息。
- 利用Flask-SocketIO将数据实时推送给客户端。
- 客户端浏览器利用js框架socketio实时接收数据,然后利用js可视化库hightlights.js库动态展示。
二.实验环境准备
环境要求
- Ubuntu: 16.04
Spark: 2.1.0
Scala: 2.11.8
kafka: 0.8.2.2
Python: 3.x(3.0以上版本)
Flask: 0.12.1
Flask-SocketIO: 2.8.6
kafka-python: 1.3.3
三.Python操作kafka
1.数据介绍
采用的数据集压缩包为data_format.zip点击这里下载data_format.zip数据集
该数据集压缩包是淘宝2015年双11前6个月(包含双11)的交易数据(交易数据有偏移,但是不影响实验的结果),里面包含3个文件,分别是用户行为日志文件user_log.csv 、回头客训练集train.csv 、回头客测试集test.csv. 在这个案例中只是用user_log.csv这个文件,下面列出文件user_log.csv的数据格式定义:
用户行为日志user_log.csv,日志中的字段定义如下:
- user_id | 买家id
- item_id | 商品id
- cat_id | 商品类别id
- merchant_id | 卖家id
- brand_id | 品牌id
- month | 交易时间:月
- day | 交易事件:日
- action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
- age_range | 买家年龄分段:1表示年龄<18,2表示年龄在[18,24],3表示年龄在[25,29],4表示年龄在[30,34],5表示年龄在[35,39],6表示年龄在[40,49],7和8表示年龄>=50,0和NULL则表示未知
- gender | 性别:0表示女性,1表示男性,2和NULL表示未知
- province| 收获地址省份
数据具体格式如下:
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江
328862,623866,1271,2882,2661,08,29,0,0,2,黑龙江
328862,542871,1467,2882,2661,08,29,0,5,2,四川
328862,536347,1095,883,1647,08,29,0,7,1,吉林
实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Spark Streaming再接收gender进行处理。
2.数据预处理
使用Python对数据进行预处理,并将处理后的数据直接通过Kafka生产者发送给Kafka,这里需要先安装Python操作Kafka的代码库。
2.1创建生产者
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='192.168.1.30:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv", "r", encoding='UTF-8')
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'sex'
print(line[9].encode('utf8'))
producer.send('sex', line[9].encode('utf8'))
2.2创建消费者
from kafka import KafkaConsumer
consumer = KafkaConsumer('result', bootstrap_servers='192.168.1.30:9092')
for msg in consumer:
print((msg.value).decode('utf8'))
三.Spark Streaming实时处理数据
- 将spark-streaming-kafka(版本为kafka010)的jar包拷贝到集群各个节点上的jars包中,
- 实时统计每秒中男女生购物人数,而Spark Streaming接收的数据为1,1,0,2…,其中0代表女性,1代表男性,所以对于2或者null值,则不考虑。其实通过分析,可以发现这个就是典型的wordcount问题,而且是基于Spark流计算。女生的数量,即为0的个数,男生的数量,即为1的个数。利用Spark Streaming接口reduceByKeyAndWindow,设置窗口大小为1,滑动步长为1,这样统计出的0和1的个数即为每秒男生女生的人数。
- 首先按每秒的频率读取Kafka消息;
- 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
- 最后将上述结果封装成json发送给Kafka。
1.设置日志格式化处理
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
2.spark streaming实时数据分析
接受kafka数据做实时流分析,创建生产者,分析结果发送给kakfa,Python web端从kafka接受分析结果可视化。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.json4s._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.write
// import org.json4s._
// import org.json4s.JsonDSL._
// import org.json4s.jackson.JsonMethods._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.Interval
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafkaWordCount{
def main(args:Array[String]){
// 格式化日志
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// 设置chackpoint
ssc.checkpoint(".")
// 创建消费者,从kafka获取数据供spark streaming做实时处理
// 新版本的kafka的参数设置
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.30:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "1",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 可能同时订阅多个topic,如何给每个topic设置分区数
val topics = Array("sex")
// 创建数据源
val lineMap = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val lines =lineMap.map(record => record.value)
val words=lines.flatMap(_.split(" "))
// val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).print
// 对数据进行窗口截取做实时流分析
val wordCounts = words.map(x => (x, 1L)).reduceByKeyAndWindow(_+_,_-_, Seconds(1), Seconds(1), 1).foreachRDD(rdd => {
if(rdd.count !=0 ){
implicit val formats = DefaultFormats//数据格式化时需要,为了排除序列化问题,放在此处
val props = new HashMap[String, Object]()
// 提供brokers地址
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.30:9092")
// 指定key value中的value可序列化方式,因为网络传输
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// 指定key value中的key可序列化方式,因为网络传输
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// 生产者对象,提供borker和kv的序列化方式
val producer = new KafkaProducer[String, String](props)
// 将rdd中数组转化为数组之后转化为json
val str = write(rdd.collect)
// 另一种将rdd转化成json的方法
// val json=rdd.collect().toList.map{case (word, count) =>(word, count)}
// val str=compact(render(json))
// println(rdd.collect.getClass().getName()) //Lscala.Tuple2;
// println(str)
val message = new ProducerRecord[String, String]("result", null, str)
producer.send(message)
}
})
ssc.start()
ssc.awaitTermination()
}
}
三.实时分析结果可视化(源代码下载)
- 利用Flask-SocketIO实时推送数据
- socket.io.js实时获取数据
- highlights.js展示数据
3.1Flask-SocketIO实时推送数据
Spark Streaming实时接收Kafka中topic为’sex’发送的日志数据,然后Spark Streaming进行实时处理,统计好每秒中男女生购物人数之后,将结果发送至Kafka,topic为’result’。在本章节,将介绍如何利用Flask-SocketIO将结果实时推送到浏览器。
文件目录结构如下:
- 1.data目录存放的是用户日志数据;
- 2.scripts目录存放的是Kafka生产者和消费者;
- 3.static/js目录存放的是前端所需要的js框架;
- 4.templates目录存放的是html页面;
- 5.app.py为web服务器,接收Spark Streaming处理后的结果,并推送实时数据给浏览器;
- 6.External Libraries是本项目所依赖的Python库,是PyCharm自动生成。
app.py的功能就是作为一个简易的服务器,处理连接请求,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer('result', bootstrap_servers='192.168.1.30:9092')
# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
girl = 0
boy = 0
for msg in consumer:
data_json = msg.value.decode('utf8')
data_list = json.loads(data_json)
for data in data_list:
if '0' in data.keys():
girl = data['0']
elif '1' in data.keys():
boy = data['1']
else:
continue
result = str(girl) + ',' + str(boy)
print(result)
socketio.emit('test_message', {'data': result})
# 客户端发送connect事件时的处理函数
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
# 单独开启一个线程给客户端发送数据
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'Connected'})
# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
return render_template("index.html")
# main函数
if __name__ == '__main__':
socketio.run(app, debug=True)
4.效果展示
经过以上步骤,一切准备就绪,我们就可以启动程序来看看最后的效果。启动步骤如下:(多个python文件需要启动可以使用命令行启动方式)
1.确保kafka开启。
2.开启producer.py模拟数据流。
3.启动Spark Streaming实时处理数据。提示你可以在实时处理数据启动之后,把comsumer.py的topic改成result,运行comsumer.py就可以看到数据处理后的输出结果。
4.启动app.py。
使用浏览器访问上图中给出的网址 http://127.0.0.1:5000/ ,就可以看到最终效果图:
更多推荐
所有评论(0)