前言

很多时候,比如用structure streaming消费kafka数据,默认可能是得到key,value字段,key是偏移量,value是一个byte数组。很可能value其实是一个Json字符串。这个时候我们该如何用SQL操作这个json里的东西呢?另外,如果我处理完的数据,我想写入到kafka,但是我想把整条记录作为json格式写入到Kafka,又该怎么写这个SQL呢?

get_json_object

第一个就是get_json_object,具体用法如下:

select get_json_object('{"k": "foo", "v": 1.0}','$.k') as k

需要给定get_json_object 一个json字段名(或者字符串),然后通过类似jsonPath的方式去拿具体的值。
这个方法其实有点麻烦,如果要提取里面的是个字段,我就要写是个类似的东西,很复杂。

from_json

具体用法如下:

select a.k from  (
select from_json('{"k": "foo", "v": 1.0}','k STRING, v STRING',map("","")) as a
)

这个方法可以给json定义一个Schema,这样在使用时,就可以直接使用a.k这种方式了,会简化很多。

to_json

该方法可以把对应字段转化为json字符串,比如:

select to_json(struct(*)) AS value

可以把所有字段转化为json字符串,然后表示成value字段,接着你就可以把value字段写入Kafka了。是不是很简单。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐