Spring Cloud 集成 kafka
1、在pom.xml里面添加kafka的maven依赖org.springframework.cloudspring-cloud-starter-stream-kafka2、在properties 配置文件里面添加 kafka binder 参数spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092spring.clo
·
kafka的安装部署请参考:kafka安装部署
1、在pom.xml里面添加kafka的maven依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2、在properties 配置文件里面添加 kafka binder 参数
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092
spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181
spring.cloud.stream.kafka.binder.minPartitionCount=1
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=true
3、输入通道定义,供消费者使用
(1)在properties配置文件里面添加输入通道配置信息
spring.cloud.stream.bindings.testa.destination=test_spring_stream
spring.cloud.stream.bindings.testa.group=group-1
spring.cloud.stream.bindings.testa.consumer.concurrency=1
spring.cloud.stream.bindings.testa.consumer.partitioned=false
(2)定义输入通道并绑定输入通道配置信息
public interface Sink {
//接收队列1
String INPUT_1 = "testa";
@Input(Sink.INPUT_1)
SubscribableChannel input1();
}
INPUT_1 = "testa" 跟配置文件里面的通道名称 testa 保持一致
4、输出通道定义,供生产者使用
(1)在properties配置文件里面添加输出通道配置信息
spring.cloud.stream.bindings.sourceA.destination=test_spring_stream
spring.cloud.stream.bindings.sourceA.producer.partitionCount=1
(2)定义输出通道并绑定输出通道配置信息
public interface Source {
//发送队列1
String OUTPUT_1 = "sourceA";
@Output(Source.OUTPUT_1)
MessageChannel output1();
}
OUTPUT_1 = "sourceA" 跟配置文件里面的通道名称 sourceA 保持一致
5、生产者端代码
@EnableBinding(Source.class)
public class KafkaSender {
private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
@Autowired
private Source source;
public void sendMessage(String message) {
try {
source.output1().send(MessageBuilder.withPayload("message: " + message).build());
} catch (Exception e) {
logger.info("消息发送失败,原因:"+e);
e.printStackTrace();
}
}
}
调用sendMessage方法发送消息
6、消费者端代码
@EnableBinding(Sink.class)
public class KafkaReceiver {
private final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
@StreamListener(Sink.INPUT_1)
private void receive(String vote) {
logger.info("receive message : " + vote);
}
}
通过receive方法接收消息
更多推荐
已为社区贡献3条内容
所有评论(0)