kafka发送自定义消息体(集合,自定义对象)(学习笔记2020.3.25)
kafka发送自定义消息体(集合,自定义对象)(学习笔记2020.3.25)前言:前几天学习使用kafka一直都是使用对象转换为json字符串在发送的,突然想研究下怎么直接发送自定义(Object)对象。1. kafka的生产者序列化首先我的生产者工厂配置序列化代码是:泛型都是<String,Object>//key与value序列化方式co...
kafka
发送自定义消息体(集合,自定义对象)
(学习笔记2020.3.25)
前言:
前几天学习使用
kafka
一直都是使用对象转换为json
字符串在发送的,突然想研究下怎么直接发送自定义(Object)对象。
1. kafka
的生产者序列化
首先我的生产者工厂配置序列化代码是:
泛型都是<String,Object>
//key与value序列化方式
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
发送的消息体类型是:
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
//发送集合
kafkaTemplate.send("mykafkaDemo", Arrays.asList("你好","张韶涵"));
1.1 JsonSerializer
这个类,是作为发送消息体进行序列化发送的, 我们来看看他是怎么进行序列化的。
debug源码发现,是将对象序列化为字节数组进行发送。
2.kafka
的消费者反序列化
首先我的消费者工厂配置序列化代码是:
泛型都是<String,Object>
DefaultKafkaConsumerFactory<String, Object> DefaultKafkaConsumerFactory
= new DefaultKafkaConsumerFactory<>(properties,new StringDeserializer(),new JsonDeserializer<>());
2.1JsonDeserializer
这个类是作为将发送的字节数组进行反序列化的。
debug源码发现,是将字节数组进行反序列化为对象。
到此发送集合是没有任何问题
3. 发送自定义对象
定义一个Message对象…然后进行发送。
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
kafkaTemplate.send("mykafkaDemo",new Message("张韶涵", "台湾"));
首先将对象序列化为数组,没有任何问题。
将对象进行反序列化时候,报了不可信包错误。
Caused by: java.lang.IllegalArgumentException: The class 'com.zhihao.entity.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*)
然后通过debug发现是在
DefaultJackson2JavaTypeMapper类中的toJavaType
方法里面调用getClassIdType
方法出现了异常, 然后继续深入getClassIdType
方法,发现是在if (!isTrustedPackage(classId))
判断是否是可信包,返回的结果进入了抛异常。
这里我们的包名与原本的可信包进行比较,如果是相等就返回true,不相等就返回false, 因为
(!isTrustedPackage(classId))
进行了取反,如果不相等就返回false就会抛出不可信包异常。
既然知道了结果,就找解决方法,经过
JsonDeserializer
里面发现有方法addTrustedPackages
可以添加可信包。
然后在消费者工厂配置value序列化
JsonDeserializer
方式进行修改:
JsonDeserializer<Object> objectJsonDeserializer = new JsonDeserializer<>();
//添加可信序列化包
objectJsonDeserializer.addTrustedPackages("com.zhihao.entity");
DefaultKafkaConsumerFactory<String, Object> DefaultKafkaConsumerFactory
= new DefaultKafkaConsumerFactory<>(properties,new StringDeserializer(), objectJsonDeserializer);
然后发现
(!isTrustedPackage(classId))
返回了true,不报错了,可以正常进行反序列化了。
并可以发现可信包里面又
java.util
与java.lang
,也叫是说明发送这个2个包下的都可信,可以进行正常反序列化。
并且经过错误提示,可以设置为信任所有包
objectJsonDeserializer.addTrustedPackages("*");
那将不进行可信包循环判断,直接返回
true
。
4. 映射类型(官方发送自定义消息体):
从2.2版开始,使用JSON时,您现在可以使用前面列表中的属性来提供类型映射。以前,您必须在序列化器和反序列化器中自定义类型映射器。映射由逗号分隔的
token:className
成对列表组成。在出站时,有效负载的类名将映射到相应的令牌。在入站时,类型标头中的令牌将映射到相应的类名称。模板,工厂,消费者工厂与监听容器的泛型都是
Object
下面的示例创建一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "user:com.zhihao.User, cat:com.zhihao.Cat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "user:com.zhihao.User, cat:com.zhihao.Cat");
如果使用Spring Boot,则可以在
application.properties
(或yaml)文件中提供这些属性
1
更多推荐
所有评论(0)