一、情况介绍:

基于scala语言的Flink从kafka中消费数据,然后使用protobuf解析,当然默认是使用string解析的,此处需要指定接收的数据格式

package cetc.kakfa2flink

import java.io.IOException
import java.util.Properties

import com.hxy.protobuf.DSFusion
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.util.serialization.{AbstractDeserializationSchema, SimpleStringSchema}

/**
  * @description: ${description}
  * @author: fangchangtan
  * @create: 2018-11-14 15:22
  **/
object Kafka2FlinkTest {
  private val ZOOKEEPER_HOST = "192.168.xx.xx1:2181,192.168.xx.xx2:2181,192.168.xx.xx3:2181"
  private val KAFKA_BROKER = "192.168.xx.xx1:9092,192.168..xx.xx2:9092,192.168..xx.xx3:9092"
  private val TRANSACTION_GROUP  = "group_id_xx" //定义的消费组
  private val TOPIC = "TOPIC_MQ2KAFKA_DyFusion2" //定义的topic


  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.enableCheckpointing(5000)


    // configure Kafka consumer
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", KAFKA_BROKER)
    // only required for Kafka 0.8
    properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
    properties.setProperty("group.id", TRANSACTION_GROUP )

    //topicd的名字是new,schema默认使用SimpleStringSchema()即可
    val myConsumer: FlinkKafkaConsumer08[Array[Byte]] = new FlinkKafkaConsumer08[Array[Byte]](TOPIC,new ByteArrayDeserializationSchema[Array[Byte]](),properties)
    
    val transaction: DataStream[Array[Byte]] = env.addSource(myConsumer)
    transaction.map(arrbyte => {
      val dFusionMessage = DSFusion.DSFusionMessage.parseFrom(arrbyte)
      val baicBean: BasicBean = BasicBean(dFusionMessage.getFusionNum,dFusionMessage.getMmsi)
      println("*****[baicBean] fusionNum:"+baicBean.fusionNum+";mmsi"+baicBean.mmsi)
    })
    
//    transaction .setParallelism(4).writeAsText("E:\\123\\kafka2flink_test\\aa.txt")
    env.execute()
  }

  class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
    @throws[IOException]
    override def deserialize(message: Array[Byte]): Array[Byte] = message
  }


  //抽象类
  abstract class OriBean
  //子类1:case class basicBean
  case  class BasicBean(fusionNum :Int,mmsi:Int) extends  OriBean
  //子类2:case class DyStaticBean
  case class DyStaticBean(fusionNum :Int,mmsi:Int,utc:Int,
                          lon :Double,lat:Double,cSpeed:Double,cCourse:Int) extends OriBean

}

二、当kafka接收的不是string类型,而是protobuf类型时候

使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new SimpleStringSchema(), properties);

如果存入Kafka中的数据不是JSON,而是Protobuf类型的数据,需要用二进制的Schema进行接收,可以自己实现一个类,很简单,只有一行代码:

 class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
  @throws[IOException]
  override def deserialize(message: Array[Byte]): Array[Byte] = message
}

然后使用时,如下所示:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名称", new ByteArrayDeserializationSchema[Array[Byte]](), properties);
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐