原文:https://blog.csdn.net/u011254180/article/details/80172452

https://blog.csdn.net/xw15061126063/article/details/86760299

 本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解 大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开发和web可视化交互设计。项目代码托管于github,大家可以自行下载

一、业务需求分析

  1. 捕获用户浏览日志信息
  2. 实时分析前20名流量最高的新闻话题
  3. 实时统计当前线上已曝光的新闻话题
  4. 统计哪个时段用户浏览量最高

二、系统架构图设计

四、集群资源规划设计

三、系统数据流程设计

五、步骤详解

1. Zookeeper分布式集群部署 
2. Hadoop2.X HA架构与部署 
3. HBase分布式集群部署与设计 
4. Kafka分布式集群部署 
5. Flume部署及数据采集准备

#node6与node7中flume数据采集到node5中,而且node6和node7的flume配置文件大致相同,node7中将a2改为a3,如下

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/data/weblog-flume.log  # 每一行数据发送kafka
a2.sources.r1.channels = c1

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 1000
a2.channels.c1.keep-alive = 5

a2.sinks.k1.type = avro
a2.sinks.k1.channel = c1
a2.sinks.k1.hostname = node5
a2.sinks.k1.port = 5555


6. Flume+HBase+Kafka集成与开发

3. 下载日志数据并分析

到搜狗实验室下载用户查询日志

数据格式为:  访问时间\t     用户ID\t     [查询词]\t      URL在返回结果中的排名\t     用户点击的顺序号\t     用户点击的URL

用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID

00:00:00  2982199073774412  [360安全卫士]  8 3  download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
00:00:00  0759422001082479  [哄抢救灾物资]  1 1  news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
00:00:00  5228056822071097  [75810部队]  14 5  www.greatoo.com/greatoo_cn/list.asp?link_id=276&title=%BE%DE%C2%D6%D0%C2%CE%C5
00:00:00  6140463203615646  [绳艺]  62 36  www.jd-cd.com/jd_opus/xx/200607/706.html
00:00:00  8561366108033201  [汶川地震原因]  3 2  www.big38.net/
00:00:00  2390814038614871  [莫衷一是的意思]  1 2  www.chinabaike.com/article/81/82/110/2007/2007020724490.html
00:00:00  1797943298449139  [星梦缘全集在线观看]  8 5  www.6wei.net/dianshiju/????\xa1\xe9|????do=index
00:00:00  0071772592458284  [闪字吧]  1 2  www.shanziba.com/
00:00:00  4141621901895211  [霍震霆与朱玲玲照片]  2 6  bbs.gouzai.cn/thread-698736.html
00:00:00  9975666857142764  [电脑创业]  2 2  ks.cn.yahoo.com/question/1307120203719.html
00:00:00  2160337461907744  [111aa图片]  1 6  www.fotolog.com.cn/tags/aa111
00:00:00  7423866288265172  [豆腐的制成]  3 13  ks.cn.yahoo.com/question/1406051201894.html
00:00:00  0616877776407358  [tudou.com+禁播电影]  2 9  topic.bindou.com/1487/
00:00:00  3933365481995287  [最佳受孕时间]  6 3  ks.cn.yahoo.com/question/1407051001276.html
00:00:00  8242389147671512  [捷克民歌土风舞++教案]  2 3  shwamlys.blog.sohu.com/76558184.html
00:00:00  8248403977107859  [pdf]  1 1  download.it168.com/18/1805/13947/13947_3.shtml
00:00:00  6239728533154822  [喷嘭乐团lonely+day]  7 9  www.songtaste.com/song/266563/
00:00:00  6551182914925117  [尼康相机报价]  6 4  product.it168.com/list/b/03050171_1.shtml
00:00:00  2345161545038265  [哄抢救灾物资]  2 1  pic.news.mop.com/gs/2008/0528/12985.shtml

5. 对日志数据进行格式处理
    1)将文件中的tab更换成逗号
cat weblog.log|tr "\t" "," > weblog.log
    2)将文件中的空格更换成逗号
cat weblog2.log|tr " " "," > weblog.log 

å¨è¿éæå¥å¾çæè¿°

4. node5聚合节点与HBase和Kafka的集成配置

    node5通过flume接收node6与node7中flume传来的数据,并将其分别发送至hbase与kafka中,配置内容如下

a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink

a1.sources.r1.type = avro       
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = node5
a1.sources.r1.port = 5555 
a1.sources.r1.threads = 5 

#****************************flume + hbase****************************** 
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20

a1.sinks.hbaseSink.type = asynchbase
a1.sinks.hbaseSink.table = weblogs
a1.sinks.hbaseSink.columnFamily = info
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl

#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20

a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = node5:9092,node6:9092,node7:9092
a1.sinks.kafkaSink.topic = weblogs
a1.sinks.kafkaSink.zookeeperConnect = node5:2181,node6:2181,node7:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

 6. 自定义SinkHBase程序设计与开发

    1)模仿SimpleAsyncHbaseEventSerializer自定义KfkAsyncHbaseEventSerializer实现类,修改一下代码即可。

@Override
public List<PutRequest> getActions() {
    List<PutRequest> actions = new ArrayList<PutRequest>();
    if (payloadColumn != null) {
        byte[] rowKey;
        try {
            /*---------------------------代码修改开始---------------------------------*/
            // 解析列字段
            String[] columns = new String(this.payloadColumn).split(",");
            // 解析flume采集过来的每行的值
            String[] values = new String(this.payload).split(",");
            for(int i=0;i < columns.length;i++){
                byte[] colColumn = columns[i].getBytes();
                byte[] colValue = values[i].getBytes(Charsets.UTF_8);
 
                // 数据校验:字段和值是否对应
                if(colColumn.length != colValue.length) break;
                 // 时间
                String datetime = values[0].toString();
                // 用户id
                String userid = values[1].toString();
                // 根据业务自定义Rowkey
                rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
                // 插入数据
                PutRequest putRequest =  new PutRequest(table, rowKey, cf,  colColumn, colValue);
                actions.add(putRequest);
            /*---------------------------代码修改结束---------------------------------*/
            }
         } catch (Exception e) {
                throw new FlumeException("Could not get row key!", e);
        }
    }
    return actions;
}

 2)在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法

public static byte[] getKfkRowKey(String userid, String datetime) throws UnsupportedEncodingException {
    return (userid + "-" + datetime + "-" + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
}

将打包名字替换为flume自带的包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至flume/lib目录下,覆盖原有的jar包即可。

 5. 编写启动flume服务程序的shell脚本

在各节点的flume安装目录下编写flume启动脚本flume-kfk-start.sh。下面是node5中的配置写法,node6与node7中将a1分别改为a2和a3。

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

9. 启动数据采集所有服务

    1)启动Zookeeper服务

    2)启动hdfs服务

    3)启动HBase服务

    创建hbase业务表  create 'weblogs','info'
    4)启动Kafka服务,并创建业务数据topic

bin/kafka-server-start.sh config/server.properties &

bin/kafka-topics.sh --create 
--zookeeper node5:2181,node6:2181,node7:2181 
--topic weblogs 
--partitions 1 
--replication-factor 3

 编写Kafka Consumer执行脚本

bin/kafka-console-consumer.sh --zookeeper node5:2181,node6:2181,node7:2181 --from-beginning --topic weblogs

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐