引入依赖

    implementation("org.springframework.cloud:spring-cloud-starter-bus-kafka")

开始coding

bootstrap.yml

    stream:
      bindings:
        input:
          destination: kafka-topic-01
          consumer:
            concurrency: 1
            partitioned: false
        output:
          destination: kafka-topic-01
          producer:
            partitionCount: 1

kafka.yml

spring:
  kafka:
    bootstrap-servers: 123.57.*.*:9092, 123.57.*.*:9093
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapswd";
  cloud:
    bus:
      enabled: true
      trace:
        enabled: true

application.yml (这个配置是我个人项目问题)
在这里插入图片描述

spring:
  aop:
    auto: fasle

KafkaChannel.java (自定义 输入输出通道)

package com.*.*.*

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface KafkaChannel {

    String INPUT = "input";
    String OUTPUT = "output";


    /**
     * 注解@Input声明了它是一个输入类型的通道,名字是input
     *
     * @return
     */

    @Input(KafkaChannel.INPUT)
    SubscribableChannel input();


    /**
     /*
     * 注解@Output声明了它是一个输出类型的通道,名字是output,
     *
     * @return
     */
    @Output(KafkaChannel.OUTPUT)
    MessageChannel output();


}

SendService.java (消息发送端)

package com.microcardio.multiparameter.service.kafka;

import com.microcardio.multiparameter.config.kafka.KafkaChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;

/**
 * MPS
 * 2020/6/1 17:02
 *
 * @author aaaak
 * @since
 **/
@EnableBinding(KafkaChannel.class)
public class SendService {

    @Autowired
    private KafkaChannel channel;

    /**
     * 发送消息
     * @param msg
     */
    public void sendMessage(String msg) {

        try {

            channel.output().send(MessageBuilder.withPayload(msg).build());

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

MsgListener.java (消息监听 接收端)

package com.*.*.*


import com.microcardio.multiparameter.config.kafka.KafkaChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

/**
 * MPS
 * 2020/6/1 23:02
 *
 * @author aaaak
 * @since
 **/
@EnableBinding(KafkaChannel.class)
public class MsgListener {

    /**
     *  监听 KafkaChannel 消息通道
     * @param payload  消息内容
     */
    @StreamListener(KafkaChannel.INPUT)
    public void messageListener(String payload) {
        System.out.println("Received: " + payload);
    }


}

KafkaController.java

package com.*.*.*

import com.microcardio.multiparameter.json.JsonResult;
import com.microcardio.multiparameter.service.kafka.SendService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InjectionPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.logging.Logger;

/**
 * MPS
 * 2020/6/1 23:02
 *
 * @author aaaak
 * @since
 **/
@Slf4j
@RestController
@RequestMapping(value = "/kafka")
@Configuration
public class KafkaController {
    //    @Resource
//    private KafkaTemplate<String, Object> kafkaTemplate;
//
//    @GetMapping(value = "/kafka/send")
//    public JsonResult<Void> testKafkaSend(@RequestParam String message){
//        kafkaTemplate.send("data-connection", message);
//        return JsonResult.success(null);
//    }
//    @KafkaListener(id = "consumer_aaak", topics = "data-connection",groupId = "data-connetction")
//    public void listen(String input) {
//        log.info("input value: {}" , input);
//    }


    @Autowired
    SendService sendService;


    @GetMapping(value = "/kafka/send1")
    public JsonResult<Void> testKafkaSend1(@RequestParam String message){
        sendService.sendMessage(message);
        return JsonResult.success(null);
    }



}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐