基于storm的电商可视化大屏
基于storm的电商销售量大屏一、项目需求二、项目架构模型三、代码结构1.订单实体类2.模拟消息生产者3.消息消费者ProcessOrderBoltRealBoardTopology4.Redis使用工具类四、环境搭建1.storm集群搭建2.kafka搭建(1)下载并解压(2)安装zookeeper(3)配置kafka3.redis集群搭建五、环境启动1.启动zookeeper2.启动kafka
基于storm的电商销售量大屏
一、项目需求
对某一电商平台每一天的销售量,销售总额进行统计并进行实时可视化展示
二、项目架构模型
zookeeper+storm+kafka+redis
生产者不断生成订单的信息发送给kafka,然后kafka将数据发送给storm,storm进行处理后发送到redis进行存储
三、代码结构
整个项目的下载链接:
链接:https://pan.baidu.com/s/1LQmp4y2zbiafz4XgLlilig
提取码:gscx
整个项目分为四个部分
1.订单实体类
domain下的PaymentInfo类,这里定义了一个订单的各种属性和方法,后面用到的主要是商铺名称、商品名称、商品价格、订单的城市以及商品的类别。
package domain;
import com.alibaba.fastjson.JSONObject;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
public class PaymentInfo implements Serializable{
private static final long serialVersionUID = -7958315778386204397L;
private String orderId;//订单编号
private Date createOrderTime;//订单创建时间
private String paymentId;//支付编号
private Date paymentTime;//支付时间
private String productId;//商品编号
private String productName;//商品名称
private long productPrice;//商品价格
private long promotionPrice;//促销价格
private String shopId;//商铺编号
private String shopName;//商铺名称
private String shopMobile;//店铺电话
private long payPrice;//订单支付价格
private int num;//订单数量
/**
* <Province>19</Province>
* <City>1657</City>
* <County>4076</County>
*/
private String province; //省
private String city; //市
private String county;//县
//102,144,114
private String catagorys; //分类
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getCounty() {
return county;
}
public void setCounty(String county) {
this.county = county;
}
public String getCatagorys() {
return catagorys;
}
public void setCatagorys(String catagorys) {
this.catagorys = catagorys;
}
public PaymentInfo() {
}
public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
this.orderId = orderId;
this.createOrderTime = createOrderTime;
this.paymentId = paymentId;
this.paymentTime = paymentTime;
this.productId = productId;
this.productName = productName;
this.productPrice = productPrice;
this.promotionPrice = promotionPrice;
this.shopId = shopId;
this.shopName = shopName;
this.shopMobile = shopMobile;
this.payPrice = payPrice;
this.num = num;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Date getCreateOrderTime() {
return createOrderTime;
}
public void setCreateOrderTime(Date createOrderTime) {
this.createOrderTime = createOrderTime;
}
public String getPaymentId() {
return paymentId;
}
public void setPaymentId(String paymentId) {
this.paymentId = paymentId;
}
public Date getPaymentTime() {
return paymentTime;
}
public void setPaymentTime(Date paymentTime) {
this.paymentTime = paymentTime;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public long getProductPrice() {
return productPrice;
}
public void setProductPrice(long productPrice) {
this.productPrice = productPrice;
}
public long getPromotionPrice() {
return promotionPrice;
}
public void setPromotionPrice(long promotionPrice) {
this.promotionPrice = promotionPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
public String getShopMobile() {
return shopMobile;
}
public void setShopMobile(String shopMobile) {
this.shopMobile = shopMobile;
}
public long getPayPrice() {
return payPrice;
}
public void setPayPrice(long payPrice) {
this.payPrice = payPrice;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "PaymentInfo{" +
"orderId='" + orderId + '\'' +
", createOrderTime=" + createOrderTime +
", paymentId='" + paymentId + '\'' +
", paymentTime=" + paymentTime +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", promotionPrice=" + promotionPrice +
", shopId='" + shopId + '\'' +
", shopName='" + shopName + '\'' +
", shopMobile='" + shopMobile + '\'' +
", payPrice=" + payPrice +
", num=" + num +
'}';
}
public String random() {
this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
this.productPrice = new Random().nextInt(1000);
this.promotionPrice = new Random().nextInt(500);
this.payPrice = new Random().nextInt(480);
String[] shop = new String[]{"左岸男装","HALO王妃","MOON衣橱","8号鞋仓","潮童领地","全饰界","女王当铺","伊甸园","缘来饰你","爱尚","百衣百顺","新奥五金","简森建材","企鹅装饰建材店","鸿宝家私","洁尚家","欧瑞时代家具","富美家具城","理家家居","童龄坊"};
int index2 = (int) (Math.random() * shop.length);
this.shopId = shop[index2];
// this.shopId = new Random().nextInt(20)+"";
String[] product = new String[]{"笔类","薄","本","册","日用文具","办公设备","复印机","考勤机","腰带","手链","手镯","戒指","脚链","饰链","充电器","照明","灯具"," 钟表仪器","仪表","游戏机","卡激光产品门钟","门铃","防盗器","手套","毛巾","浴巾","帽子","衬衣","毛衣","西服","裤子","休闲服装","婚纱","净水器","饮水机","榨汁机","网球","台球","枪杆","棋类","乐器","填充玩具","绒毛玩具","电子玩具","电动玩具"};
int index3 = (int) (Math.random() * product.length);
this.productId = product[index3];
// this.productId = new Random().nextInt(50);
// this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
String[] category = new String[]{"食品类","服装类","日用品类","家具类","家用电器类","纺织品类"};
int index1 = (int) (Math.random() * category.length);
this.catagorys = category[index1];
this.province = new Random().nextInt(23)+"";
String[] city = new String[]{"杭州", "苏州", "上海", "天津", "深圳", "成都", "郑州", "宁波", "合肥", "重庆", "广州", "大连", "青岛", "北京", "义乌", "东莞", "长沙", "贵阳", "珠海", "威海", "泉州", "赤峰", "厦门", "福州", "抚顺", "汕头", "海口", "岳阳", "武汉", "唐山", "石家庄", "哈尔滨", "兰州", "呼和浩特", "南昌", "佛山", "烟台"};
int index = (int) (Math.random() * city.length);
this.city = city[index];
// this.city = new Random().nextInt(265)+"";
this.county = new Random().nextInt(1489)+"";
String date = "2021-11-11 12:11:11";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.createOrderTime = simpleDateFormat.parse(date);
} catch (Exception e) {
e.printStackTrace();
}
JSONObject obj = new JSONObject();
String jsonString = obj.toJSONString(this);
return jsonString;
// return new Gson().toJson(this);
}
}
2.模拟消息生产者
producer目录下的PayMentInfoProducer,用来模拟生产者不断随机产生订单数据,需要配置kafka集群的端口和IP
package producer;
import domain.PaymentInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class PayMentInfoProducer {
public static void main(String[] args){
//1、准备配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "storm-master:9092,storm-slave1:9092,storm-slave2:9092");
/**
* 当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数。
* 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
* 当一起使用时,min.insync.replicas和acks允许您执行更大的耐久性保证。
* 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“全部”选项来产生。
* 这将确保生产者如果大多数副本没有收到写入引发异常。
*/
props.put("acks", "all");
/**
* 设置一个大于零的值,将导致客户端重新发送任何失败的记录
*/
props.put("retries", 0);
/**
* 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。
* 这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。
*/
props.put("batch.size", 16384);
/**
*在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。
* 这个设置通过添加少量的人工延迟来实现这一点,即不是立即发出一个记录,
* 而是等待达到给定延迟的记录,以允许发送其他记录,以便发送可以一起批量发送
*/
props.put("linger.ms", 1);
/**
* 生产者可用于缓冲等待发送到服务器的记录的总字节数。
* 如果记录的发送速度比发送给服务器的速度快,那么生产者将会阻塞,max.block.ms之后会抛出异常。
* 这个设置应该大致对应于生产者将使用的总内存,但不是硬性限制,
* 因为不是所有生产者使用的内存都用于缓冲。
* 一些额外的内存将被用于压缩(如果压缩被启用)以及用于维护正在进行的请求。
*/
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2、创建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
while (true){
//3、发送数据
kafkaProducer.send(new ProducerRecord<String, String>("itcast_order",new PaymentInfo().random()));
System.out.println("发送数据");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3.消息消费者
kafkaStorm下面的两个类
ProcessOrderBolt
这里写了在存入redis数据库时进行的操作,以及定义了存入的数据格式,包括:
- 每个店铺的总销售额
- 每个店铺的销售数量
- 平台总销售额度
- 平台商品销售数量
- 商品的维度
- 每个商品的销售数量
- 每类商品的销售量
- 每个城市的销售量
package kafkaStorm;
import domain.PaymentInfo;
import producer.PayMentInfoProducer;
import util.JedisUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;
public class ProcessOrderBolt extends BaseBasicBolt{
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
Object value = input.getValue(4);
//通过fastjson,将我们的json字符串转为对象
JSONObject obj = new JSONObject();
PaymentInfo parseObject = obj.parseObject(value.toString(), PaymentInfo.class);
//拿到了paymentInfo就可以从paymentinf里面获取我们需要计算的一些指标
// 计算店铺的维度
String shopId = parseObject.getShopId();
Jedis conn = JedisUtil.getConn();
// 每个店铺的总销售额
conn.incrBy("shop_price"+shopId,parseObject.getPayPrice());
// conn.incrBy("itcast:order:"+shopId+":user:2018-02-26",1);
// 每个店铺的销售数量
conn.incrBy("shop_num"+shopId,1);
//平台维度的数据统计
// 平台总销售额度
conn.incrBy("total_price",parseObject.getPayPrice());
// conn.incrBy("itcast:order:total:user:date",1);
// 平台商品销售数量
conn.incrBy("total_num",1);
//商品的维度
String productId = parseObject.getProductId();
// 每个商品的总销售额
conn.incrBy("product_price"+productId,parseObject.getPayPrice());
// conn.incrBy("itcast:order:"+productId+":user:date",1);
// 每个商品的销售数量
conn.incrBy("product_num"+productId,1);
// 每个城市的销售量
String city = parseObject.getCity();
conn.incrBy("city"+city,1);
// 每类商品的销售量
String category = parseObject.getCatagorys();
conn.incrBy("category"+category,1);
conn.close();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
RealBoardTopology
构建拓扑结构,需要配置kafka集群的端口和IP
package kafkaStorm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;
public class RealBoardTopology {
public static void main(String[] args) throws Exception {
//从kafka当中获取数据
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder("storm-master:9092,storm-slave1:9092,storm-slave2:9092", "itcast_order");
kafkaSpoutConfig.setGroupId("itcast_order_group");
KafkaSpoutConfig<String, String> build = kafkaSpoutConfig.build();
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
builder.setSpout("kafkaSpout",kafkaSpout,5);
builder.setBolt("processBolt",new ProcessOrderBolt(),8).localOrShuffleGrouping("kafkaSpout");
Config config = new Config();
if(args.length > 0){
config.setNumWorkers(2);
config.setDebug(false);
StormSubmitter.submitTopology(args[0],config,builder.createTopology());
}else{
LocalCluster cluster = new LocalCluster();
config.setDebug(true);
cluster.submitTopology("realBoard",config,builder.createTopology());
}
}
}
4.Redis使用工具类
util目录下的JedisUtil类,需要配置redis服务器的端口和IP
package util;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class JedisUtil {
private static JedisPool pool = null;
/**
* 获取jedis连接池
* */
public static JedisPool getPool(){
if(pool == null){
//创建jedis连接池配置
JedisPoolConfig config = new JedisPoolConfig();
//最大连接数
config.setMaxTotal(20);
//最大空闲连接
config.setMaxIdle(5);
//创建redis连接池
// 使用jedisPool的时候,timeout一定要给出来,如果不给,redis很大概率会报错,超时
pool = new JedisPool(config,"172.25.0.5",6379,3000);
}
return pool;
}
/**
* 获取jedis连接
* */
public static Jedis getConn(){
return getPool().getResource();
}
/**
* 测试连接
* @param args
*/
public static void main(String[] args) {
Jedis jedis = getPool().getResource();
jedis.incrBy("mine", 5);
jedis.close();
}
}
四、环境搭建
1.storm集群搭建
博客:https://blog.csdn.net/weixin_43622131/article/details/111057523
2.kafka搭建
(1)下载并解压
下载链接:
链接:https://pan.baidu.com/s/1nnlJB0vnLO7_NBKZgJ9BtA
提取码:w4a6
(2)安装zookeeper
在上述配置storm教程中已经配置了zookeeper
(3)配置kafka
首先配置主节点:
配置文件为server.properties
在zookeeper.connect中配置上zookeeper集群的IP和端口
将主节点的broker.id设置为1
配置完成,将整个kafka发送到从节点,从节点的配置文件只需要修改broker.id,从2开始依次设置即可
启动kafka:
首先启动zookeeper集群,然后进入kafka安装目录的bin目录执行下面命令(集群中的机器都要执行)
./kafka-server-start.sh -daemon ../config/server.properties &
3.redis集群搭建
博客:https://blog.csdn.net/weixin_43622131/article/details/105820078
五、环境启动
1.启动zookeeper
每个节点都要执行下面命令,该命令在zookeeper安装目录下的bin目录中
./zkServer.sh start
2.启动kafka
每个节点都要执行下面命令,该命令在kafka安装目录下的bin目录中
./kafka-server-start.sh -daemon ../config/server.properties &
3.启动redis
这里为了方便使用了redis的单机版
启动一个即可
redis-server redis1.conf
使用下面命令访问数据库
redis-cli -c -h 172.25.0.5 -p 6379
4.启动storm
在master上启动
storm nimbus &
storm ui &
storm logviewer &
在slave上启动
storm supervisor &
storm logviewer &
全部启动成功后查看进程:
master
slave
六、运行项目
首先将项目打成jar包
1.提交拓扑到storm集群
./storm jar /home/zf/storm_bigwork.jar kafkaStorm.RealBoardTopology realboard2
要运行此项目需要在storm的lib目录下添加该jar包
下载链接:
链接:https://pan.baidu.com/s/1kU7q3YdTwfsPWOWyjYCPqA
提取码:jdf0
2.启动生产者
java -cp /home/zf/storm_bigwork.jar producer.PayMentInfoProducer start
3.查看效果
查看数据
七、进行可视化实时数据展示
展示视频链接:
链接:https://pan.baidu.com/s/1-JK6qmzA-SvP2uGhiOFCVg
提取码:p0n1
第一页中间是该平台的总销售额,左边是销售额前十名的店铺,右边是销售量前十名的商品
第二页左边的地图展示的是每个城市的销售量,右边展示的是各种类别产品所占的比重
前面已经实现了将流数据处理后放入redis数据库中,下面是从数据库中读取数据并通过前端进行实时展示
1.后端从数据库读取数据
这里使用了flask框架,从redis数据库中读取数据进行排序后取前十,设置好路由供前端获取数据
代码链接:
链接:https://pan.baidu.com/s/13RXzINNBKnb_mMcbeMqt6Q
提取码:477u
from flask import Flask
from flask_cors import CORS
from flask import render_template
import redis
from flask import Flask, jsonify,request
app = Flask(__name__)
CORS(app)
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0,decode_responses=True, charset='UTF-8', encoding='UTF-8')
# 获取总销售额
@app.route('/total_price', methods=['GET', 'POST'])
def sum_total():
# r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
total_price = r.get("total_price")
# return render_template('index.html', total_price=total_price)
# print("success")
return total_price
# 获取各个店铺的总销售额
@app.route('/shop_price', methods=['GET', 'POST'])
def sum_shop():
# r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
# keys = r.keys()
# print(keys[0][:4])
keys = r.keys()
shop_key = []
for i in keys:
# print(i[:10])
if i[:10] == "shop_price":
shop_key.append(i)
data = []
for i in shop_key:
tmp = {};
tmp["stock"] = str(i[10:])
tmp["fundPost"] = int(r.get(i))
data.append(tmp)
data = sorted(data, key=lambda x: x['fundPost'], reverse=True)
return jsonify(data[:10])
# 获取各个商品的销售量
@app.route('/product_num', methods=['GET', 'POST'])
def product_num():
# r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
# keys = r.keys()
# print(keys[0][:4])
keys = r.keys()
product_key = []
for i in keys:
# print(i[:10])
if i[:11] == "product_num":
product_key.append(i)
data = []
for i in product_key:
tmp = {};
tmp["stock"] = str(i[11:])
tmp["fundPost"] = int(r.get(i))
data.append(tmp)
data = sorted(data, key=lambda x: x['fundPost'], reverse=True)
return jsonify(data[:10])
# 获取各个城市的销售量
@app.route('/city_num', methods=['GET', 'POST'])
def city_num():
keys = r.keys()
city_key = []
keys = r.keys()
for i in keys:
# print(i[:10])
if i[:4] == "city":
city_key.append(i)
data = {}
for i in city_key:
# print(i[4:])
data[i[4:]] = r.get(i)
# print(len(data))
return jsonify(data)
# 获取每类商品的销售量
@app.route('/category_num', methods=['GET', 'POST'])
def category_num():
keys = r.keys()
category_key = []
for i in keys:
# print(i[:10])
if i[:8] == "category":
category_key.append(i)
data = []
for i in category_key:
tmp = {};
tmp["name"] = i[8:];
tmp["value"] = r.get(i)
data.append(tmp)
return jsonify(data)
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000)
2.前端从后端请求数据并进行可视化
代码链接:
链接:https://pan.baidu.com/s/1zmGEXMjP47TYKblp9nMAdQ
提取码:y2n2
使用的可视化框架是echarts,请求后端数据使用的是Ajax,用法如下
$.ajax({
type: 'GET',
url: 'http://127.0.0.1:5000/total_price',
async: false,
success: function(data) {
let option = myChart.getOption();
option.series[3].startAngle = option.series[3].startAngle - 1;
option.series[6].data[0].value = Number(data);
myChart.setOption(option);
}
})
实现数据的实时刷新是通过javascript的setInterval函数来实现的,剩下的具体实现可以看看完整的代码。
整体实现下来确实需要花费一些时间,但是坚持做下来是一定能成功的,如果遇到一些小bug可以与我交流。
八、系统的特点
- 全流程都在集群上运行
- 代码整洁规范,可扩展性好
- 实时效果较好
更多推荐
所有评论(0)