spark screaming 模拟实战项目实例
由于没有网络日志,我们这里用之前写的python脚本爬取新浪微博热搜模拟产生日志文件,通过kafka和flume整合 将日志定时抽取到 spark上进行处理,微博热搜是十分钟更新一次,我们这里也设置十分钟的定时任务,具体步骤如下第一步编写python脚本获取微博热搜 实时排名,主题和url,然后运行测试,代码如下#!python2# -*- coding:utf-8 -*-import url..
由于没有网络日志,我们这里用之前写的python脚本爬取新浪微博热搜模拟产生日志文件,通过kafka和flume整合 将日志定时抽取到 spark上进行处理,微博热搜是十分钟更新一次,我们这里也设置十分钟的定时任务,具体步骤如下
第一步
编写python脚本获取微博热搜 实时排名,主题和url,然后运行测试,代码如下
#!python2
# -*- coding:utf-8 -*-
import urllib,requests,re,sys
#获取热搜源码
weiboHotFile=requests.get('http://s.weibo.com/top/summary')
weiboHotHtml=weiboHotFile.text
#正则表达式匹配URL ,找到title
hotKey=re.compile(r'td class=\\"td_05\\"><a href=\\"\\/weibo\\/(.*?)&Refer=top\\"')
hotKeyListBe=hotKey.findall(weiboHotHtml)
rank=1
#遍历获取的title 列表
for title in hotKeyListBe:
#去除干扰数字
title=title.replace('25','')
url='http://s.weibo.com/weibo/'+title
print(str(rank)+'\t'+(str(urllib.unquote(title.encode('utf-8'))).decode('utf-8'))+'\t'+url+'\n')
rank+=1
运行python脚本
第二步
在mysql创建表,包括当天最高排名,主题,url,和日期
#spark screaming 将数据导入这张表,创建 主题索引,用来加快替换排名的速度
Create table weiboHotSearch(
highest_rank int(4),
title varchar(100) unique,
url varchar(100),
day_date date);
#创建临时表,每天晚上数据导入表中的当天的数据加载到零时表
Create table weiboHotSearch_temp(
highest_rank int(4),
title varchar(100),
url varchar(100),
day_date date);
第三步
编写代码,实现从kafka实时获取热搜榜,并存入数据库,然后打成jar包
package com.stanley.sparktest.weibo
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import java.util.Date
import com.stanley.sparktest.sqlUtil.ConnectionPool
object WeiBoHot {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("Weibo HotSerach Application")
.setMaster("local[2]")
// Create SparkContext
val sc = new SparkContext(sparkConf)
//5秒获取一次
val ssc = new StreamingContext(sc, Seconds(5))
//设置checkPoint
ssc.checkpoint(".")
// Kafka Cluster
val kafkaParam = Map("metadata.broker.list" -> "master:9092")
// Kafka Topics
val topics = Set("weiboTopic")
// Step 1: Create DStream
val lineDStream = KafkaUtils.createDirectStream[
String, String, StringDecoder,StringDecoder](
ssc, // StreamingContext
kafkaParam, // kafkaParams: Map[String, String]
topics // topics: Set[String]
).map(tuple => tuple._2)
// Step 2: DStream Transformation
val tupleDStream = lineDStream
.map(line => line.split("\t"))
.map(arr=>{
val rank=arr(0)
val title=arr(1)
val url=arr(2)
val date=arr(3)
(rank,title,url,date)
})
tupleDStream.foreachRDD(rdd => rdd.foreachPartition(partitionOfRecords =>{
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
println("input data is " + record._1 + "\t" +record._2+"\t"+record._3+"\t"+record._4)
val sql1="select * from weiboHotSearch where title='"+record._2+"'"
println("sql:" + sql1)
val stmt = connection.createStatement
val rs=stmt.executeQuery(sql1)
if(rs.next()){
//对比之前的排名,更新排名
var highest_rank=rs.getInt("highest_rank")
if(record._1.toInt<highest_rank){
highest_rank=record._1.toInt
}
val sql="update weiboHotSearch set highest_rank="+highest_rank+" where title='"+record._2+"'"
println("sql:" + sql)
stmt.executeUpdate(sql)
}else{
val sql="insert into weiboHotSearch values("+record._1+",'"+record._2+"','"+record._3+"','"+record._4+"')"
println("sql:" + sql)
stmt.executeUpdate(sql)
}
})
ConnectionPool.returnConnection(connection)
}
))
//封装事件
tupleDStream.print()
ssc.start()
ssc.awaitTermination()
// Stop Context
ssc.stop()
sc.stop()
}
}
package com.stanley.sparktest.sqlUtil;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
public class ConnectionPool {
private static LinkedList<Connection> connectionQueue;
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public synchronized static Connection getConnection() {
try {
if (connectionQueue == null) {
connectionQueue = new LinkedList<Connection>();
for (int i = 0; i < 5; i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://master:3306/test",
"root",
"123456");
connectionQueue.push(conn);
}
}
} catch (Exception e) {
e.printStackTrace();
}
return connectionQueue.poll();
}
public static void returnConnection(Connection conn){connectionQueue.push(conn);}
}
第四步
整合flume kafka 通过执行tail-F抽取微博热搜榜
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
a2.sources = r2
a2.channels = c2
a2.sinks = k2
# define sources
a2.sources.r2.type = exec
## 注意一定要执行flume命令的用户对该/var/log/httpd/access_log文件
## 具有可读的权限
a2.sources.r2.command = tail -F /opt/project/weibo/data_`date +"%Y-%m-%d"`
a2.sources.r2.shell = /bin/bash -c
# define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# define sinks
#启用设置多级目录,这里按年/月/日/时 2级目录,每个小时生成一个文件夹
a2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a2.sinks.k2.brokerList = master:9092
a2.sinks.k2.topic = weiboTopic
# bind the sources and sinks to the channels
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
第五步
启动kafka 创建topic ,启动flume
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic weiboTopic
第六步
创建hive分区表与mysql关联
用来存放mysql数据
create external table weiboHotSearch(
highest_rank int,
title string,
url string,
day_date string
)partitioned by (year string,month string,day string)
row format delimited fields terminated by '\t';
临时表,用来导入mysql数据
create table weiboHotSearch_temp(
highest_rank int,
title string,
url string,
day_date string
)row format delimited fields terminated by '\t';
第七步
编写shell脚本
#!/bin/sh
PATH=/usr/local/bin
export PATH
python_dir=/opt/project/weibo/weiboHot.py
python ${python_dir}
#!/bin/sh
#操作脚本
#使用环境变量
. /etc/profile
#HDFS数据源目录
DATA_LOG=/user/hive/warehouse/work0403.db
#当天日期
TODAY=`date +%Y-%m-%d`
#前一天日期
YESTERDAY=$(date -d "yesterday" +%Y-%m-%d)
#设置数据库变量
HOSTNAME=master
PORT=3306
USERNAME=root
PASSWORD=123456
DATABASE=weibo
#设置sql语句
sql_truncate_temp="truncate table ${DATABASE}.weiboHotSearch_temp"
sql_insert="insert into ${DATABASE}.weiboHotSearch_temp (select * from ${DATABASE}.weiboHotSearch where day_date=\""${TODAY}"\")"
sql_truncate_main="truncate table ${DATABASE}.weiboHotSearch"
#将前一天的临时表清空
mysql -h${HOSTNAME} -P${PORT} -u${USERNAME} -p${PASSWORD} -e "${sql_truncate_temp}" --default-character-set=UTF8
#将主表数据插入临时表
mysql -h${HOSTNAME} -P${PORT} -u${USERNAME} -p${PASSWORD} -e "${sql_insert}" --default-character-set=UTF8
#主表数据清空
mysql -h${HOSTNAME} -P${PORT} -u${USERNAME} -p${PASSWORD} -e "${sql_truncate_main}" --default-character-set=UTF8
#清空hive前一天临时表
hive -e "
use ${DATABASE};
truncate table weiboHotSearch_temp;"
#用sqoop 将临时表数据导入hive 临时表
sqoop import \
--connect jdbc:mysql://${HOSTNAME}:${PORT}/${DATABASE} \
--username ${USERNAME} \
--password ${PASSWORD} \
--table weiboHotSearch_temp \
--num-mappers 1 \
--fields-terminated-by "\t" \
--delete-target-dir \
--hive-database ${DATABASE} \
--hive-import \
--hive-table weiboHotSearch_temp
#将hive 临时表数据导入hive主表
hive -e "
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nostrick;
insert into table ${DATABASE}.weiboHotSearch partition(year,month,day)
select highest_rank,title,url,day_date,substr(day_date,0,4) year,substr(day_date,6,2) month, substr(day_date,9,2) day
from ${DATABASE}.weiboHotSearch_temp;"
#删除前一天数据文件
rm -rf /opt/project/weibo/data_${YESTERDAY}
第八步
测试运行
启动hdfs
${HADOOP_HOME}/sbin/start-dfs.sh
${HADOOP_HOME}/sbin/start-yarn.sh
${HADOOP_HOME}/sbin/mr-jobhistory-daemon.sh start historyserver
启动zookeeper
${ZOOKEEPER_HOME}/sbin/zkServer.sh start
启动hive metastore
${HIVE_HOME}/bin/hive --service metastore &
启动flume
${FLUME_HOME}/bin/flume-ng agent --conf conf/ --name a2 --conf-file /opt/project/weibo/flume-kafka_weibo.conf
启动kafka
${KAFKA_HOME}/bin/kafka-server-start.sh config/server.properties
运行 spark java 包
${SPARK_HOME}/bin/spark-submit \
--class com.stanley.sparktest.weibo.WeiBoHot \
/opt/project/weibo/weiboHot.jar
执行python 脚本
bash /opt/project/weibo/python_shell.sh
python脚本执行完后执行操作脚本
bash /opt/project/weibo/operation.sh
爬取的内容抽取到kafka集群中
查看数据库,数据已存入数据库
执行操作脚本后
数据库中接收数据表被清空,数据转入准备导入和hive 连接的表
数据已经存入到hive 中
数据在存入到hdfs 分区当中
数据文件已经自动产生
第九步
设置定时任务
可以用oozie来调度,
Crontab调度如下
#每十分钟爬取一次微博热搜数据
2,12,22,32,42,52 * * * * bash /opt/project/weibo/python_shell.sh
#每天23点55分执行操作脚本,把数据存入到hdfs
55 23 * * * bash /opt/project/weibo/operation.sh
使用hue 页面操作oozie
创建两个workflow,分别对应python脚本,和操作脚本
两个脚本分别创建coordinator,频率和上面的crontab一致
创建bundle,将两个coordinator绑在一起
最后提交bundle任务
更多推荐
所有评论(0)