目录

一、通话日志来源

二、 flume日志采集

三、kafka消费数据

四、HBASE保存数据

五、MapReduce分析数据

六、Redis缓存

七、数据展示


项目需求:统计分析用户每个月,每年的通话次数和通话总时长

项目架构图如下:

本项目是一个离线项目,主要是监控采集数据到hbase,然后将hbase上的数据进行分析处理,将处理后的结果存放到MySQL中,然后将分析的结果进行图表展示。流程如下:

一、通话日志来源

自己编写一个生产日志的程序,生成一些通话的日志,模拟通话记录

二、 flume日志采集

使用Flume收集日志,然后传输到Kafka队列中,其中Flume的类型就可以确定

source:exec类型,其中source可以是多个,因为,集群上每一台机器都可以产生日志,都需要监听

channel:毫无疑问的选择的是memory,这样速度快

sink:下游选择的是kafka,需要在flume配置文件中指定kafka的topic,地址等,具体的配置见后面

三、kafka消费数据

使用kafka创建topic,flume采集到的数据输入到对应的topic中

四、HBASE保存数据

将kafka消费者消费到的消息写入到HBASE中进行保存,在HBASE这块是整个项目的核心点之一。这里需要考虑很多事情。比如表的创建和删除,预分区的设定,rowkey的设计,分区键的设计等等。如何设计rowkey才能将数据分散而又相对集中的存放在hbase中,并且如何实现将同一个用户的同一个月的数据都存放在同一个分区中。

五、MapReduce分析数据

数据分析阶段可以采用很多框架来进行数据分析,比如spark和hive等,本项目采用MapReduce进行数据分析

在MapReduce阶段将hbase中存放的数据进行分析。首先要怎样才能从hbase中取得数据?在mapper类中继承TableMapper<Text,Text>类,而不直接继承Mapper类,这样map方法中的key就是从hbase中取得的rowkey了。在map阶段进行数据的切分,在reduce阶段进行数据的统计汇总。在reduce方法后,再进行自定义一个outputformat,将reduce方法处理后的数据保存到MySQL中。

六、Redis缓存

将用户表的id和电话号码Tel和日期表的id和日期缓存到Redis中,当outputformat向MySQL中写入数据的时候,从Redis中取得对应电话号码的id和对应日期的dateid,然后将这两个id写入到mysql数据库的ct_call表中,存放电话号码的id和日期的id,将三张表进行关联,这样使数据库存储效率更高。

七、数据展示

通过使用SSM框架和echarts组件,查询出mysql中分析好的数据,然后进行图表展示

 

flume配置:

#defile
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#source
a1.source.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/call.log
a1.sources.r1.shell = /bin/bash -c

#sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = cMaster:9092,cSlave0:9092:cSlave1:9092
a1.sinks.k1.kafka.topic = ct
al.sinks.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/call.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = cMaster:9092,cSlave0:9092,cSlave1:9092
a1.sinks.k1.topic = ct
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐