spark streaming消费kafka报错 Kafka ConsumerRecord is not serializable.Use .map to extract fields before
2020-04-15 15:56:50,026ERROR --- [streaming-job-executor-0]org.apache.spark.streaming.kafka010.KafkaRDD(line:70):Kafka Consume...
·
2020-04-15 15:56:50,026 ERROR --- [ streaming-job-executor-0] org.apache.spark.streaming.kafka010.KafkaRDD (line: 70) : Kafka ConsumerRecord is not serializable. Use .map to extract fields before calling .persist or .window
2020-04-15 15:57:00,052 ERROR --- [ streaming-job-executor-0] org.apache.spark.streaming.kafka010.KafkaRDD (line: 70) : Kafka ConsumerRecord is not serializable. Use .map to extract fields before calling .persist or .window
2020-04-15 15:57:10,012 ERROR --- [ streaming-job-executor-0] org.apache.spark.streaming.kafka010.KafkaRDD (line: 70) : Kafka ConsumerRecord is not serializable. Use .map to extract fields before calling .persist or .window
2020-04-15 15:57:20,130 ERROR --- [ streaming-job-executor-0] org.apache.spark.streaming.kafka010.KafkaRDD (line: 70) : Kafka ConsumerRecord is not serializable. Use .map to extract fields before calling .persist or .window
2020-04-15 15:57:30,077 ERROR --- [ streaming-job-executor-0] org.apache.spark.streaming.kafka010.KafkaRDD (line: 70) : Kafka ConsumerRecord is not serializable. Use .map to extract fields before calling .persist or .window
报错信息如上,
2,通过代码发现rdd的产生的dataframe没有cache,dataframe加上cache就行了
原始代码如下:
val user_visit_history = rdd
.map{ record =>
val event = JSON.parseObject(record.value().toString)
val uid = event.getString("uid")
val cid = event.getString("cid")
val optType = event.getInteger("optType")
val optValue = event.getInteger("optValue")
user_visit_his(uid,cid,optType,optValue)
}
.filter{ record =>
if( record.optType == 4 && record.optValue>= 0){
true
}else{
false
}
}
.toDF("user_id","video_id","optType","optValue")
3,通过对 user_visit_history 加上一行代码,问题修复:
user_visit_history.cache()
更多推荐
已为社区贡献6条内容
所有评论(0)