参考: https://blog.csdn.net/u011019141/article/details/108743342
https://blog.csdn.net/qq12547345/article/details/119531607
https://blog.csdn.net/JinXYan/article/details/90813592
部署kafka服务, 使用docker-compose部署
docker-compose内容如下 (kafka依赖zookeeper所以会同时部署zookeeper)

version: "3.7" 
services:
    zookeeper_server:
        image: wurstmeister/zookeeper 
        container_name: zookeeper_server
        ports: 
            - 2181:2181
        volumes:
            - ./data:/data
        logging:
            options:
                max-size: "50M" # 最大文件上传限制
                max-file: "100"
            driver: json-file

            
         
    kafka_server:
        image: wurstmeister/kafka
        container_name: kafka_server
        ports: 
            - 9092:9092
        environment:
            KAFKA_ZOOKEEPER_CONNECT: zookeeper_server:2181
            KAFKA_LISTENERS: INSIDE://:9093,OUTSIDE://:9092
            KAFKA_ADVERTISED_LISTENERS: INSIDE://:9093,OUTSIDE://localhost:9092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
        volumes:
            - ./kafka-logs:/kafka
        logging:
            options:
                max-size: "50M" # 最大文件上传限制
                max-file: "100"
            driver: json-file     
        depends_on:
          - zookeeper_server   

可能会有启动项目后可能会有连接报错的可能 记得在hosts文件中添加(一般都会有的) 127.0.0.1 localhost

本来想弄一个publisher模块和subscriber模块,网上的东西复现下来各种问题,于是就先弄个能跑的demo

项目结构:
在这里插入图片描述
依赖文件pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.demo.springcloud_05_kafka</groupId>
    <artifactId>kafka_publisher</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka_publisher</name>
    <description>kafka</description>
    <properties>
        <spring.cloud.version>Hoxton.SR9</spring.cloud.version>
        <spring.boot.version>2.3.2.RELEASE</spring.boot.version>
        <spring.cloud.alibaba.version>2.2.6.RELEASE</spring.cloud.alibaba.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.0</version>
            <optional>true</optional>
        </dependency>

        <!--kafka-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

    </dependencies>


    <dependencyManagement>
        <dependencies>
            <!--spring-boot-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!-- spring-cloud -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.yml配置文件

server:
  port: 8581
spring:
  application:
    name: kafka_app
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          autoCreateTopics: true
          requiredAcks: 1
          autoAddPartitions: true
      bindings:
        input:
          destination: stream-demo
        output:      #这里用stream给我们提供的默认output,后面会讲到自定义output
          destination: stream-demo    #消息发往的目的地
          content-type: text/plain    #消息发送的格式,接收端不用指定格式,但是发送端要

项目文件 KafkaPublisherApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@SpringBootApplication
//@EnableBinding(Source.class)
public class KafkaPublisherApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaPublisherApplication.class, args);
    }
}

消息接受端 KafkaMessageReceiveListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

import java.util.Date;

@Slf4j
@EnableBinding(value = Sink.class)
public class KafkaMessageReceiveListener {

    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(Message<String> message){
        log.info("{}订阅告警消息:通道 = es_default_input,data = {}", new Date(), message);
    }
}

消息发送端:KafkaMessageSender.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@EnableBinding(Source.class)
public class KafkaMessageSender {
    @Autowired
    private Source channel;

    public void sendToDefaultChannel(String message) {
        channel.output().send(MessageBuilder.withPayload(message).build());
    }

}

发送消息端 KafkaSenderController.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class KafkaSenderController {
    @Autowired
    private KafkaMessageSender sender;

    @GetMapping("/send")
    public void testKafkaMessageSend(String message) {
        log.info("message:{}",message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }
}

然后启动项目浏览器访问 http://localhost:8581/send?message=%221234521313123
就可以看到接受到的消息了
在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐