SpringCloudStream是一个构建消息驱动的微服务框架。SpringCloudStream构建在SpringBoot之上用以创建工业级的应用程序,并且Spring Integration提供了和消息代理的连接。SpringCloudStream提供几个厂商消息中间件个性化配置,引入发布订阅、消费组和分区的语义概念。添加@EnableBinding注解在你的程序中,被@StreamListener修饰的方法可以立即连接到消息代理接收流处理事件。

原生Spring Integration支持

生产者:
package original;

import java.util.UUID;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.MessageBuilder;

import bean.TimeInfo;

@EnableBinding(Source.class)
public class DataProducer {


    @Bean
    @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
    public MessageSource<TimeInfo> timerMessageSource() {
        return () -> MessageBuilder.withPayload(
                TimeInfo.builder().time(Long.toString(System.currentTimeMillis())).label(UUID.randomUUID().toString()).build())
                .build();
    }


}
消费者
package original;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import bean.SinkTimeInfo;

@EnableBinding(Sink.class)
public class DataConsumer {

    @StreamListener(Sink.INPUT)
    public void loggerSink(SinkTimeInfo sinkTimeInfo) {
        System.out.println("Received: " + sinkTimeInfo.toString());
    }  

}

自定义input和output通道

output通道
package custom;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface ProducerSource {

    String OUTPUT = "output";

    @Output(ProducerSource.OUTPUT)
    MessageChannel output();

}
生产者
package custom;

import java.util.Random;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import bean.Order;

@EnableBinding(ProducerSource.class)
@Component
public class Producer {

    @Autowired
    private ProducerSource source;

    @Scheduled(fixedRate = 3000)
    public void produceHotDrinks() {
        try {
            source.output().send(MessageBuilder
                    .withPayload(Order.builder().flag("Hot").num(new Random().nextInt(100)).build()).build());
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    @Scheduled(fixedRate = 3000)
    public void produceColdDrinks() {
        source.output().send(MessageBuilder
                .withPayload(Order.builder().flag("Cold").num(new Random().nextInt(100)).build()).build());
    }
input通道
package custom;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ConsumerSink {

    String INPUT = "input";
    @Input(ConsumerSink.INPUT)
    SubscribableChannel input();
}
消费者
package custom;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

import bean.Order;

@EnableBinding(ConsumerSink.class)
public class Consumer {

    @StreamListener(ConsumerSink.INPUT)
    public void listen(Order order) {
        System.out.println("Order Received: " + order);

    }

kafka配置

---
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: wd
          content-type: application/json
        input:
          destination: wd
          content-type: application/json

      kafka:
        binder:
          zkNodes: 192.168.164.129
          brokers: 192.168.164.129
          input:
            consumer:
              resetOffsets: true
---
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: wd
          content-type: application/json

      kafka:
        binder:
          zkNodes: 192.168.164.129
          brokers: 192.168.164.129
          input:
            consumer:
              resetOffsets: true

https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/

http://spring-cloud.io/reference/stream/#

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐