Spring Cloud Stream + Kafka
消息驱动,微服务框架。SpringCloudStream,SpringBoot,kafka
·
SpringCloudStream是一个构建消息驱动的微服务框架。SpringCloudStream构建在SpringBoot之上用以创建工业级的应用程序,并且Spring Integration提供了和消息代理的连接。SpringCloudStream提供几个厂商消息中间件个性化配置,引入发布订阅、消费组和分区的语义概念。添加@EnableBinding注解在你的程序中,被@StreamListener修饰的方法可以立即连接到消息代理接收流处理事件。
原生Spring Integration支持
生产者:
package original;
import java.util.UUID;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.MessageBuilder;
import bean.TimeInfo;
@EnableBinding(Source.class)
public class DataProducer {
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<TimeInfo> timerMessageSource() {
return () -> MessageBuilder.withPayload(
TimeInfo.builder().time(Long.toString(System.currentTimeMillis())).label(UUID.randomUUID().toString()).build())
.build();
}
}
消费者
package original;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import bean.SinkTimeInfo;
@EnableBinding(Sink.class)
public class DataConsumer {
@StreamListener(Sink.INPUT)
public void loggerSink(SinkTimeInfo sinkTimeInfo) {
System.out.println("Received: " + sinkTimeInfo.toString());
}
}
自定义input和output通道
output通道
package custom;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface ProducerSource {
String OUTPUT = "output";
@Output(ProducerSource.OUTPUT)
MessageChannel output();
}
生产者
package custom;
import java.util.Random;
import java.util.UUID;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import bean.Order;
@EnableBinding(ProducerSource.class)
@Component
public class Producer {
@Autowired
private ProducerSource source;
@Scheduled(fixedRate = 3000)
public void produceHotDrinks() {
try {
source.output().send(MessageBuilder
.withPayload(Order.builder().flag("Hot").num(new Random().nextInt(100)).build()).build());
} catch (Exception e) {
System.out.println(e);
}
}
@Scheduled(fixedRate = 3000)
public void produceColdDrinks() {
source.output().send(MessageBuilder
.withPayload(Order.builder().flag("Cold").num(new Random().nextInt(100)).build()).build());
}
input通道
package custom;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ConsumerSink {
String INPUT = "input";
@Input(ConsumerSink.INPUT)
SubscribableChannel input();
}
消费者
package custom;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import bean.Order;
@EnableBinding(ConsumerSink.class)
public class Consumer {
@StreamListener(ConsumerSink.INPUT)
public void listen(Order order) {
System.out.println("Order Received: " + order);
}
kafka配置
---
spring:
cloud:
stream:
bindings:
output:
destination: wd
content-type: application/json
input:
destination: wd
content-type: application/json
kafka:
binder:
zkNodes: 192.168.164.129
brokers: 192.168.164.129
input:
consumer:
resetOffsets: true
---
spring:
cloud:
stream:
bindings:
input:
destination: wd
content-type: application/json
kafka:
binder:
zkNodes: 192.168.164.129
brokers: 192.168.164.129
input:
consumer:
resetOffsets: true
https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/
更多推荐
已为社区贡献3条内容
所有评论(0)