本文总结了kafka的基本使用,包含下载安装,基本的消息发送与消费,并介绍kafka中的一些概念以及与springboot的集成。

下载

下载地址

Apache Kafkahttps://kafka.apache.org/downloads

我们在Binary downloads里选择一个版本下载即可。

安装与启动

解压下载下来的压缩包,压缩包中包含了kafka和zookeeper程序,如果你本身已经有zookeeper,可以不使用kafka中自带的zookeeper,这里我们就用kafka自带的zookeeper来做演示。

按先后顺序分别启动zookeeper和kafka,其中zookeeper的配置文件在%kafka_path%/config/zookeeper.properties中,kafka的配置文件在%kafka_path%/config/server.properties中,启动时需要指定配置文件路径。

打开cmd窗口,进入到kafka安装目录,执行如下命令启动zookeeper

linux系统:bin/zookeeper-server-start.sh config/zookeeper.properties
windows系统:bin\windows\zookeeper-server-start.bat config\zookeeper.properties

打开另一个cmd窗口,进入到kafka安装目录,执行如下命令启动kafka

linux系统:bin/kafka-server-start.sh config/server.properties
windows系统:bin\windows\kafka-server-start.bat config\server.properties

从配置文件可以知道zookeeper和kafka的日志文件路径如下,打开对应目录可以发现已经生成了对应的目录。

  • zookeeper数据文件保存路径

  • kafka日志文件保存路径

  • 启动后自动生成的两个数据目录

配置修改

下面介绍几个比较重要的kafka配置,这些配置可以在%kafka_path%/config/server.properties中修改或查看。

listeners=PLAINTEXT://:9092   kafka服务对外提供服务的ip和端口。配置格式为 protocol1://hostname1:port1,protocol2://hostname2:port2,其中protocol代表协议类型,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可。hostname代表主机名,port代表服务端口。

log.dirs=/tmp/kafka-logs  kafka日志文件保存位置,kafka的消息会持久化保存在硬盘,这个配置指定消息保存位置。

zookeeper.connect=localhost:2181  zookeeper连接地址,kafka需要依赖zookeeper,这个配置指定zookeeper地址。比如我们可以将地址指定为localhost:2181/kafka,这样kafka的数据都会保存在zookeeper的/kafka节点下。

重要概念

kafka有几个重要的概念,需要先了解一下。

broker: 服务代理节点,可以理解为一个kafka服务器,比如你的机器上启动了一个kafka,这就是一个broker,多个kafka broker组成集群。

topic: 主题,生产者把消息发送到某个topic,消费者订阅某个topic进行消费。

partition: 分区,分区是kafka中逻辑上的一个概念,一个主题可以保存在多个分区,一个分区只属于一个主题。不同分区下的消息是不同的,也就是主题的消息是分布在不同分区的。当我们发送一条消息到某个topic时,kafka会根据一定的规则来计算出这个消息要保存在哪个分区。有了分区的概念,kafka就可以将不同分区保存在不同服务器,这样可以减轻单个服务器的压力。topic和分区的关系可以通过下图来理解。从生产者角度来看,如果对分区没有特别要求,只需要指定将消息发到哪个topic即可,kafka会自动将消息均衡到不同分区。

消费者组:kafka创建消费者时,必须指定消费者组,对于topic中的某个消息,会发向所有消费者组,但是只会选择消费者组中一个消费者进行发送。也就是说不同消费者组之间的消费是互不影响的,同一个消费者组中的消费者分摊消费消息。消息是按topic的分区分摊给同一个组中的消费者的,比如一个topic有4个分区,一个消费者组中有两个消费者,那么每个消费者就负责消费某2个分区中的消息。消费者组,消费者以及分区的关系可以通过下图理解。

生产消费者开发(java)

在%kafka_path%/bin目录下有一些脚本,可用来测试消息的发送接收等,有兴趣的可以研究下,本文只介绍如何用java来连接kafka服务器进行发送接收消息。

添加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

发送消息

    private static void sendMsg() throws InterruptedException {
        Properties props = new Properties();
        // 设置kafka地址,kafka默认端口9092
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 设置key和value的序列化方式,我们发送字符串消息,选择StringSerializer即可。
        // key的作用是用来决定消息发送到哪个分区用的
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client.id.demo");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        String topic = "topic-demo1";
        String msg = "hello kafka";
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
        producer.send(record);
        // 等待一会再停止程序,否则可能客户端还没来得及把消息发出去,程序就停止了,导致消息发送失败。
        Thread.sleep(5000);

//        // 如果不采用sleep方式,也可以采用如下方式,producer在发送时会返回一个Future对象,
//        // 可以通过这个对象获得发送结果,我们只需要阻塞获取下发送结果也可以避免程序停止导致消息发送失败。
//        try {
//            Future<RecordMetadata> send = producer.send(record);
//            send.get();
//        } catch (ExecutionException e) {
//            e.printStackTrace();
//        }

        System.out.println("end");
        producer.close();
    }

接收消息

    private static void consumeMsg() {
        Properties props = new Properties();
        // kafka broker地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 指定key,value序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消息者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group01");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        String topic = "topic-demo1";
        // 指定要订阅的topic
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            // 采用拉模式拉取消息
            System.out.println("开始拉取消息");
            // Duration.ofMillis(5000)用来指定如果暂时没有消息,阻塞一定时间,在这段时间内有消息到来,就会被接收到。
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
            for (ConsumerRecord<String, String> record : records) {
                String message =
                        "topic: " + record.topic() + ", " +
                        "分区:" + record.partition() + ", " +
                        "消息偏移: " + record.offset() + ", " +
                        "消息key: " + record.key() + ", " +
                        "消息内容: " + record.value() + ", ";
                System.out.println(message);
            }
        }
    }

我们先启动消费者,然后再启动生产者,就可以看到消费者成功消费了消息。

快问快答

通过上面的示例,我们已经完成了基本的生产与消费消息,不过kafka中还有很多细节需要开发时注意,我们以快问快答的形式来说明一下。

topic何时创建?

如果broker端配置参数auto.create.topics.enable设置为true(默认值就是true),那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions (默认值为1)、副本因子为default.replication.factor(默认值为1)的主题。那么如果我们想自己提前创建好topic,并自己指定分区数和副本因子,可以使用kafka自带的工具kafka-topics.bat(linux中是kafka-topics.sh),命令使用方式如下:

# 创建topic

kafka-topics.bat --bootstrap-server localhost:9092 --create --topic topic-demo1 --partitions 4 --replication-factor 1

# 查看topic

1. 查看所有topic 

kafka-topics.bat --bootstrap-server localhost:9092 -list

2. 查看某个topic的详细信息

kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic topic-demo1

# 删除topic(注意这个命令我在windows下测试有bug,删除topic会导致kafka崩溃,在linux下测试没问题)

kafka-topics.bat --bootstrap-server localhost:9092 --delete --topic topic-demo1

 

消息存储在何处?

既然kafka消息是持久化的,那么kafka的消息肯定是存储在磁盘中。前面我们提到了log.dirs这个配置,这个配置所指定的目录就是kafka消息保存位置。我本地测试时kafka日志保存位置为D:\tmp\kafka-logs。

当没有任何topic时,kafka-logs目录如下。

当创建了一个分区数为2,名为demo1的topic后,kafka-logs目录如下。可以看到kafka为每个分区创建了一个目录。

具体消息在kafka中如何存储,是一个比较复杂而深入的话题,本文不做讨论,有需要可以查看相关文档或书籍。

消费者停机期间发送的消息,消费者启动后还能否消费到?

我们通过实际测试来看下,首先我新建了一个分区数为1的topic,名为demo2,然后先不启动消费者,我向demo2发送了两条消息hello111和hello222,此时再启动消费者,然后再发送一条hello333的消息,下面是运行结果,可以看到,消费者只消费到了偏移为2的hello333这一条消息,也就是没有消费到未启动时发送的消息。

 此时我们再关闭消息者,然后向demo2发送一条消息hello444,然后再启动消费者,可以看到消费者消费到了消息。

也就是说,如果我们一个消费者组还从来没从一个topic中消费过消息,那么刚启动时是消费不到历史消息的,如果消费过,那么中间消费者停止,然后后来又启动,消费者停止期间发送的消息再启动后是可以消费到的。

kafka中每条消息都是有个编号的,每个分区的消息按收到的顺序依次递增编号,我们在收到的消息中打印的offset就是这个消息的偏移量,也就是这个消息是分区中的第几条消息。kafka会记录下某个消费者组消费到了哪条消息,称为这个消费者组的消费位移,下次消费者启动时就可以从记录的消费位移开始消费,而当一个新的消费者组消费时,kafka中没有消费位移的信息,所以也就没法消费历史消息。

上面提到的情况是我们按照默认配置时的情况,其实kafka提供了一个配置参数auto.offset.reset来控制如何进行消费。auto.offset.reset的取值有3个,分别为latest、earliest、none,默认值为latest,也就是当一个消费者找不到消费位移信息时,会从最新的消息(也就是消费者启动后发送的消息)开始消费,earliest就是从最早的消息开始消费,这种情况下就可以消费到在消费者启动之前发送的消息。none表示当找不到消费位移信息时,kafka会报错。

kakfa还可以指定从任意位置的消息开始消费,对应java客户端的consumer.seek()方法,限于篇幅,本文不再展开,有兴趣的可以去研究下。

springboot集成kafka

添加依赖

如果想要在springboot中集成kafka,只需要在新建springboot项目时,选择上Spring for Apache Kafka即可。

或者直接在maven中添加如下依赖。其中spring-kafka-test是springboot内置的kafka服务,如果你本机已经安装了kafka,可以不需要这个依赖。

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置kafka

在application.properties中配置kafka

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

# 更多kafka配置可参考org.springframework.boot.autoconfigure.kafka.KafkaProperties

生产者

配置好kafka连接信息后,我们直接在类中注入KafkaTemplate即可发送消息。

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    private KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void send(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

消费者

@Component
public class KafkaConsumer {

    // topics参数指定要消费的主题
    @KafkaListener(topics = "topic-demo1")
    public void listen(String message) {
        System.out.println("收到消息");
        System.out.println(message);
    }

//    如果你想获得更多关于消息的信息,也可以把方法参数设置为org.apache.kafka.clients.consumer.ConsumerRecord
//    @KafkaListener(topics = "topic-demo1")
//    public void listen(ConsumerRecord<String, String> record) {
//        System.out.println("收到消息");
//        String message =
//                "topic: " + record.topic() + ", " +
//                        "分区:" + record.partition() + ", " +
//                        "消息偏移: " + record.offset() + ", " +
//                        "消息key: " + record.key() + ", " +
//                        "消息内容: " + record.value() + ", ";
//        System.out.println(message);
//    }
}

上面只是一个简单的示例,如果需要更多功能,可以查阅相关文档。sping kafka文档Spring for Apache Kafka

参考资料

《深入理解Kafka:核心设计与实践原理》 朱忠华  电子工业出版社

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐