项目背景

通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论。

当前我们的需求是:统计每天、每月以及每年的每个人的通话次数及时长。

项目架构

说明:

1、用户通过时,通过应用服务生成主叫、被叫、通话时间、通话时长等日志信息,日志信息打印到日志文件中。

2、Flume监听日志文件,读取通话相关的日志通过Kafka的log主题发送到Kafka中。

3、日志消费服务通过log主题获取Kafka中的数据,并调用HBase的api,将数据信息存入HBase、文件信息存入HDFS中。

4、编写的MapReduce任务定时去从HBase读数据,以各个维度进行数据分析。并将分析结果写入Mysql中。

5、用户查询通话记录报表时,请求分析服务获取数据,然后数据在界面中以图表形式展示。

系统环境

系统版本
Windows10
CentOS7.9.2009
工具版本
IDEA2020.1.3
Maven3.5.4
JDK1.8
MySQL5.6.50
框架版本
Flume1.9.0
Kafka2.12-2.8.0
Zookeeper3.6.3
Hadoop2.10.1
HBase2.2.6
Hutool5.6.4
SpringBoot2.4.5
MybatisPlus3.4.2

系统配置

在CentOS中添加hosts:

vim /etc/hosts

添加内容:

192.168.1.43 linuxserver

192.168.1.43为本机CentOS的IP。

在network中添加HOSTNAME

vim /etc/sysconfig/network

添加内容:

HOSTNAME=linuxserver

需要重启系统。

在windows中添加hosts:

修改C:\Windows\System32\drivers\etc\hosts文件,追加内容:

192.168.1.43	linuxserver

分别在windows中和centos中执行ping命令,查看hostname是否被解析:

ping linuxserver

框架安装

所有框架在配置好环境变量后,均可直接使用命令,不用加路径,文中是为了展示其位置而加上全路径。

注意开放外部访问相关端口,否则会拒绝连接引起启动异常。centos开放端口可参考文章《CentOS7 中端口命令》

JDK

JDK下载链接

下载jdk-8u281-linux-x64.tar.gz压缩包,并解压缩到hadoop用户的家目录的jvm文件夹

cd ~
mkdir jvm
tar -zxf jdk-8u281-linux-x64.tar.gz -C jvm

编辑环境变量:

vim ~/.bashrc

添加JAVA_HOME:

export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281
export PATH=$JAVA_HOME/bin

让环境变量生效:

source ~/.bashrc

查看java版本:

java -version

Hadoop

通过wget下载:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/stable2/hadoop-2.10.1.tar.gz

解压到用户家目录:

tar -zxf hadoop-2.10.1.tar.gz -C ~

重命名:

mv hadoop-2.10.1 hadoop

修改环境变量:

vi ~/.bashrc

增加以下内容:

# Hadoop Environment Variables
export HADOOP_HOME=/home/hadoop/hadoop
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

注意,如果以前有PATH变量,请将以前的配置和此PATH合并即可。

使文件生效:

source ~/.bashrc

修改core-site.xml

vim ~/hadoop/etc/hadoop/core-site.xml

替换为以下内容:

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/home/hadoop/hadoop/tmp</value>
        <description>Abase for other temporary directories.</description>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://linuxserver:9000</value>
    </property>
</configuration>

修改hdfs-site.xml

vim ~/hadoop/etc/hadoop/hdfs-site.xml

替换为以下内容:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/home/hadoop/hadoop/tmp/dfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/home/hadoop/hadoop/tmp/dfs/data</value>
    </property>
</configuration>

第一次使用时执行NameNode初始化,后续无需执行:

/home/hadoop/hadoop/bin/hdfs namenode -format

启动hdfs文件系统:

/home/hadoop/hadoop/sbin/start-dfs.sh

修改mapred-site.xml

cd /home/hadoop/hadoop/etc/hadoop  #进入配置文件的文件夹
mv ./mapred-site.xml.template ./mapred-site.xml  #重命名
vim ./mapred-site.xml  #编辑文件

替换为以下内容:

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

修改yarn-site.xml

cd /home/hadoop/hadoop/etc/hadoop  #进入配置文件的文件夹
vim ./yarn-site.xml  #编辑文件

替换为以下内容:

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
        </property>
</configuration>

启动资源管理器YARN

/home/hadoop/hadoop/sbin/start-yarn.sh

启动历史服务器:

/home/hadoop/hadoop/sbin/mr-jobhistory-daemon.sh start historyserver

如果需要关闭hdfs,执行命令:

/home/hadoop/hadoop/sbin/stop-dfs.sh

如果需要关闭yarn,执行命令:

/home/hadoop/hadoop/sbin/stop-yarn.sh

如果需要关闭历史服务器,执行命令:

/home/hadoop/hadoop/sbin/mr-jobhistory-daemon.sh stop historyserver

启动成功后,可通过ip+8088/cluster访问hadoop历史服务:

相关安装博客可以查看《分布式处理框架Hadoop的安装与使用》

Zookeeper

通过wget下载安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

解压到用户家目录:

tar -zxf apache-zookeeper-3.6.3-bin.tar.gz -C ~

重命名:

mv apache-zookeeper-3.6.3-bin zookeeper

进入zookeeper文件中创建文件夹:

cd zookeeper
mkdir tmp

复制模板配置文件并修改:

cp ./conf/zoo-sample.cfg ./conf/zoo.cfg
vim ./conf/zoo.cfg

修改内容:

dataDir=/home/hadoop/zookeeper/tmp

修改环境变量:

vi ~/.bashrc

增加以下内容:

# Zookeeper Environment Variables
export ZOOKEEPER_HOME=/home/hadoop/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

注意,如果以前有PATH变量,请将以前的配置和此PATH合并即可。

使文件生效:

source ~/.bashrc

启动zookeeper:

./bin/zkServer.sh start

如果需要停止zookeeper,可以通过stop命令停止:

./bin/zkServer.sh stop

Kafka

通过wget下载安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz

解压到家目录:

tar -zxf kafka_2.12-2.8.0.tgz -C ~

重命名:

mv kafka_2.12-2.8.0.tgz kafka

进入kafka目录:

cd kafka

修改kafka配置:

vim config/server.properties
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linuxserver:9092
zookeeper.connect=linuxserver:2181

启动kafka(在zookeeper启动之后):

bin/kafka-server-start.sh -daemon ./config/server.properties

创建log主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log

如果需要关闭kafka,执行:

bin/kafka-server-stop.sh

Flume

通过wget下载flume安装包:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

解压到家目录:

tar -zxf apache-flume-1.9.0-bin.tar.gz -C ~

重命名:

mv apache-flume-1.9.0-bin flume

在flume的conf目录下新建log-telecom-customer-service.properties,内容为:

agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink

# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail -F /home/hadoop/applogs/telecomCustomerServiceLog/telecomCustomerService.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+TELECOM_CUSTOMER_SERVICE.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = linuxserver:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20

#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000

表示通过命令tail -F /home/hadoop/applogs/telecomCustomerServiceLog/telecomCustomerService.log监听日志文件,并通过表达式.+TELECOM_CUSTOMER_SERVICE.+进行正则匹配,将结果通过log主题发送到linuxserver:9092的Kafka中。

启动Flume:

./bin/flume-ng agent -c ./conf/ -f ./conf/log-telecom-customer-service.properties -n agent -Dflume.root.logger=INFO,console

HBase

通过wget下载:

wget https://mirrors.tuna.tsinghua.edu.cn/apache/hbase/2.2.6/hbase-2.2.6-bin.tar.gz

解压到用户家目录:

tar -zxf hbase-2.2.6-bin -C ~

重命名:

mv hbase-2.2.6-bin hbase

修改/home/hadoop/hbase/conf/hbase-env.sh中的下列配置:

export JAVA_HOME=/home/hadoop/jvm/jdk1.8.0_281
export HBASE_CLASSPATH=/home/hadoop/hbase/conf 
export HBASE_MANAGES_ZK=false

修改/home/hadoop/hbase/conf/hbase-site.xml配置,替换为以下内容:

<configuration>
	<property>
        <name>hbase.rootdir</name>
        <value>hdfs://linuxserver:9000/hbase</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
	<property>
        <name>hbase.master.info.port</name>
    	<value>16010</value> 
    </property>
	<property>
        <name>hbase.master</name>
        <value>linuxserver:16000</value>
    </property>
	<property>
     	<name>zookeeper.znode.parent</name>
     	<value>/hbase/master</value>
	 </property>
	<property>
		<name>hbase.zookeeper.quorum</name>
		<value>linuxserver:2181</value>
	</property> 
</configuration>

修改/home/hadoop/hbase/conf/regionservers,增加以下内容:

linuxserver

修改环境变量:

vi ~/.bashrc

增加以下内容:

# HBase Environment Variables
export HBASE_HOME=/home/hadoop/hbase
export PATH=$PATH:$HBASE_HOME/bin

注意,如果以前有PATH变量,请将以前的配置和此PATH合并即可。

使文件生效:

source ~/.bashrc

启动HBase(需要先启动HDFS与Zookeeper):

/home/hadoop/hbase/bin/start-hbase.sh

如果需要关闭HBase,执行命令:

/home/hadoop/hbase/bin/stop-hbase.sh

HBase依赖与HDFS存储文件,依赖Zookeeper存储节点信息,所以启动前需要先启动HDFS与Zookeeper。

更多安装详情可以查看《分布式数据库HBase实践指南》

项目实现

项目结构

TelecomCustomerService
├── tcs-analysis #数据分析服务
├── tcs-api #web端接口服务
├── tcs-common #公共服务
├── tcs-consumer #数据存储服务
├── tcs-consumer-coprocessor #HBase协处理器服务
├── tcs-producer #生产日志服务
└── tcs-web #web界面服务

项目全局引入Hutool5.6.4工具类。

表设计

HBase

数据表call_record:

列族说明
activecall1通话号码1
activecall2通话号码2
activedate_time通话建立时间,格式为yyyyMMddHHmmss
activeduration通话时长,格式为0000
activeflag标志位,主叫为1,被叫为0
passivecall1通话号码1
passivecall2通话号码2
passivedate_time通话建立时间,格式为yyyyMMddHHmmss
passiveduration通话时长,格式为0000
passiveflag标志位,主叫为1,被叫为0
passivedate_time_ts通话时长时间戳

列族active为主叫记录,列族passive为被叫记录。

行键规则为call1_date_time_call2_flag_duration

Mysql

tb_contacts 联系人表

名称数据类型约束说明
idint(11)AUTO_INCREMENT, PRIMARY KEY自增id
telephonevarchar(255)NOT NULL手机号码
namevarchar(255)NOT NULL联系人姓名

tb_dimension_date 时间维度表

名称数据类型约束说明
idint(11)AUTO_INCREMENT, PRIMARY KEY自增id
yearvarchar(4)NOT NULL当前通话信息所在年
monthvarchar(2)NOT NULL当前通话信息所在月
dayvarchar(2)NOT NULL当前通话信息所在日

tb_call 通话统计表

名称数据类型约束说明
id_date_contactvarchar(255)NOT NULL, PRIMARY KEY复合主键(联系人id,时间维度id)
id_date_dimensionint(11)NOT NULL时间维度id
id_contactint(11)NOT NULL查询人id
call_sumint(11)NOT NULL通话次数总和
call_duration_sumint(11)NOT NULL通话时长总和

功能编写

公共服务

用于编写一些公用的工具。

引入Mysql、HBase、Log4j依赖:

<properties>
    <log4j.version>1.2.17</log4j.version>
    <slf4j.version>1.7.22</slf4j.version>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <!-- 引入共同的日志管理工具 -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-mapreduce</artifactId>
        <version>${hbase.version}</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.java.version}</version>
    </dependency>
</dependencies>

NameConstant

列族名称常量。

package cn.javayuli.common.constants;

/**
 * 名称常量
 *
 * @author hanguilin
 */
public interface NameConstant {

    /**
     * 主叫
     */
    String ACTIVE = "active";

    /**
     * 被叫
     */
    String PASSIVE = "passive";
}

StateConstant

主被叫常量。

package cn.javayuli.common.constants;

/**
 * 状态常量
 *
 * @author hanguilin
 */
public interface StateConstant {

    /**
     * 主叫
     */
    String ACTIVE = "1";

    /**
     * 被叫
     */
    String PASSIVE = "0";
}

HBaseDao

HBase表操作类。

package cn.javayuli.common.dao;

import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
 * HBase 工具类
 *
 * @author hanguilin
 */
public class HBaseDao {

    public static Configuration conf;
    
    private static final Logger LOGGER = LoggerFactory.getLogger(HBaseDao.class);

    static {
        conf = HBaseConfiguration.create();
    }

    /**
     * 表是否存在
     *
     * @param tableName 表名
     * @return
     */
    public boolean isExistTable(String tableName) {
        TableName table = TableName.valueOf(tableName);
        try (
                Connection conn = ConnectionFactory.createConnection(conf);
                Admin admin = conn.getAdmin()
        ) {
            return admin.tableExists(table);
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
            return true;
        }
    }

    /**
     * 创建表
     *
     * @param tableName    表明
     * @param columnFamily 列族
     */
    public void createTable(String tableName, List<String> columnFamily) {
        TableName table = TableName.valueOf(tableName);
        try (
                Connection conn = ConnectionFactory.createConnection(conf);
                Admin admin = conn.getAdmin()
        ) {
            // 判断表是否已存在
            if (admin.tableExists(table)) {
                LOGGER.info("表{}已存在", tableName);
                return;
            }
            TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(table);
            // 添加协处理器
            CoprocessorDescriptor coprocessor = CoprocessorDescriptorBuilder
                    // 协处理器类
                    .newBuilder("cn.javayuli.coprocessor.observer.CalleeWriteObserver")
                    // 协处理器jar包的位置
                    .setJarPath("hdfs://linuxserver:9000/user/hadoop/hbase/coprocessor/tcs-consumer-coprocessor-1.0.jar")
                    .setPriority(Coprocessor.PRIORITY_USER)
                    .build();
            tableDescriptorBuilder.setCoprocessor(coprocessor);
            if (CollUtil.isNotEmpty(columnFamily)) {
                columnFamily.forEach(column -> {
                    ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(column)).build();
                    tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
                });
            }
            admin.createTable(tableDescriptorBuilder.build());
            LOGGER.info("创建表{}成功", tableName);
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    /**
     * 删除表
     *
     * @param tableName 表名称
     */
    public void deleteTable(String tableName) {
        TableName table = TableName.valueOf(tableName);
        try (
                Connection conn = ConnectionFactory.createConnection(conf);
                Admin admin = conn.getAdmin()
        ) {
            if (admin.tableExists(table)) {
                // 弃用表
                admin.disableTable(table);
                // 删除表
                admin.deleteTable(table);
                LOGGER.info("删除表{}成功", tableName);
            }
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    /**
     * 获取所有表
     */
    public List<String> listTable() {
        try (
                Connection conn = ConnectionFactory.createConnection(conf);
                Admin admin = conn.getAdmin()
        ) {
            List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
            return tableDescriptors.stream().map(o -> o.getTableName().getNameAsString()).collect(Collectors.toList());
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
            return Collections.emptyList();
        }
    }

    /**
     * 插入行数据
     *
     * @param tableName 表名称
     * @param put put对象
     */
    public void insertRow(String tableName, Put put) {
        try (
                Connection conn = ConnectionFactory.createConnection(conf)
        ) {
            Table table = conn.getTable(TableName.valueOf(tableName));
            table.put(put);
            table.close();
            LOGGER.info("向表{}插入数据成功", tableName);
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    /**
     * 删除行数据
     *
     * @param tableName 表名称
     * @param rowKey    行键
     * @param colFamily 列族
     * @param col       列名称
     */
    public void deleteRow(String tableName, String rowKey, String colFamily, String col) {
        try (
                Connection conn = ConnectionFactory.createConnection(conf)
        ) {
            Table table = conn.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(rowKey.getBytes());
            delete.addColumn(colFamily.getBytes(), col.getBytes());
            table.delete(delete);
            System.out.println("删除数据成功");
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

    /**
     * 获取行数据
     *
     * @param tableName 表名称
     * @param rowKey    行键
     * @param colFamily 列族
     * @param col       列名称
     */
    public void getRow(String tableName, String rowKey, String colFamily, String col) {
        try (
                Connection conn = ConnectionFactory.createConnection(conf)
        ) {
            Table table = conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(rowKey.getBytes());
            get.addColumn(colFamily.getBytes(), col.getBytes());
            Result result = table.get(get);
            Cell[] rawCells = result.rawCells();
            for (Cell cell : rawCells) {
                System.out.println("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
                System.out.println("Timetamp:" + cell.getTimestamp() + " ");
                System.out.println("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
                System.out.println("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " ");
                System.out.println("value:" + new String(CellUtil.cloneValue(cell)) + " ");
            }
            table.close();
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

}

HBaseUtil

HBase工具类。

package cn.javayuli.common.utils;

import cn.hutool.setting.dialect.Props;

/**
 * HBase工具类
 * @author hanguilin
 */
public class HBaseUtil {

    private static final Props HBASE_PROPS = new Props("classpath://hbase.properties");

    /**
     * 生成行键
     *
     * @param call1
     * @param call2
     * @param dateTime
     * @param flag
     * @param duration
     * @return
     */
    public static String genRowKey (String call1, String dateTime, String call2, String flag, String duration) {
        return call1 + "_" + dateTime + "_" + call2 + "_" + flag + "_" + duration;
    }

    /**
     * 获取配置的hbase属性
     *
     * @param key 键
     * @return
     */
    public static String getHBaseProperties(String key) {
       return HBASE_PROPS.getStr(key);
    }
}

JDBCUtil

JDBC工具类。

package cn.javayuli.common.utils;


import java.sql.Connection;
import java.sql.DriverManager;

/**
 * @author hanguilin
 * JDBC工具类
 */
public class JDBCUtil {

    private static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver";
    private static final String MYSQL_URL = "jdbc:mysql://192.168.1.39:3306/telecom?useUnicode=true&characterEncoding=UTF-8";
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "root";

    public static Connection getConnection() {
        Connection conn = null;
        try {
            Class.forName(MYSQL_DRIVER_CLASS);
            conn = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return conn;

    }
}

TimeFormatUtil

时间格式化工具。

package cn.javayuli.common.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;

/**
 * 时间工具类
 *
 * @author hanguilin
 */
public class TimeFormatUtil {

    private static final SimpleDateFormat SDF = new SimpleDateFormat("yyyyMMddHHmmss");

    private static final Logger LOGGER = LoggerFactory.getLogger(TimeFormatUtil.class);

    /**
     * 格式化日期转换为时间戳
     *
     * @param formatString 格式化日期
     * @return
     * @throws ParseException
     */
    public static String toTS(String formatString) {
        try {
            return String.valueOf(SDF.parse(formatString).getTime());
        } catch (ParseException e) {
            LOGGER.error(e.getMessage(), e);
            return null;
        }
    }
}

resources文件夹下需要hbase.properties、hbase-site.xml、log4j.properties。

hbase.properties

用于HBaseUtil中获取操作的数据表的名称

tableName=call_record

hbase-site.xml

将hbase中配置好的hbase-site.xml复制过来,在hbase-client的源码中,会加载此路径下的hbase-site.xml配置文件。

log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

生产日志服务

引入依赖:

<dependency>
    <groupId>cn.javayuli</groupId>
    <artifactId>tcs-common</artifactId>
    <version>1.0</version>
</dependency>

设置打包时把用到的jar包都打进来:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

ProduceLog

生产日志主类。

package cn.javayuli.producer;

import cn.javayuli.common.utils.JDBCUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

/**
 * @author hanguilin
 *
 * 模拟生成日志信息
 */
public class ProduceLog {

    /**
     * 用户与手机号映射关系
     */
    private static final Map<String, String> USER_PHONE_MAP;

    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DateTimeFormatter OUTPUT_DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");

    private static final Logger LOGGER = LoggerFactory.getLogger(ProduceLog.class);

    private static final String LOG_PREFIX = "TELECOM_CUSTOMER_SERVICE:";

    static {
        HashMap<String, String> contacts = Maps.newHashMap();
        Connection connection = JDBCUtil.getConnection();
        try (PreparedStatement preparedStatement = connection.prepareStatement("select telephone, name from tb_contacts")) {
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                String telephone = resultSet.getString(1);
                String name = resultSet.getString(2);
                // 从数据库处查询联系人信息
                contacts.put(telephone, name);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        USER_PHONE_MAP = Collections.unmodifiableMap(contacts);
    }

    /**
     * 在时间区间内随机选择一个时间
     *
     * @param startDate 开始时间
     * @param endDate 结束时间
     * @return
     */
    private static String randomDate(String startDate, String endDate) {
        LocalDateTime start = LocalDateTime.parse(startDate, DATE_TIME_FORMATTER);
        LocalDateTime end = LocalDateTime.parse(endDate, DATE_TIME_FORMATTER);
        long duration = Duration.between(start, end).toMillis();
        LocalDateTime plus = start.plus((long) (Math.random() * duration), ChronoUnit.MILLIS);
        return plus.format(OUTPUT_DATE_TIME_FORMATTER);
    }

    /**
     * 产生单条日志
     *
     * @return
     */
    private static String produceRecord() {
        int size = USER_PHONE_MAP.size();
        // 获取一个[0, size)的随机数
        int fromIdx = new Random().nextInt(size);
        // 通过随机跳过[0, size)个元素随机获取map中的一个key
        String callFrom = USER_PHONE_MAP.entrySet().stream().skip(fromIdx).findFirst().get().getKey();
        // 当被叫人与主叫人相同时则重新随机挑选被叫人
        int toIdx;
        do {
            toIdx = new Random().nextInt(size);
        } while (toIdx == fromIdx);
        String callTo = USER_PHONE_MAP.entrySet().stream().skip(toIdx).findFirst().get().getKey();
        // 随机生成30分钟内的通话时长
        int duration = new Random().nextInt(30 * 60) + 1;
        String durationString = new DecimalFormat("0000").format(duration);
        // 建立通话时间
        String startDate = randomDate("2020-01-01 00:00:00", "2020-12-31 00:00:00");
        // 将信息用逗号拼接
        String log = Lists.newArrayList(callFrom, callTo, startDate, durationString).stream().collect(Collectors.joining(","));
        return log;
    }

    public static void main(String[] args) {
        // 生产日志信息
        while (true) {
            LOGGER.info(LOG_PREFIX + produceRecord());
        }
    }
}

log4j.properties

log4j配置,将日志数据输出到/home/hadoop/applogs/telecomCustomerServiceLog/telecomCustomerService.log文件中。

log4j.rootLogger=info, stdout, file
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%20t]  %-40c(line:%5L)  :  %m%n

log4j.appender.file = org.apache.log4j.FileAppender
log4j.appender.file.File = /home/hadoop/applogs/telecomCustomerServiceLog/telecomCustomerService.log
log4j.appender.file.Encoding=UTF-8
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%20t]  %-40c(line:%5L)  :  %m%n
log4j.appender.file.append = true

打印日志截图:

输出日志格式为:

TELECOM_CUSTOMER_SERVICE:15064972307,17519874292,20200222131726,1216
  • “TELECOM_CUSTOMER_SERVICE:”

固定前缀,方便Flume收集时过滤有效信息。

  • 15064972307,17519874292

第一个号码为主叫号码,第二个号码为被叫号码。

  • 20200222131726

建立随机通话时间,格式为yyyyMMddHHmmss。

  • 1216

随机通话时间,单位为秒,随机数在30分钟内,不满四位数则补0。

数据存储服务

日志生产服务产生日志文件后,Flume就会对增加的日志进行正则过滤,然后将日志信息通过log主题写入到kafka,数据消费服务订阅log主题,并消费log中的数据。

引入依赖:

<dependency>
    <groupId>cn.javayuli</groupId>
    <artifactId>tcs-common</artifactId>
    <version>1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

HBaseConsumer

消费Kafka消息,存储到HBase中。

package cn.javayuli.consumer;

import cn.hutool.setting.dialect.Props;
import cn.javayuli.common.constants.NameConstant;
import cn.javayuli.common.constants.StateConstant;
import cn.javayuli.common.dao.HBaseDao;
import cn.javayuli.common.utils.HBaseUtil;
import com.google.common.collect.Lists;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;

/**
 * kafka消费者
 *
 * @author hanguilin
 */
public class HBaseConsumer {

    private static final String LOG_PREFIX = "TELECOM_CUSTOMER_SERVICE:";

    public static void main(String[] args) {
        Props kafkaProps = new Props("classpath://kafka.properties");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaProps);
        kafkaConsumer.subscribe(Collections.singletonList(kafkaProps.getStr("kafka.topic")));
        HBaseDao hBaseDao = new HBaseDao();
        // 创建表
        String tableName = HBaseUtil.getHBaseProperties("tableName");
        hBaseDao.createTable(tableName, Lists.newArrayList(NameConstant.ACTIVE, NameConstant.PASSIVE));
        byte[] ACTIVE = Bytes.toBytes(NameConstant.ACTIVE);
        byte[] FLAG = Bytes.toBytes(StateConstant.ACTIVE);
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 15490732767,19335715448,20200505161114,0271
                String value = record.value().split(LOG_PREFIX)[1].trim();
                String[] split = value.split(",");
                String rowKey = HBaseUtil.genRowKey(split[0], split[2], split[1], StateConstant.ACTIVE, split[3]);
                Put put = new Put(Bytes.toBytes(rowKey));
                put.addColumn(ACTIVE, Bytes.toBytes("call1"), Bytes.toBytes(split[0]));
                put.addColumn(ACTIVE, Bytes.toBytes("call2"), Bytes.toBytes(split[1]));
                put.addColumn(ACTIVE, Bytes.toBytes("date_time"), Bytes.toBytes(split[2]));
                put.addColumn(ACTIVE, Bytes.toBytes("duration"), Bytes.toBytes(split[3]));
                put.addColumn(ACTIVE, Bytes.toBytes("flag"), FLAG);
                hBaseDao.insertRow(tableName, put);
            }
        }
    }
}

resources文件夹下需要kafka.properties、log4j.properties。

kafka.properties

kafka配置文件。

bootstrap.servers=linuxserver:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
group.id=telecomCustomerServiceGroup
auto.offset.reset=latest
enable.auto.commit=false

kafka.topic=log

log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

协处理器服务

HBase中协处理器概念

简而言之跟java中的拦截器类似,可以在各个状态时进行事件处理。

注意HBase1.x与HBase2.x协处理器实现方法的区别。

官方文档:http://hbase.apache.org/2.2/book.html#cp

在本文中的应用为在列族active中插入一条主叫数据之后,协处理器在列族passive中新增加一条被叫数据。

引入依赖:

<dependency>
    <groupId>cn.javayuli</groupId>
    <artifactId>tcs-common</artifactId>
    <version>1.0</version>
</dependency>

设置打包时把用到的jar包都打进来:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

CalleeWriteObserver

package cn.javayuli.coprocessor.observer;

import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import cn.javayuli.common.constants.NameConstant;
import cn.javayuli.common.constants.StateConstant;
import cn.javayuli.common.dao.HBaseDao;
import cn.javayuli.common.utils.HBaseUtil;
import cn.javayuli.common.utils.TimeFormatUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;

/**
 * HBase协处理器,用于实现主叫日志插入成功后,同时插入一条被叫日志
 *
 * @author hanguilin
 */
public class CalleeWriteObserver implements RegionObserver, RegionCoprocessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(CalleeWriteObserver.class);

    private static final byte[] PASSIVE = Bytes.toBytes(NameConstant.PASSIVE);

    private static final byte[] FLAG = Bytes.toBytes(StateConstant.PASSIVE);

    private static final String TABLE = HBaseUtil.getHBaseProperties("tableName");

    /**
     * hbase 2.x需要重写的方法
     *
     * @return
     */
    @Override
    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }

    /**
     * put数据之后执行的操作
     *
     * @param c
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) {
       try {
           String currentTableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString();

           if (!StrUtil.equals(currentTableName, TABLE)) {
               return;
           }
           // 15870580719_20210512142802_18323797211_1_0600
           String originRowKey = Bytes.toString(put.getRow());
           String[] splits = originRowKey.split("_");
           String flag = splits[3];
           if (!StrUtil.equals(StateConstant.ACTIVE, flag)) {
               return;
           }
           String rowKey = HBaseUtil.genRowKey(splits[2], splits[1], splits[0], StateConstant.PASSIVE, splits[4]);
           Put newPut = new Put(rowKey.getBytes());

           newPut.addColumn(PASSIVE, Bytes.toBytes("call1"), Bytes.toBytes(splits[2]));
           newPut.addColumn(PASSIVE, Bytes.toBytes("call2"), Bytes.toBytes(splits[0]));
           newPut.addColumn(PASSIVE, Bytes.toBytes("date_time"), Bytes.toBytes(splits[1]));
           newPut.addColumn(PASSIVE, Bytes.toBytes("duration"), Bytes.toBytes(splits[4]));
           newPut.addColumn(PASSIVE, Bytes.toBytes("flag"), FLAG);
           newPut.addColumn(PASSIVE, Bytes.toBytes("date_time_ts"), Bytes.toBytes(TimeFormatUtil.toTS(splits[1])));
           Table table = c.getEnvironment().getConnection().getTable(TableName.valueOf(TABLE));
           table.put(newPut);
           table.close();
       } catch (IOException e) {
           LOGGER.error(e.getMessage(), e);
       }
    }
}

在resources中添加日志配置文件:

log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n

数据分析服务

使用MapReduce进行数据离线分析。先从HBase中读取数据,将数据进行计算后存入Mysql表中。

引入依赖:

<dependency>
    <groupId>cn.javayuli</groupId>
    <artifactId>tcs-common</artifactId>
    <version>1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>2.10.1</version>
</dependency>

AnalysisKey

定义自己的key。

package cn.javayuli.analysis.kv;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author hanguilin
 *
 * 自定义分析数据Key
 */
public class AnalysisKey implements WritableComparable<AnalysisKey> {

    /**
     * 电话
     */
    private String telephone;
    /**
     * 日期
     */
    private String date;

    public AnalysisKey() {
    }

    public AnalysisKey(String telephone, String date) {
        this.telephone = telephone;
        this.date = date;
    }

    public String getTelephone() {
        return telephone;
    }

    public void setTelephone(String telephone) {
        this.telephone = telephone;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    @Override
    public int compareTo(AnalysisKey o) {
        int result = telephone.compareTo(o.getTelephone());
        if (result == 0) {
            result = date.compareTo(o.getDate());
        }
        return result;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(telephone);
        dataOutput.writeUTF(date);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        telephone = dataInput.readUTF();
        date = dataInput.readUTF();
    }
}

AnalysisValue

定义自己的value。

package cn.javayuli.analysis.kv;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author hanguilin
 *
 * 自定义分析数据value
 */
public class AnalysisValue implements Writable {

    /**
     * 通话次数求和
     */
    private String callSum;

    /**
     * 通话时间求和
     */
    private String callDurationSum;

    public AnalysisValue() {
    }

    public AnalysisValue(String callSum, String callDurationSum) {
        this.callSum = callSum;
        this.callDurationSum = callDurationSum;
    }

    public String getCallSum() {
        return callSum;
    }

    public void setCallSum(String callSum) {
        this.callSum = callSum;
    }

    public String getCallDurationSum() {
        return callDurationSum;
    }

    public void setCallDurationSum(String callDurationSum) {
        this.callDurationSum = callDurationSum;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(callSum);
        dataOutput.writeUTF(callDurationSum);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        callSum = dataInput.readUTF();
        callDurationSum = dataInput.readUTF();
    }
}

AnalysisMapper

MapReduce中定义的Mapper。

package cn.javayuli.analysis.mapper;

import cn.javayuli.analysis.kv.AnalysisKey;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * Mapper
 *
 * @author hanguilin
 */
public class AnalysisMapper extends TableMapper<AnalysisKey, Text> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        // 19920860202_20201226131016_18503558939_0_1451
        String rowKey = Bytes.toString(key.get());
        String[] split = rowKey.split("_");

        String call1 = split[0];
        String dateTime = split[1];
        String call2 = split[2];
        String duration = split[4];

        String year = dateTime.substring(0, 4);
        String month = dateTime.substring(0, 6);
        String date = dateTime.substring(0, 8);

        // 主叫用户 - 年
        context.write(new AnalysisKey(call1, year), new Text(duration));
        // 主叫用户 - 月
        context.write(new AnalysisKey(call1, month), new Text(duration));
        // 主叫用户 - 日
        context.write(new AnalysisKey(call1, date), new Text(duration));

        // 被叫用户 - 年
        context.write(new AnalysisKey(call2, year), new Text(duration));
        // 被叫用户 - 月
        context.write(new AnalysisKey(call2, month), new Text(duration));
        // 被叫用户 - 日
        context.write(new AnalysisKey(call2, date), new Text(duration));
    }
}

AnalysisReducer

MapReduce中定义的Reducer。

package cn.javayuli.analysis.reducer;

import cn.javayuli.analysis.kv.AnalysisKey;
import cn.javayuli.analysis.kv.AnalysisValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author hanguilin
 *
 * 分析数据Reducer
 */
public class AnalysisReducer extends Reducer<AnalysisKey, Text, AnalysisKey, AnalysisValue> {

    @Override
    protected void reduce(AnalysisKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int sumCall = 0;
        int sumDuration = 0;
        for (Text value: values) {
            sumCall ++;
            sumDuration += Integer.valueOf(value.toString());
        }

        context.write(key, new AnalysisValue(sumCall + "", sumDuration + ""));
    }
}

AnalysisTool

定义MapReduce工作的配置。

package cn.javayuli.analysis.tool;

import cn.javayuli.analysis.format.MysqlOutputFormat;
import cn.javayuli.analysis.kv.AnalysisKey;
import cn.javayuli.analysis.kv.AnalysisValue;
import cn.javayuli.analysis.mapper.AnalysisMapper;
import cn.javayuli.analysis.reducer.AnalysisReducer;
import cn.javayuli.common.constants.NameConstant;
import cn.javayuli.common.utils.HBaseUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;

/**
 * 分析数据的工具类
 *
 * @author hanguilin
 */
public class AnalysisTool implements Tool {

    private static final String TABLE = HBaseUtil.getHBaseProperties("tableName");

    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(NameConstant.ACTIVE));
        // mapper
        TableMapReduceUtil.initTableMapperJob(TABLE, scan, AnalysisMapper.class, AnalysisKey.class, Text.class, job);
        // reducer
        job.setReducerClass(AnalysisReducer.class);
        job.setOutputKeyClass(AnalysisKey.class);
        job.setOutputValueClass(AnalysisValue.class);
        // outputFormat
        job.setOutputFormatClass(MysqlOutputFormat.class);
        return job.waitForCompletion(true) ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
    }

    @Override
    public void setConf(Configuration configuration) {

    }

    @Override
    public Configuration getConf() {
        return null;
    }
}

MysqlOutputFormat

输出到Mysql的实现类。

package cn.javayuli.analysis.format;

import cn.javayuli.analysis.kv.AnalysisKey;
import cn.javayuli.analysis.kv.AnalysisValue;
import cn.javayuli.common.utils.JDBCUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
 * 输出到mysql
 *
 * @author hanguilin
 */
public class MysqlOutputFormat extends OutputFormat<AnalysisKey, AnalysisValue> {

    private FileOutputCommitter committer = null;

    protected static class MysqlRecordWriter extends RecordWriter<AnalysisKey, AnalysisValue> {

        /**
         * Mysql连接
         */
        private Connection connection;
        /**
         * 用户信息Map
         */
        private Map<String, Integer> userMap = new HashMap();
        /**
         * 时间维度Map
         */
        private Map<String, Integer> dateMap = new HashMap();

        public MysqlRecordWriter() {
            connection = JDBCUtil.getConnection();
            try (PreparedStatement contactStatement = connection.prepareStatement("select id, telephone from tb_contacts");
                 PreparedStatement dateStatement = connection.prepareStatement("select id, year, month, day from tb_dimension_date")){
                ResultSet contactRs = contactStatement.executeQuery();
                while (contactRs.next()) {
                    Integer id = contactRs.getInt(1);
                    String telephone = contactRs.getString(2);
                    userMap.put(telephone, id);
                }
                ResultSet dateRs = dateStatement.executeQuery();
                while (dateRs.next()) {
                    Integer id = dateRs.getInt(1);
                    String year = dateRs.getString(2);
                    String month = dateRs.getString(3);
                    String day = dateRs.getString(4);
                    if (month.length() == 1) {
                        month = "0" + month;
                    }
                    if (day.length() == 1) {
                        day = "0" + day;
                    }
                    dateMap.put(year + month + day, id);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void write(AnalysisKey analysisKey, AnalysisValue analysisValue) {
            try (PreparedStatement preparedStatement = connection.prepareStatement("insert into tb_call (id_date_contact, id_date_dimension, id_contact, call_sum, call_duration_sum) values (?, ?, ?, ?, ?)")) {
                Integer idDateDimension = dateMap.get(analysisKey.getDate());
                Integer idContact = userMap.get(analysisKey.getTelephone());
                Integer callSum = Integer.valueOf(analysisValue.getCallSum());
                Integer callDurationSum = Integer.valueOf(analysisValue.getCallDurationSum());
                preparedStatement.setString(1, idDateDimension + "_" + idContact);
                preparedStatement.setInt(2, idDateDimension);
                preparedStatement.setInt(3, idContact);
                preparedStatement.setInt(4, callSum);
                preparedStatement.setInt(5, callDurationSum);
                preparedStatement.executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void close(TaskAttemptContext taskAttemptContext) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) {
        return new MysqlRecordWriter();
    }

    @Override
    public void checkOutputSpecs(JobContext jobContext) {

    }

    public static Path getOutputPath(JobContext job) {
        String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
        return name == null ? null : new Path(name);
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
        return Optional.ofNullable(committer).orElseGet(() -> {
            Path outputPath = getOutputPath(taskAttemptContext);
            FileOutputCommitter fileOutputCommitter = null;
            try {
                fileOutputCommitter = new FileOutputCommitter(outputPath, taskAttemptContext);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return fileOutputCommitter;
        });
    }
}

AnalysisApplication

执行入口。

package cn.javayuli.analysis;

import cn.javayuli.analysis.tool.AnalysisTool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author hanguilin
 *
 * 分析类执行器
 */
public class AnalysisApplication {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new AnalysisTool(), args);
    }
}

Web端接口服务

获取mysql中的分析数据,以rest接口暴露给外部。

引入依赖:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.4.5</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.4.2</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.21</version>
    </dependency>

</dependencies>

项目结构:

WebMvcConfig

跨域配置。

package cn.javayuli.api.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

/**
 * @author hanguilin
 *
 * mvc配置
 */
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {

    @Override
    public void addCorsMappings(CorsRegistry registry) {
        registry.addMapping("/**")
                .allowedOrigins("*")
                .allowedMethods("*")
                .allowedHeaders("*")
                .allowCredentials(false);
    }
}

DataOut

值传递类,用于包装返回数据项。

package cn.javayuli.api.vo;

/**
 * @author hanguilin
 *
 * 输出的数据
 */
public class DataOut {

    /**
     * 日期
     */
    private String date;

    /**
     * 通话次数
     */
    private Integer callSum;

    /**
     * 通话总数
     */
    private Integer callDurationSum;

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public Integer getCallSum() {
        return callSum;
    }

    public void setCallSum(Integer callSum) {
        this.callSum = callSum;
    }

    public Integer getCallDurationSum() {
        return callDurationSum;
    }

    public void setCallDurationSum(Integer callDurationSum) {
        this.callDurationSum = callDurationSum;
    }
}

DataOutWrapper

值传递类,用于包装返回数据项,是对DataOut的进一步封装。

package cn.javayuli.api.vo;

import java.util.List;

/**
 * @author hanguilin
 *
 * 值对象类
 */
public class DataOutWrapper {

    /**
     * 数据
     */
    List<DataOut> data;

    /**
     * 用户名
     */
    String name;

    public DataOutWrapper() {
    }

    public DataOutWrapper(List<DataOut> data, String name) {
        this.data = data;
        this.name = name;
    }

    public List<DataOut> getData() {
        return data;
    }

    public void setData(List<DataOut> data) {
        this.data = data;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

Call

tb_call对应的实体。

package cn.javayuli.api.entity;

import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

/**
 * @author hanguilin
 *
 * 通话信息
 */
@TableName("tb_call")
public class Call {

    /**
     * 复合主键(联系人维度id,时间维度id)
     */
    @TableId
    private String idDateContact;

    /**
     * 时间维度id
     */
    private Integer idDateDimension;

    /**
     * 查询人的id
     */
    private Integer idContact;

    /**
     * 通话次数总和
     */
    private Integer callSum;

    /**
     * 通话时长总和
     */
    private Integer callDurationSum;

    public String getIdDateContact() {
        return idDateContact;
    }

    public void setIdDateContact(String idDateContact) {
        this.idDateContact = idDateContact;
    }

    public Integer getIdDateDimension() {
        return idDateDimension;
    }

    public void setIdDateDimension(Integer idDateDimension) {
        this.idDateDimension = idDateDimension;
    }

    public Integer getIdContact() {
        return idContact;
    }

    public void setIdContact(Integer idContact) {
        this.idContact = idContact;
    }

    public Integer getCallSum() {
        return callSum;
    }

    public void setCallSum(Integer callSum) {
        this.callSum = callSum;
    }

    public Integer getCallDurationSum() {
        return callDurationSum;
    }

    public void setCallDurationSum(Integer callDurationSum) {
        this.callDurationSum = callDurationSum;
    }
}

Contacts

tb_contacts对应的实体。

package cn.javayuli.api.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

/**
 * @author hanguilin
 *
 * 联系人信息
 */
@TableName("tb_contacts")
public class Contacts {

    /**
     * 自增id
     */
    @TableId(type = IdType.AUTO)
    private Integer id;

    /**
     * 号码
     */
    private String telephone;

    /**
     * 用户名
     */
    private String name;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getTelephone() {
        return telephone;
    }

    public void setTelephone(String telephone) {
        this.telephone = telephone;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

DimensionDate

tb_dimension_date对应的实体。

package cn.javayuli.api.entity;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;

import java.util.stream.Collectors;

/**
 * @author hanguilin
 *
 * 时间维度
 */
@TableName("tb_dimension_date")
public class DimensionDate {

    /**
     * 自增id
     */
    @TableId(type = IdType.AUTO)
    private Integer id;

    /**
     * 年
     */
    private String year;

    /**
     * 月
     */
    private String month;

    /**
     * 日
     */
    private String day;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getYear() {
        return year;
    }

    public void setYear(String year) {
        this.year = year;
    }

    public String getMonth() {
        return month;
    }

    public void setMonth(String month) {
        this.month = month;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    /**
     * 获取格式化的日期
     *
     * @return
     */
    public String getFormatDate() {
        if (month.length() == 1) {
            month = "0" + month;
        }
        if (day.length() == 1) {
            day = "0" + day;
        }
       return CollUtil.newArrayList(year, month, day).stream().filter(StrUtil::isNotEmpty).collect(Collectors.joining("-"));
    }
}

CallDao

Call实体对应的dao层接口。

package cn.javayuli.api.dao;

import cn.javayuli.api.entity.Call;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
 * @author hanguilin
 *
 * 通话信息
 */
public interface CallDao extends BaseMapper<Call> {
}

ContactsDao

Contacts实体对应的dao层接口。

package cn.javayuli.api.dao;

import cn.javayuli.api.entity.Contacts;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
 * @author hanguilin
 *
 * 联系人信息
 */
public interface ContactsDao extends BaseMapper<Contacts> {
}

DimensionDateDao

DimensionDate实体对应的dao层接口。

package cn.javayuli.api.dao;

import cn.javayuli.api.entity.DimensionDate;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
 * @author hanguilin
 *
 * 时间维度
 */
public interface DimensionDateDao extends BaseMapper<DimensionDate> {
}

CallService

Call实体对应的业务接口类。

package cn.javayuli.api.service;

import cn.javayuli.api.entity.Call;
import com.baomidou.mybatisplus.extension.service.IService;

/**
 * @author hanguilin
 *
 * 通话信息
 */
public interface CallService extends IService<Call> {
}

ContactsService

Contacts实体对应的业务接口类。

package cn.javayuli.api.service;

import cn.javayuli.api.entity.Contacts;
import com.baomidou.mybatisplus.extension.service.IService;

/**
 * @author hanguilin
 *
 * 联系人信息
 */
public interface ContactsService extends IService<Contacts> {
}

DimensionDateService

DimensionDate实体对应的业务接口类。

package cn.javayuli.api.service;

import cn.javayuli.api.entity.DimensionDate;
import com.baomidou.mybatisplus.extension.service.IService;

/**
 * @author hanguilin
 *
 * 时间维度
 */
public interface DimensionDateService extends IService<DimensionDate> {
}

CallServiceImpl

Call实体对应的业务接口实现类。

package cn.javayuli.api.service.impl;

import cn.javayuli.api.dao.CallDao;
import cn.javayuli.api.entity.Call;
import cn.javayuli.api.service.CallService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;

/**
 * @author hanguilin
 *
 * 通话信息
 */
@Service
public class CallServiceImpl extends ServiceImpl<CallDao, Call> implements CallService {
}

ContactsServiceImpl

Contacts实体对应的业务接口实现类。

package cn.javayuli.api.service.impl;

import cn.javayuli.api.dao.ContactsDao;
import cn.javayuli.api.entity.Contacts;
import cn.javayuli.api.service.ContactsService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;

/**
 * @author hanguilin
 *
 * 联系人信息
 */
@Service
public class ContactsServiceImpl extends ServiceImpl<ContactsDao, Contacts> implements ContactsService {
}

DimensionDateServiceImpl

DimensionDate实体对应的业务接口实现类。

package cn.javayuli.api.service.impl;

import cn.javayuli.api.dao.DimensionDateDao;
import cn.javayuli.api.entity.DimensionDate;
import cn.javayuli.api.service.DimensionDateService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;

/**
 * @author hanguilin
 *
 * 时间维度
 */
@Service
public class DimensionDateServiceImpl extends ServiceImpl<DimensionDateDao, DimensionDate> implements DimensionDateService {
}

EchartsController

接口类。

package cn.javayuli.api.controller;

import cn.javayuli.api.entity.Call;
import cn.javayuli.api.entity.Contacts;
import cn.javayuli.api.entity.DimensionDate;
import cn.javayuli.api.service.CallService;
import cn.javayuli.api.service.ContactsService;
import cn.javayuli.api.service.DimensionDateService;
import cn.javayuli.api.vo.DataOut;
import cn.javayuli.api.vo.DataOutWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author hanguilin
 *
 * 控制器
 */
@RestController
@RequestMapping("/web")
public class EchartsController {

    @Autowired
    private DimensionDateService dimensionDateService;

    @Autowired
    private CallService callService;

    @Autowired
    private ContactsService contactsService;

    /**
     * 根据用户和日期维度查询数据
     *
     * @param type 类型
     * @param phone 用户号码
     * @return
     */
    @GetMapping("/data/{type}/{phone}")
    public DataOutWrapper getData(@PathVariable String type, @PathVariable String phone) {
        Contacts contacts = contactsService.getOne(Wrappers.lambdaQuery(Contacts.class).eq(Contacts::getTelephone, phone));
        if (contacts == null) {
            return null;
        }
        List<DimensionDate> dimensionDates;
        // 根据type去查询dimensionDate表中的数据
        switch (type) {
            case "year":
                dimensionDates = dimensionDateService.list(Wrappers.lambdaQuery(DimensionDate.class).eq(DimensionDate::getMonth, "").eq(DimensionDate::getDay, ""));
                break;
            case "month":
                dimensionDates = dimensionDateService.list(Wrappers.lambdaQuery(DimensionDate.class).ne(DimensionDate::getMonth, "").eq(DimensionDate::getDay, ""));
                break;
            case "day":
                dimensionDates = dimensionDateService.list(Wrappers.lambdaQuery(DimensionDate.class).ne(DimensionDate::getMonth, "").ne(DimensionDate::getDay, ""));
                break;
            default:
                throw new RuntimeException("no type to query");
        }
        // 将DimensionDate数据转为key为DimensionDate记录的id,value为DimensionDate的map
        Map<Integer, DimensionDate> dimensionDateMap = dimensionDates.stream().collect(Collectors.toMap(DimensionDate::getId, Function.identity()));
        Integer contactsId = contacts.getId();
        // 将DimensionDate的id拼接contactsId
        List<String> callIdList = dimensionDateMap.keySet().stream().map(o -> o + "_" + contactsId).collect(Collectors.toList());
        // 查找call数据
        List<Call> callList = callService.list(Wrappers.lambdaQuery(Call.class).in(Call::getIdDateContact, callIdList));
        List<DataOut> dataOuts = callList.stream().map(o -> {
            DimensionDate dimensionDate = dimensionDateMap.get(o.getIdDateDimension());
            DataOut dataOut = new DataOut();
            dataOut.setDate(dimensionDate.getFormatDate());
            dataOut.setCallSum(o.getCallSum());
            dataOut.setCallDurationSum(o.getCallDurationSum());
            return dataOut;
        }).sorted(Comparator.comparing(DataOut::getDate)).collect(Collectors.toList());
        return new DataOutWrapper(dataOuts, contacts.getName());
    }

    /**
     * 获取所有用户
     *
     * @return
     */
    @GetMapping("/data/contacts")
    public List<Contacts> getContacts(){
        return contactsService.list();
    }
}

WebApplication

SpringBoot项目启动类。

package cn.javayuli.api;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author hanguilin
 *
 * 启动类
 */
@SpringBootApplication
@MapperScan({"cn.javayuli.api.dao"})
public class WebApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebApplication.class, args);
    }
}

Web界面服务

实现简单的查询和报表功能。使用Vue+ElementUI+Echarts+Axios。

index.html

<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>电信客服</title>
    <!-- vue引入组件库 -->
    <script src="https://unpkg.com/vue/dist/vue.js"></script>
    <!-- axios引入组件库 -->
    <script src="https://unpkg.com/axios/dist/axios.min.js"></script>
    <!-- elementui引入样式 -->
    <link rel="stylesheet" href="https://unpkg.com/element-ui/lib/theme-chalk/index.css">
    <!-- elementui引入组件库 -->
    <script src="https://unpkg.com/element-ui/lib/index.js"></script>
    <!-- echarts引入组件库 -->
    <script src="https://cdn.bootcdn.net/ajax/libs/echarts/5.1.0/echarts.js"></script>
    <style>
        #my-echarts {
            margin-top: 10vh;
        }
        .el-select {
            width: 90%
        }
        #echarts,
        #echarts2 {
            margin: 20px 0;
            height: 300px;
        }
    </style>
</head>

<body>
    <div id="my-echarts">
        <el-row>
            <el-col :span="8" :offset=8>
                <el-form ref="form" :model="form">
                    <el-row>
                        <el-col :span="10">
                            <el-select v-model="form.type" placeholder="请选择时间维度">
                                <el-option v-for="item in optionsType" :key="item.value" :label="item.label"
                                    :value="item.value"></el-option>
                            </el-select>
                        </el-col>
                        <el-col :span="10">
                            <el-select v-model="form.phone" filterable placeholder="请选择查询用户">
                                <el-option v-for="item in optionsContacts" :key="item.value" :label="item.label"
                                    :value="item.value"></el-option>
                            </el-select>
                        </el-col>
                        <el-col :span="4">
                            <el-button type="primary" icon="el-icon-search" @click="request">搜索</el-button>
                        </el-col>
                    </el-row>
                    <el-row>
                        <el-col :span=24>
                            <div id="echarts"></div>
                            <div id="echarts2"></div>
                        </el-col>
                    </el-row>
                </el-form>
            </el-col>
        </el-row>
    </div>

    <script>
        new Vue({
            el: '#my-echarts',
            data() {
                return {
                    form: {
                        type: 'month',
                        phone: '18944239644'
                    },
                    optionsContacts: [],
                    optionsType: [{
                        label: '年',
                        value: 'year'
                    }, {
                        label: '月',
                        value: 'month'
                    }, {
                        label: '日',
                        value: 'day'
                    }]
                }
            },
            mounted() {
                this.request()
                this.requestContacts()
            },
            methods: {
                initEcharts(id, xData, data, title, name) {
                    var chartDom = document.getElementById(id);
                    var myChart = echarts.init(chartDom);
                    var option;

                    option = {
                        title: {
                            text: title,
                            left: "left",
                            textStyle: {
                                fontSize: 20
                            }
                        },
                        dataZoom: [
                            {
                                id: 'dataZoomX',
                                type: 'slider',
                                xAxisIndex: [0],
                                filterMode: 'filter'
                            }
                        ],
                        xAxis: {
                            type: 'category',
                            data: xData
                        },
                        yAxis: {
                            name: name,
                            type: 'value'
                        },
                        tooltip: {
                            show: true,
                            trigger: 'axis'
                        },
                        series: [{
                            data: data,
                            type: 'line',
                            smooth: true,
                            name: name
                        }]
                    };

                    option && myChart.setOption(option);
                },
                request() {
                    axios.get(`http://localhost:8000/web/data/${this.form.type}/${this.form.phone}`).then(({ data }) => {
                        let dateArr = data.data.map(o => o.date)
                        let callSumArr = data.data.map(o => o.callSum)
                        let callSumDurationArr = data.data.map(o => o.callDurationSum)
                        this.initEcharts('echarts', dateArr, callSumArr, '通话次数统计', '次数')
                        this.initEcharts('echarts2', dateArr, callSumDurationArr, '通话时长统计', '秒数')
                    })
                },
                requestContacts() {
                    axios.get(`http://localhost:8000/web/data/contacts`).then(({ data }) => {
                        this.optionsContacts = data.map(e => {
                            return {
                                label: e.telephone + ' ' + e.name,
                                value: e.telephone
                            }
                        })
                    })
                }
            }
        })
    </script>
</body>

</html>

项目启动/部署

在安装过程中,我们用到的框架(Hadoop、Zookeeper、kafka、Flume、HBase)都已经启动起来了。

由于本例中Flume监听的是本地文件系统中的日志文件,所以生产日志服务需要和Flume在同一个服务器。

协处理器

1、将tcs-consumer-coprocessor项目打成jar包

2、将tcs-consumer-coprocessor-1.0-jar-with-dependencies.jar上传至centos服务器中, 并重命名

mv tcs-consumer-coprocessor-1.0-jar-with-dependencies.jar tcs-consumer-coprocessor-1.0.jar

3、创建hdfs中菜单

hdfs dfs -mkdir -p /user/hadoop/hbase/coprocessor

4、将jar包放入hdfs文件系统中

hdfs dfs -put tcs-consumer-coprocessor-1.0.jar /user/hadoop/hbase/coprocessor

生产日志

1、将tcs-producer项目打成jar包

2、将tcs-producer-1.0-jar-with-dependencies.jar上传至centos服务器中

3、运行程序

java -cp tcs-producer-1.0-jar-with-dependencies.jar 'cn.javayuli.producer.ProduceLog'

由于程序中是死循环,所以控制台会源源不断的打印日志。

运行此程序后,Flume通过监听日志文件就可以将日志发送到Kafka。

数据存储

tcs-consumer可以直接在本地运行,直接运行main函数即可。

或者打成jar包上传至服务器,然后使用java -cp命令运行即可。

运行此程序后,会将Kafka中的消息经过处理后存入HBase。

此处向HBase插入数据容易出错,分享两篇对我很有帮助的文章:

1、HBase2.X的修复工具hbck2《Hbase HBCK2》

2、HBase中的Shell命令 《HBase Shell命令大全》

数据分析

tcs-analysis可以在本地windows中运行,也可以打包到centos中使用java -cp运行。本例为了方便调试,所以直接在widows的idea中运行。在windwos中运行MapReduce程序,需要一些配置:

1、将hadoop-2.10.1.tar.gz下载到windows中并解压

2、下载winutils,解压后如图:

本例中hadoop版本为2.10.1,此处winutils中对应的最近的只有2.8.1版本的插件,但是经测试无问题。

将hadoop-2.8.1中的winutils.exehadoop.dll文件复制到hadoop-2.10.1/bin中。

3、配置环境变量

新增HADOOP_HOME,值为Hadoop解压的文件夹,即hadoop-2.10.1文件夹,并在path中添加%HADOOP_HOME%\bin

重启你的编辑器(IDEA/Eclipse/***),并运行AnalysisApplication类中的main函数,此时就会将Hadoop中的数据经过MapReduce计算后存入到Mysql中。

Web接口服务

tcs-api服务可以直接在编辑器中运行WebApplication类来启动服务,或者打成jar包之后使用java -jar命令来运行。

功能浏览

浏览器tcs-web/index.html打开。可以通过时间维度、用户维度查询通话次数统计及通话时长统计。

资源地址

GitHub:https://github.com/hanguilin/big-data/tree/main/TelecomCustomerService

Gitee:https://gitee.com/hanguilin/big-data/tree/main/TelecomCustomerService

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐