flink kafka consumer (scala)自定义反序列化
flink source kafka consumer 自定义反序列化类 继承KafkaDeserializationSchema类
·
话不多说,直接上代码:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import java.util.Properties
object Consumer {
def main(args: Array[String]): Unit = {
//创建flink环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置时间语义为,事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// kafka config
val props = new Properties()
props.setProperty("bootstrap.servers", "hosts") //hosts
props.setProperty("group.id", "droupid") //消费者分组id
props.setProperty("retries", "10") //重试次数
props.setProperty("retries.backoff.ms", "100") //每次重试的间隔
//kafka consumer
val mykafka = new FlinkKafkaConsumer[User]("topicname", new MyKafkkaDes, props)
//flink source
val source = env.addSource(mykafka)
source.print() //打印
env.execute("jobname")
}
}
case class User(id: Long, name: String, age: Int)
class MyKafkkaDes extends KafkaDeserializationSchema[User] {
override def isEndOfStream(t: User): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): User = {
val str = new String(consumerRecord.value())
val lists = str.split(",")
val userid = lists(0).toLong
val name = lists(1)
val age = lists(2).toInt
User(userid, name, age)
}
override def getProducedType: TypeInformation[User] = TypeExtractor.getForClass(classOf[User])
}
更多推荐
已为社区贡献2条内容
所有评论(0)