本栏目讲解 kafka 相关的知识,包括简介、集群部署、架构及 spring 整合



简介

1、概述

  • :一个分布式的基于发布/订阅模式的消息队列

2、优点

  • 解耦
  • 异步通信
  • 峰值处理
  • 缓冲
  • 可恢复性

3、模式

  • 点对点模式
  • 发布/订阅模式

4、高效读写

  • 顺序写磁盘:写数据过程是追加到log文件末端
  • 零拷贝:直接从系统内存中返回给客户端

集群部署

# 解压
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
# 修改解压后的文件夹名称
mv kafka_2.11-0.11.0.0/ kafka
# 修改配置文件
vim config/server.properties

###############################################
# broker 的全局唯一编号,不能重复
broker.id=0
# 删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的现成数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
# 配置连接 Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

# 注意:配置其他的broker时,其broker.id不同
##############################################

# 配置环境变量
vim /etc/profile
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile

# 启动zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 关闭kafka
bin/kafka-server-stop.sh stop

操作命令

# 查看所有topic
bin/kafka-topic.sh --zookeeper hadoop102:2181 --list
# 创建topic
bin/kafka-topic.sh --zookeeper hadoop102:2181 --create --replication-factor 3 
 --partitions --topic topicName
# 删除topic
bin/kafka-topic.sh --zookeeper hadoop102:2181 --delete --topic topicName
# 发送消息
bin/kafka-console-producer.sh --broker-list hadoop102:2181 --topic topicName 
# 消费消息
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic topicName

Spring 整合 Kafka

  • 导入依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.9.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.1</version>
</dependency>
  • 配置信息
spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      transaction-id-prefix: kafka_tx.
    consumer:
      bootstrap-servers: 127.0.0.1:9092
  • 消息监听
/**
 * 监听消息
 */
@Component
public class ConsumerListener {

    @KafkaListener(topics = "msg1", groupId = "group.msg")
    public String messageListener2(String message, Acknowledgment acknowledgment) {
        System.out.println("获取到的数据:" + message);
        acknowledgment.acknowledge();
        return message;
    }

    @KafkaListener(topics = "msg2", topicPartitions = {@TopicPartition(topic = "msg2", 
    			partitions = {"0"})}, containerFactory = "kafkaListenerContainerFactory")
    public void messageListener1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println("获取到的数据:" + record.key() + "=" + record.value());
        acknowledgment.acknowledge();
    }

}
  • 发送消息
@SpringBootTest(classes = KafkaDemoApplication.class)
class PublishMessageTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String MSG1 = "msg1";
    private static final String MSG2 = "msg2";

    /**
     * 异步发送不考虑回调,常用作收集操作日志、用户浏览痕迹等不重要的信息
     */
    @Test
    public void asynPublishMessage1() {
        kafkaTemplate.send(MSG1, "Hello");
    }

    /**
     * 异步发送带回调
     */
    @Test
    public void asynPublishMessage2() {
        ProducerRecord<String, String> record = new ProducerRecord<>(MSG2, "key1", "value1");
        kafkaTemplate.send(record).addCallback(
        					new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败,失败原因:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功," + result);
            }
        });
    }

    /**
     * 同步发送
     */
    @Test
    public void syncPublishMessage() {
        try {
            final SendResult<String, String> result = kafkaTemplate.send(MSG1, "Java").get();
            System.out.println("发送结果:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * 事务可以保证消息不丢失也不重复
     */
    @Test
    @Transactional
    public void transactionPublishMessage() {
        kafkaTemplate.send(MSG1, "World");
    }

}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐