一 ,并行度 :同时进行的程序,处理同一组数据

1 ,实时性 : kafka , storm ,spark ,flink

  1. 微批次处理 : 好多条数据一起处理。
    代表 : spark
  2. 非微批次处理 : 每条数据处理一次,实时性更好。
    代表 : storm , kafka
  3. flink :
    对于 spark 做了一些改进,形成了这么个东西。

2 ,怎样增加 kafka 并行度 : 基础知识

  1. 每个消费者处理一个分区的数据。
  2. 每个 topic 可以有多个分区。
  3. 一个组中的消费者,只能有一个人去吹一个分区的数据。

3 ,怎样增加 kafka 并行度 : 解决

  1. 把一个 topic 分成多个分区。
  2. 把多个消费者组成一个组。
  3. 让这个消费者组,去处理这个 topic 的数据。

4 ,怎样增加 kafka 并行度 : 效果

  1. 一个组的消费者,处理了一个 topic 的数据。
  2. 一个 topic 中的数据,存放在不同的 partition 中。
  3. 不同的消费者可以实时的处理同一个 topic 中的,不同的 partition 中的数据。

二 ,kafka streams 介绍

1 ,Kafka Streams :

Apache Kafka 开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在 Kafka 上构建高可分布式、拓展性,容错的应用程序。

2 ,注意 :

他只是 kafka 的一个库,不是一个独立的框架。

3 ,Kafka Streams 特点

  1. 功能强大
    高扩展性,弹性,容错
  2. 轻量级
    无需专门的集群
    一个库,而不是框架
  3. 完全集成
    100% 的 Kafka 0.10.0 版本兼容
    易于集成到现有的应用程序
  4. 实时性
    毫秒级延迟
    并非微批处理
    窗口允许乱序数据
    允许迟到数据

4 ,为什么要有 Kafka Stream : 反方请提问

  1. 当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm 。
  2. Apache Storm 发展多年,应用广泛,提供记录级别的处理能力,当前也支持 SQL on Stream 。
  3. Spark Streaming 基于 Apache Spark , 可以非常方便与图计算,SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。
  4. 另外,目前主流的 Hadoop 发行版,如 Cloudera 和 Hortonworks ,都集成了 Apache Storm 和 Apache Spark , 使得部署更容易。
  5. 既然 Apache Spark 与 Apache Storm 拥用如此多的优势,那为何还需要 Kafka Stream 呢?

5 ,为什么要有 Kafka Stream :正方请回答

  1. 主要有如下原因。
  2. Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而 Kafka Stream 作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。
  3. 虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,但是这些框架的部署仍然相对复杂。而 Kafka Stream 作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。
  4. 就流式处理系统而言,基本都支持 Kafka 作为数据源。例如 Storm 具有专门的 kafka-spout ,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上, Kafka 基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka ,此时使用 Kafka Stream 的成本非常低。
  5. 使用 Storm 或 Spark Streaming 时 , 需要为框架本身的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager 。即使对于应用实例而言,框架本身也会占用部分资源,如 Spark Streaming 需要为 shuffle 和 storage 预留内存。但是 Kafka 作为类库不占用系统资源。
  6. 由于 Kafka 本身提供数据持久化,因此 Kafka Stream 提供滚动部署和滚动升级以及重新计算的能力。
  7. 由于 Kafka Consumer Rebalance 机制, Kafka Stream 可以在线动态调整并行度。

三 ,Kafka Stream 数据清洗案例 :

1 ,需求 :

实时处理单词带有”>>>”前缀的内容。例如输入 “atguigu>>>ximenqing”,最终处理成 “ximenqing”

2 ,需求分析 :

在这里插入图片描述

3 ,准备工作 : 两个 topic :

1. bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic first
2. bin/kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 3 --partitions 1 --topic second

4 , 准备工作 : 引入依赖 :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.heima</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- 新 API -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- 旧 API -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- kafka streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.0</version>
        </dependency>
    </dependencies>
</project>

5 ,拓扑结构类 :

package stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Application {

    public static void main(String[] args) {
        //  输入的 topic
        String from = "first";
        //  输出的 topic
        String to = "second";

        //  设置参数
        Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
        StreamsConfig config = new StreamsConfig(settings);

        //  构建拓扑
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("SOURCE", from).addProcessor("PROCESS", new ProcessorSupplier<byte[], byte[]>() {
                    public Processor<byte[], byte[]> get() {
                        //  具体分析处理 ( 我们写的逻辑处理类 )
                        return new LogProcessor();
                    }
                }, "SOURCE").addSink("SINK", to, "PROCESS");
        //  创建kafka stream
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}

6 ,逻辑处理类 :

package stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

public class LogProcessor implements Processor<byte[], byte[]> {

    private ProcessorContext context;

    //  初始化方法
    public void init(ProcessorContext context) {
        this.context = context;
    }

    //  处理逻辑
    public void process(byte[] key, byte[] value) {
        String input = new String(value);
        //  如果包含“>>>”则只保留该标记后面的内容
        if (input.contains(">>>")) {
            input = input.split(">>>")[1].trim();
            // 输出到下一个topic
            context.forward("logProcessor".getBytes(), input.getBytes());
        }else{
            context.forward("logProcessor".getBytes(), input.getBytes());
        }
    }

    public void punctuate(long timestamp) { }

    public void close() { }
}

7 ,启动 :

  1. 启动流处理类 :
    在这里插入图片描述
  2. 启动生产者 :在 node01 机器上,启动 first
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-producer.sh --broker-list node01:9092 --topic first
  1. 启动消费者 : 在 node02 上,监控 second
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic second

8 ,测试 :

  1. 在 node01 给 first 发数据 : 普通数据
    在这里插入图片描述
  2. 在 node02 看 second 收到什么 : 没有变化
    在这里插入图片描述
  3. 在 node01 给 first 发送敏感数据 :
    在这里插入图片描述
  4. 在 node02 的 second 看到了数据的变化 :
    在这里插入图片描述
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐