kafka数据缓存到redis的全路径操作流程
第一步:配置redis客户端spark中配置redis客户端的代码参考: import org.apache.commons.pool2.impl.GenericObjectPoolConfigimport redis.clients.jedis.JedisPool object RedisClient extends Serializable { val redis...
第一步:配置redis客户端
spark中配置redis客户端的代码参考:
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
object RedisClient extends Serializable {
val redisHost = "192.168.16.100"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}
若出错可能缺少jar包,需要引入common-pool2-2.2.jar 和 jedis-2.6.jar
第二步:数据输入到kafka中,本列使用sparkstream
①Kafka生产数据
package Traffic
import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jettison.json.JSONObject
/**
* Created by Administrator on 2017/10/14.
* 功能:SparkStream作为kafka的生产者,将制定文件数据打到kafka中
*
*/
object KafkaEventProducer {
def main(args: Array[String]): Unit = {
//创建topic
val topic="car_event"
val brokers="192.168.17.108:9092"
val props=new Properties()
//把broker put进去
props.put("metadata.broker.list",brokers)
//把kafka编译器放进去
props.put("serializer.class","kafka.serializer.StringEncoder")
//配置kafka的config(配置)
val kafkaconfig=new ProducerConfig(props)
val producer=new Producer[String,String](kafkaconfig)
//配置spark的config
val conf=new SparkConf().setAppName("KafkaEventProducer").setMaster("local[2]")
val sc=new SparkContext(conf)
//从path中加载数据
// val filePath="data/shuju.txt"
val filePath="c://test//shuju.txt"
//加载数据并进行切分
val records=sc.textFile(filePath)
.filter(!_.startsWith(";"))
.map(_.split(",")).collect()
//对数据进行预处理形成Json形式
for(temp <-records)
{
val event=new JSONObject()
//因为要put很多数据,这样看起来很规范
event
.put("camer_id",temp(0)) //相机编号
.put("car_id",temp(2)) //车牌号
.put("event_time",temp(4)) //时间
.put("car_speed",temp(6)) //速度
.put("car_speed",temp(13)) //车道编号
//生产event信息 topic 是往哪个topic中生产数据 event.toString是生产的真正的内容
producer.send(new KeyedMessage[String,String](topic,event.toString))
println("Message Sent: "+event)
Thread.sleep(200) //休息200微秒
}
sc.stop()
}
}
需要commons-pool2-2.2.jar,jedis-2.6.1.jar和json-lib-2.3-jdk15.jar
②启动kafka 创建car_event 和 topic
start-kafka.sh
kafka-topics.sh --create --zookeeper hadoop:2181 --topic car_event --partitions 1 --replication-factor 1
Created topic "car_event".
第三小步:启动car_event的topic的消费者,此步仅仅是为了验证数据的
Kafka-console-consumer.sh --topic car_event --zookeeper hadoop:2181
第三步:idea中部署kafka打入redis的代码,如下所示:
package Traffic
import java.text.SimpleDateFormat
import java.util.Calendar
import kafka.serializer.{StringDecoder, StringEncoder}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import net.sf.json.JSONObject
/**
* Created by Administrator on 2017/10/14.
* 功能: 从kafka中获取数据写入到redis中
*
*/
object CarEventAnalysis {
def main(args: Array[String]): Unit = {
//配置SparkStrteaming
val conf=new SparkConf().setAppName("CarEventAnalysis").setMaster("local[2]")
val sc=new SparkContext(conf)
val ssc=new StreamingContext(sc,Seconds(5))
val dbindex=1 //指定是用哪个数据库进行连接
//从kafka中读取数据(用直连的方法)
val topics=Set("car_event")
// 只要和brokers相关的都要写全
val brokers="192.168.17.108:9092"
//配置kafka参数
val kafkaParams=Map[String,String](
"metadata.broker.list"->brokers,
"serializer.class"->"kafka.serializer.StringEncoder"
)
//创建一个流 这是一个模板代码 参数中的两个String代表的是kafka的键值对的数据,及key和value
val kafkaStream=KafkaUtils.createDirectStream[String,String,
StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//从kafka中将数据读出
val events=kafkaStream.flatMap(line=>{
//转换为object
val data=JSONObject.fromObject(line._2) // ._2是真正的数据
// println(data)
//必须用Some修饰data option有两个子类 none 代表无值 some代表有值
// 加上some表示一定有值,后面有x.getString和x.getInt,保证程序能知道有值
Some(data)
})
//从kafka中取出卡口编号和速度数据
val carspeed=events.map(x=>(x.getString("camer_id"),x.getInt("car_speed")))
//把数据变成(camer_id,(car_speed,1))
.mapValues((x:Int)=>(x,1.toInt))
//每隔10秒计算一次前20秒的速度(4个rdd) Tuple2表示两个参数
// (速度,数量) (速度,数量)
.reduceByKeyAndWindow((a:Tuple2[Int,Int], b:Tuple2[Int,Int]) =>
{(a._1 + b._1,a._2 + b._2)},Seconds(20),Seconds(10))
// carspeed 速度之和 数量之和
// carspeed.map{case(key,value)=>(key,value._1/value._2.toFloat)}
carspeed.foreachRDD(rdd=>{
rdd.foreachPartition(partitionofRecords=>{
//得到连接池的一个资源
val jedis=RedisClient.pool.getResource
// camer_id 卡口以及总的速度
partitionofRecords.foreach(pair=>{
val camer_id=pair._1 //卡口
val total_speed=pair._2._1 //总的速度
val count=pair._2._2 //总的数量
val now=Calendar.getInstance().getTime() //获取当前的时间
val minuteFormat=new SimpleDateFormat("HHmm") //获取分钟格式
val dayFormat=new SimpleDateFormat("yyyyMMdd") //获取天格式
val time = minuteFormat.format(now) //获取分钟
val day = dayFormat.format(now) //获取天
//开始往redis中插入数据
if(count!=0){
jedis.select(dbindex) //用选择的数据库
// set进去一个map
jedis.hset(day + "_" + camer_id, time ,total_speed + "_" + count)
// 从redis中取数据
val foreachdata=jedis.hget(day + "_" + camer_id, time)
println(foreachdata)
}
})
RedisClient.pool.returnResource(jedis)
})
})
println("----------计算开始---------------------------")
ssc.start()
ssc.awaitTermination()
}
}
第四步: idea中运行第三步部署好的kafka打入redis的代码
记得要引入ezmorph-1.0.6.jar, commons-collections-3.2.jar ,
commons-lang-2.3.jar,commons-pool2-2.2.jar, 共四个jar
第五步:运行第二步的往kafka中打数据的程序
第六步:登录到redis的客户端,验证数据是否存入redis中
redis-cli -p 12002
Select 数据库名称
更多推荐
所有评论(0)