运行之前:启动zookeeper,kafka,redis

注意jar兼容:代码:Traffic.zip

地图实时路况:

可以使用Java代码来当kafka生产者

def main(args: Array[String]): Unit = {

    val topic = "car_events"

    val brokers = "node2:9092,node3:9092,node4:9092"

    val props = new Properties()

    props.put("metadata.broker.list", brokers)

    props.put("serializer.class", "kafka.serializer.StringEncoder")

 

    val kafkaConfig = new ProducerConfig(props)

    val producer = new Producer[String, String](kafkaConfig)

 

    val sparkConf = new SparkConf().setAppName("Beijing traffic").setMaster("local[4]")

    val sc = new SparkContext(sparkConf)

 

// val filePath = "D:/traffic/trafficlf_all_column_all.txt"

    val filePath = "data/2014082013_all_column_test.txt"

 

    val records = sc.textFile(filePath)

            .filter(!_.startsWith(";"))

                .map(_.split(",")).collect()

 

    for(record <- records){

      // prepare event data

      val event = new JSONObject()

      event

              .put("camera_id", record(0))

              .put("car_id", record(2))

              .put("event_time", record(4))

              .put("speed", record(6))

              .put("road_id", record(13))

 

      // produce event message

      producer.send(new KeyedMessage[String, String](topic, event.toString))

// producer.send(new KeyedMessage[String, String](topic, "yuntian",event.toString))

      println("Message sent: " + event)

 

      Thread.sleep(200)

    }

  }


代码当消费者,并存储到redis数据库:

def main(args: Array[String]): Unit = {

    var masterUrl = "local[1]"

    if (args.length > 0) {

      masterUrl = args(0)

    }

    val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")

    val ssc = new StreamingContext(conf, Seconds(5))

    val topics = Set("car_events")

    val brokers = "node2:9092,node3:9092,node4:9092"

      

    val kafkaParams = Map[String, String](

      "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

    val dbIndex = 1

    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val events = kafkaStream.flatMap(line => {

      val data = JSONObject.fromObject(line._2)

      println(data)

      Some(data)

    })

    // (Some(data)) --> (camera_id, speed)

    val carSpeed = events.map(x => (x.getString("camera_id"),x.getInt("speed")))

// val carSpeed = events.map(x => x.getString("road_id") -> (x.getInt("speed"),1))

// .reduceByKey((a, b) => {(a._1 + b._1, a._2 + b._2)})

            .mapValues((x:Int)=>(x,1.toInt))

// .reduceByKeyAndWindow((a, b) => {(a._1 + b._1, a._2 + b._2)},Seconds(10))

            

            // (camera_id,(speed,1))

            .reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) => {(a._1 + b._1, a._2 + b._2)},Seconds(20),Seconds(10))

// carSpeed.map{ case (key, value) => (key, value._1 / value._2.toFloat) }

    carSpeed.foreachRDD(rdd => {

      rdd.foreachPartition(partitionOfRecords => {

        val jedis = RedisClient.pool.getResource

        // (camera_id,(speed,1))

        partitionOfRecords.foreach(pair => {

          val camera_id = pair._1

          val total = pair._2._1

          val count = pair._2._2

          val now = Calendar.getInstance().getTime()

          // create the date/time formatters

          val minuteFormat = new SimpleDateFormat("HHmm")

          val dayFormat = new SimpleDateFormat("yyyyMMdd")

          val time = minuteFormat.format(now)

          val day = dayFormat.format(now)

          if(count!=0){

// val averageSpeed = total / count

            jedis.select(dbIndex)//连接redis数据库,dbIndex设置连接几号数据库

            jedis.hset(day + "_" + camera_id, time , total + "_" + count)

            // fetch data from redis

// val temp = jedis.hget(day + "_" + camera_id, time)

// println(temp)

          }

        })

        //RedisClient里面封装了redis数据库信息

        RedisClient.pool.returnResource(jedis)

      })

 

    })

 

    println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

 

    ssc.start()

    ssc.awaitTermination()

 

  }


RedisClient.scala:

 

object RedisClient extends Serializable {

  val redisHost = "node2"

  val redisPort = 6379

  val redisTimeout = 30000

  lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)

 

  lazy val hook = new Thread {

    override def run = {

      println("Execute hook thread: " + this)

      pool.destroy()

    }

  }

  sys.addShutdownHook(hook.run)

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐