json日志使用拦截器,字段取出放到header里


a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = "自定义字段":"(\\w+)"
a1.sources.r1.interceptors.i1.serializers = s1

a1.sources.r1.interceptors.i1.serializers.s1.name = topic

topic不要自定义,1.7 源码固定是 "topic",...如果自定义的话..淡淡的忧伤.....

...........

#设置Kafka的Topic

a1.sinks.k1.topic=%{topic}

............

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐