Flink从kafka中消费数据--解析
一、情况介绍:基于scala语言的Flink从kafka中消费数据,然后使用protobuf解析,当然默认是使用string解析的,此处需要指定接收的数据格式package cetc.kakfa2flinkimport java.io.IOExceptionimport java.util.Propertiesimport com.hxy.protobuf.DSFusionimpo...
·
一、情况介绍:
基于scala语言的Flink从kafka中消费数据,然后使用protobuf解析,当然默认是使用string解析的,此处需要指定接收的数据格式
package cetc.kakfa2flink
import java.io.IOException
import java.util.Properties
import com.hxy.protobuf.DSFusion
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.util.serialization.{AbstractDeserializationSchema, SimpleStringSchema}
/**
* @description: ${description}
* @author: fangchangtan
* @create: 2018-11-14 15:22
**/
object Kafka2FlinkTest {
private val ZOOKEEPER_HOST = "192.168.xx.xx1:2181,192.168.xx.xx2:2181,192.168.xx.xx3:2181"
private val KAFKA_BROKER = "192.168.xx.xx1:9092,192.168..xx.xx2:9092,192.168..xx.xx3:9092"
private val TRANSACTION_GROUP = "group_id_xx" //定义的消费组
private val TOPIC = "TOPIC_MQ2KAFKA_DyFusion2" //定义的topic
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.enableCheckpointing(5000)
// configure Kafka consumer
val properties = new Properties()
properties.setProperty("bootstrap.servers", KAFKA_BROKER)
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
properties.setProperty("group.id", TRANSACTION_GROUP )
//topicd的名字是new,schema默认使用SimpleStringSchema()即可
val myConsumer: FlinkKafkaConsumer08[Array[Byte]] = new FlinkKafkaConsumer08[Array[Byte]](TOPIC,new ByteArrayDeserializationSchema[Array[Byte]](),properties)
val transaction: DataStream[Array[Byte]] = env.addSource(myConsumer)
transaction.map(arrbyte => {
val dFusionMessage = DSFusion.DSFusionMessage.parseFrom(arrbyte)
val baicBean: BasicBean = BasicBean(dFusionMessage.getFusionNum,dFusionMessage.getMmsi)
println("*****[baicBean] fusionNum:"+baicBean.fusionNum+";mmsi"+baicBean.mmsi)
})
// transaction .setParallelism(4).writeAsText("E:\\123\\kafka2flink_test\\aa.txt")
env.execute()
}
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
@throws[IOException]
override def deserialize(message: Array[Byte]): Array[Byte] = message
}
//抽象类
abstract class OriBean
//子类1:case class basicBean
case class BasicBean(fusionNum :Int,mmsi:Int) extends OriBean
//子类2:case class DyStaticBean
case class DyStaticBean(fusionNum :Int,mmsi:Int,utc:Int,
lon :Double,lat:Double,cSpeed:Double,cCourse:Int) extends OriBean
}
二、当kafka接收的不是string类型,而是protobuf类型时候
使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new SimpleStringSchema(), properties);
如果存入Kafka中的数据不是JSON,而是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类,很简单,只有一行代码:
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
@throws[IOException]
override def deserialize(message: Array[Byte]): Array[Byte] = message
}
然后使用时,如下所示:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new ByteArrayDeserializationSchema[Array[Byte]](), properties);
更多推荐
已为社区贡献8条内容
所有评论(0)