kafka的安装部署请参考:kafka安装部署

1、在pom.xml里面添加kafka的maven依赖

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

2、在properties 配置文件里面添加 kafka binder 参数

spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092
spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181
spring.cloud.stream.kafka.binder.minPartitionCount=1
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=true

3、输入通道定义,供消费者使用

(1)在properties配置文件里面添加输入通道配置信息

spring.cloud.stream.bindings.testa.destination=test_spring_stream
spring.cloud.stream.bindings.testa.group=group-1
spring.cloud.stream.bindings.testa.consumer.concurrency=1
spring.cloud.stream.bindings.testa.consumer.partitioned=false

(2)定义输入通道并绑定输入通道配置信息

public interface Sink {
	
	//接收队列1
	String INPUT_1 = "testa";

	@Input(Sink.INPUT_1)
	SubscribableChannel input1();

}
INPUT_1 = "testa" 跟配置文件里面的通道名称 testa 保持一致

4、输出通道定义,供生产者使用

(1)在properties配置文件里面添加输出通道配置信息

spring.cloud.stream.bindings.sourceA.destination=test_spring_stream
spring.cloud.stream.bindings.sourceA.producer.partitionCount=1

(2)定义输出通道并绑定输出通道配置信息

public interface Source {
	
	//发送队列1
	String OUTPUT_1 = "sourceA";
	
	@Output(Source.OUTPUT_1)
	MessageChannel output1();
	
}
OUTPUT_1 = "sourceA" 跟配置文件里面的通道名称 sourceA 保持一致

5、生产者端代码

@EnableBinding(Source.class)
public class KafkaSender {
	
    private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
	
    @Autowired
    private Source source;
	
    public void sendMessage(String message) {
	try {
	    source.output1().send(MessageBuilder.withPayload("message: " + message).build());
	} catch (Exception e) {
	    logger.info("消息发送失败,原因:"+e);
	    e.printStackTrace();
	}
    }
}
调用sendMessage方法发送消息

6、消费者端代码

@EnableBinding(Sink.class)
public class KafkaReceiver {

	private final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);

	@StreamListener(Sink.INPUT_1)
	private void receive(String vote) {
		logger.info("receive message : " + vote);
	}
	
}
通过receive方法接收消息


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐