之前的项目中用到RabbitMQ比较多,也有用到RocketMQ,,虽然项目中没有用到过Kafka,不过自己在空闲时间学习过,而且在面试中也会问到,因为还是有不少公司用到Kafka,所以做个总结,一方面是做为面试参考,还有就是以后项目如果用到,也可以作为实践参考。

     Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流式应用程序。它具有高吞吐量、可扩展性、持久性、容错性等特点,并且能够高效地处理高负载数据。

    

目录

1.Kafka 基本概念

2. Kafka 架构

3.Kafka 数据持久化

4. Kafka 消息传递保证

 5.Kafka 消费者API

6. Kafka 生产者API

7. Kafka 连接器和流处理 (Kafka Connect & Kafka Streams)

8,Kafka安全性

 9.Kafka 监控和运维 (Kafka Monitoring & Operations)

10. Kafka 集群管理 (Kafka Cluster Management)

11. Kafka 性能优化 (Kafka Performance Optimization)

12. Kafka 集成和生态系统 (Kafka Integration & Ecosystem)

13. Kafka 高级特性 (Kafka Advanced Features)

14. Kafka 消息流处理模式 (Kafka Stream Processing Patterns)

15.Kafka 消息序列化与反序列化

16. Kafka 消息传递语义 (Kafka Messaging Semantics)

17. Kafka 集群扩展性 (Kafka Cluster Scalability)

18. Kafka 测试和调试 (Kafka Testing & Debugging)

19.Spring Boot整合Kafka


    1.Kafka 基本概念
概念描述
主题 (Topic)Kafka中消息的分类,类似于数据库中的表。
记录 (Record)Kafka中消息的基本单位,包含键、值和时间戳。
偏移量 (Offset)记录在日志中的位置,唯一标识一条记录。
分区 (Partition)主题的子集,可以分布在不同的Broker上,用于实现负载均衡和并行处理。
2. Kafka 架构
组件描述示例
生产者 (Producer)负责发送消息到Kafka集群的客户端。一个应用服务器,将用户活动数据发送到Kafka。
消费者 (Consumer)负责从Kafka集群读取消息的客户端。一个数据分析系统,从Kafka读取数据进行实时分析。
代理 (Broker)Kafka集群中的一个节点,负责维护主题和日志。一个运行中的Kafka实例,存储消息数据。
Zookeeper集中管理集群的元数据,包括Broker状态、主题和分区信息。一个运行中的Zookeeper服务,协调Kafka集群的运行。
主题 (Topic)Kafka中消息的分类,存储消息记录。如“user-activities”主题存储用户活动数据。
分区 (Partition)主题的子集,可以分布在不同的Broker上,用于实现负载均衡。“user-activities”主题可能被分为多个分区以支持高吞吐量。
3.Kafka 数据持久化
概念描述示例
日志 (Log)Kafka中存储消息的地方,由一系列有序的记录组成。Kafka集群中每个主题的每个分区都有一个日志。
段文件 (Segment File)日志由多个段文件组成,每个段文件包含一系列的记录和索引。一个日志可能包含多个段文件,如log.0、log.1等。
索引 (Index)帮助快速定位消息,每个段文件都有与之对应的索引文件。索引文件允许消费者快速定位到特定的消息偏移量。
持久性 (Persistence)Kafka通过日志的持久化存储,确保消息不会丢失。Kafka配置允许设置日志的持久化策略,如保留时间或大小。
清理策略 (Retention Policy)控制日志的存储时间或大小,以避免无限制增长。可以设置基于时间的保留策略,如保留7天的数据。
压缩 (Compression)减少日志存储大小,提高存储效率。Kafka支持对日志段进行压缩,如使用Snappy或Gzip压缩算法。
4. Kafka 消息传递保证
保证类型描述如何实现
可靠性 (Reliability)确保消息从生产者成功发送到Kafka,并且不会丢失。使用acks参数,设置为all以确保所有副本都已确认消息。
持久性 (Durability)确保消息一旦被确认,就不会丢失,即使发生故障。Kafka将消息持久化到磁盘,并且通过副本机制保证数据不丢失。
顺序性 (Ordering)确保消息在单个分区内的顺序性。通过为每个生产者分配一个唯一的序列号来保证消息顺序。
消息确认 (Message Acknowledgment)生产者收到确认,表示消息已经成功发送。消费者通过自动提交或手动提交偏移量来确认消息。
偏移量管理 (Offset Management)控制消息的消费进度。消费者可以手动管理偏移量,以控制从何处开始读取消息。
副本 (Replication)提高消息的可用性和容错性。主题的每个分区都可以配置多个副本,分布在不同的Broker上。
故障转移 (Failover)在领导者Broker故障时,自动将请求转移到另一个副本上。使用Zookeeper进行领导者选举,实现故障转移。
 5.Kafka 消费者API
API 类型描述特点
高层次消费者 (High-Level Consumer)提供了更简单的API,自动管理偏移量和分区分配。

- 自动提交偏移量

- 消费者组管理

- 简化的错误处理

- 易于使用,适合大多数场景

低层次消费者 (Low-Level Consumer)提供了更底层的控制,允许手动管理偏移量和分区分配。

- 手动提交偏移量

- 更细粒度的控制

- 适合需要精细控制的场景

消费者组 (Consumer Group)一组共享同一消费者组ID的消费者,共同消费主题中的所有消息。

- 支持消息的广播和汇聚

- 允许消费者并行消费

- 保证消息至少被消费一次

偏移量提交 (Offset Committing)控制消费者偏移量的提交,可以自动或手动。- 自动提交:消费者定期自动提交偏移量- 手动提交:消费者代码中显式提交偏移量
再平衡 (Rebalance)当消费者组中的成员发生变化时,重新分配分区给消费者。

- 确保所有分区都有活跃的消费者

- 可能影响消息的实时性

反序列化器 (Deserializer)用于将字节消息转换为特定类型的对象。

- 支持自定义反序列化器

- Kafka提供默认的字符串和字节反序列化器

轮询 (Polling)消费者从Broker拉取消息的过程。

- 通过轮询调用获取消息

- 可以设置超时时间以控制轮询行为

6. Kafka 生产者API
API 类型描述特点
高层次生产者 (High-Level Producer)提供了简单易用的API,支持异步发送和批处理。

- 异步发送

- 批处理优化性能

- 自动管理消息确认

- 易于实现高吞吐量

低层次生产者 (Low-Level Producer)提供了底层的控制,允许手动管理消息的发送和确认。

- 手动控制消息确认

- 更细粒度的控制

- 适合需要精细控制的场景

同步发送 (Synchronous Send)生产者在发送消息后等待服务器的响应。

- 确保消息发送成功

- 可能影响吞吐量

异步发送 (Asynchronous Send)生产者发送消息后立即返回,不等待服务器响应。

- 提高吞吐量

- 需要额外处理消息确认

批处理 (Batching)生产者将多个消息打包在一起发送,减少网络请求。

- 减少网络负载

- 提高性能

压缩 (Compression)减少发送到Broker的消息大小。

- 支持Gzip、Snappy、LZ4压缩算法

- 减少网络和存储使用

确认机制 (Acknowledgment)控制消息确认的行为。

acks=0:不确认

 acks=1:领导者确认

acks=all:所有副本确认

重试策略 (Retry Policy)定义生产者在发送失败时的重试行为。- 设置重试次数和重试间隔
7. Kafka 连接器和流处理 (Kafka Connect & Kafka Streams)
组件描述特点
Kafka Connect用于连接Kafka与外部系统(如数据库、键值存储等)的框架。

- 提供了无服务器的数据流传输

- 支持自定义连接器开发

- 可以自动化数据同步任务

Kafka Streams用于构建流处理应用程序的库,提供了流处理的各种操作。

- 支持状态流处理

- 易于实现复杂流处理逻辑

- 与Kafka生态无缝集成

连接器 (Connector)Kafka Connect使用的插件,用于特定的数据源或目的地。

- 例如:JDBC连接器用于数据库

- 可以处理数据的导入和导出

流处理拓扑 (Stream Topology)Kafka Streams中定义的数据处理流程。

- 包括数据源、处理器、数据汇聚等

- 支持复杂的数据处理逻辑

状态存储 (State Store)Kafka Streams中用于存储状态数据的持久化层。

- 支持RocksDB等存储引擎

- 保证状态的持久性和一致性

窗口 (Windowing)Kafka Streams中用于处理基于时间或计数的窗口数据。

- 支持滑动窗口和跳窗

- 用于实现各种基于时间的聚合操作

处理时间 (Processing Time)Kafka Streams中基于事件处理的时间。

- 与事件时间相比,处理时间通常更快

- 但可能受到系统负载的影响

事件时间 (Event Time)Kafka Streams中基于事件自身时间戳的时间。

- 允许更准确的时间相关处理

- 支持时间相关的语义,如水印

8,Kafka安全性
安全特性描述如何实现
身份验证 (Authentication)验证客户端身份的过程。

- SASL/PLAIN、SASL/SCRAM、Kerberos等机制

- SSL/TLS客户端认证

授权 (Authorization)控制客户端对资源的访问权限。

- ACLs (Access Control Lists)

- Kafka授权模型

加密 (Encryption)保护传输中的数据,防止数据在传输过程中被窃听。

- SSL/TLS加密传输

- 数据加密在库中存储

传输安全 (Transport Security)确保数据在客户端和Broker之间的传输安全。- 使用SSL/TLS协议加密网络连接
逻辑安全 (Logical Security)确保Kafka集群的逻辑组件安全。- 例如,确保Zookeeper的安全
网络隔离 (Network Isolation)防止未授权的网络访问。- 使用防火墙、私有网络等
审计 (Auditing)记录和监控系统活动,用于事后审查和问题诊断。- 记录关键操作和访问事件
密钥管理 (Key Management)管理用于加密和解密的密钥。- 使用密钥管理系统,如Keycloak、KMS等
端到端加密 (End-to-End Encryption)从生产者到消费者的数据全程加密。

- 客户端加密消息

- 消费者解密消息

 9.Kafka 监控和运维 (Kafka Monitoring & Operations)
监控运维特性描述工具/方法
指标监控 (Metrics Monitoring)监控Kafka集群的运行指标,如吞吐量、延迟等。

- JMX (Java Management Extensions)

- Prometheus + Grafana

日志管理 (Logging)收集和分析Kafka的日志信息,用于问题诊断。

- 使用日志收集器,如Fluentd、Logstash

- ELK (Elasticsearch, Logstash, Kibana) 栈

性能调优 (Performance Tuning)调整Kafka配置以优化性能。

- 调整Broker、生产者、消费者的配置参数

- 根据实际负载进行调优

故障排查 (Troubleshooting)诊断和解决Kafka运行中的问题。

- 使用Kafka管理工具

- 分析Broker、Zookeeper日志

集群管理 (Cluster Management)管理Kafka集群的生命周期,如扩容、缩容等。

- 使用Kafka管理脚本

- Kafka管理界面,如Confluent Control Center

数据备份 (Data Backup)备份Kafka中的数据,防止数据丢失。

- 使用MirrorMaker跨集群复制

- 定期备份日志文件

高可用性 (High Availability)确保Kafka集群在部分节点故障时仍能正常运行。

- 配置副本和分区

- 使用Raft等协议

灾难恢复 (Disaster Recovery)在发生严重故障时恢复Kafka集群。

- 制定和测试灾难恢复计划

- 使用备份数据恢复集群

安全运维 (Secure Operations)在运维过程中保护Kafka集群的安全。

- 使用安全的远程访问工具

- 定期更新和打补丁

10. Kafka 集群管理 (Kafka Cluster Management)
管理任务描述考虑因素
集群规划 (Cluster Planning)确定集群的大小、Broker数量、硬件规格等。

- 预期负载

- 数据量

- 性能要求

集群部署 (Cluster Deployment)实际部署Kafka集群到服务器或云环境。

- 自动化部署工具,如Ansible、Terraform

- 容器化部署,如Kubernetes

集群配置 (Cluster Configuration)设置Broker、Zookeeper、日志保留策略等配置。

- 根据具体需求调整配置参数

- 考虑安全性和性能

集群监控 (Cluster Monitoring)实施监控策略,实时了解集群状态。

- 使用专业监控工具,如Kafka Manager、Confluent Control Center

- 集成到现有的监控系统中

集群维护 (Cluster Maintenance)定期进行维护工作,如升级、扩容、缩容等。- 零停机时间升级<br>- 数据迁移和负载均衡
集群故障转移 (Cluster Failover)实现故障转移机制,确保高可用性。

- 使用Raft等协议

- 配置副本和分区

集群安全 (Cluster Security)加固集群,防止未授权访问。

- 配置ACLs

- 使用SSL/TLS加密

- 定期安全审计

集群备份与恢复 (Backup & Recovery)实施备份策略,确保数据不丢失,并能够恢复。

- 定期备份数据

- 测试恢复流程

集群优化 (Cluster Optimization)根据监控数据对集群进行优化。

- 调整JVM参数

- 优化网络和磁盘I/O

集群文档 (Cluster Documentation)记录集群的架构、配置、操作手册等。

- 便于新成员快速上手

- 便于问题排查

11. Kafka 性能优化 (Kafka Performance Optimization)
优化领域描述优化策略
网络优化 (Network Optimization)提升网络传输效率,减少延迟。

- 使用高速网络硬件

- 优化网络配置,如MTU (Maximum Transmission Unit)

磁盘I/O优化 (Disk I/O Optimization)提高磁盘读写性能。

- 使用SSD代替HDD

优化文件系统和存储配置

JVM调优 (JVM Tuning)调整Java虚拟机参数,提升Kafka性能。

- 调整堆大小

- 垃圾回收策略优化

批处理 (Batching)生产者将多个消息打包在一起发送,减少网络请求。

- 调整批处理大小和延迟参数

- 优化批处理策略

压缩 (Compression)使用压缩减少传输和存储的数据量。

- 选择适当的压缩算法

- 调整压缩级别

索引优化 (Indexing Optimization)优化日志索引以加快消息查找速度。- 调整索引参数,如索引段的大小
分区优化 (Partitioning Optimization)合理划分分区,提高并行处理能力。- 根据数据量和消费者数量调整分区数量
消费者优化 (Consumer Optimization)提升消费者处理消息的效率。

- 使用多线程或消费者组

- 优化拉取和提交偏移量的策略

副本优化 (Replica Optimization)优化副本管理,提高数据的可靠性和可用性。- 调整副本数量和副本分配策略
垃圾回收 (Garbage Collection)优化垃圾收集过程,减少GC (Garbage Collection) 暂停时间。

- 使用G1垃圾收集器

- 调整GC参数

负载均衡 (Load Balancing)确保集群负载均衡,避免热点问题。- 使用分区键和消息路由优化
12. Kafka 集成和生态系统 (Kafka Integration & Ecosystem)
组件/工具描述用途
Kafka ConnectKafka与外部系统之间数据流的连接器。

- 数据导入导出

- 支持各种数据源和目标系统

Kafka Streams用于构建流处理应用程序的库。

- 实时流处理

- 状态管理

- 复杂事件处理

Apache Flink流处理框架,与Kafka集成,处理实时数据流。

- 流批一体处理

- 容错机制

- 窗口操作

Apache Spark大数据处理框架,支持Kafka作为数据源和数据汇聚点。

- 批处理

- 流处理

- 机器学习

Apache NiFi数据流自动化系统,用于数据的流动、处理和分发。

- 数据管道

- 数据分发

- 数据处理

Apache Camel集成框架,提供Kafka组件,用于构建集成路由。

- ETL (Extract, Transform, Load) 过程

- 企业集成模式

Confluent Control CenterKafka管理工具,提供集群管理、监控和操作界面。

- 集群监控

- 配置管理

- 用户界面

Kafka MirrorMaker用于在Kafka集群之间复制数据的工具。

- 数据异地冗余

- 跨数据中心同步

Schema Registry用于管理Kafka消息模式的中心化服务。

- 保证消息兼容性

- 版本控制

Kafka REST Proxy允许HTTP请求访问Kafka集群的服务。

- 提供RESTful接口

- 简化客户端开发

Kafka Security Modules提供额外的安全特性,如加密、认证和授权。

- 增强集群安全性

- 支持多种安全协议

13. Kafka 高级特性 (Kafka Advanced Features)
Transactional Producer支持事务性消息的生产者,确保消息的原子性。

- 金融交易

- 数据库变更日志

Exactly-Once Semantics确保消息精确一次处理,避免重复或丢失。

- 计费系统

- 订单处理

Idempotent Producer保证消息不重复发送,即使在重试情况下。

- 防止重复处理

- 幂等性保证

Compaction Topics对于键值消息,Kafka可以进行日志压缩。

- 存储最新状态

- 时间序列数据

Retention Policy控制消息在Kafka中的保留时间或大小。

- 日志清理

- 数据保留

Log SegmentsKafka将日志分割成多个段,以优化存储和管理。

- 分段存储

- 高效索引

Message Timestamps消息可以携带时间戳,支持时间相关的操作。

- 事件时间处理

- 窗口函数

Custom Partitioner用户可以自定义分区器,以控制消息的路由。

- 高级路由逻辑

- 特定业务需求

Interceptors允许在消息发送和接收路径上插入自定义逻辑。

- 监控

- 修改消息

- 安全性检查

Quarantine隔离无法处理的消息,防止影响正常消息流。

- 错误处理

- 消息过滤

14. Kafka 消息流处理模式 (Kafka Stream Processing Patterns)
处理模式描述示例
Map对消息进行转换,生成新的键值对。将原始数据转换为所需的格式或类型。
Filter根据条件过滤消息,只传递符合条件的消息。仅允许特定用户的行为数据通过。
Aggregate对消息进行聚合操作,如求和、平均等。按小时聚合网站访问量。
Window对在特定时间范围内的消息进行操作。计算每个5分钟窗口内的交易总量。
Join将两个流基于共同的键连接起来。将用户点击流和用户信息流连接,以添加用户详细信息。
Session Window基于会话的窗口,会话之间由非活动间隔分隔。根据用户的活动间隔聚合用户行为。
Caching缓存消息或中间结果,提高处理速度。缓存热门查询结果以快速响应。
Stateful Processing使用状态进行处理,可以记住之前的状态。根据用户的历史行为进行个性化推荐。
Complex Event Processing (CEP)检测消息流中的复杂模式。检测特定股票的连续价格变动模式。
Sink将处理后的消息发送到外部系统。将流处理结果写入数据库或搜索索引。
15.Kafka 消息序列化与反序列化
序列化/反序列化类型描述适用场景
String Serializer使用字符串序列化器,消息内容会被转换成字符串。

- 简单的文本消息

- 日志数据

String Deserializer使用字符串反序列化器,消息内容会被解析为字符串。

- 需要以文本形式处理消息

- 日志解析

Avro Serializer使用Avro序列化器,一种二进制格式,支持模式。

- 需要模式验证的数据

- 跨语言支持

Avro Deserializer使用Avro反序列化器,解析Avro格式的消息。- 与Avro Serializer配合使用
JSON Serializer使用JSON序列化器,将消息内容转换为JSON格式。

- 需要结构化数据

- 易于人读和机器解析

JSON Deserializer使用JSON反序列化器,将JSON格式的消息解析为对象。- 与JSON Serializer配合使用
Protobuf Serializer使用Protocol Buffers序列化器,一种高效的二进制格式。

- 高性能要求

- 跨平台支持

Protobuf Deserializer使用Protocol Buffers反序列化器,解析二进制消息。- 与Protobuf Serializer配合使用
Custom Serializer自定义序列化器,根据需要实现序列化逻辑。

- 特殊格式数据

- 高度定制化需求

Custom Deserializer自定义反序列化器,根据需要实现反序列化逻辑。

- 特殊格式数据

- 高度定制化需求

16. Kafka 消息传递语义 (Kafka Messaging Semantics)
语义类型描述特点
At most once消息可能会丢失,但不会重复。

- 最少的保证

- 高吞吐量

acks=1

At least once消息不会丢失,但可能会重复。

- 保证不丢失

- 可能重复

acks=all

Exactly once消息不会丢失且不会重复。

- 精确处理

- 事务性

- 复杂性较高

enable.idempotence=true

In-order delivery消息在单个分区内是有序的。

- 保证顺序性

- 单个分区内消息顺序

Out-of-order delivery消息可能会乱序,特别是在多个分区中。

- 吞吐量可能更高

- 消息可能乱序

17. Kafka 集群扩展性 (Kafka Cluster Scalability)
扩展性方面描述考虑因素
水平扩展 (Horizontal Scaling)通过增加Broker数量来扩展集群。

- 增加更多的服务器

- 重新分配分区

垂直扩展 (Vertical Scaling)通过增加单个Broker的硬件资源来扩展集群。

- 更强大的CPU

- 更多的内存

- 更快的磁盘

分区扩展 (Partition Scaling)增加主题的分区数量来提高并行处理能力。

- 更高的吞吐量

- 更复杂的协调

消费者扩展 (Consumer Scaling)增加消费者数量或消费者组来提高消息处理能力。

- 消费者组的再平衡

- 负载均衡

流处理器扩展 (Stream Processor Scaling)增加流处理器的数量来提高流处理应用程序的吞吐量。

- Kafka Streams应用程序

- 增加实例

数据保留策略 (Data Retention Policy)调整数据保留策略以优化存储资源使用。

- 保留时间

- 保留大小

控制器扩展 (Controller Scaling)控制器负责管理分区和副本的状态,其性能影响集群的稳定性。

- 控制器选举

- 集群元数据管理

网络优化 (Network Optimization)优化网络配置以提高数据传输效率。

- 网络带宽

- 网络延迟

存储优化 (Storage Optimization)优化存储系统以提高I/O性能和降低成本。

- 使用SSD

- RAID配置

18. Kafka 测试和调试 (Kafka Testing & Debugging)
测试/调试工具或方法描述用途
Kafka Console ProducerKafka提供的命令行生产者工具,用于发送测试消息。

- 测试主题

- 发送样例数据

Kafka Console ConsumerKafka提供的命令行消费者工具,用于读取和查看消息。

- 读取主题消息

- 调试消费者代码

Kafka Manager提供了一个Web界面用于管理Kafka集群。

- 监控集群状态

- 主题、分区管理

Kafka Tools包括了一系列用于测试和调试的工具。

- 检查日志

- 验证消息

Kafka Logs直接查看Kafka的日志文件以获取详细的错误信息。

- 诊断问题

- 查看Broker状态

JMX Metrics使用JMX (Java Management Extensions) 来监控和调试Kafka。

- 实时监控性能指标

- 动态调整配置

Kafka's built-in monitoringKafka自身提供了一些监控工具和指标。

- 内置的性能监控

- 健康检查

Integration Tests对Kafka与其他系统集成进行测试。

- 验证连接器功能

- 测试流处理应用程序

Stress Testing对Kafka进行压力测试,以评估其在高负载下的表现。

- 确定性能瓶颈

- 测试集群稳定性

Debugging with IDEs使用集成开发环境进行代码级别的调试。

- 开发者调试

- 查找代码缺陷

Profiling使用性能分析工具来识别性能瓶颈。

- 优化代码

- 提升效率

19.Spring Boot整合Kafka

Spring Boot整合Kafka主要涉及以下几个步骤:

  1. 添加依赖:在Spring Boot项目的pom.xml文件中添加Spring Kafka的相关依赖。

  2. 配置Kafka:在application.propertiesapplication.yml中配置Kafka的连接信息,包括Bootstrap Servers。

  3. 创建Producer配置:配置Kafka Producer并创建Kafka Template,用于发送消息。

  4. 创建Consumer配置:配置Kafka Consumer,创建用于接收消息的监听器。

  5. 发送和接收消息:使用Kafka Template发送消息,使用@KafkaListener注解或消息监听器容器来接收消息。

  6. 管理监听器:可以控制监听器的启动、停止和恢复。

  7. 高级特性:使用Spring Kafka提供的高级特性,如事务性消息、自定义分区器、消息转发等

具体步骤:

       首先需要安装Kafka服务,网上教程挺多,确保有对应的服务。

      19.1 添加依赖:

    在pom.xml中添加如下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.6.RELEASE</version>
</dependency>

    19.2 配置Kafka

   在application.properties中添加:

spring:
  kafka:
    consumer:
      key-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:  org.apache.kafka.common.serialization.StringDeserializer
      group-id: myGroup
      enable-auto-commit: false
      auto-commit-interval: 1000
      max-poll-records: 1
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        acks: all
        retries: 3
        batch.size: 16384
        linger.ms: 1
        buffer.memory: 33554432
        max.block.ms: 6000
        compression.type: none
        max.request.size: 1048576
    bootstrap-servers: 127.0.0.1:9092

   19.3  创建生产者:发送消息到Kafka

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
 
@Service
public class KafkaProducer {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

 19.4  创建消费者:从Kafka消费消息

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
 
@Component
public class KafkaConsumer {
 
    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    public void listen(String message) {
        System.out.println("Received message in group myGroup: " + message);
    }
}

19.5 启动类:确保@EnableKafka注解已经添加

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;
 
@SpringBootApplication
@EnableKafka
public class KafkaApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐