Springboot自定义Kafka序列化与Flink自定义kafka反序列化
Flink Kafka自定义序列化与反序列化
·
Springboot自定义kafka序列化与Flink自定义kafka反序列化
在实时计算的场景下,我们大多数会采用Storm+Kafka、Spark+Kafka和Flink+Kafka的组合形式来完成。而其中Flink又是目前比较流行的大数据计算框架,相比其它大数据计算框架拥有更多的优势。
Flink+Kafka的流式计算组合中,Kafka的默认序列化与反序列化都是采用的String,也就是说,Kafka的生产者与与消费者都是通过String进行传递的,当需要传递对象的时候,我们当然也可以采用将对象转换为json的形式进行传递,而本篇文章将会讲述通过自定义Kafka序列化的方式来传递对象,省去了对象-》json-》对象的过程。
Kafka生产者自定义序列化
Kafka生产者的自定义序列化是基于Spring Boot项目进行配置的。
Maven中关于Kafka自定义序列化的配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Kafka自定义序列化类
这里使用的是Kotlin代码,与Java代码也都有些相似,相信大家应该都能看懂。
import com.junwei.pojo.TravelerData
import org.apache.kafka.common.serialization.Serializer
import java.io.ByteArrayOutputStream
import java.io.ObjectOutputStream
class TravelerDataSerializer : Serializer<TravelerData> {
override fun serialize(p0: String?, data: TravelerData?): ByteArray? {
if (null == data) {
return null
} else {
val output = ByteArrayOutputStream()
val outputStream = ObjectOutputStream(output)
outputStream.writeObject(data)
return output.toByteArray()
}
}
override fun close() {
}
override fun configure(p0: MutableMap<String, *>?, p1: Boolean) {
}
}
application.yml配置
Sptring:
kafka:
topic: traveler-data
bootstrap-servers: bigdata01:9092,bigdata02:9092,bigdata03:9092
producer:
retries: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 这里配置的是自定义序列化的全类名
value-serializer: com.junwei.browse.util.TravelerDataSerializer
Kafka生产者
import com.junwei.pojo.TravelerData
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
@Configuration
class KafkaUtil {
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, TravelerData>
@Value("\${spring.kafka.topic:0}")
private lateinit var topic: String
fun sendMsg(message: TravelerData) {
kafkaTemplate.send(topic, message)
}
}
向生产者发送数据
@ApiOperation(value = "根据id查询景点信息")
@GetMapping("{id}")
fun searchById(@PathVariable id: String, request: HttpServletRequest): Result<*> {
val userId = HeaderUtil.getUserIdFromToken(request)
val travelInfo = travelInfoService.searchById(id, userId)
if (travelInfo != null) {
// 向Kafka发送对象数据
kafkaUtil.run {
sendMsg(TravelerData(userId, id, travelInfo.title, travelInfo.city, travelInfo.topic))
}
}
return if (travelInfo != null) Result.success(travelInfo) else Result.fail()
}
Kafka消费者自定义反序列化
Kafka消费者的自定义反序列化是基于Flink项目进行配置的。
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.junwei</groupId>
<artifactId>flink-kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<kafka.version>1.1.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>com.junwei</groupId>
<artifactId>common-pojo</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>default-compile</id>
<phase>none</phase>
</execution>
<execution>
<id>default-testCompile</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.junwei.manager.TravelerDataKafkaConsumer</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Kafka自定义反序列化器配置
import java.io.{ByteArrayInputStream, ObjectInputStream}
import java.util
import com.junwei.pojo.TravelerData
import org.apache.kafka.common.serialization.Deserializer
class TravelerDataDeserializer extends Deserializer[TravelerData] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def deserialize(topic: String, data: Array[Byte]): TravelerData = {
val byteArray = new ByteArrayInputStream(data)
val objectInput = new ObjectInputStream(byteArray)
objectInput.readObject().asInstanceOf[TravelerData]
}
override def close(): Unit = {}
}
import java.io.{ByteArrayInputStream, ObjectInputStream}
import com.junwei.pojo.TravelerData
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
class TravelerDataSchema extends DeserializationSchema[TravelerData] {
override def deserialize(message: Array[Byte]): TravelerData = {
val byteArray = new ByteArrayInputStream(message)
val objectInput = new ObjectInputStream(byteArray)
objectInput.readObject().asInstanceOf[TravelerData]
}
override def isEndOfStream(nextElement: TravelerData): Boolean = false
override def getProducedType: TypeInformation[TravelerData] = {
TypeInformation.of(new TypeHint[TravelerData] {})
}
}
Kafka消费者配置
import java.util.Properties
import com.junwei.constant.Constant
import com.junwei.pojo.TravelerData
import com.junwei.serialization.{TravelerDataDeserializer, TravelerDataSchema}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaConfig {
def getKafkaTravelerConsumer(groupId: String, topic: String): FlinkKafkaConsumer[TravelerData] = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", Constant.KAFKA_IP_PORT)
properties.setProperty("zookeeper.connect", Constant.ZK_IP_PORT)
properties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
// 这里配置自定义反序列化类
properties.setProperty("value.deserializer", classOf[TravelerDataDeserializer].getName)
// offset自动重置
properties.setProperty("auto.offset.reset", "latest")
properties.setProperty("group.id", groupId)
// 这里配置自定义的Schema
new FlinkKafkaConsumer[TravelerData](topic, new TravelerDataSchema(), properties)
}
}
Flink Job
这个Flink Job中主要做了对消费数据的分类汇总处理。
import com.junwei.config.KafkaConfig
import com.junwei.entity.{CityData, ResultData, TopicData, TravelsData}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object TravelerDataKafkaConsumer {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(KafkaConfig.getKafkaTravelerConsumer("0", "traveler-data"))
// 经过Kafka自定义反序列化的处理,消费者接收到的直接就是一个对象数据
.map(it => (it.getUserId, it.getTravelId, it.getTravelName, it.getTravelCity.split("·")(0).substring(2), it.getTravelTopic))
.keyBy(_._1).process(new KeyedProcessFunction[String, (String, String, String, String, String), (Boolean, String, ResultData)] {
var resultData: ValueState[ResultData] = _
override def open(parameters: Configuration): Unit = {
resultData = getRuntimeContext.getState(new ValueStateDescriptor[ResultData]("resultData", classOf[ResultData]))
}
override def processElement(value: (String, String, String, String, String),
ctx: KeyedProcessFunction[String, (String, String, String, String, String),
(Boolean, String, ResultData)]#Context, out: Collector[(Boolean, String, ResultData)]): Unit = {
var data = resultData.value()
val name = List[TravelsData](TravelsData(value._2, value._3, 1))
val topic = value._5.split(",").map(it => TopicData(it, 1)).toList
val city = List[CityData](CityData(value._4, 1))
var insertFlag = false
if (null == data) {
insertFlag = true
data = ResultData(value._1, topic, city, name)
} else {
insertFlag = false
data.cityDataList = data.cityDataList.union(city)
.groupBy(_.cityName).map(it =>
CityData(it._1, it._2.map(_.count).sum)
).toList
data.topicDataList = data.topicDataList.union(topic)
.groupBy(_.topicName).map(it =>
TopicData(it._1, it._2.map(_.count).sum)
).toList
data.traversDataList = data.traversDataList.union(name)
.groupBy(_.travelId).map(it =>
TravelsData(it._1, it._2.head.travelName, it._2.map(_.count).sum)
).toList
}
resultData.update(data)
out.collect(insertFlag, resultData.value().userId, resultData.value())
}
}).print("result")
env.execute("traveler")
}
}
至此,关于Kafka的自定义序列化与反序列化已经配置完毕,相信大家可以通过以上关键代码可以自行实现。
更多推荐
已为社区贡献2条内容
所有评论(0)