spring cloud stream-Kafka Streams的简单使用

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

spring cloud stream-Apache Kafka的简单使用

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

定义发射器/接收器

public interface Consumer {
	String FILE_UPLOAD = "fileUpload";
	String FINISHED_UPLOAD = "finishedUpload";
	// 接收消息的通道,对应配置
	@Input(FINISHED_UPLOAD)
	SubscribableChannel finishedUpload();
	// 发消息的通道
	@Output(FILE_UPLOAD)
	MessageChannel fileUpload();
}
public interface Producer {
	String FILE_UPLOAD = "fileUpload";
	String FINISHED_UPLOAD = "finishedUpload";
	// 接收消息的通道
	@Input(FILE_UPLOAD)
	SubscribableChannel fileUpload();
	// 发消息的通道
	@Output(FINISHED_UPLOAD)
	MessageChannel finishedUpload();
}

消息发送/消息处理

public class FileUploadController {
	@Autowired
	private Consumer consumer;
	@SysLog("文件上传" )
	@PostMapping
	public R save() {
		log.info("文件上传成功");
		log.info("通过kafka发送消息");
		// 有返回值,如果要关注发送结果,则判断返回值
		// 一般消息体不会这么简单
		consumer.fileUpload().send(MessageBuilder.withPayload("upgrade").build());
		return R.ok();
	}
}
@Transactional
@Component
@Slf4j
public class UploadListenner {
	@Autowired
	@Qualifier(Producer.FINISHED_UPLOAD)
	private MessageChannel finishedUploadMessageChannel;
	@StreamListener(Producer.FILE_UPLOAD)
	@SendTo(Producer.FINISHED_UPLOAD)
	public String processFileUpload(String message) {
		log.info("Consumer"+ message);
		log.info("Consumer upgrading .....");
		return message;
	}
}

监听结果

@Component
@Slf4j
public class UploadListenner {
	@StreamListener(Consumer.FINISHED_UPLOAD)
	public void listenFileUpload(String message) {
		log.info("Producer:",message);
	}
}

配置

生产者
stream:
      kafka:
        binder:
          brokers: 192.168.204.131
      bindings:
        finishedUpload:
          group: @artifactId@
消费者
 stream:
      kafka:
        binder:
         brokers: 192.168.204.131
      bindings:
        fileUpload:
            group: @artifactId@

主程序

@EnableBinding(Consumer.class)
public class MessageProducerApplication {
		public static void main(String[] args) {
			SpringApplication.run(ProducerApplication.class, args);
		}
}
@EnableBinding(Producer.class)
public class MessageConsumerApplication {
		public static void main(String[] args) {
			SpringApplication.run(MessageConsumerApplication.class, args);
		}
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐