Kafka-stream介绍
线程模型kafka stream默认是单线程的。如果启动多线程#
·
线程模型
kafka stream默认是单线程的。
如果启动多线程
每一个task拥有自己的state store。用于帮助kafka stream进行有状态的数据处理。state store以topic的方式存储在kafka broker端。
Kafka Stream的并行模型中,最小粒度为Task,而每个Task包含一个特定子Topology的所有Processor。因此每个Task所执行的代码完全一样,唯一的不同在于所处理的数据集互补。这一点跟Storm的Topology完全不一样。Storm的Topology的每一个Task只包含一个Spout或Bolt的实例。因此Storm的一个Topology内的不同Task之间需要通过网络通信传递数据,而Kafka Stream的Task包含了完整的子Topology,所以Task之间不需要传递数据,也就不需要网络通信。这一点降低了系统复杂度,也提高了处理效率。一个线程可以包含多个task。
概念
KStream和KTable只能从kafka topic中创造。
简单DEMO
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"demo");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"10.237.64.46:9094,10.237.64.47:9094,10.237.64.48:9094");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,"10.237.64.46:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//key采用String
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.ByteArray().getClass());//value采用byte[]
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "ALL_PRODUCE_TOPIC")
.addProcessor("PROCESS1", RelayProcesser::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
.addStateStore(Stores.create("STATISTICS").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS1");
streams.start();
复杂特性
- statestore
有些stream应用不需要state,因为每条消息的处理都是独立的。然而维护stream处理的状态对于复杂的应用是非常有用的,比如可以对stream中的数据进行join、group和aggreagte,Kafka Stream DSL提供了这个功能。Kafka Stream使用state stores提供基于stream的数据存储和数据查询,Kafka Stream内嵌了多个state store,可以通过API访问到,这些state store的实现可以是持久化的KV存储引擎、内存HashMap或者其他数据结构。Kafka Stream提供了local state store的故障转移和自动发现。statestore会被默认持久在kafka的changelog topic中。
更多推荐
已为社区贡献9条内容
所有评论(0)