这部分文档介绍了如何用Kafka的接口实现流式数据处理。

kafka流模式开发
1 概述
kafka Streams是一个客户端库(client library),用于处理和分析储存在Kafka中的数据,并把处理结果写回Kafka或发送到外部系统的最终输出点。它建立在一些很重要的概念上,比如事件时间和消息时间的准确区分,开窗支持,简单高效的应用状态管理。Kafka Streams的门槛很低:你可以快速编写一个小规模的原型运行在一台独立主机中;然后你只需要在其他主机主机上部署应用的实例,就可以完成到大规模生产环境的扩展。Kafka Streams利用Kafka的并行模型,可以透明处理同一应用的多实例负载均衡。
kafka Streams的特点:
*被设计为一个简单轻量级的客户端库,可以嵌入到Java应用,整合到已有的包、部署环境或者其他用户的流应用处理工具。
*除了Kafka自身做为内部消息层外,没有其他系统依赖。使用Kafka分区模型来水平扩展并保证绝对的顺序性。
*支持本地状态容错,可以执行非常快速有效的有状态操作,比如joins和windowed aggregations(窗口聚合)。
*采用“一次处理一条记录(one-record-at-a-time)”的方式达到低处理延迟,支持基于开窗操作的事件消息(event-time)。
*提供必要的流处理基础件,包括一个高级Streams DSL和一个底层处理API(Processor API)。
2 开发指南
2.1 核心概念
2.1.1 流处理过程拓扑图
*一个流(stream)是Kafka中最重要的抽象概念:它代表了一个无界,持续更新的数据集。一个流是一个有序,可重复读取,容错的不可变数据记录序列,一个数据记录被定义为一个键值对(key-value pair)。
*一个流处理应用,用Kafka Streams开发,定义了经过若干个处理拓扑(processor topologies)的计算逻辑,每个处理拓扑是一个通过流(线,edge)连接到流处理实例(点,node)的图。
*一个流处理实例(processor)是一个处理拓扑的节点;其含义是,通过从拓扑图中它的上游处理节点每次接收一条输入记录,执行一步流数据的变换,可能是请求操作流数据,也有可能随后生产若干条记录给到下游处理实例。
2.1.2 时间
流处理中一个临界面就是时间概念,以及它是怎么定义和整合的。比如,像开窗(windowing)这样的操作定义是基于时间边界的。
流中常用的消息概念有:
*事件时间————当事件或数据记录产生的时间点,最初被称为"at the source"(起源)。
*处理时间————当事件或数据记录被流处理应用开始处理的时间点,也就是记录开始被消费的时间。处理时间会比源事件时间晚若干毫秒、小时,甚至若干天。
*存储时间————当事件或者数据记录被Kafka broker储存到一个主题分区的时间。和事件时间不同的是,存储时间是发生在Kafka broker把记录添加到目标主题时,而不是记录创建时。和处理时间不同的是,处理时间发生在流处理应用处理记录时。比如,如果一个记录从来没被处理过,那它就没有处理时间的概念,但是它还是有存储时间。
选择事件时间还是存储时间,是通过Kafka配置文件确定的(不是Kafka Streams):在Kafka 0.10.x之前,时间戳会自动嵌入到Kafka消息中。通过Kafka的配置项,这些时间戳可以代表事件时间或存储时间。该项可以配置在broker级或单个topic。默认Kafka Streams中时间戳提取器会把嵌入的时间戳原样提取。所以,你应用中有效的时间含义依赖于Kafka中这些嵌入时间戳的配置。
Kafka Streams把每一个时间戳关联到每个数据记录通过接口TimestampExtractor。该接口的具体实现会检索或计算时间戳,数据记录确实产生内容的时间被当做嵌入时间戳时代表事件时间语义,或者用其他方法如当前时钟时间获取的处理时时间,会代表处理时间语义。开发者可以鉴于此依照业务需要使用不同时间概念。比如,单个记录(per-record)时间戳描述了按照时间访问流的进度(虽然流中的记录可能是无序的),然后被依赖于时间的操作(如joins)利用。
最后,无论何时一个Kafka Streams应用写记录到Kafka,都会给新记录关联一个时间戳。关联时间戳的方法依赖于context对象:
*当通过处理输入记录而产生新输出记录时,比如,用context.forward()触发process()方法调用,输出记录会直接继承输入记录的时间戳。
*当通过周期函数产生新输出记录时(如punctuate),输出记录的时间戳被定义为当前流任务的内部时间(通过context.timestamp())。
*为了聚合性,更新记录聚合的结果时间戳就是最新输入记录到达时触发的更新时间。
2.1.3 状态
某些流处理应用不需要状态,也就是一个消息处理过程不依赖于取他消息的处理过程。但是,可以保持状态会提供更多更复杂的流处理过程:你可以组合(join)输入流,分组并聚合数据记录。很多这种有状态的操作都可以通过Kafka Streams DSL得到。
Kafka Streams提供了所谓的状态存储(state stores),可以被流处理应用用于保存和查询数据。当实现有状态操作时,这是非常有用的功能。每个Kafka Streams任务会嵌入若干个状态存储,通过API访问存储的状态可以保存或查询处理过程需要的数据。这些状态存储可以保存为持久化键值对,一个内存哈希表,或者其他实用的数据结构。Kafka Streams提供了本地状态存储的容错和自动还原。
Kafka Streams允许直接只读查询(read-only query)状态存储,可以通过方法、线程、处理过程或和创建数据存储的应用无关的应用。这个功能被称为“交互式查询” (Interactive Query)。所有的存储都被命名,而且交互式查询底层实现只开放了读操作。


如前所述,一个Kafka Streams应用的计算逻辑被定义为一个处理拓扑。当前Kafka Streams提供了两组API用于定义处理拓扑。
2.2 底层处理API
2.2.1 Processor类
开发人员可以定制自己的业务处理逻辑,通过继承Process类。该接口提供了process和punctuate方法。process方法会在每条记录上执行;punctuate方法会被周期性调用。另外,processor接口可以保持当前ProcessorContext实例变量(在init方法中初始化),用context来设定punctuate调用周期(context().schedule),转发修改/新键值对到下游Processor实例(context().forwar),提交当前处理进度(context().commit),等等。

    public class MyProcessor extends Processor {
        private ProcessorContext context;
        private KeyValueStore kvStore;

        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(1000);
            this.kvStore = (KeyValueStore) context.getStateStore("Counts");
        }

        @Override
        public void process(String dummy, String line) {
            String[] words = line.toLowerCase().split(" ");

            for (String word : words) {
                Integer oldValue = this.kvStore.get(word);

                if (oldValue == null) {
                    this.kvStore.put(word, 1);
                } else {
                    this.kvStore.put(word, oldValue + 1);
                }
            }
        }

        @Override
        public void punctuate(long timestamp) {
            KeyValueIterator iter = this.kvStore.all();

            while (iter.hasNext()) {
                KeyValue entry = iter.next();
                context.forward(entry.key, entry.value.toString());
            }

            iter.close();
            context.commit();
        }

        @Override
        public void close() {
            this.kvStore.close();
        }
    };


上面的示例中执行了如下的操作:
*init:设定punctuate调用周期为1秒,获取本地状态存储并命名为“Counts”.
*process: 根据每条收到的记录,把输入字符串值分割为单词,把他们的计数更新到状态存储(我们在下一节讨论该功能)。
*punctuate:迭代本地状态存储,发送计数集合到下游处理器,提交当前流状态。
2.2.2 处理拓扑(Processor Topology)
实现自定义Processor的同时,开发人员可以用TopologyBuilder构建一个处理拓扑,把各个Processor过程连接在一起:
  TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("SOURCE", "src-topic")

        .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
        .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");


上面代码中创建拓扑有几个步骤,下面简略说明一下:
*首先,调用addSource方法将一个源节点(命名为“SOURCE”)添加到拓扑中,并和一个Kafka主题“src-topic”关联,。
*其次,调用addProcessor添加三个处理节点,在这里,第一个处理实例是“SOURCE”节点的孩子,但是是其他两个实例的父亲。
*最后,调用addSink添加三个槽(sink)节点到已经部署好的拓扑中,每一个从不同父Processor节点来的管道都写入不同的主题。
2.2.3 本地状态存储
注意,ProcessorAPI不限制应用仅仅访问当前到达的记录,也可以访问之前保存了之前到达记录的本地状态存储,用于聚合或窗口组合等有状态处理操作。为了利用本地状态的优势,开发者使用TopologyBuilder.addStateStore方法在构建处理拓扑时创建本地状态,并把它和需要访问它的处理节点关联起来;或者用TopologyBuilder.connectProcessorAndStateStores方法连接已创建的本地状态存储和已存在的处理节点。
TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("SOURCE", "src-topic")

        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

        // connect the state store "COUNTS" with processor "PROCESS2"
        .connectProcessorAndStateStores("PROCESS2", "COUNTS");

        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");



2.3 高级Streams DSL
使用Streams DSL构建一个处理拓扑,开发者需要使用KStreamBuilder类,该类继承自TopologyBuilder类。streams/examples包内有一个简单的包含源码的示例。本节剩余部分,会通过一些实例代码来展现使用Streams DSL创建一个拓扑的关键步骤,不过我们还是建议开发者阅读完整的实例以了解所有的细节。
2.3.1 KStream类和KTable类
DSL用到了两个主要的抽象概念。一个KStream实例是一个记录流的抽象,记录流中每条数据记录代表了一个无界数据集中的一个独立数据。一个KTable实例是一个更新日志流的抽象,更新日志流中每一条数据代表了一个更新。更准确的说,数据记录中的值代表了同一个记录关键字的最新更新值,如果有相同关键字记录的话(如果关键字不存在,那么更新动作会创建一个)。为了说明KStream和KTable的区别,我们有下面两个记录发往流:("alice", 1) --> ("alice", 3)。如果这两条记录保存在KStream实例,流处理应用累加他们的值会得到结果4。如果这两条记录保存在KTable实例,得到的结果是3,因为后一个记录会被当做是前一个记录的更新。
2.3.2 从Kafka创建流数据源
无论是记录流(用KStream定义)还是更新日志流(用KTable定义),都可以被创建为一个流数据源,数据来自若干个Kafka主题(KTable只能创建单主题的数据源)。
	KStreamBuilder builder = new KStreamBuilder();
	KStream source1 = builder.stream("topic1", "topic2");
	KTable source2 = builder.table("topic3", "stateStoreName");


2.3.3 数据流开窗
某个流处理过程可能需要把数据记录按时间分组,也就是按时间把流分为多个窗口。通过join和聚合操作会用到这个。Kafka Streams目前定义了如下几种窗口:
*Hopping time window 时间跨越窗口,基于时间间隔,模拟了大小固定、(可能)重叠的窗口。一个跨越窗口由两个属性确定:窗口大小和跨越步长(前进间隔)(即“hop”跳)。前进间隔指定了一个窗口每次相对于前一个窗口向前移动的距离。比如,你可以配置一个长度5分钟的跨越窗口,前进间隔是1分钟。跨越窗口可能覆盖了一个记录,该记录属于若干个这样的窗口。
*Tumbling time windows 是一个特殊的跨越窗口,所以也是基于时间间隔。它模拟了大小固定、不可重叠、无间隙的一类窗口。一个trumbing窗口由一个属性确定:窗口大小。投入你们 trumbing窗口是一个窗口大小等于前进步长的跨越窗口。因为它不会重叠,一条记录也仅属于唯一的窗口。
*sliding window,滑动窗口,模拟了大小固定并沿着时间轴连续滑动的窗口。这里,有两条数据记录存在于同一个窗口,他们时间戳不同但是都在窗口大小内。所以,滑动窗口没有和某个时间点对齐,而是和数据记录时间戳对齐。在Kafka流中,滑动窗口只有在join操作时才用到,可以用JoinWindows类来定义。
2.3.4 join操作
一个join(合并)操作就是合并两个数据流,基于他们数据的键,然后生成一个新流。一个记录流上的join操作通常需要基于窗口操作(即分段执行),因为用于执行join操作的记录数量可能会无限增长。Kafka Streams定义了如下几个join操作:
*KStream-to-KStream  Joins:就是windowed join(窗口合并),因为用于计算join操作的内存大小和状态可能是无限增长的。这里,假设从需要和其他记录流进行join操作的流,新接收到一条记录,按照指定的窗口间隔生产一个结果,用于每个符合用户提供的ValueJoiner类要求的键值对。一个从join操作返回的新的KStream实例代表了join操作的结果。
*KTable-to-KTable Joins:这个join操作用于和关系数据库中对应记录保持一致。这里,两个更新日志流先实例化到本地状态存储中。当收到其中某个流的新记录,就把记录合并到另一个流的实例化状态存储中,然后生产一个符合用户提供的ValueJoiner类的键值对结果。join操作返回一个新的KTable实例代表了流合并的结果,它仍然是一个更新日志流。
*KSream-to-KTable Joins:允许你根据记录流(KStream)接收到的新数据,在更新日志流(KTable)中执行表查询。一种应用是可以用最新的用户资料信息(KTable)补充用户行为信息(KStream)。只有当从记录流接收到记录时才会触发join操作,然后通过ValueJoiner生产结果,反过来不成立(也就是从更新日志流接收到的记录只能用于更新实例化状态存储)。该操作返回新的KStream实例代表了流合并的结果。
根据操作对象不同,join支持如下操作:inner joins,outer joins, 和left joins。他们的语义和关系数据库中相同。
2.3.5 转换一个流
KStream和KTable各自提供了一系列转换操作。每个操作都会生成一个或多个KStream或KTable对象,可以被传入已连接的底层处理拓扑中的处理过程。所有这样转换方法可以链式组合为复杂的处理拓扑。KStream和KTable是强类型,但是所有这些转换操作都被定义为模板方法,用户可以指定输入输出的数据类型。
在这些转换中,filter、map、mapValues等等,都是无状态转换操作,都可以在KStream和KTable中调用,用户只需要传入一个自定义函数作为它们形参,比如Predicate传入到filter,KeyValueMapper传入到map,等等:
// written in Java 8+, using lambda expressions
    KStream mapped = source1.mapValue(record -> record.get("category"));
无状态转换,顾名思义,就是不依赖于处理过程的状态,所以在实现时不需要关联流处理实例(stream processor)的状态存储。有状态转换,换句话说,就是处理时出现访问关联状态然后产生输出。比如,join和aggregate操作,通常需要一个窗口状态保存所有接收到的记录(在窗口范围内)。然后这些个操作访问存储中积累的记录,用他们做业务逻辑。
// written in Java 8+, using lambda expressions
    KTable<Long> counts = source1.groupByKey().aggregate(
        () -> 0L,  // initial value
        (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
        TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
        Serdes.Long() // serde for aggregated value
    );


    KStream joined = source1.leftJoin(source2,
        (record1, record2) -> record1.get("user") + "-" + record2.get("region");
    );


2.3.6 把流写回Kafka
处理完数据后,用户可以选择(持续的)把最终结果流写入一个Kafka主题,通过KStream.to和KTable.to方法。
joined.to("topic4");
如果你的应用需要持续读取并处理那些通过to方法写入到主题的记录,有一个办法是构造一个新的流从输出主题读取数据;Kafka Streams提供了一个便利的方法叫through:
    // equivalent to
    //
    // joined.to("topic4");
    // materialized = builder.stream("topic4");
    KStream materialized = joined.through("topic4");


除了定义拓扑之外,开发者还要在运行拓扑前配置文件StreamsConfig。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐