spring cloud stream-Kafka Streams的简单使用
spring cloud stream-Kafka Streams的简单使用<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka-streams</artifactId><...
·
spring cloud stream-Kafka Streams的简单使用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
spring cloud stream-Apache Kafka的简单使用
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
定义发射器/接收器
public interface Consumer {
String FILE_UPLOAD = "fileUpload";
String FINISHED_UPLOAD = "finishedUpload";
// 接收消息的通道,对应配置
@Input(FINISHED_UPLOAD)
SubscribableChannel finishedUpload();
// 发消息的通道
@Output(FILE_UPLOAD)
MessageChannel fileUpload();
}
public interface Producer {
String FILE_UPLOAD = "fileUpload";
String FINISHED_UPLOAD = "finishedUpload";
// 接收消息的通道
@Input(FILE_UPLOAD)
SubscribableChannel fileUpload();
// 发消息的通道
@Output(FINISHED_UPLOAD)
MessageChannel finishedUpload();
}
消息发送/消息处理
public class FileUploadController {
@Autowired
private Consumer consumer;
@SysLog("文件上传" )
@PostMapping
public R save() {
log.info("文件上传成功");
log.info("通过kafka发送消息");
// 有返回值,如果要关注发送结果,则判断返回值
// 一般消息体不会这么简单
consumer.fileUpload().send(MessageBuilder.withPayload("upgrade").build());
return R.ok();
}
}
@Transactional
@Component
@Slf4j
public class UploadListenner {
@Autowired
@Qualifier(Producer.FINISHED_UPLOAD)
private MessageChannel finishedUploadMessageChannel;
@StreamListener(Producer.FILE_UPLOAD)
@SendTo(Producer.FINISHED_UPLOAD)
public String processFileUpload(String message) {
log.info("Consumer"+ message);
log.info("Consumer upgrading .....");
return message;
}
}
监听结果
@Component
@Slf4j
public class UploadListenner {
@StreamListener(Consumer.FINISHED_UPLOAD)
public void listenFileUpload(String message) {
log.info("Producer:",message);
}
}
配置
生产者
stream:
kafka:
binder:
brokers: 192.168.204.131
bindings:
finishedUpload:
group: @artifactId@
消费者
stream:
kafka:
binder:
brokers: 192.168.204.131
bindings:
fileUpload:
group: @artifactId@
主程序
@EnableBinding(Consumer.class)
public class MessageProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
@EnableBinding(Producer.class)
public class MessageConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MessageConsumerApplication.class, args);
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)