线程模型

kafka stream默认是单线程的。
这里写图片描述
如果启动多线程
这里写图片描述

每一个task拥有自己的state store。用于帮助kafka stream进行有状态的数据处理。state store以topic的方式存储在kafka broker端。
Kafka Stream的并行模型中,最小粒度为Task,而每个Task包含一个特定子Topology的所有Processor。因此每个Task所执行的代码完全一样,唯一的不同在于所处理的数据集互补。这一点跟Storm的Topology完全不一样。Storm的Topology的每一个Task只包含一个Spout或Bolt的实例。因此Storm的一个Topology内的不同Task之间需要通过网络通信传递数据,而Kafka Stream的Task包含了完整的子Topology,所以Task之间不需要传递数据,也就不需要网络通信。这一点降低了系统复杂度,也提高了处理效率。一个线程可以包含多个task。

概念

KStream和KTable只能从kafka topic中创造。

简单DEMO

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"demo");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"10.237.64.46:9094,10.237.64.47:9094,10.237.64.48:9094");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,"10.237.64.46:2181");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key采用String
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.ByteArray().getClass());//value采用byte[]
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", "ALL_PRODUCE_TOPIC")
                .addProcessor("PROCESS1", RelayProcesser::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
                .addStateStore(Stores.create("STATISTICS").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS1");
streams.start();

复杂特性

  • statestore
    有些stream应用不需要state,因为每条消息的处理都是独立的。然而维护stream处理的状态对于复杂的应用是非常有用的,比如可以对stream中的数据进行join、group和aggreagte,Kafka Stream DSL提供了这个功能。Kafka Stream使用state stores提供基于stream的数据存储和数据查询,Kafka Stream内嵌了多个state store,可以通过API访问到,这些state store的实现可以是持久化的KV存储引擎、内存HashMap或者其他数据结构。Kafka Stream提供了local state store的故障转移和自动发现。statestore会被默认持久在kafka的changelog topic中。
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐