实时处理Kafka发来的日志信息
packagecom.trigl.spark.streaming importjava.text.SimpleDateFormat importcom.trigl.spark.util.{DataUtil,LauncherMultipleTextOutputFormat}
·
| package com.trigl.spark.streaming | |
| import java.text.SimpleDateFormat | |
| import com.trigl.spark.util.{DataUtil,LauncherMultipleTextOutputFormat} | |
| import org.apache.commons.lang3.StringUtils | |
| import org.apache.log4j.{Level,Logger} | |
| import org.apache.spark.SparkConf | |
| import org.apache.spark.rdd.RDD | |
| import org.apache.spark.storage.StorageLevel | |
| import org.apache.spark.streaming.kafka.KafkaUtils | |
| import org.apache.spark.streaming.{Seconds,StreamingContext, Time} | |
| import scala.collection.mutable.ArrayBuffer | |
| /** | |
| * 桌面日志实时ETL | |
| * | |
| * @author 白鑫 | |
| */ | |
| object LauncherStreaming { | |
| val HDFS_DIR = "/changmi/launcher/click" | |
| def main(args: Array[String]) { | |
| Logger.getLogger("org.apache.spark").setLevel(Level.WARN) | |
| System.setProperty("spark.serializer","org.apache.spark.serializer.KryoSerializer") | |
| val sparkConf = new SparkConf().setAppName("LauncherStreaming") | |
| //每60秒一个批次 | |
| val ssc = new StreamingContext(sparkConf,Seconds(60)) | |
| // 从Kafka中读取数据 | |
| // 日志内容:val str = "183.160.102.250|1493169060.849|UEsDBBQACAgIAGBJmkoAAAAAAAAAAAAAAAABAAAAMGVVS3LrNhC8DVYvKmB+GJRXWaVygBwABCGLZYm0fi9xKofPUCJgO9mQNUVwPt09DRX2Ih41MgaSX*RrzOhOdfr77k48L7f6T1lOuzyPl2Uad+OUj*XiQmGikJMLlDD4BCmi5x+PKHhNrD5uEXBARukRBFbtkXof2n8QiUT9M7JWoockL2Xevd3*XHbvx*xhlccwesxArTKJtyQvX3vMx3w5leNS3hxmGQpQ7qcTA7XqyJE8bn1SDB6D6rdM13q7TfPr1VVKA8HYq6oPUfFHi5CVqU1vqZQ6FoAepUcIPqHvkWH4+R+yV+nfYhKvHUMktcZbpOwZ2klMiNh6CQQBCLbIsAHwlFqUBH0IjwkfBNvT+NyVfKqX7JBK2jOOfcaoIr074yJ56d0J8yffKYj10PsxglferMqtzqXOt935*D6drrXcL9XJyPshaackeU+4lbFmPcSwReTFMKG0SqDelns57Gox+c1jvjjjKw5YtacBNCS+sXc6XR0D7IsE7scic4RPMTJp+h8gr*loIv9wgylHsETXpkabu3GCEST0qSOKiQta496EjH0MiDYl*AeR4*SzOpQoPkjpFdj7FPseIUnc2Ftpj5T0a7PX22KAcg1xTywtR5KQoOdIUYOGliMYnKF9i2RvbtsQWUNITSsRwcTz1Eo55Pn1NO2O+T6Xg+1gkkHDkDZQ1yKJuPMvgWyxeoQm5KYbQkAb8RsSp5NjH3AkwpbvIQLfSUJk7v6hNoI2TJhAtOl7dQy1pW6oc1BO9L3WMkzHej67PZZxT2VsFZOBhk8h1NcuoF+P03v++O39sMzV5QJG1n7TbYBAZFi2tkjXPWxQQoIgW8tApjim7aRZobe2nuvRxLz743dnq2eoU3DNm4z8NrUtMZpWt*SmA8sO7ZsqRfOOR8Jrnt*ueVr1cbvn2RUdTVx1aEmVo4H2OHqePqbdz2msi8tjzTDsG*xiyU14rXurxLgJ2zxAUKXNkkjNUbrMo7fGnoAPeRrvZp*5Ug7D8pfjsVb1st0Xpry42n2DK9qkzcFQdRVQL8Fs5hb6Xq2oPDepLMvx8ThPLuwjQFXY0pulmAX7NoHRpNvinKfDsqD4TQnXvK9u3JsADBm3WZAttVVt6MoqwPhdRbebk1QFInjXSAAReSJ7y8uQl+3lcmXcC26ywbi+oE0XHwhSjyJ0c7IopdDARbWV4MQ9IqsNHTFApa82tl7b7*l9XVVz9aB5Q8aUEhPL0yYPH7v1jryWS62z0xKglBLaQbstgsLLv1BLBwjA8lu4twMAACkIAAA=" | |
| val kafkaStream = KafkaUtils.createStream( | |
| ssc, | |
| "hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181",// Kafka集群使用的zookeeper | |
| "launcher-streaming",// 该消费者使用的group.id | |
| Map[String,Int]("launcher_click"->0, "launcher_click"->1), // 日志在Kafka中的topic及其分区 | |
| StorageLevel.MEMORY_AND_DISK_SER).map(_._2)// 获取日志内容 | |
| kafkaStream.foreachRDD((rdd:RDD[String],time: Time) => { | |
| val result = rdd.map(log=> parseLog(log)) // 分析处理原始日志 | |
| .filter(t => StringUtils.isNotBlank(t._1) && StringUtils.isNotBlank(t._2)) | |
| // 存入hdfs | |
| result.saveAsHadoopFile(HDFS_DIR,classOf[String],classOf[String],classOf[LauncherMultipleTextOutputFormat[String,String]]) | |
| }) | |
| ssc.start() | |
| // 等待实时流 | |
| ssc.awaitTermination() | |
| } | |
| // 分析日志 | |
| def parseLog(log: String): (String,String) = { | |
| var data = log | |
| var key = "" | |
| try { | |
| val sb = new StringBuilder | |
| var idx = 0 | |
| // 接收服务器获取的IP | |
| val ip = data.substring(0, log.indexOf("|")) | |
| data = data.substring(log.indexOf("|")+ 1) | |
| var tss = data.substring(0, data.indexOf("|")) | |
| tss = tss.replaceAll("\\.","") | |
| // 服务器时间戳,13位 | |
| val serverTimestamp = tss.toLong | |
| val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") | |
| // 服务器时间 | |
| val serverTime = df.format(serverTimestamp) | |
| data = data.substring(data.indexOf("|")+ 1) | |
| val keyDf = new SimpleDateFormat("yyyy/MM/dd/") | |
| key = keyDf.format(serverTimestamp) | |
| // 主体数据:imei列表&品牌&机型|包名&文件crc&时间戳列表;包名&文件crc&时间戳列表;... | |
| data = data.replaceAll("\\*","\\/") | |
| data = DataUtil.unzip(data) | |
| // 解压后的数据格式 | |
| // 865066038753146-865066038753153&meizu&m5note|com.android.dialer&1c5441a9&1493109297305,1493110895807,1493112513536,1493112521588,1493112580015,1493112744680,1493138770296;cn.kuwo.player&d1d03a24&1493109460580;com.android.alarmclock&3a6bc24a&1493109495248,1493113574037,1493147103188;com.android.settings&e49b42d4&1493109801783,1493109835854,1493110147845,1493110230365,1493110320930,1493110344154,1493110350860,1493110796087,1493112348524,1493112850520,1493113933383,1493114212423,1493146022049,1493146963011; | |
| val dataArr = data.split("\\|") | |
| val phoneData = dataArr(0).split("&") | |
| // imei列表 | |
| val imeis = phoneData(0).split("-").mkString(";") | |
| // 品牌 | |
| val phoneBrand = phoneData(1) | |
| // 机型 | |
| val phoneModel = phoneData(2) | |
| val appData = dataArr(1).split(";") | |
| val appArr = ArrayBuffer[String]() | |
| for (app <- appData) { | |
| val appDetail = app.split("&") | |
| // 时间戳列表 | |
| val timestamps = appDetail(2).split(",") | |
| // 点击次数 | |
| val clickTimes = timestamps.length | |
| appArr += app+ "&"+ clickTimes | |
| } | |
| // imei有数据,app有数据 | |
| if (imeis.length > 0 && appArr.length > 0) { | |
| sb.append(imeis) | |
| sb.append("|") | |
| sb.append(if (StringUtils.isNotBlank(ip)) ipelse " ") | |
| sb.append("|") | |
| sb.append(serverTimestamp) | |
| sb.append("|") | |
| sb.append(if (StringUtils.isNotBlank(serverTime)) serverTimeelse " ") | |
| sb.append("|") | |
| sb.append(if (StringUtils.isNotBlank(phoneBrand)) phoneBrandelse " ") | |
| sb.append("|") | |
| sb.append(if (StringUtils.isNotBlank(phoneModel)) phoneModelelse " ") | |
| sb.append("|") | |
| sb.append(appArr.mkString(";")) | |
| } | |
| data = sb.toString() | |
| } catch { | |
| case e: Exception => | |
| e.printStackTrace() | |
| data = "" | |
| } | |
| (key, data) | |
| } | |
| } | |
更多推荐


所有评论(0)