12.5 地图实时路况数据分析
运行之前:启动zookeeper,kafka,redis注意jar兼容:代码:Traffic.zip地图实时路况:可以使用Java代码来当kafka生产者def main(args: Array[String]): Unit = { val topic = "car_events" val brokers = "node2:9092,node3:9092,
运行之前:启动zookeeper,kafka,redis
注意jar兼容:代码:Traffic.zip
地图实时路况:
可以使用Java代码来当kafka生产者
def main(args: Array[String]): Unit = {
val topic = "car_events"
val brokers = "node2:9092,node3:9092,node4:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
val sparkConf = new SparkConf().setAppName("Beijing traffic").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
// val filePath = "D:/traffic/trafficlf_all_column_all.txt"
val filePath = "data/2014082013_all_column_test.txt"
val records = sc.textFile(filePath)
.filter(!_.startsWith(";"))
.map(_.split(",")).collect()
for(record <- records){
// prepare event data
val event = new JSONObject()
event
.put("camera_id", record(0))
.put("car_id", record(2))
.put("event_time", record(4))
.put("speed", record(6))
.put("road_id", record(13))
// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
// producer.send(new KeyedMessage[String, String](topic, "yuntian",event.toString))
println("Message sent: " + event)
Thread.sleep(200)
}
}
代码当消费者,并存储到redis数据库:
def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
if (args.length > 0) {
masterUrl = args(0)
}
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))
val topics = Set("car_events")
val brokers = "node2:9092,node3:9092,node4:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")
val dbIndex = 1
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
println(data)
Some(data)
})
// (Some(data)) --> (camera_id, speed)
val carSpeed = events.map(x => (x.getString("camera_id"),x.getInt("speed")))
// val carSpeed = events.map(x => x.getString("road_id") -> (x.getInt("speed"),1))
// .reduceByKey((a, b) => {(a._1 + b._1, a._2 + b._2)})
.mapValues((x:Int)=>(x,1.toInt))
// .reduceByKeyAndWindow((a, b) => {(a._1 + b._1, a._2 + b._2)},Seconds(10))
// (camera_id,(speed,1))
.reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) => {(a._1 + b._1, a._2 + b._2)},Seconds(20),Seconds(10))
// carSpeed.map{ case (key, value) => (key, value._1 / value._2.toFloat) }
carSpeed.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
val jedis = RedisClient.pool.getResource
// (camera_id,(speed,1))
partitionOfRecords.foreach(pair => {
val camera_id = pair._1
val total = pair._2._1
val count = pair._2._2
val now = Calendar.getInstance().getTime()
// create the date/time formatters
val minuteFormat = new SimpleDateFormat("HHmm")
val dayFormat = new SimpleDateFormat("yyyyMMdd")
val time = minuteFormat.format(now)
val day = dayFormat.format(now)
if(count!=0){
// val averageSpeed = total / count
jedis.select(dbIndex)//连接redis数据库,dbIndex设置连接几号数据库
jedis.hset(day + "_" + camera_id, time , total + "_" + count)
// fetch data from redis
// val temp = jedis.hget(day + "_" + camera_id, time)
// println(temp)
}
})
//RedisClient里面封装了redis数据库信息
RedisClient.pool.returnResource(jedis)
})
})
println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
ssc.start()
ssc.awaitTermination()
}
RedisClient.scala:
object RedisClient extends Serializable {
val redisHost = "node2"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}
更多推荐
所有评论(0)