新闻大数据实时分析可视化系统,Flume+HBase+Kafka
原文:https://blog.csdn.net/u011254180/article/details/80172452https://blog.csdn.net/xw15061126063/article/details/86760299本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解 大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开...
原文:https://blog.csdn.net/u011254180/article/details/80172452
https://blog.csdn.net/xw15061126063/article/details/86760299
本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解 大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开发和web可视化交互设计。项目代码托管于github,大家可以自行下载。
一、业务需求分析
- 捕获用户浏览日志信息
- 实时分析前20名流量最高的新闻话题
- 实时统计当前线上已曝光的新闻话题
- 统计哪个时段用户浏览量最高
二、系统架构图设计
四、集群资源规划设计
三、系统数据流程设计
五、步骤详解
1. Zookeeper分布式集群部署
2. Hadoop2.X HA架构与部署
3. HBase分布式集群部署与设计
4. Kafka分布式集群部署
5. Flume部署及数据采集准备
#node6与node7中flume数据采集到node5中,而且node6和node7的flume配置文件大致相同,node7中将a2改为a3,如下
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/data/weblog-flume.log # 每一行数据发送kafka
a2.sources.r1.channels = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 1000
a2.channels.c1.keep-alive = 5
a2.sinks.k1.type = avro
a2.sinks.k1.channel = c1
a2.sinks.k1.hostname = node5
a2.sinks.k1.port = 5555
6. Flume+HBase+Kafka集成与开发
3. 下载日志数据并分析
到搜狗实验室下载用户查询日志
数据格式为: 访问时间\t 用户ID\t [查询词]\t URL在返回结果中的排名\t 用户点击的顺序号\t 用户点击的URL
用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID
00:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
00:00:00 0759422001082479 [哄抢救灾物资] 1 1 news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
00:00:00 5228056822071097 [75810部队] 14 5 www.greatoo.com/greatoo_cn/list.asp?link_id=276&title=%BE%DE%C2%D6%D0%C2%CE%C5
00:00:00 6140463203615646 [绳艺] 62 36 www.jd-cd.com/jd_opus/xx/200607/706.html
00:00:00 8561366108033201 [汶川地震原因] 3 2 www.big38.net/
00:00:00 2390814038614871 [莫衷一是的意思] 1 2 www.chinabaike.com/article/81/82/110/2007/2007020724490.html
00:00:00 1797943298449139 [星梦缘全集在线观看] 8 5 www.6wei.net/dianshiju/????\xa1\xe9|????do=index
00:00:00 0071772592458284 [闪字吧] 1 2 www.shanziba.com/
00:00:00 4141621901895211 [霍震霆与朱玲玲照片] 2 6 bbs.gouzai.cn/thread-698736.html
00:00:00 9975666857142764 [电脑创业] 2 2 ks.cn.yahoo.com/question/1307120203719.html
00:00:00 2160337461907744 [111aa图片] 1 6 www.fotolog.com.cn/tags/aa111
00:00:00 7423866288265172 [豆腐的制成] 3 13 ks.cn.yahoo.com/question/1406051201894.html
00:00:00 0616877776407358 [tudou.com+禁播电影] 2 9 topic.bindou.com/1487/
00:00:00 3933365481995287 [最佳受孕时间] 6 3 ks.cn.yahoo.com/question/1407051001276.html
00:00:00 8242389147671512 [捷克民歌土风舞++教案] 2 3 shwamlys.blog.sohu.com/76558184.html
00:00:00 8248403977107859 [pdf] 1 1 download.it168.com/18/1805/13947/13947_3.shtml
00:00:00 6239728533154822 [喷嘭乐团lonely+day] 7 9 www.songtaste.com/song/266563/
00:00:00 6551182914925117 [尼康相机报价] 6 4 product.it168.com/list/b/03050171_1.shtml
00:00:00 2345161545038265 [哄抢救灾物资] 2 1 pic.news.mop.com/gs/2008/0528/12985.shtml
5. 对日志数据进行格式处理
1)将文件中的tab更换成逗号
cat weblog.log|tr "\t" "," > weblog.log
2)将文件中的空格更换成逗号
cat weblog2.log|tr " " "," > weblog.log
4. node5聚合节点与HBase和Kafka的集成配置
node5通过flume接收node6与node7中flume传来的数据,并将其分别发送至hbase与kafka中,配置内容如下
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink
a1.sources.r1.type = avro
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = node5
a1.sources.r1.port = 5555
a1.sources.r1.threads = 5
#****************************flume + hbase******************************
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20
a1.sinks.hbaseSink.type = asynchbase
a1.sinks.hbaseSink.table = weblogs
a1.sinks.hbaseSink.columnFamily = info
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20
a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = node5:9092,node6:9092,node7:9092
a1.sinks.kafkaSink.topic = weblogs
a1.sinks.kafkaSink.zookeeperConnect = node5:2181,node6:2181,node7:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
6. 自定义SinkHBase程序设计与开发
1)模仿SimpleAsyncHbaseEventSerializer自定义KfkAsyncHbaseEventSerializer实现类,修改一下代码即可。
@Override
public List<PutRequest> getActions() {
List<PutRequest> actions = new ArrayList<PutRequest>();
if (payloadColumn != null) {
byte[] rowKey;
try {
/*---------------------------代码修改开始---------------------------------*/
// 解析列字段
String[] columns = new String(this.payloadColumn).split(",");
// 解析flume采集过来的每行的值
String[] values = new String(this.payload).split(",");
for(int i=0;i < columns.length;i++){
byte[] colColumn = columns[i].getBytes();
byte[] colValue = values[i].getBytes(Charsets.UTF_8);
// 数据校验:字段和值是否对应
if(colColumn.length != colValue.length) break;
// 时间
String datetime = values[0].toString();
// 用户id
String userid = values[1].toString();
// 根据业务自定义Rowkey
rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
// 插入数据
PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue);
actions.add(putRequest);
/*---------------------------代码修改结束---------------------------------*/
}
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
}
return actions;
}
2)在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法
public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
return (userid + "-" + datetime + "-" + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
}
将打包名字替换为flume自带的包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至flume/lib目录下,覆盖原有的jar包即可。
5. 编写启动flume服务程序的shell脚本
在各节点的flume安装目录下编写flume启动脚本flume-kfk-start.sh。下面是node5中的配置写法,node6与node7中将a1分别改为a2和a3。
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
9. 启动数据采集所有服务
1)启动Zookeeper服务
2)启动hdfs服务
3)启动HBase服务
创建hbase业务表 create 'weblogs','info'
4)启动Kafka服务,并创建业务数据topic
bin/kafka-server-start.sh config/server.properties &
bin/kafka-topics.sh --create
--zookeeper node5:2181,node6:2181,node7:2181
--topic weblogs
--partitions 1
--replication-factor 3
编写Kafka Consumer执行脚本
bin/kafka-console-consumer.sh --zookeeper node5:2181,node6:2181,node7:2181 --from-beginning --topic weblogs
更多推荐
所有评论(0)