一、项目需求

对某一电商平台每一天的销售量,销售总额进行统计并进行实时可视化展示

二、项目架构模型

zookeeper+storm+kafka+redis
生产者不断生成订单的信息发送给kafka,然后kafka将数据发送给storm,storm进行处理后发送到redis进行存储
在这里插入图片描述

三、代码结构

整个项目的下载链接:
链接:https://pan.baidu.com/s/1LQmp4y2zbiafz4XgLlilig
提取码:gscx

整个项目分为四个部分
在这里插入图片描述

1.订单实体类

domain下的PaymentInfo类,这里定义了一个订单的各种属性和方法,后面用到的主要是商铺名称、商品名称、商品价格、订单的城市以及商品的类别。

package domain;

import com.alibaba.fastjson.JSONObject;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

public class PaymentInfo implements Serializable{

    private static final long serialVersionUID = -7958315778386204397L;
    private String orderId;//订单编号
    private Date createOrderTime;//订单创建时间
    private String paymentId;//支付编号
    private Date paymentTime;//支付时间
    private String productId;//商品编号
    private String productName;//商品名称
    private long productPrice;//商品价格
    private long promotionPrice;//促销价格
    private String shopId;//商铺编号
    private String shopName;//商铺名称
    private String shopMobile;//店铺电话
    private long payPrice;//订单支付价格
    private int num;//订单数量

    /**
     * <Province>19</Province>
     * <City>1657</City>
     * <County>4076</County>
     */
    private String province; //省
    private String city; //市
    private String county;//县

    //102,144,114
    private String catagorys;  //分类

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getCounty() {
        return county;
    }

    public void setCounty(String county) {
        this.county = county;
    }

    public String getCatagorys() {
        return catagorys;
    }

    public void setCatagorys(String catagorys) {
        this.catagorys = catagorys;
    }

    public PaymentInfo() {
    }

    public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '\'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '\'' +
                ", shopName='" + shopName + '\'' +
                ", shopMobile='" + shopMobile + '\'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() {
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);
        String[] shop = new String[]{"左岸男装","HALO王妃","MOON衣橱","8号鞋仓","潮童领地","全饰界","女王当铺","伊甸园","缘来饰你","爱尚","百衣百顺","新奥五金","简森建材","企鹅装饰建材店","鸿宝家私","洁尚家","欧瑞时代家具","富美家具城","理家家居","童龄坊"};
        int index2 = (int) (Math.random() * shop.length);
        this.shopId = shop[index2];
//        this.shopId = new Random().nextInt(20)+"";
        String[] product = new String[]{"笔类","薄","本","册","日用文具","办公设备","复印机","考勤机","腰带","手链","手镯","戒指","脚链","饰链","充电器","照明","灯具"," 钟表仪器","仪表","游戏机","卡激光产品门钟","门铃","防盗器","手套","毛巾","浴巾","帽子","衬衣","毛衣","西服","裤子","休闲服装","婚纱","净水器","饮水机","榨汁机","网球","台球","枪杆","棋类","乐器","填充玩具","绒毛玩具","电子玩具","电动玩具"};
        int index3 = (int) (Math.random() * product.length);
        this.productId = product[index3];
//        this.productId = new Random().nextInt(50);
//        this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
        String[] category = new String[]{"食品类","服装类","日用品类","家具类","家用电器类","纺织品类"};
        int index1 = (int) (Math.random() * category.length);
        this.catagorys = category[index1];
        this.province = new Random().nextInt(23)+"";
        String[] city = new String[]{"杭州", "苏州", "上海", "天津", "深圳", "成都", "郑州", "宁波", "合肥", "重庆", "广州", "大连", "青岛", "北京", "义乌", "东莞", "长沙", "贵阳", "珠海", "威海", "泉州", "赤峰", "厦门", "福州", "抚顺", "汕头", "海口", "岳阳", "武汉", "唐山", "石家庄", "哈尔滨", "兰州", "呼和浩特", "南昌", "佛山", "烟台"};
        int index = (int) (Math.random() * city.length);
        this.city = city[index];
//        this.city = new Random().nextInt(265)+"";
        this.county = new Random().nextInt(1489)+"";

        String date = "2021-11-11 12:11:11";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (Exception e) {
            e.printStackTrace();
        }
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString(this);
        return jsonString;
        //  return new Gson().toJson(this);
    }

}

2.模拟消息生产者

producer目录下的PayMentInfoProducer,用来模拟生产者不断随机产生订单数据,需要配置kafka集群的端口和IP

package producer;

import domain.PaymentInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PayMentInfoProducer {

    public static void main(String[] args){
        //1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "storm-master:9092,storm-slave1:9092,storm-slave2:9092");
        /**
         * 当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数。
         * 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
         * 当一起使用时,min.insync.replicas和acks允许您执行更大的耐久性保证。
         * 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“全部”选项来产生。
         * 这将确保生产者如果大多数副本没有收到写入引发异常。
         */
        props.put("acks", "all");
        /**
         * 设置一个大于零的值,将导致客户端重新发送任何失败的记录
         */
        props.put("retries", 0);
        /**
         * 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。
         * 这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。
         */
        props.put("batch.size", 16384);
        /**
         *在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。
         * 这个设置通过添加少量的人工延迟来实现这一点,即不是立即发出一个记录,
         * 而是等待达到给定延迟的记录,以允许发送其他记录,以便发送可以一起批量发送
         */
        props.put("linger.ms", 1);
        /**
         * 生产者可用于缓冲等待发送到服务器的记录的总字节数。
         * 如果记录的发送速度比发送给服务器的速度快,那么生产者将会阻塞,max.block.ms之后会抛出异常。
         * 这个设置应该大致对应于生产者将使用的总内存,但不是硬性限制,
         * 因为不是所有生产者使用的内存都用于缓冲。
         * 一些额外的内存将被用于压缩(如果压缩被启用)以及用于维护正在进行的请求。
         */
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        while (true){
            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("itcast_order",new PaymentInfo().random()));
            System.out.println("发送数据");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

3.消息消费者

kafkaStorm下面的两个类

ProcessOrderBolt

这里写了在存入redis数据库时进行的操作,以及定义了存入的数据格式,包括:

  • 每个店铺的总销售额
  • 每个店铺的销售数量
  • 平台总销售额度
  • 平台商品销售数量
  • 商品的维度
  • 每个商品的销售数量
  • 每类商品的销售量
  • 每个城市的销售量
package kafkaStorm;

import domain.PaymentInfo;
import producer.PayMentInfoProducer;
import util.JedisUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;

public class ProcessOrderBolt extends BaseBasicBolt{


    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        Object value = input.getValue(4);
        //通过fastjson,将我们的json字符串转为对象
        JSONObject obj = new JSONObject();
        PaymentInfo parseObject = obj.parseObject(value.toString(), PaymentInfo.class);
        //拿到了paymentInfo就可以从paymentinf里面获取我们需要计算的一些指标

        // 计算店铺的维度
        String shopId = parseObject.getShopId();
        Jedis conn = JedisUtil.getConn();
        // 每个店铺的总销售额
        conn.incrBy("shop_price"+shopId,parseObject.getPayPrice());
//        conn.incrBy("itcast:order:"+shopId+":user:2018-02-26",1);
        // 每个店铺的销售数量
        conn.incrBy("shop_num"+shopId,1);

        //平台维度的数据统计

        // 平台总销售额度
        conn.incrBy("total_price",parseObject.getPayPrice());
//        conn.incrBy("itcast:order:total:user:date",1);
        // 平台商品销售数量
        conn.incrBy("total_num",1);



        //商品的维度
        String productId = parseObject.getProductId();
        // 每个商品的总销售额
        conn.incrBy("product_price"+productId,parseObject.getPayPrice());
//        conn.incrBy("itcast:order:"+productId+":user:date",1);
        // 每个商品的销售数量
        conn.incrBy("product_num"+productId,1);

        // 每个城市的销售量
        String city = parseObject.getCity();
        conn.incrBy("city"+city,1);

        // 每类商品的销售量
        String category = parseObject.getCatagorys();
        conn.incrBy("category"+category,1);
        conn.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}


RealBoardTopology

构建拓扑结构,需要配置kafka集群的端口和IP

package kafkaStorm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;

public class RealBoardTopology {


    public static void main(String[] args) throws Exception {
        //从kafka当中获取数据
        TopologyBuilder builder = new TopologyBuilder();
        KafkaSpoutConfig.Builder<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder("storm-master:9092,storm-slave1:9092,storm-slave2:9092", "itcast_order");
        kafkaSpoutConfig.setGroupId("itcast_order_group");
        KafkaSpoutConfig<String, String> build = kafkaSpoutConfig.build();
        KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
        builder.setSpout("kafkaSpout",kafkaSpout,5);
        builder.setBolt("processBolt",new ProcessOrderBolt(),8).localOrShuffleGrouping("kafkaSpout");
        Config config = new Config();
        if(args.length > 0){
            config.setNumWorkers(2);
            config.setDebug(false);
            StormSubmitter.submitTopology(args[0],config,builder.createTopology());
        }else{
            LocalCluster cluster = new LocalCluster();
            config.setDebug(true);
            cluster.submitTopology("realBoard",config,builder.createTopology());
        }
    }

}

4.Redis使用工具类

util目录下的JedisUtil类,需要配置redis服务器的端口和IP

package util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisUtil {


    private static JedisPool pool = null;

    /**
     * 获取jedis连接池
     * */
    public static JedisPool getPool(){
        if(pool == null){
            //创建jedis连接池配置
            JedisPoolConfig config = new JedisPoolConfig();
            //最大连接数
            config.setMaxTotal(20);
            //最大空闲连接
            config.setMaxIdle(5);
            //创建redis连接池
            // 使用jedisPool的时候,timeout一定要给出来,如果不给,redis很大概率会报错,超时
            pool = new JedisPool(config,"172.25.0.5",6379,3000);
        }
        return pool;
    }

    /**
     * 获取jedis连接
     * */
    public static Jedis getConn(){
        return getPool().getResource();
    }

    /**
     * 测试连接
     * @param args
     */
    public static void main(String[] args) {
        Jedis jedis = getPool().getResource();
        jedis.incrBy("mine", 5);
        jedis.close();
    }


}

四、环境搭建

1.storm集群搭建

博客:https://blog.csdn.net/weixin_43622131/article/details/111057523

2.kafka搭建

(1)下载并解压

下载链接:
链接:https://pan.baidu.com/s/1nnlJB0vnLO7_NBKZgJ9BtA
提取码:w4a6

(2)安装zookeeper

在上述配置storm教程中已经配置了zookeeper

(3)配置kafka

首先配置主节点:
配置文件为server.properties
在这里插入图片描述
在zookeeper.connect中配置上zookeeper集群的IP和端口
在这里插入图片描述


将主节点的broker.id设置为1
在这里插入图片描述


配置完成,将整个kafka发送到从节点,从节点的配置文件只需要修改broker.id,从2开始依次设置即可


启动kafka:
首先启动zookeeper集群,然后进入kafka安装目录的bin目录执行下面命令(集群中的机器都要执行)

 ./kafka-server-start.sh -daemon ../config/server.properties &

3.redis集群搭建

博客:https://blog.csdn.net/weixin_43622131/article/details/105820078

五、环境启动

1.启动zookeeper

每个节点都要执行下面命令,该命令在zookeeper安装目录下的bin目录中

./zkServer.sh start 

2.启动kafka

每个节点都要执行下面命令,该命令在kafka安装目录下的bin目录中

 ./kafka-server-start.sh -daemon ../config/server.properties &

3.启动redis

这里为了方便使用了redis的单机版
启动一个即可

redis-server  redis1.conf

使用下面命令访问数据库

redis-cli -c -h 172.25.0.5 -p 6379

4.启动storm

在master上启动

storm nimbus &
 
storm ui &
 
storm logviewer &


在slave上启动

storm supervisor &
 
storm logviewer &

全部启动成功后查看进程:
master
在这里插入图片描述


slave
在这里插入图片描述

六、运行项目

首先将项目打成jar包

1.提交拓扑到storm集群

 ./storm jar /home/zf/storm_bigwork.jar  kafkaStorm.RealBoardTopology realboard2

要运行此项目需要在storm的lib目录下添加该jar包
在这里插入图片描述
下载链接:

链接:https://pan.baidu.com/s/1kU7q3YdTwfsPWOWyjYCPqA
提取码:jdf0

2.启动生产者

java -cp /home/zf/storm_bigwork.jar producer.PayMentInfoProducer start

3.查看效果

在这里插入图片描述


查看数据
在这里插入图片描述

在这里插入图片描述

七、进行可视化实时数据展示

展示视频链接:
链接:https://pan.baidu.com/s/1-JK6qmzA-SvP2uGhiOFCVg
提取码:p0n1
第一页中间是该平台的总销售额,左边是销售额前十名的店铺,右边是销售量前十名的商品
第二页左边的地图展示的是每个城市的销售量,右边展示的是各种类别产品所占的比重

在这里插入图片描述

前面已经实现了将流数据处理后放入redis数据库中,下面是从数据库中读取数据并通过前端进行实时展示

1.后端从数据库读取数据

这里使用了flask框架,从redis数据库中读取数据进行排序后取前十,设置好路由供前端获取数据
代码链接:
链接:https://pan.baidu.com/s/13RXzINNBKnb_mMcbeMqt6Q
提取码:477u

from flask import Flask
from flask_cors import CORS
from flask import render_template
import redis
from flask import Flask, jsonify,request

app = Flask(__name__)
CORS(app)
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0,decode_responses=True, charset='UTF-8', encoding='UTF-8')

# 获取总销售额
@app.route('/total_price', methods=['GET', 'POST'])
def sum_total():
    # r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
    total_price = r.get("total_price")
    # return render_template('index.html', total_price=total_price)
    # print("success")
    return total_price

# 获取各个店铺的总销售额
@app.route('/shop_price', methods=['GET', 'POST'])
def sum_shop():
    # r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
    # keys = r.keys()
    # print(keys[0][:4])
    keys = r.keys()
    shop_key = []
    for i in keys:
        # print(i[:10])
        if i[:10] == "shop_price":
            shop_key.append(i)
    data = []
    for i in shop_key:
        tmp = {};
        tmp["stock"] = str(i[10:])
        tmp["fundPost"] = int(r.get(i))
        data.append(tmp)
    data = sorted(data, key=lambda x: x['fundPost'], reverse=True)
    return jsonify(data[:10])

# 获取各个商品的销售量
@app.route('/product_num', methods=['GET', 'POST'])
def product_num():
    # r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
    # keys = r.keys()
    # print(keys[0][:4])
    keys = r.keys()
    product_key = []
    for i in keys:
        # print(i[:10])
        if i[:11] == "product_num":
            product_key.append(i)
    data = []
    for i in product_key:
        tmp = {};
        tmp["stock"] = str(i[11:])
        tmp["fundPost"] = int(r.get(i))
        data.append(tmp)
    data = sorted(data, key=lambda x: x['fundPost'], reverse=True)
    return jsonify(data[:10])

# 获取各个城市的销售量
@app.route('/city_num', methods=['GET', 'POST'])
def city_num():
    keys = r.keys()
    city_key = []
    keys = r.keys()
    for i in keys:
        # print(i[:10])
        if i[:4] == "city":
            city_key.append(i)
    data = {}

    for i in city_key:
        # print(i[4:])
        data[i[4:]] = r.get(i)
    # print(len(data))
    return jsonify(data)

# 获取每类商品的销售量
@app.route('/category_num', methods=['GET', 'POST'])
def category_num():
    keys = r.keys()
    category_key = []
    for i in keys:
        # print(i[:10])
        if i[:8] == "category":
            category_key.append(i)
    data = []
    for i in category_key:
        tmp = {};
        tmp["name"] = i[8:];
        tmp["value"] = r.get(i)
        data.append(tmp)
    return jsonify(data)
if __name__ == '__main__':
    app.run(host='127.0.0.1', port=5000)

在这里插入图片描述

2.前端从后端请求数据并进行可视化

代码链接:
链接:https://pan.baidu.com/s/1zmGEXMjP47TYKblp9nMAdQ
提取码:y2n2

使用的可视化框架是echarts,请求后端数据使用的是Ajax,用法如下

$.ajax({
					type: 'GET',
					url: 'http://127.0.0.1:5000/total_price',
					async: false,
					success: function(data) {
						let option = myChart.getOption();

						option.series[3].startAngle = option.series[3].startAngle - 1;
						option.series[6].data[0].value = Number(data);
						myChart.setOption(option);
					}
				})

实现数据的实时刷新是通过javascript的setInterval函数来实现的,剩下的具体实现可以看看完整的代码。
整体实现下来确实需要花费一些时间,但是坚持做下来是一定能成功的,如果遇到一些小bug可以与我交流。

八、系统的特点

  1. 全流程都在集群上运行
  2. 代码整洁规范,可扩展性好
  3. 实时效果较好
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐