7,kafka stream :kafka 并行度 ,kafka 流式处理
一 ,并行度 :同时进行的程序,处理同一组数据1 ,实时性 : kafka , storm ,spark ,flink微批次处理 : 好多条数据一起处理。代表 : spark非微批次处理 : 每条数据处理一次,实时性更好。代表 : storm , kafkaflink :对于 spark 做了一些改进,形成了这么个东西。2 ,怎样增加 kafka 并行度 : 基础知识每个消...
·
一 ,并行度 :同时进行的程序,处理同一组数据
1 ,实时性 : kafka , storm ,spark ,flink
- 微批次处理 : 好多条数据一起处理。
代表 : spark - 非微批次处理 : 每条数据处理一次,实时性更好。
代表 : storm , kafka - flink :
对于 spark 做了一些改进,形成了这么个东西。
2 ,怎样增加 kafka 并行度 : 基础知识
- 每个消费者处理一个分区的数据。
- 每个 topic 可以有多个分区。
- 一个组中的消费者,只能有一个人去吹一个分区的数据。
3 ,怎样增加 kafka 并行度 : 解决
- 把一个 topic 分成多个分区。
- 把多个消费者组成一个组。
- 让这个消费者组,去处理这个 topic 的数据。
4 ,怎样增加 kafka 并行度 : 效果
- 一个组的消费者,处理了一个 topic 的数据。
- 一个 topic 中的数据,存放在不同的 partition 中。
- 不同的消费者可以实时的处理同一个 topic 中的,不同的 partition 中的数据。
二 ,kafka streams 介绍
1 ,Kafka Streams :
Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。
2 ,注意 :
他只是 kafka 的一个库,不是一个独立的框架。
3 ,Kafka Streams 特点
- 功能强大
高扩展性,弹性,容错 - 轻量级
无需专门的集群
一个库,而不是框架 - 完全集成
100% 的 Kafka 0.10.0 版本兼容
易于集成到现有的应用程序 - 实时性
毫秒级延迟
并非微批处理
窗口允许乱序数据
允许迟到数据
4 ,为什么要有 Kafka Stream : 反方请提问
- 当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm 。
- Apache Storm 发展多年,应用广泛,提供记录级别的处理能力,当前也支持 SQL on Stream 。
- Spark Streaming 基于 Apache Spark , 可以非常方便与图计算,SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。
- 另外,目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks ,都集成了 Apache Storm 和 Apache Spark , 使得部署更容易。
- 既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?
5 ,为什么要有 Kafka Stream :正方请回答
- 主要有如下原因。
- Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
- 虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。
- 就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的 kafka-spout ,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上, Kafka 基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka ,此时使用 Kafka Stream 的成本非常低。
- 使用 Storm 或 Spark Streaming 时 , 需要为框架本身的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager 。即使对于应用实例而言,框架本身也会占用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不占用系统资源。
- 由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以及重新计算的能力。
- 由于 Kafka Consumer Rebalance 机制, Kafka Stream 可以在线动态调整并行度。
三 ,Kafka Stream 数据清洗案例 :
1 ,需求 :
实时处理单词带有”>>>”前缀的内容。例如输入 “atguigu>>>ximenqing”,最终处理成 “ximenqing”
2 ,需求分析 :
3 ,准备工作 : 两个 topic :
1. bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first
2. bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic second
4 , 准备工作 : 引入依赖 :
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.heima</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- 新 API -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- 旧 API -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- kafka streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
</dependency>
</dependencies>
</project>
5 ,拓扑结构类 :
package stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
public class Application {
public static void main(String[] args) {
// 输入的 topic
String from = "first";
// 输出的 topic
String to = "second";
// 设置参数
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
StreamsConfig config = new StreamsConfig(settings);
// 构建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", from).addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
public Processor<byte[], byte[]> get() {
// 具体分析处理 ( 我们写的逻辑处理类 )
return new LogProcessor();
}
}, "SOURCE").addSink("SINK", to, "PROCESS");
// 创建kafka stream
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
}
6 ,逻辑处理类 :
package stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
// 初始化方法
public void init(ProcessorContext context) {
this.context = context;
}
// 处理逻辑
public void process(byte[] key, byte[] value) {
String input = new String(value);
// 如果包含“>>>”则只保留该标记后面的内容
if (input.contains(">>>")) {
input = input.split(">>>")[1].trim();
// 输出到下一个topic
context.forward("logProcessor".getBytes(), input.getBytes());
}else{
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
public void punctuate(long timestamp) { }
public void close() { }
}
7 ,启动 :
- 启动流处理类 :
- 启动生产者 :在 node01 机器上,启动 first
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
- 启动消费者 : 在 node02 上,监控 second
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic second
8 ,测试 :
- 在 node01 给 first 发数据 : 普通数据
- 在 node02 看 second 收到什么 : 没有变化
- 在 node01 给 first 发送敏感数据 :
- 在 node02 的 second 看到了数据的变化 :
更多推荐
已为社区贡献2条内容
所有评论(0)