使用kafka和storm简单模拟天猫双十一实时统计订单各个维度信息
程序说明:根据双十一当天的订单mq,快速计算当天的订单量、销售金额思路:1,支付系统发送mq到kafka集群中,编写storm程序消费kafka的数据并计算实时的订单数量、订单数量2,将计算的实时结果保存在redis中3,外部程序实时展示结果程序设计数据产生:编写kafka数据生产者,模拟订单系统发送mq数据输入:使用PaymentSpout消费kafka中的数据...
·
程序说明:
- 根据双十一当天的订单mq,快速计算当天的订单量、销售金额
- 思路:
- 1,支付系统发送mq到kafka集群中,编写storm程序消费kafka的数据并计算实时的订单数量、订单数量
- 2,将计算的实时结果保存在redis中
- 3,外部程序实时展示结果
程序设计
- 数据产生:编写kafka数据生产者,模拟订单系统发送mq
- 数据输入:使用PaymentSpout消费kafka中的数据
- 数据计算:使用FilterMessageBlot对数据进行过滤清洗
- 数据存储:使用Sava2RedisBolt对数据进行存储
- 数据展示:编写java app客户端,对数据进行展示,展示方式为打印在控制台。
maven依赖
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.5</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
模拟订单实体类:PaymentInfo.java
package storm.tmall.other;
import com.google.gson.Gson;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* Describe: 简单的支付信息,实际业务更复杂,比如父子订单,一个订单多个商品,一个支付订单多个订单等情况。
*/
public class PaymentInfo implements Serializable {
private String orderId;//订单编号
private Date createOrderTime;//订单创建时间
private String paymentId;//支付编号
private Date paymentTime;//支付时间
private String productId;//商品编号
private String productName;//商品名称
private long productPrice;//商品价格
private long promotionPrice;//促销价格
private String shopId;//商铺编号
private String shopName;//商铺名称
private String shopMobile;//商品电话
private long payPrice;//订单支付价格
private int num;//订单数量
public PaymentInfo() {
}
public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
this.orderId = orderId;
this.createOrderTime = createOrderTime;
this.paymentId = paymentId;
this.paymentTime = paymentTime;
this.productId = productId;
this.productName = productName;
this.productPrice = productPrice;
this.promotionPrice = promotionPrice;
this.shopId = shopId;
this.shopName = shopName;
this.shopMobile = shopMobile;
this.payPrice = payPrice;
this.num = num;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Date getCreateOrderTime() {
return createOrderTime;
}
public void setCreateOrderTime(Date createOrderTime) {
this.createOrderTime = createOrderTime;
}
public String getPaymentId() {
return paymentId;
}
public void setPaymentId(String paymentId) {
this.paymentId = paymentId;
}
public Date getPaymentTime() {
return paymentTime;
}
public void setPaymentTime(Date paymentTime) {
this.paymentTime = paymentTime;
}
public String getProductId() {
return productId;
}
public void setProductId(String productId) {
this.productId = productId;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public long getProductPrice() {
return productPrice;
}
public void setProductPrice(long productPrice) {
this.productPrice = productPrice;
}
public long getPromotionPrice() {
return promotionPrice;
}
public void setPromotionPrice(long promotionPrice) {
this.promotionPrice = promotionPrice;
}
public String getShopId() {
return shopId;
}
public void setShopId(String shopId) {
this.shopId = shopId;
}
public String getShopName() {
return shopName;
}
public void setShopName(String shopName) {
this.shopName = shopName;
}
public String getShopMobile() {
return shopMobile;
}
public void setShopMobile(String shopMobile) {
this.shopMobile = shopMobile;
}
public long getPayPrice() {
return payPrice;
}
public void setPayPrice(long payPrice) {
this.payPrice = payPrice;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
@Override
public String toString() {
return "PaymentInfo{" +
"orderId='" + orderId + '\'' +
", createOrderTime=" + createOrderTime +
", paymentId='" + paymentId + '\'' +
", paymentTime=" + paymentTime +
", productId='" + productId + '\'' +
", productName='" + productName + '\'' +
", productPrice=" + productPrice +
", promotionPrice=" + promotionPrice +
", shopId='" + shopId + '\'' +
", shopName='" + shopName + '\'' +
", shopMobile='" + shopMobile + '\'' +
", payPrice=" + payPrice +
", num=" + num +
'}';
}
public String random() {
this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
this.productPrice = new Random().nextInt(1000);
this.promotionPrice = new Random().nextInt(500);
this.payPrice = new Random().nextInt(480);
String date = "2015-11-11 12:22:12";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.createOrderTime = simpleDateFormat.parse(date);
} catch (ParseException e) {
e.printStackTrace();
}
return new Gson().toJson(this);
}
}
Kafa生产者生成随机数据:PaymentInfoProducer.java
package storm.tmall.other;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
/**
* Describe: 随机生产订单消息,此服务单独部署
*/
public class PaymentInfoProducer {
private final static String TOPIC = "paymentInfo";
public static void main(String[] args) {
// 设置配置信息
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list",
"hadoop1:9092,hadoop2:9092,hadoop3:9092");
props.put("request.required.acks", "1");
// 创建producer
Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props));
// 发送数据
int messageNo = 1;
while (true) {
producer.send(new KeyedMessage<Integer, String>(TOPIC, PaymentInfoProducer.genPaymentInfo()));
messageNo++;
}
}
private static String genPaymentInfo(){
return "";
}
}
kafka数据消费者代码,没有用kafkaSpout,而是自己实现消费者代码:PaymentInfoSpout.java
package storm.tmall;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
public class PaymentInfoSpout extends BaseRichSpout {
private static final String TOPIC = "paymentInfo";
private Properties props;
private ConsumerConnector consumer;
private SpoutOutputCollector collector;
//ArrayBlockingQueue是一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。
// 队列的头部 是在队列中存在时间最长的元素
private ArrayBlockingQueue<String> paymentInfoQueue = new ArrayBlockingQueue<String>(100);
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("paymentInfo"));
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
props = new Properties();
props.put("zookeeper.connect", "hadoop1:2181,hadoop1:2181,hadoop1:2181");
props.put("group.id", "testGroup");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
paymentInfoQueue.add(new String(it.next().message()));
}
}
public void nextTuple() {
try {
collector.emit(new Values(paymentInfoQueue.take()));
try {
Thread.sleep(1000);
} catch (Exception e) {
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
对订单数据进行简单的过滤清洗:FilterMessageBlot.java
package storm.tmall;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.google.gson.Gson;
import storm.tmall.other.PaymentInfo;
import java.util.Calendar;
import java.util.Date;
import java.util.Map;
public class FilterMessageBlot extends BaseBasicBolt {
public void execute(Tuple input, BasicOutputCollector collector) {
//读取订单数据
String paymentInfoStr = input.getStringByField("paymentInfo");
//将订单数据解析成JavaBean
PaymentInfo paymentInfo = new Gson().fromJson(paymentInfoStr, PaymentInfo.class);
// 过滤订单时间,如果订单时间在2015.11.11这天才进入下游开始计算
Date date = paymentInfo.getCreateOrderTime();
if (Calendar.getInstance().get(Calendar.DAY_OF_MONTH) != 31) {
return;
}
collector.emit(new Values(paymentInfoStr));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
}
}
生成维度统计保存到redis中:Save2RedisBlot.java
package storm.tmall;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import storm.tmall.other.PaymentInfo;
import java.util.Map;
public class Save2RedisBlot extends BaseBasicBolt {
private JedisPool pool;
@Override
public void prepare(Map stormConf, TopologyContext context) {
//change "maxActive" -> "maxTotal" and "maxWait" -> "maxWaitMillis" in all examples
JedisPoolConfig config = new JedisPoolConfig();
//控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
//控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
//如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
//在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setMaxTotal(1000 * 100);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(3000);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
/**
*如果你遇到 java.net.SocketTimeoutException: Read timed out exception的异常信息
*请尝试在构造JedisPool的时候设置自己的超时值. JedisPool默认的超时时间是2秒(单位毫秒)
*/
pool = new JedisPool(config, "127.0.0.1", 6379, 50);
super.prepare(stormConf, context);
}
public void execute(Tuple input, BasicOutputCollector collector) {
//读取订单数据
String paymentInfoStr = input.getStringByField("message");
//将订单数据解析成JavaBean
PaymentInfo paymentInfo = new Gson().fromJson(paymentInfoStr, PaymentInfo.class);
//计算业务订单量
Jedis jedis = pool.getResource();
if (paymentInfo != null) {
//计算订单的总数
jedis.incrBy("orderTotalNum", 1);
//计算总的销售额
jedis.incrBy("orderTotalPrice", paymentInfo.getProductPrice());
//计算折扣后的销售额
jedis.incrBy("orderPromotionPrice", paymentInfo.getPromotionPrice());
//计算实际交易额
jedis.incrBy("orderTotalRealPay", paymentInfo.getPayPrice());
jedis.incrBy("userNum", 1);
}
System.out.println("订单总数:" + jedis.get("orderTotalNum") +
" 销售额" + jedis.get("orderTotalPrice") +
" 交易额" + jedis.get("orderPromotionPrice") +
" 实际支付:" + jedis.get("orderTotalRealPay") +
" 下单用户:" + jedis.get("userNum"));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
storm启动类:Double11ToplogyMain.java
package storm.tmall;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class Double11ToplogyMain {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("readPaymentInfo", new PaymentInfoSpout(), 1);
builder.setBolt("processIndex", new FilterMessageBlot(), 2).shuffleGrouping("readPaymentInfo");
builder.setBolt("saveResult2Redis", new Save2RedisBlot(), 2).shuffleGrouping("processIndex");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
//集群运行
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
//本地模拟集群运行
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("double11", conf, builder.createTopology());
// Utils.sleep(10000);
// cluster.shutdown();
}
}
}
更多推荐
已为社区贡献12条内容
所有评论(0)