这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。

一、安装kafka

访问kafka官网Apache Kafka,然后点击快速开始

紧接着,点击Download

最后点击下载链接下载安装包

如果下载缓慢,博主已经把安装包上传到百度网盘:

链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
提取码:3aoh
--来自百度网盘超级会员V3的分享

二、启动kafka

经过上一步下载完成后,按照页面的提示启动kafka

1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录

2、切换到usr目录,解压kafka_2.13-3.6.0.tgz

cd /usr

tar -zxzf kafka_2.13-3.6.0.tgz

3、启动zookeeper

修改配置文件confg/zookeeper.properties,修改一下数据目录

dataDir=/usr/local/zookeeper

然后通过以下命令启动kafka自带的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

4、启动kafka

修改配置文件confg/server.properties,修改一下kafka保存日志的目录

log.dirs=/usr/local/kafka/logs

然后新开一个连接窗口,通过以下命令启动kafka

bin/kafka-server-start.sh config/server.properties

三、kafka发送、接收消息

创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

生产消息

往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

消费消息

消费最新的消息

新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello

指定偏移量消费

 指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello

消息的分组消费
每个消费者都可以指定一个消费者组, kafka 中的同一条消息,只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。
通过以下命令在启动消费者时设置分组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello

四、Java中使用kafka

通过maven官网搜索kafka的maven依赖版本

https://central.sonatype.com/search?q=kafkaicon-default.png?t=N7T8https://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>3.6.0</version>
        </dependency>
    </dependencies>
</project>

创建消息生产者

生产者工厂类
package producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * 消息生产者工厂类
 * @author heyunlin
 * @version 1.0
 */
public class MessageProducerFactory {

    private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";

    public static Producer<String, String> getProducer() {
        //PART1:设置发送者相关属性
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        // 配置key的序列化类
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 配置value的序列化类
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        return new KafkaProducer<>(props);
    }

}

测试发送消息
package producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @author heyunlin
 * @version 1.0
 */
public class MessageProducer {

    private static final String TOPIC = "hello";

    public static void main(String[] args) {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");
        Producer<String, String> producer = MessageProducerFactory.getProducer();

        // 同步发送消息
        producer.send(record);

        // 异步发送消息
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                String topic = recordMetadata.topic();
                long offset = recordMetadata.offset();
                int partition = recordMetadata.partition();
                String message = recordMetadata.toString();

                System.out.println("topic = " + topic);
                System.out.println("offset = " + offset);
                System.out.println("message = " + message);
                System.out.println("partition = " + partition);
            }
        });

        // 加上这行代码才会发送消息
        producer.close();
    }

}

创建消息消费者

消费者工厂类
package consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Properties;

/**
 * 消息生产者工厂类
 * @author heyunlin
 * @version 1.0
 */
public class MessageConsumerFactory {

    private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";

    public static Consumer<String, String> getConsumer() {
        //PART1:设置发送者相关属性
        Properties props = new Properties();

        //kafka地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        //每个消费者要指定一个group
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");
        //key序列化类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //value序列化类
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        return new KafkaConsumer<>(props);
    }

}

测试消费消息
package consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.time.Duration;
import java.util.Collections;

/**
 * @author heyunlin
 * @version 1.0
 */
public class MessageConsumer {

    private static final String TOPIC = "hello";

    public static void main(String[] args) {
        Consumer<String, String> consumer = MessageConsumerFactory.getConsumer();
        consumer.subscribe(Collections.singletonList(TOPIC));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));

            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + ": " + record.value());
            }

            // 提交偏移量,避免消息重复推送
            consumer.commitSync(); // 同步提交
            // consumer.commitAsync(); // 异步提交
        }
    }

}

五、springboot整合kafka

开始前的准备工作

然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

然后修改application.yml,添加kafka相关配置

spring:
  kafka:
    bootstrap-servers: 192.168.254.128:9092
    producer:
      acks: 1
      retries: 3
      batch-size: 16384
      properties:
        linger:
          ms: 0
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: helloGroup
      enable-auto-commit: false
      auto-commit-interval: 1000
      auto-offset-reset: latest
      properties:
        request:
          timeout:
            ms: 18000
        session:
          timeout:
            ms: 12000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建消息生产者

package com.example.springboot.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author heyunlin
 * @version 1.0
 */
@RestController
@RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
public class KafkaProducer {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @RequestMapping(value = "/sendMessage", method = RequestMethod.GET)
    public String sendMessage(String message) {
        kafkaTemplate.send("hello", message);

        return "发送成功~";
    }

}

创建消息消费者

package com.example.springboot.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author heyunlin
 * @version 1.0
 */
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "hello")
    public void receiveMessage(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        long offset = record.offset();
        int partition = record.partition();

        System.out.println("topic = " + topic);
        System.out.println("offset = " + offset);
        System.out.println("partition = " + partition);
    }

}

然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。

 

文章涉及的项目已经上传到gitee,按需获取~

Java中操作kafka的基本项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/kafka.git

springboot整合kafka案例项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/springboot-kafka.git

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐