[SpringCloud-SpringCloudStream] 简单通过Spring-cloud-stream组件使用kafka
一.pom依赖<dependency><groupId>org.springframework.cloud</groupId&am
·
1.消费者
一.pom依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
也可以根据springcloud适配版本,省略版本号
二.Kafka消费者配置
#kafka对应的地址
spring.cloud.stream.kafka.binder.brokers = 192.168.xx.xxx:9092
#kafka的zookeeper对应的地址
spring.cloud.stream.kafka.binder.zkNodes = 192.168.xx.xxx:2181
#监听kafka的topic
spring.cloud.stream.bindings.xxxxxx.destination = topic-test
#消费者分组
spring.cloud.stream.bindings.xxxxxx.group = test-group
#接收原始消息
spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw
其中“xxxxxx”是自定义字段,需要和第三步中消费者代码的String INPUT = "xxxxxx";保持一致;
三.消费者代码
public interface MqSinkI {
String INPUT = "xxxxxx";
/**
* 消费者接口
*
* @return org.springframework.messaging.SubscribableChannel 接口对象
*/
@Input(MqSinkI.INPUT)
SubscribableChannel input();
}
@EnableBinding(value = {MqSinkI.class})
public class MqSinkReceiver {
@Autowired
MqListener mqListener;
@StreamListener(MqSinkI.INPUT)
public void messageListen(JSONObject jsonParam) {
System.out.println("收到信息:" + jsonParam.toString());
//处理请求的类,对消息进行处理
mqListener.listen(jsonParam);
}
}
@Component
public class MqListener {
public void listen(JSONObject jsonParam) {
System.out.println("收到:" + jsonParam);
}
}
四.小结
- 注意配置项里面的xxx字段要和代码定义的字符串常量保持一致
- MqListener 类可以可以省去,处理逻辑直接可以写在MqSinkReceiver 类的messageListen里面;
- 配置中的地址和主题都可以配置多个
- spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw配置项
可能会影响消息的接收格式,如果不添加这条配置,接收引擎的消息可能会有问题,如果其他生产者按照第2点的方式生产消息,则可以不使用这条配置。
2.生产者
一.pom依赖
与消费者一样
二.Kafka生产者配置
#kafka对应的地址
spring.cloud.stream.kafka.binder.brokers=192.168.11.199:9092
#kafka的zookeeper对应的地址
spring.cloud.stream.kafka.binder.zkNodes=192.168.11.199:2181
spring.cloud.stream.bindings.oooooooo.destination=topic-test
spring.cloud.stream.bindings.oooooooo.content-type=application/json
三.生产者代码
public interface MySource {
String OUTPUT = "oooooooo";
String OUTPUT1 = "myOutputTest1";
@Output(MySource.OUTPUT)
MessageChannel output();
}
@EnableBinding(MySource.class)
public class SendService {
@Autowired
private MySource mySource;
public void sendMessage(String msg) {
try {
mySource.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RestController
public class ProducerController {
@Autowired
private SendService service;
@RequestMapping(value = "/kafka")
public void send() {
while (true) {
//发送消息到指定topic
JSONObject obj = new JSONObject();
obj.put("time", (new Date()).toString());
System.out.println("生产者发送:" + obj.toString());
service.sendMessage(obj.toString());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
}
}
}
3.源码
更多推荐
已为社区贡献4条内容
所有评论(0)