Snaapy java.io.IOException: FAILED_TO_UNCOMPRESS(5)
最近需要从第三方的kafka消息集群中的获取消息,使用spring整合kafka后,写了一个监听器,如下:@Componentpublic class TestListener implements MessageListener<byte[], byte[]> {private static Logger logger = LogManager.getLogge...
·
最近需要从第三方的kafka消息集群中的获取消息,使用spring整合kafka后,写了一个监听器,如下:
@Component
public class TestListener implements MessageListener<byte[], byte[]> {
private static Logger logger = LogManager.getLogger(TestListener.class);
@Override
public void onMessage (ConsumerRecord<byte[], byte[]> record) {
byte[] keyByte = record.key();
byte[] valueByte = record.value();
String key = null;
String value = null;
try {
//keyByte = Snappy.uncompress(keyByte);//解压缩
//valueByte = Snappy.uncompress(valueByte);//解压缩
key = new String(keyByte, Charset.forName("UTF-8"));
value = new String(valueByte, Charset.forName("UTF-8"));
} catch (IOException e) {
logger.info("出错了");
e.printStackTrace();
}
long offset = record.offset();
logger.info("收到消息了");
logger.info("========recordid====" + offset);
logger.info("========key====" + key);
logger.info("========value====" + value);
}
}
测试的时候发现,接收的json字符串中有乱码,如下:
�I{\"vin\":\"W2GD24FDE4EF00009\",\"fuel_type\":0," +"\"srs_status\":1,\"vehicle_alarm\":0}
咨询了对方技术之后,他们告诉我,kafka对数据进行了压缩,压缩方式是Snappy,于是,就把上面注释掉的代码打开。
但是进行测试的时候,一直报下面这个错误:
java.io.IOException: FAILED_TO_UNCOMPRESS(5)
对方告诉我,数据进行了Snappy压缩,但是我的解压一直报错,最后debug发现解压缩key的时候报的错,对方确实对数据进行了压缩,但是只压缩了value,没有压缩key,也就是说我不应该解压缩key,去掉下面这句话就没问题了。
keyByte = Snappy.uncompress(keyByte);//解压缩
总而言之,对接之前要好好沟通。
更多推荐
已为社区贡献1条内容
所有评论(0)