spring cloud stream+Kafka 实现消息传递
引入依赖implementation("org.springframework.cloud:spring-cloud-starter-bus-kafka")开始codingbootstrap.ymlstream:bindings:input:destination: kafka-topic-01consumer:concurrency: 1partitio
·
引入依赖
implementation("org.springframework.cloud:spring-cloud-starter-bus-kafka")
开始coding
bootstrap.yml
stream:
bindings:
input:
destination: kafka-topic-01
consumer:
concurrency: 1
partitioned: false
output:
destination: kafka-topic-01
producer:
partitionCount: 1
kafka.yml
spring:
kafka:
bootstrap-servers: 123.57.*.*:9092, 123.57.*.*:9093
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapswd";
cloud:
bus:
enabled: true
trace:
enabled: true
application.yml (这个配置是我个人项目问题)
spring:
aop:
auto: fasle
KafkaChannel.java (自定义 输入输出通道)
package com.*.*.*
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface KafkaChannel {
String INPUT = "input";
String OUTPUT = "output";
/**
* 注解@Input声明了它是一个输入类型的通道,名字是input
*
* @return
*/
@Input(KafkaChannel.INPUT)
SubscribableChannel input();
/**
/*
* 注解@Output声明了它是一个输出类型的通道,名字是output,
*
* @return
*/
@Output(KafkaChannel.OUTPUT)
MessageChannel output();
}
SendService.java (消息发送端)
package com.microcardio.multiparameter.service.kafka;
import com.microcardio.multiparameter.config.kafka.KafkaChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
/**
* MPS
* 2020/6/1 17:02
*
* @author aaaak
* @since
**/
@EnableBinding(KafkaChannel.class)
public class SendService {
@Autowired
private KafkaChannel channel;
/**
* 发送消息
* @param msg
*/
public void sendMessage(String msg) {
try {
channel.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
}
MsgListener.java (消息监听 接收端)
package com.*.*.*
import com.microcardio.multiparameter.config.kafka.KafkaChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
/**
* MPS
* 2020/6/1 23:02
*
* @author aaaak
* @since
**/
@EnableBinding(KafkaChannel.class)
public class MsgListener {
/**
* 监听 KafkaChannel 消息通道
* @param payload 消息内容
*/
@StreamListener(KafkaChannel.INPUT)
public void messageListener(String payload) {
System.out.println("Received: " + payload);
}
}
KafkaController.java
package com.*.*.*
import com.microcardio.multiparameter.json.JsonResult;
import com.microcardio.multiparameter.service.kafka.SendService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InjectionPoint;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.logging.Logger;
/**
* MPS
* 2020/6/1 23:02
*
* @author aaaak
* @since
**/
@Slf4j
@RestController
@RequestMapping(value = "/kafka")
@Configuration
public class KafkaController {
// @Resource
// private KafkaTemplate<String, Object> kafkaTemplate;
//
// @GetMapping(value = "/kafka/send")
// public JsonResult<Void> testKafkaSend(@RequestParam String message){
// kafkaTemplate.send("data-connection", message);
// return JsonResult.success(null);
// }
// @KafkaListener(id = "consumer_aaak", topics = "data-connection",groupId = "data-connetction")
// public void listen(String input) {
// log.info("input value: {}" , input);
// }
@Autowired
SendService sendService;
@GetMapping(value = "/kafka/send1")
public JsonResult<Void> testKafkaSend1(@RequestParam String message){
sendService.sendMessage(message);
return JsonResult.success(null);
}
}
更多推荐
已为社区贡献2条内容
所有评论(0)