第99讲:使用sparkStreaming实战对论坛网站动态行为的多维度分析下
/** 第99讲,消费者消费SparkStreamingDataManuallyProducerForKafka类中逻辑级别产生的数据,这里pv,uv,注册人数,跳出率的方式*/package com.dt.streaming;import java.util.HashMap;import java.util.HashSet;import java.ut
·
有兴趣想学习国内整套Spark+Spark Streaming+Machine learning最顶级课程的,可加我qq 471186150。共享视频,性价比超高!/* * 第99讲,消费者消费SparkStreamingDataManuallyProducerForKafka类中逻辑级别产生的数据,这里pv,uv,注册人数,跳出率的方式 */
package com.dt.streaming; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; public class OnlineBBSUserLogss { public static void main(String[] args) { /* * 第99讲,消费者消费SparkStreamingDataManuallyProducerForKafka类中逻辑级别产生的数据,这里pv,uv,注册人数,跳出率的方式 */ /*SparkConf conf = new SparkConf().setMaster("local[2]"). setAppName("WordCountOnline");*/ SparkConf conf = new SparkConf().setMaster("spark://Master:7077"). setAppName("OnlineBBSUserLogs"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); Map<String, String> kafkaParameters = new HashMap<String, String>(); kafkaParameters.put("metadata.broker.list","Master:9092,Worker1:9092,Worker2:9092"); Set topics = new HashSet<String>(); topics.add("UserLogs"); JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class,StringDecoder.class, kafkaParameters, topics); //在线PV计算 onlinePagePV(lines); //在线UV计算 onlineUV(lines); //在线计算注册人数 onlineRegistered(lines); //在线计算跳出率 onlineJumped(lines); //在线不同模块的PV onlineChannelPV(lines); /* * Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于 * 接受应用程序本身或者Executor中的消息; */ jsc.start(); jsc.awaitTermination(); jsc.close(); } private static void onlineChannelPV(JavaPairInputDStream<String, String> lines) { lines.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() { @Override public Tuple2<String, Long> call(Tuple2<String,String> t) throws Exception { String[] logs = t._2.split("\t"); String channelID =logs[4]; return new Tuple2<String,Long>(channelID, 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }).print(); } private static void onlineJumped(JavaPairInputDStream<String, String> lines) { lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("View".equals(action)){ return true; } else { return false; } } }).mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() { @Override public Tuple2<Long, Long> call(Tuple2<String,String> t) throws Exception { String[] logs = t._2.split("\t"); // Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1"); Long usrID = Long.valueOf("null".equals(logs[2]) ? "-1" : logs[2]); return new Tuple2<Long,Long>(usrID, 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }).filter(new Function<Tuple2<Long,Long>, Boolean>() { @Override public Boolean call(Tuple2<Long, Long> v1) throws Exception { if(1 == v1._2){ return true; } else { return false; } } }).count().print(); } private static void onlineRegistered(JavaPairInputDStream<String, String> lines) { lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("Register".equals(action)){ return true; } else { return false; } } }).count().print(); } /** * 因为要计算UV,所以需要获得同样的Page的不同的User,这个时候就需要去重操作,DStreamzhong有distinct吗?当然没有(截止到Spark 1.6.1的时候还没有该Api) * 此时我们就需要求助于DStream魔术般的方法tranform,在该方法内部直接对RDD进行distinct操作,这样就是实现了用户UserID的去重,进而就可以计算出UV了。 * @param lines */ private static void onlineUV(JavaPairInputDStream<String, String> lines) { /* * 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!原因是DStream是RDD产生的模板(或者说类),在Spark Streaming具体 * 发生计算前,其实质是把每个Batch的DStream的操作翻译成为对RDD的操作!!! *对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * */ JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("View".equals(action)){ return true; } else { return false; } } }); logsDStream.map(new Function<Tuple2<String,String>,String>(){ @Override public String call(Tuple2<String, String> v1) throws Exception { String[] logs =v1._2.split("\t"); Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1"); Long pageID = Long.valueOf(logs[3]); return pageID+"_"+usrID; } }).transform(new Function<JavaRDD<String>,JavaRDD<String>>(){ @Override public JavaRDD<String> call(JavaRDD<String> v1) throws Exception { // TODO Auto-generated method stub return v1.distinct(); } }).mapToPair(new PairFunction<String, Long, Long>() { @Override public Tuple2<Long, Long> call(String t) throws Exception { String[] logs = t.split("_"); Long pageId = Long.valueOf(logs[0]); return new Tuple2<Long,Long>(pageId, 1L); } }).reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }).print(); } private static void onlinePagePV(JavaPairInputDStream<String, String> lines) { /* * 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!原因是DStream是RDD产生的模板(或者说类),在Spark Streaming具体 * 发生计算前,其实质是把每个Batch的DStream的操作翻译成为对RDD的操作!!! *对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * */ JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() { @Override public Boolean call(Tuple2<String, String> v1) throws Exception { String[] logs = v1._2.split("\t"); String action = logs[5]; if("View".equals(action)){ return true; } else { return false; } } }); /* * 第四步:对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * */ JavaPairDStream<Long, Long> pairs = logsDStream.mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() { @Override public Tuple2<Long, Long> call(Tuple2<String, String> t) throws Exception { String[] logs = t._2.split("\t"); Long pageId = Long.valueOf(logs[3]); return new Tuple2<Long,Long>(pageId, 1L); } }); /* * 第四步:对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * */ JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } }); /* * 此处的print并不会直接出发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的,对于Spark Streaming * 而言具体是否触发真正的Job运行是基于设置的Duration时间间隔的 * * 诸位一定要注意的是Spark Streaming应用程序要想执行具体的Job,对Dtream就必须有output Stream操作, * output Stream有很多类型的函数触发,类print、saveAsTextFile、saveAsHadoopFiles等,最为重要的一个 * 方法是foraeachRDD,因为Spark Streaming处理的结果一般都会放在Redis、DB、DashBoard等上面,foreachRDD * 主要就是用用来完成这些功能的,而且可以随意的自定义具体数据到底放在哪里!!! * * 在企業生產環境下,一般會把計算的數據放入Redis或者DB中,采用J2EE等技术进行趋势的绘制等,这就像动态更新的股票交易一下来实现 * 在线的监控等; */ wordsCount.print(); } }
/** * 99讲:论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从 * Kafka中在线Pull到论坛或者网站的用户在线行为信息,进而进行多维度的在线分析 * 这里产生数据,就会发送给kafka,kafka那边启动消费者,就会接收到数据,这一步是用来测试生成数据和消费数据没有问题的,确定没问题后要关闭消费者, * 启动99讲的类作为消费者,就会按pv,uv等方式处理这些数据。因为一个topic只能有一个消费者,所以启动程序前必须关闭kakka方式启动的消费者*
*/
package com.dt.streaming; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 99讲:论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从 * Kafka中在线Pull到论坛或者网站的用户在线行为信息,进而进行多维度的在线分析 * 这里产生数据,就会发送给kafka,kafka那边启动消费者,就会接收到数据,这一步是用来测试生成数据和消费数据没有问题的,确定没问题后要关闭消费者, * 启动99讲的类作为消费者,就会按pv,uv等方式处理这些数据。因为一个topic只能有一个消费者,所以启动程序前必须关闭kakka方式启动的消费者 * 数据格式如下: * date:日期,格式为yyyy-MM-dd * timestamp:时间戳 * userID:用户ID * pageID:页面ID * chanelID:板块的ID * action:点击和注册 */ public class SparkStreamingDataManuallyProducerForKafkas extends Thread { //具体的论坛频道 static String[] channelNames = new String[]{ "Spark","Scala","Kafka","Flink","Hadoop","Storm", "Hive","Impala","HBase","ML" }; //用户的两种行为模式 static String[] actionNames = new String[]{"View", "Register"}; private String topic; //发送给Kafka的数据的类别 private Producer<Integer, String> producerForKafka; private static String dateToday; private static Random random; public SparkStreamingDataManuallyProducerForKafkas(String topic){ dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); this.topic = topic; random = new Random(); Properties conf = new Properties(); conf.put("metadata.broker.list","Master:9092,Worker1:9092,Worker2:9092"); conf.put("serializer.class", "kafka.serializer.StringEncoder"); producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf)) ; } @Override public void run() { int counter = 0; while(true){ counter++; String userLog = userlogs(); System.out.println("product:"+userLog); producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog)); if(0 == counter%500){ counter = 0; try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main( String[] args ) { new SparkStreamingDataManuallyProducerForKafka("UserLogs").start(); } private static String userlogs() { StringBuffer userLogBuffer = new StringBuffer(""); int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8}; long timestamp = new Date().getTime(); Long userID = 0L; long pageID = 0L; //随机生成的用户ID if(unregisteredUsers[random.nextInt(8)] == 1) { userID = null; } else { userID = (long) random.nextInt((int) 2000); } //随机生成的页面ID pageID = random.nextInt((int) 2000); //随机生成Channel String channel = channelNames[random.nextInt(10)]; //随机生成action行为 String action = actionNames[random.nextInt(2)]; userLogBuffer.append(dateToday) .append("\t") .append(timestamp) .append("\t") .append(userID) .append("\t") .append(pageID) .append("\t") .append(channel) .append("\t") .append(action); //这里不要加\n换行符,因为kafka自己会换行,再append一个换行符,消费者那边就会处理不出数据 return userLogBuffer.toString(); } }
更多推荐
已为社区贡献4条内容
所有评论(0)