最近准备用flink开发实时交易量统计功能,架构是由业务系统通过kafka发送交易日志,flink服务收集kafka数据统计后存储到es中。

scala map转jsonString的时候,结果格式
[{"_1":“name”,"_2":“value”}]

期望格式
{“name”,“value”}

原版
var OBJECT_MAPPER = new ObjectMapper();
var codeMap = new mutable.HashMap[String, Long];
elements.foreach(it => {
codeMap.put(it.getRtnCode, codeMap.getOrElse(it.getRtnCode, 0L).+(1L));
})
OBJECT_MAPPER.writeValueAsString(codeMap)

修改后版本
var OBJECT_MAPPER = new ObjectMapper();
var codeMap: util.Map[String, Long] = new util.HashMap();
elements.foreach(it => {
if (codeMap.containsKey(it.getRtnCode)) {
codeMap.put(it.getRtnCode, codeMap.get(it.getRtnCode).+(1L));
} else {
codeMap.put(it.getRtnCode, 1L);
}
})
OBJECT_MAPPER.writeValueAsString(codeMap)

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐