storm实时看板案例

1.项目需求

根据订单mq,快速计算购物网站当天的订单量、销售金额。

2.项目架构模型

支付系统+kafka+storm/Jstorm集群+redis集群

  • 支付系统发送mq到kafka集群中,编写storm程序消费kafka的数据并计算实时的订单数量、订单数量
  • 将计算的实时结果保存在redis中
  • 外部程序访问redis的数据实时展示结果

这里写图片描述

3.订单数据模型

订单编号、订单时间、支付编号、支付时间、商品编号、商家名称、商品价格、优惠价格、支付金额

4.指标需求

  • 平台运维角度统计指标
    平台总销售额度
    redisRowKey设计 itcast:order:total:price:date
    平台今天下单人数
    redisRowKey设计 itcast:order:total:user:date
    平台商品销售数量
    redisRowKey设计 itcast:order:num:user:date

  • 商品销售角度统计指标
    每个商品的总销售额
    Redis的rowKey设计itcast:order:productId:price:date
    每个商品的购买人数
    Redis的rowKey设计itcast:order:productId:user:date
    每个商品的销售数量
    Redis的rowKey设计itcast:order:productId:num:date

  • 店铺销售角度统计指标
    每个店铺的总销售额
    Redis的rowKey设计itcast:order:shopId:price:date
    每个店铺的购买人数
    Redis的rowKey设计itcast:order:shopId:user:date
    每个店铺的销售数量
    Redis的rowKey设计itcast:order:shopId:num:date

5.代码实现

  • kafka中的topic创建
bin/kafka-topics.sh  --create --replication-factor 2 --topic itcast_order --zookeeper node-1:2181,node-2:2181,node-3:2181 --partitions 5
  • 创建maven工程添加依赖
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.1</version>
            <!--本地模式注释-->
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.itcast.storm.realboard.kafkaStorm.RealBoardTopology</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
  • 订单实体类
package cn.itcast.storm.realboard.domain;

import com.alibaba.fastjson.JSONObject;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

public class PaymentInfo implements Serializable{

    private static final long serialVersionUID = -7958315778386204397L;
    private String orderId;//订单编号
    private Date createOrderTime;//订单创建时间
    private String paymentId;//支付编号
    private Date paymentTime;//支付时间
    private String productId;//商品编号
    private String productName;//商品名称
    private long productPrice;//商品价格
    private long promotionPrice;//促销价格
    private String shopId;//商铺编号
    private String shopName;//商铺名称
    private String shopMobile;//店铺电话
    private long payPrice;//订单支付价格
    private int num;//订单数量

    /**
     * <Province>19</Province>
     * <City>1657</City>
     * <County>4076</County>
     */
    private String province; //省
    private String city; //市
    private String county;//县

    //102,144,114
    private String catagorys;  //分类

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getCounty() {
        return county;
    }

    public void setCounty(String county) {
        this.county = county;
    }

    public String getCatagorys() {
        return catagorys;
    }

    public void setCatagorys(String catagorys) {
        this.catagorys = catagorys;
    }

    public PaymentInfo() {
    }

    public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '\'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '\'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '\'' +
                ", productName='" + productName + '\'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '\'' +
                ", shopName='" + shopName + '\'' +
                ", shopMobile='" + shopMobile + '\'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() {
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);
        this.shopId = new Random().nextInt(200000)+"";

        this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
        this.province = new Random().nextInt(23)+"";
        this.city = new Random().nextInt(265)+"";
        this.county = new Random().nextInt(1489)+"";

        String date = "2017-11-11 12:22:12";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (Exception e) {
            e.printStackTrace();
        }
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString(this);
        return jsonString;
        //  return new Gson().toJson(this);
    }

}
  • 模拟消息生产者代码实现
package cn.itcast.storm.realboard.producer;

import cn.itcast.storm.realboard.domain.PaymentInfo;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PayMentInfoProducer {

    public static void main(String[] args){
        //1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node-1:9092,node-2:9092,node-3:9092");
        /**
         * 当生产者将ack设置为“全部”(或“-1”)时,min.insync.replicas指定必须确认写入被认为成功的最小副本数。
         * 如果这个最小值不能满足,那么生产者将会引发一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
         * 当一起使用时,min.insync.replicas和acks允许您执行更大的耐久性保证。
         * 一个典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用“全部”选项来产生。
         * 这将确保生产者如果大多数副本没有收到写入引发异常。
         */
        props.put("acks", "all");
        /**
         * 设置一个大于零的值,将导致客户端重新发送任何失败的记录
         */
        props.put("retries", 0);
        /**
         * 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。
         * 这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。
         */
        props.put("batch.size", 16384);
        /**
         *在某些情况下,即使在中等负载下,客户端也可能希望减少请求的数量。
         * 这个设置通过添加少量的人工延迟来实现这一点,即不是立即发出一个记录,
         * 而是等待达到给定延迟的记录,以允许发送其他记录,以便发送可以一起批量发送
         */
        props.put("linger.ms", 1);
        /**
         * 生产者可用于缓冲等待发送到服务器的记录的总字节数。
         * 如果记录的发送速度比发送给服务器的速度快,那么生产者将会阻塞,max.block.ms之后会抛出异常。
         * 这个设置应该大致对应于生产者将使用的总内存,但不是硬性限制,
         * 因为不是所有生产者使用的内存都用于缓冲。
         * 一些额外的内存将被用于压缩(如果压缩被启用)以及用于维护正在进行的请求。
         */
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        while (true){
            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("itcast_order",new PaymentInfo().random()));
            System.out.println("发送数据");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}
  • 消息消费者代码实现
    • Kafka与storm整合
package cn.itcast.storm.realboard.kafkaStorm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.topology.TopologyBuilder;

public class RealBoardTopology {


    public static void main(String[] args) throws Exception {
        //从kafka当中获取数据
        TopologyBuilder builder = new TopologyBuilder();
        KafkaSpoutConfig.Builder<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder("node-1:9092,node-2:9092,node-3:9092", "itcast_order");
        kafkaSpoutConfig.setGroupId("itcast_order_group");
        KafkaSpoutConfig<String, String> build = kafkaSpoutConfig.build();
        KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
        builder.setSpout("kafkaSpout",kafkaSpout,5);
        builder.setBolt("processBolt",new ProcessOrderBolt(),8).localOrShuffleGrouping("kafkaSpout");
        Config config = new Config();
        if(args.length > 0){
            config.setNumWorkers(2);
            config.setDebug(false);
            StormSubmitter.submitTopology(args[0],config,builder.createTopology());
        }else{
            LocalCluster cluster = new LocalCluster();
            config.setDebug(true);
            cluster.submitTopology("realBoard",config,builder.createTopology());
        }
    }

}
  • 消费kafka消息bolt
package cn.itcast.storm.realboard.kafkaStorm;

import cn.itcast.storm.realboard.domain.PaymentInfo;
import cn.itcast.storm.realboard.producer.PayMentInfoProducer;
import cn.itcast.storm.realboard.util.JedisUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import redis.clients.jedis.Jedis;

public class ProcessOrderBolt extends BaseBasicBolt{


    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        Object value = input.getValue(4);
        //通过fastjson,将我们的json字符串转为对象
        JSONObject obj = new JSONObject();
        PaymentInfo parseObject = obj.parseObject(value.toString(), PaymentInfo.class);
        //拿到了paymentInfo就可以从paymentinf里面获取我们需要计算的一些指标

        //计算店铺的维度
        /**
         * 每个店铺的总销售额
         Redis的rowKey设计itcast:order:shopId:price:date
         每个店铺的购买人数
         Redis的rowKey设计itcast:order:shopId:user:date
         每个店铺的销售数量
         Redis的rowKey设计itcast:order:shopId:num:date
         */
        String shopId = parseObject.getShopId();
        Jedis conn = JedisUtil.getConn();
        //获取我们的销售总额,存入redis当中去
        conn.incrBy("itcast:order:"+shopId+":price:2018-02-26",parseObject.getPayPrice());
        conn.incrBy("itcast:order:"+shopId+":user:2018-02-26",1);
        conn.incrBy("itcast:order:"+shopId+":num:2018-02-26",1);

        //平台维度的数据统计
        /**
         * 平台总销售额度
         redisRowKey设计  itcast:order:total:price:date
         平台今天下单人数
         redisRowKey设计  itcast:order:total:user:date
         平台商品销售数量
         redisRowKey设计  itcast:order:total:num:date
         */
        conn.incrBy("itcast:order:total:price:date",parseObject.getPayPrice());
        conn.incrBy("itcast:order:total:user:date",1);
        conn.incrBy("itcast:order:total:num:date",1);


        /**
         * 商品的维度
         * 每个商品的总销售额
         Redis的rowKey设计itcast:order:productId:price:date
         每个商品的购买人数
         Redis的rowKey设计itcast:order:productId:user:date
         每个商品的销售数量
         Redis的rowKey设计itcast:order:productId:num:date
         */
        String productId = parseObject.getProductId();
        conn.incrBy("itcast:order:"+productId+":price:date",parseObject.getPayPrice());
        conn.incrBy("itcast:order:"+productId+":user:date",1);
        conn.incrBy("itcast:order:"+productId+":num:date",1);

        conn.close();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
  • Redis使用工具类
package cn.itcast.storm.realboard.util;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class JedisUtil {


    private static JedisPool pool = null;

    /**
     * 获取jedis连接池
     * */
    public static JedisPool getPool(){
        if(pool == null){
            //创建jedis连接池配置
            JedisPoolConfig config = new JedisPoolConfig();
            //最大连接数
            config.setMaxTotal(20);
            //最大空闲连接
            config.setMaxIdle(5);
            //创建redis连接池
            // 使用jedisPool的时候,timeout一定要给出来,如果不给,redis很大概率会报错,超时
            pool = new JedisPool(config,"192.168.52.140",6379,3000);
        }
        return pool;
    }

    /**
     * 获取jedis连接
     * */
    public static Jedis getConn(){
        return getPool().getResource();
    }

    /**
     * 测试连接
     * @param args
     */
    public static void main(String[] args) {
        Jedis jedis = getPool().getResource();
        jedis.incrBy("mine", 5);
        jedis.close();
    }


}

6.本地测试

  • 启动消息生产者PayMentInfoProducer
    这里写图片描述
    这里写图片描述
  • 启动storm RealBoardTopology
    这里写图片描述

这里数据的可视化展示略过,感兴趣的朋友可以自行实现。

7.集群测试

  • 打成jar包上传到集群

  • 运行jar包

apache-storm-1.1.1/bin/storm jar realBoard.jar cn.itcast.storm.realboard.kafkaStorm.RealBoardTopology realboard

这里写图片描述

喜欢就点赞评论+关注吧

这里写图片描述

感谢阅读,希望能帮助到大家,谢谢大家的支持!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐