一、概述

本篇文章主要介绍如何使用Storm + Logstash + Kafka 实现实时数据的计算,并且使用高德地图API实现热力图的展示。

背景知识:

在有些场合,我们需要了解当前人口的流动情况,比如,需要实时监控一些旅游景点旅客的密集程度,这时可以使用GPS定位系统将该区域内旅客的IP数据进行计算,但是GPS定位系统也有一定的缺点,不是每个旅客都会GPS功能,这时可以使用“信令”来获取个人定位信息。所谓“信令”就是每个手机会不是的向附近最近的基站发送定位信息,除非手机关机。相信每个人在做车旅游的时候每经过一个地方都会受到某个地区的短信,“某某城市欢迎你的来访”等信息,移动电信应用就是利用“信令”来监控每个的定位信息。(同时也可以看出大数据下个人隐私很难受到保护)。

1. 项目架构

在这里插入图片描述

在这里我们使用了 Logstash来抽取日志数据,它与 Flume 类似,由于没有是实验项目,因此使用 Python 模拟数据。在经过 Logstash 将数据抽取到 Kafka 中,Strom 会实时消费数据,然后计算结果实时写入 MySQL数据库中,然后我们可以将结果送到后台应用中使用和可视化展示。

2. 环境以及软件说明
  • storm-1.1.1
  • zookeeper-3.4.5-cdh5.3.6
  • logstash-2.4.1
  • kafka_2.11-0.9.0.0

二、实战

1. 模拟数据
#coding=UTF-8

import random
import time

phone=[
    "13869555210",
    "18542360152",
    "15422556663",
    "18852487210",
    "13993584664",
    "18754366522",
    "15222436542",
    "13369568452",
    "13893556666",
    "15366698558"
]

location=[
    "116.191031, 39.988585",
    "116.389275, 39.925818",
    "116.287444, 39.810742",
    "116.481707, 39.940089",
    "116.410588, 39.880172",
    "116.394816, 39.91181",
    "116.416002, 39.952917"
]


def sample_phone():
    return random.sample(phone,1)[0]
def sample_location():
    return random.sample(location, 1)[0]



def generator_log(count=10):
    time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    f=open("/opt/storm_project/datas/logs/access.log","a+")
    while count>=1:
        query_log="{phone}\t{location}\t{date}".format(phone=sample_phone(),location=sample_location(),date=time_str)
        f.write(query_log+"\n")
     #   print query_log
        count=count-1


if __name__=='__main__':
    generator_log(100)

2. Logstash 配置

在Logstash安装目录下添加配置文件 storm_pro.conf:

input{
file{
 path => '/opt/storm_project/datas/logs/access.log'
}
}

output{
kafka{
 topic_id => "storm_project"
 batch_size => 1
 bootstrap_servers =>"hadoop-senior.shinelon.com:9092"
 codec => plain{
  format => "%{message}"
 }
 }
}

注意:上面配置中path指定读取数据的文件,可自行创建。topic_id 参数为下文kafka中需要创建的 topic主题。

3. Kafka配置

在kafka安装目录下添加配置文件server.properties:

broker.id=0

############################# Socket Server Settings #############################

listeners=PLAINTEXT://:9092

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=hadoop-senior.shinelon.com
zookeeper.connect=hadoop-senior.shinelon.com:2181

注意:kafka需要配置zookeeper使用,需要配置zk。

4. Strom程序编写
package cn.just.shinelon.integration;

import cn.just.shinelon.utils.DateUtil;
import com.google.common.collect.Maps;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.kafka.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.UUID;

public class KafkaTopology {


    /**
     * 源码:
     * public class RawMultiScheme implements MultiScheme {
     public RawMultiScheme() {
     }

     public Iterable<List<Object>> deserialize(ByteBuffer ser) {
     return Arrays.asList(Utils.tuple(new Object[]{Utils.toByteArray(ser)}));
     }

     public Fields getOutputFields() {
     return new Fields(new String[]{"bytes"});
     }
     }
     */
    public static class PrintBolt extends BaseRichBolt{
        private OutputCollector outputCollector;


        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.outputCollector=outputCollector;
        }

        @Override
        public void execute(Tuple tuple) {
            try {
            byte[] bytes=tuple.getBinaryByField("bytes");
            String input = new String(bytes);
            String[] logs = input.split("\t");
            String phone = logs[0];
            String tmp = logs[1];
            //经度
            Double longitude = Double.parseDouble(tmp.split(",")[0]);
            //纬度
            Double latitude = Double.parseDouble(tmp.split(",")[1]);
            //时间,需要计算当前N久的数据
            long timestamp = DateUtil.getInstance().getTime(logs[2]);
            System.out.println(phone+", "+longitude+","+latitude+", "+timestamp);
            //发射数据
            outputCollector.emit(new Values(timestamp,latitude,longitude));
            outputCollector.ack(tuple);
            } catch (Exception e) {
                e.printStackTrace();
                outputCollector.fail(tuple);
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                outputFieldsDeclarer.declare(new Fields("time","latitude","longitude"));
        }
    }

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        //JDBC配置参数
        Map hikariConfigMap = Maps.newHashMap();
        hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
        hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/storm");
        hikariConfigMap.put("dataSource.user","root");
        hikariConfigMap.put("dataSource.password","123456");
        ConnectionProvider connectionProvider;
        connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
        //表名
        String tableName = "location";
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);

        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                .withInsertQuery("insert into location values(?,?,?)")
                .withQueryTimeoutSecs(30);

        //ZK地址
        BrokerHosts hosts = new ZkHosts("hadoop-senior.shinelon.com:2181");
        String topicName="storm_project";
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());

        //设置消费数据时间,默认会从源头开始消费
        spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();

        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        builder.setSpout("KafkaSpout",kafkaSpout);
        builder.setBolt("PrintBolt",new PrintBolt()).shuffleGrouping("KafkaSpout");
        builder.setBolt("JdbcInsertBolt",userPersistanceBolt).shuffleGrouping("PrintBolt");

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("KafkaTopology",new Config(),builder.createTopology());
    }
}
5. 数据库的设计
create database storm;

use storm;

create table location(
time bigint,
latitude double,
longitude double
)charset utf8;
6. 集群的启动

首先启动kafka(注意:需要启动ZK)。

启动kafka:

nohup bin/kafka-server-start.sh config/server.properties &

创建topic:

bin/kafka-topics.sh --create --zookeeper hadoop-senior.shinelon.com:2181 --replication-factor 1 --partitions 1 -- 
    topic storm_project

注意:topic名称和logstash中配置的必须一致。

启动logstash:

bin/logstash -f storm_pro.conf

在启动kafka和logstash之后就可以启动 Strom,接着可以运行python数据模拟器,就会看到数据库中存入了计算结果:
在这里插入图片描述

三、数据可视化展示

可视化结果如下图所示:
在这里插入图片描述

前端页面如下:

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>高德地图</title>
    <link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/>
</head>
<body>
     <script src="js/echarts.min.js"></script>
    <script src="js/jquery.min.js"></script>
    <script src="http://webapi.amap.com/maps?v=1.4.9&amp;key=d16808eab90b7545923a1c2f4bb659ef"></script>
<div id="container"></div>

<script>
     var map = new AMap.Map("container", {
         resizeEnable: true,
         center: [116.418261, 39.921984],
         zoom: 11
     });
 
     var heatmap;
     var points =(function a(){  //<![CDATA[
     	var city=[];
    	$.ajax({
    		type:"POST",
    		url:"../get_map",
    		dataType:'json',
    		async:false,		//
    		success:function(result){
    			for(var i=0;i<result.length;i++){
    				//alert("调用了");
    				city.push({"lng":result[i].longitude,"lat":result[i].latitude,"count":result[i].count});
    			}
    			
    		}
    	})
    	return city;
    })();//]]>
    	 
    	 
    /**	 
    	 
    	 
    	 [
         {"lng":116.191031,"lat":39.988585,"count":1000},
         {"lng":116.389275,"lat":39.925818,"count":110},
         {"lng":116.287444,"lat":39.810742,"count":1200},
         {"lng":116.481707,"lat":39.940089,"count":130},
         {"lng":116.410588,"lat":39.880172,"count":140},
         {"lng":116.394816,"lat":39.91181,"count":15552},
         {"lng":116.416002,"lat":39.952917,"count":16}
        
         
     ];
    **/
    ;
     map.plugin(["AMap.Heatmap"],function() {      //加载热力图插件
         heatmap = new AMap.Heatmap(map,{
        	 raduis:50,
        	 opacity:[0,0.7]
         });    //在地图对象叠加热力图
         heatmap.setDataSet({data:points,max:100}); //设置热力图数据集
         //具体参数见接口文档
     }); 
     
// var map = new AMap.Map('container', {
//    pitch:75, // 地图俯仰角度,有效范围 0 度- 83 度
//    viewMode:'3D' // 地图模式
//});
</script>

</body>
</html>

SpringBoot DAO层代码如下:


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Component;

import com.shiyanlou.shinelon.movie.domain.CityTimes;
import com.shiyanlou.shinelon.movie.utils.MysqlUtil;

import cu.just.spark.domain.CourseClickCount;
import cu.just.spark.domain.Location;
import cu.just.spark.utils.HBaseUtils;
import groovy.util.logging.Commons;

/**
 * @author shinelon
 *
 */
@Component
public class LocationDao {
	
	 private static MysqlUtil mysqlUtil;

	    public List<Location> map() throws Exception{
	        List<Location> list = new ArrayList<Location>();
	        Connection connection=null;
	        PreparedStatement psmt=null;
	        try {
	            connection = MysqlUtil.getConnection();
	            psmt = connection.prepareStatement("select longitude,latitude,count(*) from location where "
	            		+ "time>unix_timestamp(date_sub(current_timestamp(),interval 10 minute))*1000 "
	            		+ "group by longitude,latitude");
	            ResultSet resultSet = psmt.executeQuery();
	            while (resultSet.next()) {
	            	Location location = new Location();
	                location.setLongitude(resultSet.getDouble(1));
	                location.setLatitude(resultSet.getDouble(2));
	                location.setCount(resultSet.getInt(3));
	                list.add(location);
	            }
	        }catch (Exception e){
	            e.printStackTrace();
	        }finally {
	            MysqlUtil.release();
	        }
	        return list;
	    }

}

实体类:


public class Location {
	private Integer count;
	private double latitude;
	private double longitude;
	
	public Integer getCount() {
		return count;
	}
	public void setCount(Integer count) {
		this.count = count;
	}
	public double getLatitude() {
		return latitude;
	}
	public void setLatitude(double latitude) {
		this.latitude = latitude;
	}
	public double getLongitude() {
		return longitude;
	}
	public void setLongitude(double longitude) {
		this.longitude = longitude;
	}
}

工具类:

package cu.just.spark.utils;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MysqlUtil {
    private static final String DRIVER_NAME="jdbc:mysql://localhost:3306/movie?user=root&password=123456";
    private static Connection connection;
    private static PreparedStatement pstm;
    private static ResultSet resultSet;

    public static Connection getConnection(){
        try {
        	Class.forName("com.mysql.jdbc.Driver");
            connection=DriverManager.getConnection(DRIVER_NAME);
        }catch (Exception e){
            e.printStackTrace();
        }
        return connection;
    }
    public static void release(){
        try {
        	if(resultSet!=null) {
        		resultSet.close();
        	}
            if (pstm != null) {
                pstm.close();
            }
            if(connection!=null){
                connection.close();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if(connection!=null){
                connection=null;    //help GC
            }
        }
    }

}

Controller层:

package cu.just.spark.controller;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

import cu.just.spark.dao.LocationDao;
import cu.just.spark.domain.Location;

@RestController
public class MapController {
	
	@Autowired
	public LocationDao locationDao;
	
	@RequestMapping("/storm")
	public ModelAndView storm() {
		return new ModelAndView("map");
	}
	
	@RequestMapping("/get_map")
	@ResponseBody
	public List<Location> getMap() throws Exception{
		return locationDao.map();
	}

}

项目源码地址:源码

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐