1、项目介绍
某互联网公司面向APP应用开发者提供的,以品牌互推、流量互导、广告分账为主的一站式服务平台。

将通话记录数据由原来的oracle系统改造成使用大数据架构解决方案。主要使用hbase做通话数据的存储方案。需要将原有oracle数据导入到hbase中,以及新生成数据通过flume收集到kafka,再通过消费者存储到hbase数据库。

hadoop+hbase+flume+zookeeper实现电信级海量通话日志数据的存储,随机访问与实时读写。通过hash技术对rowkey进行分析处理,解决hbase的热点问题,协同coprocessor,解决系统的高吞吐量和查询负载问题以及如何避免中间结果导致通知风暴或死递归问题。

1、某电信公司使用hadoop+hbase+flume+kafka + zookeeper实现海量通话日志的存储、随机访问与实时读写功能。
2、系统中采用hbase实现bigtable技术,通过盐析rowkey结合表区域的预切割,实现数据在集群上的均衡负载。
3、通过row级bloomfilter与coprocessor协同,实现通话记录中主被叫查询的透明处理与高速响应。
4、系统采用spark SQL+hive+hbase对通话数据进行准实时聚合分析处理。
5、Spark集群通过thriftServer服务器部署层分布式查询引擎,同前端web项目进行透明化整合。

2、业务介绍

参与项目的改造设计
按照文档要求,实现hbase协处理器处理,以及rowkey的盐析处理
按照部分统计分析设计要求,实现业务代码,例如按季度、年份、月份实现通话数据量的查询分析统计
数据可视化

3、功能实现
这里写图片描述

3.1 日志收集及存储

通话日志生成(模拟)程序
cn.ctgu.callloggen.App.java

package cn.ctgu.callloggen;

import cn.ctgu.callloggen.udp.HeartBeatThread;

import java.io.FileWriter;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;

/*
* 该函数主要用于模拟生成通信日志
*
* */

public class App {
    static Random r=new Random();

    public static List<String>phoneNumbers=new ArrayList<String>();
    public static Map<String,String> callers =new HashMap<String,String>();
    static{
        callers.put("15810092493", "史玉龙");
        callers.put("18000696806", "赵贺彪");
        callers.put("15151889601", "张倩 ");
        callers.put("13269361119", "王世昌");
        callers.put("15032293356", "张涛");
        callers.put("17731088562", "张阳");
        callers.put("15338595369", "李进全");
        callers.put("15733218050", "杜泽文");
        callers.put("15614201525", "任宗阳");
        callers.put("15778423030", "梁鹏");
        callers.put("18641241020", "郭美彤");
        callers.put("15732648446", "刘飞飞");
        callers.put("13341109505", "段光星");
        callers.put("13560190665", "唐会华");
        callers.put("18301589432", "杨力谋");
        callers.put("13520404983", "温海英");
        callers.put("18332562075", "朱尚宽");
        callers.put("18620192711", "刘能宗");
        phoneNumbers.addAll(callers.keySet());
    }

    public static void main(String[] args) throws Exception {
        if(args==null || args.length==0){
            System.out.println("no args");
            System.exit(-1);
        }
        genCallLog(args[0]);
     //   new HeartBeatThread().start();
    }
    public static void genCallLog(String logFile) throws Exception {
        //创建通话日志文件
//        FileWriter fw=new FileWriter("J:\\Program\\java\\CallLogSystem\\calllog.log",true);
        FileWriter fw=new FileWriter(logFile,true);

        while(true){
            //取主叫号码
            String caller=phoneNumbers.get(r.nextInt(callers.size()));
            //主叫名字
            String callerName=callers.get(caller);

            //取被叫
            String callee=null;
            String calleeName=null;
            while(true){
                callee=phoneNumbers.get(r.nextInt(callers.size()));
                if(!callee.equals(caller)){
                    break;
                }
            }
            calleeName=callers.get(callee);
            //通话时长
            int duration=r.nextInt(PropertiesUtil.getInt("call.duration.max"))+1;
            //格式化时长(避免在hbase中出现序号为8的出现在189前面,hbase存储的是字节)
            DecimalFormat df=new DecimalFormat();
            df.applyPattern(PropertiesUtil.getString("call.duration.format"));
            String durStr=df.format(duration);


            //通话时间
            int year=PropertiesUtil.getInt("call.year");
            //月份(0~11)
            int month=r.nextInt(12);
            //天(1~31)
            int day=r.nextInt(29)+1;
            int hour=r.nextInt(24);
            int min=r.nextInt(60);
            int sec=r.nextInt(60);

            //标准化时间
            Calendar c=Calendar.getInstance();
            c.set(Calendar.YEAR,year);
            c.set(Calendar.MONTH,month);
            c.set(Calendar.DAY_OF_MONTH,day);
            c.set(Calendar.HOUR_OF_DAY,hour);
            c.set(Calendar.MINUTE,min);
            c.set(Calendar.SECOND,sec);
            Date date=c.getTime();

            //如果时间超过今天就重新取时间
            Date now=new Date();
            if(date.compareTo(now)>0){
                continue;
            }

            SimpleDateFormat dfs=new SimpleDateFormat();
            dfs.applyPattern(PropertiesUtil.getString("call.time.format"));
            //通话时间
            String dateStr=dfs.format(date);


            // String log=caller+","+callerName+","+callee+","+calleeName+","+dateStr+","+duration;
            String log=caller+","+callee+","+dateStr+","+durStr;
            System.out.println(log);
            //将通话日志写入文件
            fw.write(log+"\r\n");
            fw.flush();
            Thread.sleep(PropertiesUtil.getInt("gen.data.interval.ms"));
        }
    }

}

PropertiesUtil.java

package cn.ctgu.callloggen;

import java.io.InputStream;
import java.util.Properties;

/**
 *
 */
public class PropertiesUtil {

    static Properties prop ;
    static{
        try {
            InputStream in = ClassLoader.getSystemResourceAsStream("gendata.conf");
            prop = new Properties();
            prop.load(in);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static String getProp(String key){
        return prop.getProperty(key) ;
    }

    public static String getString(String key){
        return prop.getProperty(key) ;
    }

    public static int getInt(String key){
        return Integer.parseInt(prop.getProperty(key)) ;
    }
}

gendata.conf

log.file=/home/hadoop/callLog/callLog.log
call.duration.max=600
call.duration.format=000
call.year=2018
call.time.format=yyyy/MM/dd HH:mm:ss
gen.data.interval.ms=2000

flume的日志收集配置文件(将日志存储在kafka中)
calllog.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1


a1.sources.r1.type=exec
a1.sources.r1.command=tail -c +0 -F /home/hadoop/calllog/calllog-gen-data/callLog.log

a1.channels.c1.type=memory

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=calllog
a1.sinks.k1.kafka.bootstrap.servers=s2:9092 s3:9092 s4:9092
a1.sinks.k1.kafka.flumeBatchSize=20
a1.sinks.k1.kafka.producer.acks=1

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

Kafka将数据通过消费者存储到hbase数据库

HbaseConsumer.java

package cn.ctgu.calllog.consumer;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;


import java.util.HashMap;
import java.util.List;
import java.util.Map;


/*
*
* Hbase消费者,从kafka提取数据,存储到hbase中
*
* */
public class HbaseConsumer {
    public static void main(String[] args) throws Exception {

        HbaseDao dao=new HbaseDao();
        //创建消费者配置对象
        ConsumerConfig config=new ConsumerConfig(PropertiesUtil.props);

        //获得主题
        String topic=PropertiesUtil.getProp("topic");
        //
        Map<String,Integer>map=new HashMap<String, Integer>();
        map.put(topic,new Integer(1));
        Map<String,List<KafkaStream<byte[],byte[]>>>msgs= Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props)).createMessageStreams(map);

        List<KafkaStream<byte[],byte[]>>msgList=msgs.get(topic);

        String msg=null;
        for (KafkaStream<byte[],byte[]>stream:msgList){
            ConsumerIterator<byte[],byte[]>it=stream.iterator();
            while (it.hasNext()){
                byte[]message=it.next().message();
                //取得kafka的消息
                msg=new String(message);
                //写入hbase中
                dao.put(msg);
            }
        }

    }
}

HbaseDao.java

package cn.ctgu.calllog.consumer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;


import java.text.DecimalFormat;

/*
*
* hbase数据访问对象
* */
public class HbaseDao {
    private DecimalFormat df=new DecimalFormat();
    private Table table=null;
    private int partitions;
    private String flag;
    public HbaseDao(){
        try {
            Configuration conf= HBaseConfiguration.create();
            Connection conn= ConnectionFactory.createConnection(conf);
            TableName name=TableName.valueOf(PropertiesUtil.getProp("table.name"));
            table=conn.getTable(name);
            df.applyPattern(PropertiesUtil.getProp("hashcode.pattern"));
            partitions=Integer.parseInt(PropertiesUtil.getProp("partition.number"));
            flag=PropertiesUtil.getProp("caller.flag");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /*
    *
    * put数据到hbase
    * */
    public void put(String log){
        if(log==null || log.equals("")){
            return;
        }
        try {
            //解析日志
            String[]arr=log.split(",");

            if(arr!=null && arr.length==4){
                String caller=arr[0];
                String callee=arr[1];
                String callTime=arr[2];
                callTime=callTime.replace("/","");//删除/
                callTime=callTime.replace(" ","");//删除空格
                callTime=callTime.replace(":","");//

                String callDuration=arr[3];
                //构造put对象
                String rowkey=genRowKey(getHashcode(caller,callTime),caller,callTime,flag,callee,callDuration);
                //
                Put put=new Put(Bytes.toBytes(rowkey));
                put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("caller"),Bytes.toBytes(caller));
                put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("callee"),Bytes.toBytes(callee));
                put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("callTime"),Bytes.toBytes(callTime));
                put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("callDuration"),Bytes.toBytes(callDuration));
                table.put(put);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    public String getHashcode(String caller,String callTime){
        int len=caller.length();
        //取出后四位电话号码
        String last4code=caller.substring(len-4);
        //取出时间单位,年份和月份(取前不取后)
        String mon=callTime.substring(0,6);
        //
        int hashcode=(Integer.parseInt(mon)^Integer.parseInt(last4code))%partitions;
        return df.format(hashcode);
    }
    //生成rowkey
    public String genRowKey(String hash,String caller,String time,String flag,String callee,String duration){
        return hash+","+caller+","+time+","+flag+","+callee+","+duration;
    }
}

PropertiesUtil.java

package cn.ctgu.calllog.consumer;

import java.io.InputStream;
import java.util.Properties;

public class PropertiesUtil {
    public static Properties props;
    static {
        try {
            //加载外部属性文件
            InputStream in=ClassLoader.getSystemResourceAsStream("kafka.properties");
            props=new Properties();
            props.load(in);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    public static String getProp(String key){
        return props.getProperty(key);
    }
}

kafka.properties

zookeeper.connect=s1:2181,s2:2181,s3:2181
group.id=g4
zookeeper.session.timeout.ms=500
zookeeper.sync.time.ms=250
auto.commit.interval.ms=1000
auto.offset.reset=smallest
#主题
topic=calllog
#表名
table.name=ns1:calllogs
#分区数
partition.number=100
#主叫标记
caller.flag=0
#hash区域的模式
hashcode.pattern=00

hbase协处理器功能模块

1、postPut辅助前端查询,在执行插入数据到hbase中后,协处理器将被叫记录也存储到hbase中的f2列,方便前端需要查询。
2、postGetOp辅助前端查询,在执行查询操作后,返回主叫结果,同时将主叫对应的被叫号码信息也返回。
3、postScannerNext执行扫描操作后,将f1列的被叫号码记录的主叫号码信息放入到f1列中。

CallLogRegionObserver.java

package cn.ctgu.calllog.coprossor;


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/*
*
* 协处理器
*
* */
public class CallLogRegionObserver extends BaseRegionObserver {
    //被叫引用id
    private static final String REF_ROW_ID = "refrowid" ;
    //通话记录表名
    private static final String CALL_LOGTABLE_NAME="ns1:calllogs";
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        super.postPut(e,put,edit,durability);
        //
        String tableName0= TableName.valueOf(CALL_LOGTABLE_NAME).getNameAsString();

        //得到当前的Tablename对象
        String tableName1=e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();

        //判断是否是ns1:calllogs表
        if(!tableName0.equals(tableName1)){
            return;
        }
        //得到主叫的rowkey
        String rowkey= Bytes.toString(put.getRow());
        //如果被叫就放行
        String[]arr=rowkey.split(",");
        if(arr[3].equals("1")){
            return;
        }
        //
        String caller=arr[1];   //主叫
        String callTime=arr[2]; //通话时间
        String callee=arr[4];   //被叫
        String callDuration=arr[5]; //通话时长

        //被叫hashcode(同一个号码存在主叫和被叫,这个位置就是取出记录中被叫号码信息存在主叫号码的第二列)
        String hashcode=CallLogUtil.getHashcode(callTime,callee,100);
        //被叫rowkey
        String calleeRowKey=hashcode+","+callee+"," + callTime + ",1," + caller + "," + callDuration;
        Put newPut=new Put(Bytes.toBytes(calleeRowKey));
        newPut.addColumn(Bytes.toBytes("f2"),Bytes.toBytes(REF_ROW_ID),Bytes.toBytes(rowkey));
        //将被叫插入到f2列(主叫在f1列)
        TableName tn=TableName.valueOf(CALL_LOGTABLE_NAME);
        Table t=e.getEnvironment().getTable(tn);
        t.put(newPut);


    }

    /*
    *
    * 重写方法,完成被叫查询,返回主叫结果
    * */
    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
        //获得表名
        String tableName=e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
        //判断表名是否是ns1:calllogs
        if(!tableName.equals("ns1:calllogs")){
            super.postGetOp(e,get,results);
        }
        else {
            //得到rowkey
            String rowkey=Bytes.toString(get.getRow());
            //
            String[] arr=rowkey.split(",");
            //主叫
            if(arr[3].equals("0")){
                super.postGetOp(e,get,results);
            }
            //被叫
            else{
                //得到主叫方的rowkey
                String refrowid=Bytes.toString(CellUtil.cloneValue(results.get(0)));
                //
                Table tt=e.getEnvironment().getTable(TableName.valueOf("ns1:calllogs"));
                Get g=new Get(Bytes.toBytes(refrowid));
                Result r=tt.get(g);
                List<Cell>newList=r.listCells();
                results.clear();
                results.addAll(newList);
            }
        }
    }
    /*
    *
    * 扫描后的操作
    * 扫描后协处理器将被叫存储在f1中
    *
    * */

    @Override
    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
        boolean b=super.postScannerNext(e,s,results,limit,hasMore);
        //新集合
        List<Result>newList=new ArrayList<Result>();
        //获得表名
        String tableName=e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
        //判断表名是否是ns1:calllogs
        if (tableName.equals(CALL_LOGTABLE_NAME)){
            Table tt=e.getEnvironment().getTable(TableName.valueOf(CALL_LOGTABLE_NAME));
            for (Result r:results){
                //rowkey
                String rowkey=Bytes.toString(r.getRow());
                String flag=rowkey.split(",")[3];
                //主叫
                if (flag.equals("0")){
                    newList.add(r);
                }
                //被叫
                else{
                    //取出主叫号码
                    byte[]refrowkey=r.getValue(Bytes.toBytes("f2"),Bytes.toBytes(REF_ROW_ID));
                    Get newGet=new Get(refrowkey);
                    newList.add(tt.get(newGet));
                }

            }
            results.clear();
            results.addAll(newList);
        }
        return b;
    }
}

CallLogUtil.java

package cn.ctgu.calllog.coprossor;

import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;

/**
 * Created by Administrator on 2017/4/13.
 */
public class CallLogUtil {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
    private static SimpleDateFormat sdfFriend = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

    //格式化
    private static DecimalFormat df = new DecimalFormat();

    /**
     * 获取hash值,默认分区数100
     */
    public static String getHashcode(String caller, String callTime,int partitions) {
        int len = caller.length();
        //取出后四位电话号码
        String last4Code = caller.substring(len - 4);
        //取出时间单位,年份和月份.
        String mon = callTime.substring(0, 6);
        //
        int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions;
        return df.format(hashcode);
    }

    /**
     * 起始时间
     */
    public static String getStartRowkey(String caller, String startTime, int partitions){
        String hashcode = getHashcode(caller, startTime,partitions);
        return hashcode + "," + caller + "," + startTime ;
    }

    /**
     * 结束时间
     */
    public static String getStopRowkey(String caller, String startTime,String endTime, int partitions){
        String hashcode = getHashcode(caller, startTime,partitions);
        return hashcode + "," + caller + "," + endTime ;
    }

    /**
     * 对时间进行格式化
     */
    public static String formatDate(String timeStr){
        try {
            return sdfFriend.format(sdf.parse(timeStr));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null ;
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ctgu</groupId>
    <artifactId>CallLogCoproessorModule</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.3</version>
        </dependency>
    </dependencies>

</project>

3.2 业务代码实现

3.2.1 查询通话记录

给予起始和结束时间查询某个电话号码的通话记录
findCalllog.jsp

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
    <title>查询通话记录</title>
    <link rel="stylesheet" type="text/css" href="../css/my.css">
</head>
<body>
<form action='<c:url value="/callLog/findCallLog" />' method="post">
    <table>
        <tr>
            <td>电话号码 :</td>
            <td><input type="text" name="caller"></td>
        </tr>
        <tr>
            <td>起始时间 :</td>
            <td><input type="text" name="startTime"></td>
        </tr>
        <tr>
            <td>结束时间:</td>
            <td><input type="text" name="endTime"></td>
        </tr>
        <tr>
            <td colspan="2">
                <input type="submit" value="查询"/>
            </td>
        </tr>
    </table>
</form>
</body>
</html>

查询所有的通话详单,每隔2秒刷新一次
callLogList.jsp

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
    <title>通话记录</title>
    <link rel="stylesheet" type="text/css" href="../css/my.css">
    <script type="text/javascript" src="../js/jquery-3.2.0.min.js"></script>
    <script type="text/javascript">

        //定义函数
        function refreshTable() {
        //    $("#btnCleanTable").click(function(){
                $("#t1 tbody").empty();/*清除表格里面的内容*/
                $.getJSON("/callLog/json/findAll",function (data) {/*动态取服务器其中的数据,也就是ajax*/
                    $.each(data,function (i,obj) {
                        var str="<tr><td>"+obj.caller+"</td>";
                        str=str+"<td>"+obj.callerName+"</td>";
                        str=str+"<td>"+obj.callee+"</td>";
                        str=str+"<td>"+obj.calleeName+"</td>";
                        str=str+"<td></td>";
                        str=str+"<td>"+obj.callTime+"</td>";
                        str=str+"<td>"+obj.callDuration+"</td>";
                        str=str+"</tr>";
                        $("#t1 tbody").append(str);/*将数据一行一行的插入表格中*/
                    });
                })
        //    });
        }

        $(function(){
            setInterval(refreshTable(),2000)/*每个1s调用一次上面的函数,也就是刷新一次表格的数据*/
        })
    </script>

</head>
<body>
<input id="btnCleanTable" type="button" value="清除表格"><br>
<table id="t1" border="1px" class="t-1" style="width: 800px">
    <thead>
        <tr>
            <td>电话1</td>
            <td>名称1</td>
            <td>电话2</td>
            <td>名称2</td>
            <td>主(被)叫</td>
            <td>通话时间</td>
            <td>通话时长</td>
        </tr>
    </thead>
    <tbody>
        <c:forEach items="${callLogs}" var="log">
            <tr>
                <td><c:out value="${log.caller}"/></td>
                <td><c:out value="${log.callerName}"/></td>
                <td><c:out value="${log.callee}"/></td>
                <td><c:out value="${log.calleeName}"/></td>
                <td>
                    <c:if test="${log.caller == param.caller}">主叫</c:if>
                    <c:if test="${log.caller != param.caller}">被叫</c:if>
                </td>
                <td><c:out value="${log.callTime}"/></td>
                <td><c:out value="${log.callDuration}"/></td>
            </tr>
        </c:forEach>
        <tr>
            <td colspan="7" style="text-align: right">
            </td>
        </tr>
    </tbody>
</table>
</body>
</html>

使用hive查询最近通话记录

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
    <title>查询最近通话记录</title>
    <link rel="stylesheet" type="text/css" href="../css/my.css">
</head>
<body>
<form action='<c:url value="/callLog/findLatestCallLog" />' method="post">
    <table>
        <tr>
            <td>电话号码 :</td>
            <td><input type="text" name="caller"></td>
        </tr>

        <tr>
            <td colspan="2">
                <input type="submit" value="查询"/>
            </td>
        </tr>
    </table>
</form>
</body>
</html>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
    <title>通话记录</title>
    <link rel="stylesheet" type="text/css" href="../css/my.css">
</head>
<body>
<c:if test="${log == null}">
    无记录!
</c:if>
<c:if test="${log != null}">
    <table id="t1" border="1px" class="t-1" style="width: 800px">
        <tr>
            <td>电话1</td>
            <td><c:out value="${log.caller}" /></td>
        </tr>
        <tr>
            <td>电话2</td>
            <td><c:out value="${log.callee}"/></td>
        </tr>
        <tr>
            <td>时间</td>
            <td><c:out value="${log.callTime}"/></td>
        </tr>
        <tr>
            <td>时长</td>
            <td><c:out value="${log.callDuration}"/></td>
        </tr>

    </table>
</c:if>

</body>
</html>

查询指定人员,指定月份的通话记录

<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<html>
<head>
    <title>通话记录统计结果</title>
    <link rel="stylesheet" type="text/css" href="../css/my.css">
    <script src="../js/jquery-3.2.0.min.js"></script>
    <script src="../js/echarts.js"></script>
    <script>
        $(function () {
            var myChart = echarts.init(document.getElementById('main'));
            var option = {
                title: {
                    text: '<c:out value="${title}" />'
                },
                tooltip: {},
                legend: {
                    data: ['通话次数']
                },
                xAxis: {
                    data: [<c:forEach items="${list}" var="e">'<c:out value="${e.yearMonth}"/>',</c:forEach>]
                },
                yAxis: {},
                series: [{
                    name: '通话次数',
                    type: 'bar',
                    data: [<c:forEach items="${list}" var="e"><c:out value="${e.count}"/>, </c:forEach>]
                }]
            };
            myChart.setOption(option);
        })
    </script>
</head>
<body>
    <form action='<c:url value="/callLog/statCallLog" />' method="post">
        电话号码 : <input type="text" name="caller"><br>
        年 份:  <input type="text" name="year"><br>
        <input type="submit" name="查询">
    </form>

    <div id="main" style="border:1px solid blue;width:600px;height:400px;">
    </div>
</body>
</html>

控制层代码实现

package cn.ctgu.ssm.web.controller;

import cn.ctgu.ssm.domain.CallLog;
import cn.ctgu.ssm.domain.CallLogRange;
import cn.ctgu.ssm.hive.CallLogStat;
import cn.ctgu.ssm.service.CallLogService;
import cn.ctgu.ssm.service.PersonService;
import cn.ctgu.ssm.service.hive.HiveCallLogService;
import cn.ctgu.ssm.util.CallLogUtil;
import com.alibaba.fastjson.JSON;
import com.sun.javafx.sg.prism.NGShape;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by Administrator on 2017/4/11.
 */
@Controller
public class CallLogController {

    @Resource(name="personService")
    private PersonService ps ;

    @Resource(name="HiveCallLogService")
    private HiveCallLogService hcs;

    @Resource(name="callLogService")
    private CallLogService cs ;

    @RequestMapping("/callLog/findAll")
    public String findAll(Model m){
        List<CallLog> list = cs.findAll();
        m.addAttribute("callLogs",list);
        return "callLog/callLogList" ;
    }
    /**
     * 进入查询通话记录的页面,form
     */
    @RequestMapping("/callLog/toFindCallLogPage")
    public String toFindCallLogPage(){
        return "callLog/findCallLog" ;
    }

    @RequestMapping(value = "/callLog/findCallLog",method = RequestMethod.POST)
    public String findCallLog(Model m , @RequestParam("caller") String caller, @RequestParam("startTime") String startTime, @RequestParam("endTime") String endTime) throws ParseException {
        List<CallLogRange> list = CallLogUtil.getCallLogRanges(startTime, endTime);
        List<CallLog> logs = cs.findCallogs(caller,list);
        m.addAttribute("callLogs", logs);
        return "callLog/callLogList" ;
    }

    /*
    *
    * 向客户端返回json格式数据
    * */
    @RequestMapping(value="/callLog/json/findAll")
    public String findAllJson(HttpServletResponse response){

        List<CallLog>list=cs.findAll();
        String json= JSON.toJSONString(list);
        //内容类型
        response.setContentType("application/json");
        //通知浏览器接收到数据类型字符集
        response.setCharacterEncoding("utf-8");
        try {
            OutputStream out=response.getOutputStream();
            //json文件采用编码的字符集
            out.write(json.getBytes("utf-8"));
            out.flush();
            out.close();
        }catch (IOException e){
            e.printStackTrace();
        }
        return null;

    }


    /**
     * 进入查询通话记录的页面,form
     */
    @RequestMapping(value="/callLog/toFindLatestCallLog")
    public String toFindLatestCallLog(){
        return "callLog/findLatestCallLog" ;
    }
    /*
    * 查询最近的通话记录
    * */
    @RequestMapping(value = "/callLog/findLatestCallLog",method = RequestMethod.POST)
    public String findLatestCallLog(Model m , @RequestParam("caller") String caller) throws ParseException {
        CallLog log=hcs.findLatestCallLog(caller);
        if (log!=null) {
            m.addAttribute("log", log);
        }
        return "callLog/latestCallLog" ;
    }
    /*
    *
    * 统计指定人员,指定月份的通话次数
    *
    * */
    @RequestMapping("/callLog/toStatCallLog")
    public String toStatCallLog(){
        return "callLog/statCallLog";
    }

    @RequestMapping("/callLog/statCallLog")
    public String statCallLog(Model m,@RequestParam("caller")String caller, @RequestParam("year")String year) {
        List<CallLogStat> list = hcs.statCallLogCount(caller, year);
        if (list != null && !list.isEmpty()) {
            List<String> months = new ArrayList<String>();
            List<Integer> counts = new ArrayList<Integer>();
            for (CallLogStat cls : list) {
                months.add(cls.getYearMonth());
                counts.add(cls.getCount());
            }
            m.addAttribute("title", caller + "在" + year + "年各月份通话次数统计");
            m.addAttribute("list", list);
        }
        return "callLog/statCallLog";
    }
}

工具类代码
CallLogUtil.java

package cn.ctgu.ssm.util;

import cn.ctgu.ssm.domain.CallLogRange;

import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;

public class CallLogUtil {
    private static SimpleDateFormat sdf=new SimpleDateFormat("yyyyMMddHHmmss");
    private static SimpleDateFormat sdfFriend=new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    //格式化
    private static DecimalFormat df=new DecimalFormat();
    //获取hash值,默认分区数为100
    public static String getHashcode(String caller, String callTime, int partitions){
        int len=caller.length();
        //取出后四位电话号码
        String last4code=caller.substring(len-4);
        //取出时间单位,年份和月份
        String mon=callTime.substring(0,6);
        //计算hashcode
        int hashcode=(Integer.parseInt(mon)^Integer.parseInt(last4code))%partitions;
        return df.format(hashcode);
    }
    //开始时间rowkey
    public static String getStartRowKey(String caller, String startTime, int partitions){
        String hashcode=getHashcode(caller,startTime,partitions);
        return hashcode+","+caller+","+startTime;
    }
    //结束时间rowkey
    public static String getStopRowKey(String caller, String startTime,String endTime,int partitions){
        String hashcode=getHashcode(caller,startTime,partitions);
        return hashcode+","+caller+","+endTime;
    }
    /*
    *
    * 计算查询时间范围
    * */
    public static List<CallLogRange> getCallLogRanges(String startStr,String endStr) throws ParseException {
        try {
            //设置年月日的格式化方式
            SimpleDateFormat sdfYMD = new SimpleDateFormat("yyyyMMdd");
            //设置年月的格式化方式
            SimpleDateFormat sdfYM = new SimpleDateFormat("yyyyMM");
            //在前面加两个0
            DecimalFormat df00 = new DecimalFormat("00");

            //用来存储日志时间范围
            List<CallLogRange> list = new ArrayList<CallLogRange>();
            //字符串时间,取出年份和月份
           // String startStr = "20140203";
            String startPrefix = startStr.substring(0, 6);

          //  String endStr = "20160304";
            String endPrefix = endStr.substring(0, 6);
            //取出结束时间的日
            int endDay = Integer.parseInt(endStr.substring(6, 8));

            //结束点,之所以加1是因为包含开始不包含结束
            String endPoint = endPrefix + df00.format(endDay + 1);


            //日历对象
            Calendar c = Calendar.getInstance();

            //当结束时间和开始时间是同年月的时候
            if (startPrefix.equals(endPrefix)) {
                CallLogRange range = new CallLogRange();
                range.setStartPoint(startStr);//设置起始点

                range.setEndPoint(endPoint);
                list.add(range);
            } else {
                //1、起始月
                CallLogRange range = new CallLogRange();
                range.setStartPoint(startStr);

                //设置日历的时间对象
                c.setTime(sdfYMD.parse(startStr));
                //在设置了时间对象的基础上月份加1
                c.add(Calendar.MONTH, 1);
                range.setEndPoint(sdfYM.format(c.getTime()));
                list.add(range);

                //是否是最后一月
                while (true) {
                    //到了结束月份
                    if (endStr.startsWith(sdfYM.format(c.getTime()))) {
                        range = new CallLogRange();
                        range.setStartPoint(sdfYM.format(c.getTime()));
                        range.setEndPoint(endPoint);
                        list.add(range);
                        break;
                    } else {
                        range = new CallLogRange();
                        //起始时间
                        range.setStartPoint(sdfYM.format(c.getTime()));
                        //增加月份
                        c.add(Calendar.MONTH, 1);
                        range.setEndPoint(sdfYM.format(c.getTime()));
                        list.add(range);
                    }
                }
            }
            return list;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    /*
    *
    * 对时间进行格式化
    *
    * */
    public static String formatDate(String timeStr){
        try {
            return sdfFriend.format(sdf.parse(timeStr));
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
}

PropertiesUtil.java

package cn.ctgu.ssm.util;

import java.io.InputStream;
import java.util.Properties;

/**
 *
 */
public class PropertiesUtil {

    static Properties prop ;
    static{
        try {
            InputStream in = ClassLoader.getSystemResourceAsStream("gendata.conf");
            prop = new Properties();
            prop.load(in);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static String getProp(String key){
        return prop.getProperty(key) ;
    }

    public static String getString(String key){
        return prop.getProperty(key) ;
    }

    public static int getInt(String key){
        return Integer.parseInt(prop.getProperty(key)) ;
    }
}

Service层代码
CallLogService.java

package cn.ctgu.ssm.service;

import cn.ctgu.ssm.domain.CallLog;
import cn.ctgu.ssm.domain.CallLogRange;

import java.util.List;

/**
 *
 */
public interface CallLogService {
    public List<CallLog> findAll();

    /*
    *
    * 按照范围查询通话记录
    * */
    public List<CallLog> findCallogs(String caller,List<CallLogRange> list);
}

PersonService.java

package cn.ctgu.ssm.service;

import cn.ctgu.ssm.domain.Person;


public interface PersonService extends BaseService<Person>{
    public String selectNameByPhone(String phone);
}

CallLogServiceImpl.java

package cn.ctgu.ssm.service.impl;

import cn.ctgu.ssm.domain.CallLog;
import cn.ctgu.ssm.domain.CallLogRange;
import cn.ctgu.ssm.service.CallLogService;
import cn.ctgu.ssm.service.PersonService;
import cn.ctgu.ssm.util.CallLogUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * 呼叫日志
 */
@Service("callLogService")
public class CallLogServiceImpl implements CallLogService {

    @Resource(name="personServiceCache")
    private PersonService ps;

    private Table table ;
    public CallLogServiceImpl(){
        try {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            TableName name = TableName.valueOf("ns1:calllogs");
            table = conn.getTable(name);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *查询所有log
     */
    public List<CallLog> findAll() {
        List<CallLog> list = new ArrayList<CallLog>();
        try {
            Scan scan = new Scan();
            ResultScanner rs = table.getScanner(scan);
            Iterator<Result> it = rs.iterator();
            byte[] f = Bytes.toBytes("f1");

            byte[] caller = Bytes.toBytes("caller");
            byte[] callee = Bytes.toBytes("callee");
            byte[] callTime = Bytes.toBytes("callTime");
            byte[] callDuration = Bytes.toBytes("callDuration");
            CallLog log = null ;
            while(it.hasNext()){
                log = new CallLog();
                Result r = it.next();
                //设置用户名
                String callerStr=Bytes.toString(r.getValue(f, caller));
                log.setCaller(callerStr);
                log.setCallerName(ps.selectNameByPhone(callerStr));

                //设置用户名
                String calleeStr=Bytes.toString(r.getValue(f, callee));
                log.setCallee(calleeStr);
                log.setCalleeName(ps.selectNameByPhone(calleeStr));


                log.setCallTime(Bytes.toString(r.getValue(f, callTime)));
                log.setCallDuration(Bytes.toString(r.getValue(f, callDuration)));
                list.add(log);
            }
            return list ;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /*
    *
    * 按照范围查询通话记录
    * */
    public List<CallLog> findCallogs(String call,List<CallLogRange> ranges) {
        List<CallLog> logs = new ArrayList<CallLog>();
        try {
            for(CallLogRange range:ranges){
                Scan scan = new Scan();
                //设置扫描起始行
                scan.setStartRow(Bytes.toBytes(CallLogUtil.getStartRowKey(call,range.getStartPoint(),100)));
                //设置扫描结束行
                scan.setStopRow(Bytes.toBytes(CallLogUtil.getStopRowKey(call,range.getStartPoint(),range.getEndPoint(),100)));


                ResultScanner rs = table.getScanner(scan);
                Iterator<Result> it = rs.iterator();
                byte[] f = Bytes.toBytes("f1");

                byte[] caller = Bytes.toBytes("caller");
                byte[] callee = Bytes.toBytes("callee");
                byte[] callTime = Bytes.toBytes("callTime");
                byte[] callDuration = Bytes.toBytes("callDuration");
                CallLog log = null ;
                while(it.hasNext()){
                    log = new CallLog();
                    Result r = it.next();
                    //获取rowkey
                    String rowkey=Bytes.toString(r.getRow());
                    String flag=rowkey.split(",")[3];
                    log.setFlag(flag.equals("0")?true:false);

                    //caller
                    log.setCaller(Bytes.toString(r.getValue(f, caller)));
                    //callee
                    log.setCallee(Bytes.toString(r.getValue(f, callee)));
                    //calltime
                    log.setCallTime(Bytes.toString(r.getValue(f, callTime)));
                    //callDuration
                    log.setCallDuration(Bytes.toString(r.getValue(f, callDuration)));
                    logs.add(log);
                }
            }
            return logs ;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

PersonServiceImpl.java

package cn.ctgu.ssm.service.impl;

import cn.ctgu.ssm.dao.BaseDao;
import cn.ctgu.ssm.dao.impl.PersonDaoImpl;
import cn.ctgu.ssm.domain.Person;
import cn.ctgu.ssm.service.PersonService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Service("personService")
public class PersonServiceImpl extends BaseServiceImpl<Person> implements PersonService{

    @Resource(name="userDao")
    public void setDao(BaseDao<Person>dao){
        super.setDao(dao);
    }
    public String selectNameByPhone(String phone) {
        return ((PersonDaoImpl)getDao()).selectNameByPhone(phone);
    }

}

PersonServiceCacheImpl.java

package cn.ctgu.ssm.service.impl;

import cn.ctgu.ssm.dao.BaseDao;
import cn.ctgu.ssm.dao.impl.PersonDaoImpl;
import cn.ctgu.ssm.domain.Person;
import cn.ctgu.ssm.service.PersonService;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 *
 */
@Service("personServiceCache")
public class PersonServiceCacheImpl extends BaseServiceImpl<Person> implements PersonService{
    public static Map<String, String> callers = new HashMap<String, String>();

    static {
        callers.put("15810092493", "史玉龙");
        callers.put("18000696806", "赵贺彪");
        callers.put("15151889601", "张倩 ");
        callers.put("13269361119", "王世昌");
        callers.put("15032293356", "张涛");
        callers.put("17731088562", "张阳");
        callers.put("15338595369", "李进全");
        callers.put("15733218050", "杜泽文");
        callers.put("15614201525", "任宗阳");
        callers.put("15778423030", "梁鹏");
        callers.put("18641241020", "郭美彤");
        callers.put("15732648446", "刘飞飞");
        callers.put("13341109505", "段光星");
        callers.put("13560190665", "唐会华");
        callers.put("18301589432", "杨力谋");
        callers.put("13520404983", "温海英");
        callers.put("18332562075", "朱尚宽");
        callers.put("18620192711", "刘能宗");
    }

    @Resource(name="userDao")
    public void setDao(BaseDao<Person> dao) {
        super.setDao(dao);
    }

    public String selectNameByPhone(String phone){
        return callers.get(phone);
    }
}

HiveCallLogService.java

package cn.ctgu.ssm.service.hive;

import cn.ctgu.ssm.domain.CallLog;
import cn.ctgu.ssm.hive.CallLogStat;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

@Service("HiveCallLogService")
public class HiveCallLogService {

    @Resource(name="sparkSQLDataSource")
    private DataSource ds;

    //hiveserver2连接串
    private static String url="jdbc:hive2://s1:10000/";
    //驱动程序类
    private static String driverClass="org.apache.hive.jdbc.HiveDriver";

    static {
        try {
            Class.forName(driverClass);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    /*
    *
    * 通过hive查询最近的通话记录
    *
    * */
    public CallLog findLatestCallLog(String phoneNum){
        try {
            Connection conn= DriverManager.getConnection(url);
            Statement st=conn.createStatement();
            String sql="select * from mydb2.ext_calllogs_in_hbase where id like '%"+ phoneNum+"%' order by callTime desc limit 1" ;
            ResultSet rs=st.executeQuery(sql);
            CallLog log=null;
            while (rs.next()){
                log=new CallLog();
                log.setCaller(rs.getString("caller"));
                log.setCallee(rs.getString("callee"));
                log.setCallTime(rs.getString("callTime"));
                log.setCallDuration(rs.getString("callduartion"));
            }
            rs.close();
           return log;
        }catch (Exception e){
            e.printStackTrace();
        }
        return null;
    }
    /*
    *
    * 查询指定人员指定年份中各个月的通话次数
    *
    * */
   /* public List<CallLogStat> statCallLogCount(String caller, String year){
        List<CallLogStat>list=new ArrayList<CallLogStat>();
        try{
            Connection conn=DriverManager.getConnection(url);
            Statement st=conn.createStatement();
            String sql="select count(*) ,substr(calltime,1,6) from mydb2.ext_calllogs_in_hbase " +
                    "where caller = '" + caller+"' and substr(calltime,1,4) == '" + year
                    + "' group by substr(calltime,1,6)";
            ResultSet rs=st.executeQuery(sql);
            CallLog log=null;
            if(rs.next()){
                CallLogStat logSt=new CallLogStat();
                logSt.setCount(rs.getInt(1));
                logSt.setYearMonth(rs.getString(2));
                list.add(logSt);
            }
            rs.close();
            return list;
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }*/

    //sparkSQL进行thriftserver进行集成
    public List<CallLogStat> statCallLogCount(String caller, String year){

        try {
//            Class.forName("org.apache.hive.jdbc.HiveDriver");
//            Connection conn = DriverManager.getConnection("jdbc:hive2://s201:10000");
            Connection conn = ds.getConnection();
            String sql = "select count(*) ,substr(calltime,1,6) from mydb.ext_calllogs_in_hbase " +
                    "where caller = '" + caller + "' and substr(calltime,1,4) == '" + year
                    + "' group by substr(calltime,1,6) order by substr(calltime,1,6) desc";
            Statement st = conn.createStatement();
            ResultSet rs = st.executeQuery(sql);

            List<CallLogStat> list = new ArrayList<CallLogStat>();
            while (rs.next()) {
                long count = rs.getLong(1);
                String ym = rs.getString(2);
                list.add(new CallLogStat(ym, (int)count));
            }
            rs.close();
            st.close();
            conn.close();
            return list ;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null ;
    }
}

dao层代码
PersonDaoImpl.java

package cn.ctgu.ssm.dao.impl;

import cn.ctgu.ssm.dao.BaseDao;
import cn.ctgu.ssm.domain.Person;
import org.mybatis.spring.support.SqlSessionDaoSupport;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository("personDao")
public class PersonDaoImpl extends SqlSessionDaoSupport implements BaseDao<Person>{
    public void insert(Person person) {
        getSqlSession().insert("person.insert",person);
    }

    public void update(Person person) {

    }

    public void delete(Integer id) {

    }

    public Person selectOne(Integer id) {
        return null;
    }

    public List<Person> selectAll() {
        return getSqlSession().selectList("persons.selectAll");
    }

    public List<Person> selectPage(int offset, int len) {
        return null;
    }

    public int selectCount() {
        return 0;
    }
    //按照号码查询
    public String selectNameByPhone(String phone){
        return getSqlSession().selectOne("persons.selectNameByPhone",phone);
    }
}

UserDaoImpl.java

package cn.ctgu.ssm.dao.impl;

import cn.ctgu.ssm.dao.BaseDao;
import cn.ctgu.ssm.domain.User;
import org.apache.ibatis.session.RowBounds;
import org.mybatis.spring.support.SqlSessionDaoSupport;
import org.springframework.stereotype.Repository;

import java.util.List;

/**
 */
@Repository()
public class UserDaoImpl extends SqlSessionDaoSupport implements BaseDao<User> {

    public void insert(User user) {
        getSqlSession().insert("users.insert",user);
    }

    public void update(User user) {
        getSqlSession().update("users.update", user);
    }

    public void delete(Integer id ) {
        getSqlSession().delete("users.delete", id);
    }

    public User selectOne(Integer id) {
        return getSqlSession().selectOne("users.selectOne",id) ;
    }

    public List<User> selectAll() {
        return getSqlSession().selectList("users.selectAll");
    }

    public List<User> selectPage(int offset, int len) {
        return getSqlSession().selectList("users.selectPage",new RowBounds(offset, len));
    }

    public int selectCount() {
        return getSqlSession().selectOne("users.selectCount");
    }
}

domain层代码
CallLog.java

package cn.ctgu.ssm.domain;

import cn.ctgu.ssm.util.CallLogUtil;

/**
 * CallLog 呼叫电话日志
 */
public class CallLog {
    private String caller ;
    //主叫名字
    private String callerName;
    private String callee ;

    //被叫名称
    private String calleeName;
    private String callTime ;
    private String callDuration ;
    //是否是主叫
    private boolean flag;

    public String getCallerName() {
        return callerName;
    }

    public void setCallerName(String callerName) {
        this.callerName = callerName;
    }

    public String getCalleeName() {
        return calleeName;
    }

    public void setCalleeName(String calleeName) {
        this.calleeName = calleeName;
    }

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }



    public String getCaller() {
        return caller;
    }

    public void setCaller(String caller) {
        this.caller = caller;
    }

    public String getCallee() {
        return callee;
    }

    public void setCallee(String callee) {
        this.callee = callee;
    }

    public String getCallTime() {
        if(callTime!=null){
            return CallLogUtil.formatDate(callTime);
        }
        return null;

    }

    public void setCallTime(String callTime) {
        this.callTime = callTime;
    }

    public String getCallDuration() {
        return callDuration;
    }

    public void setCallDuration(String callDuration) {
        this.callDuration = callDuration;
    }
}

CallLogRange.java

package cn.ctgu.ssm.domain;

import cn.ctgu.ssm.util.CallLogUtil;

/**
 * CallLog 呼叫电话日志
 */
public class CallLog {
    private String caller ;
    //主叫名字
    private String callerName;
    private String callee ;

    //被叫名称
    private String calleeName;
    private String callTime ;
    private String callDuration ;
    //是否是主叫
    private boolean flag;

    public String getCallerName() {
        return callerName;
    }

    public void setCallerName(String callerName) {
        this.callerName = callerName;
    }

    public String getCalleeName() {
        return calleeName;
    }

    public void setCalleeName(String calleeName) {
        this.calleeName = calleeName;
    }

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }



    public String getCaller() {
        return caller;
    }

    public void setCaller(String caller) {
        this.caller = caller;
    }

    public String getCallee() {
        return callee;
    }

    public void setCallee(String callee) {
        this.callee = callee;
    }

    public String getCallTime() {
        if(callTime!=null){
            return CallLogUtil.formatDate(callTime);
        }
        return null;

    }

    public void setCallTime(String callTime) {
        this.callTime = callTime;
    }

    public String getCallDuration() {
        return callDuration;
    }

    public void setCallDuration(String callDuration) {
        this.callDuration = callDuration;
    }
}

Person.java

package cn.ctgu.ssm.domain;

public class Person {
    private Integer id;
    private String name;
    private String phone;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }
}

CallLogStat.java

package cn.ctgu.ssm.hive;

/*
*
* 通话记录统计
* */
public class CallLogStat {
    private String yearMonth;
    private int count;

    public CallLogStat(String yearMonth, int count) {
        this.yearMonth = yearMonth;
        this.count = count;
    }
    public String getYearMonth() {
        return yearMonth;
    }

    public void setYearMonth(String yearMonth) {
        this.yearMonth = yearMonth;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }
}

4、技术及难点总结
1、flume宕机的时候如何防止重复收集先前的日志记录
可以通过在存储过程设置标记位,将其标记存储到hbase中,如果下次收集日志时判断是否存在,如果存在则跳过不存储,否则存储。

2、避免热点问题以及最快的查询效率
1)热点问题的解决是通过hashcode的方式将其打散平均存储在不同的RegionServer上同时为了加快查询时间,也就是同一年同一个月的数据最好存储在同一台RegionServer上我们采用盐析的方式来解决这一问题:即取出主叫号码和同年月份组合成一个ID,并取它们的哈希码,然后除以RegionServer数,这样就保证了同一个号码的同年月份的通话记录在同一台RegionServer上

int hash=(callerId+callTimes.substring(0,6)).hashCode();
hash=(hash&Integer.MAX_VALUE)%100;//防止出现负数

2、为了加快查询的速度,可以将有用信息设计在rowkey中,比如主叫号码+通话时间+被叫号码+通话时长
这样就只需要查询rowKey即可,而不需要再查询其他的信息列。(优化手段)

//生成rowkey
    public String genRowKey(String hash,String caller,String time,String flag,String callee,String duration){
        return hash+","+caller+","+time+","+flag+","+callee+","+duration;
    }

3、为什么不直接从flume到入到hbase中,而是通过kafka
kafka具有n-1容错机制,且能用于多个系统同时消费数据。存储在hbase中则不能供多个系统同时消费数据。

前端数据是交换机上下来的数据写入到本地日志文件中,主要包含通话信息记录,内容涉及到主叫、被叫、时长、通话时间、主叫所在地区、被叫所在地区等等。

本地的日志文件通过flume实时收集数据到hbase。

由于通话日志中只有主叫信息,该系统采用协处理实现被叫通话记录的同步插入并使用极少的冗余数据实现类二级索引,并对Get请求进行重写设计,实现主被叫查询时,服务器端回传相同的通话详情,对客户端实现透明化处理。

为避免hbase系统中设计的重灾区,即热点问题,系统采用主叫方特征数据与通话时间特征数据的hash计算后对rowkey进行加盐处理,可实现系统的高吞吐量查询和负载均衡处理。

本系统在hadoop的hdfs和Yarn层面以及hbase均采用外部化ZooKeeper集群实现统一的HA管理,可轻松解决系统的容灾问题。其中Hadoop的HDFS的HA集群采用更加流行而且成熟的基于QJM的架构方案处理。

本系统可对公检法系统提供接口,集合手机号码绑定身份信息,快速可疑人员进行定位和跟踪。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐