springcloud 整合Mqtt,kafka
springcloud 整合Mqtt,kafka (rabbit与kafka整和方式相同)原理说明:springcloud整合mqtt主要是使用spring-boot-starter-integration、spring-integration-mqtt,springcloud整合kafka、rabbitmq主要是使用spring-cloud-starter-stream-kafkas...
·
springcloud 整合Mqtt,kafka (rabbit与kafka整和方式相同)
原理说明:
springcloud整合mqtt主要是使用spring-boot-starter-integration、spring-integration-mqtt,springcloud整合kafka、rabbitmq主要是使用spring-cloud-starter-stream-kafka
springcloud整合mqtt
依赖:
<!--mqtt-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<optional>true</optional>
</dependency>
整合代码:
import cn.enncloud.iot.iotmqtttransferkafkahps.constant.AdapterProperties;
import cn.enncloud.iot.iotmqtttransferkafkahps.process.MessageMqttProcessHandler;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Optional;
@Slf4j
@Configuration
public class MqttConfig {
@Autowired
private MessageMqttProcessHandler messageProcess;
@Autowired
private AdapterProperties adapterProperties;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { adapterProperties.getHost() });
options.setUserName(adapterProperties.getUsername());
options.setPassword(adapterProperties.getPassword().toCharArray());
options.setCleanSession(adapterProperties.isCleanSession());
factory.setConnectionOptions(options);
return factory;
}
// publisher
@Bean
public IntegrationFlow mqttOutFlow() {
//console input
// return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
// e -> e.poller(Pollers.fixedDelay(1000)))
// .transform(p -> p + " sent to MQTT")
// .handle(mqttOutbound())
// .get();
return IntegrationFlows.from(outChannelMqtt())
.handle(mqttOutbound())
.get();
}
@Primary
@Bean
public MessageChannel outChannelMqtt() {
return new DirectChannel();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(adapterProperties.getPublisher(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(adapterProperties.getTopic());
return messageHandler;
}
// consumer
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
// .transform(p -> p + ", received from MQTT")
// .handle(logger())
.handle(accephandler())
.get();
}
@Bean
// 这里注入处理逻辑服务
public MessageHandler accephandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Optional optional = Optional.ofNullable(message.getPayload());
if(optional.isPresent()){
messageProcess.doProcess(message);
}else{
log.info("my"+message.getHeaders());
}
}
};
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(adapterProperties.getConsumer(),
mqttClientFactory(), adapterProperties.getTopic());
adapter.setCompletionTimeout(adapterProperties.getTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(adapterProperties.getQos());
return adapter;
}
@MessagingGateway(defaultRequestChannel = "outChannelMqtt")
public interface MessageWriter{
void write(String data);
}
}
写一个发送消息demo
import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class SendMessageController {
@Autowired
MqttConfig.MessageWriter messageWriter;
@RequestMapping(value = "send",method = RequestMethod.GET)
public void send(@RequestParam(value = "message") String message){
log.info("收到meseage"+message);
messageWriter.write(message);
}
}
yml中配置:
enn:
mqtt:
# todo
host: tcp://**:1883
publisher: samplePublisher
# todo
consumer: dampleConsumerprod2
# 共享订阅
# topic: $share/group1/allInOne
# 本地共享订阅
topic: $local/$share/group1/allInOne
username: %%
password: %%
timeout: 5000
qos: 2
cleanSession: false
AdapterProperties.java获取yml中配置信息类
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @description: 获取mqtt信息
* @author: zdl
* @createDate: 2018/8/16 16:59
* @updateUser: zdl
* @updateDate: 2018/8/16 16:59
* @updateRemark: 修改内容
* @version: 1.0
*/
@Component
@ConfigurationProperties(prefix = "enn.mqtt")
@Data
@ToString
public class AdapterProperties {
private String host;
private Integer pakageSize;
private String username;
private String password;
private String topic;
private String publisher;
private String consumer;
private Integer timeout;
private Integer qos;
private String pre;
private boolean cleanSession;
}
SendMessageController.java具体声明的消费处理接收到mqtt信息类
import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class SendMessageController {
@Autowired
MqttConfig.MessageWriter messageWriter;
@RequestMapping(value = "send",method = RequestMethod.GET)
public void send(@RequestParam(value = "message") String message){
log.info("收到meseage"+message);
messageWriter.write(message);
}
springcloud整合kafka(rabbitmq相同)
依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>org.springframework.cloud</groupId>-->
<!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>-->
<!--</dependency>-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
整合代码(已经包含了使用逻辑):
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface KafkaChannel {
/**
* 发消息的通道名称
*/
String CIM_OUTPUT = "cim_output";
/**
* 消息的订阅通道名称
*/
// String CIM_INPUT = "cim_input";
/**
* 发消息的通道
*
* @return
*/
@Output(CIM_OUTPUT)
MessageChannel sendCIMMessage();
/**
* 收消息的通道
*
* @return
*/
// @Input(CIM_INPUT)
// SubscribableChannel recieveCIMMessage();
}
@EnableBinding(value = Sink.class)
public class MessageKafkaCimHandler {
@Autowired
private MessageKafkaOutHandler messageKafkaOutHandler;
private static final String CHARSET = "UTF-8";
@StreamListener(Sink.INPUT)
public void doProcess(Message<byte[]> message){ messageKafkaOutHandler.doProcess(MessageBuilder.withPayload(kafkaData).build());
}
}
import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.KafkaChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@EnableBinding(value = KafkaChannel.class)
public class MessageKafkaOutHandler {
@Qualifier(value = KafkaChannel.CIM_OUTPUT)
@Autowired
private MessageChannel cimOutput;
public boolean doProcess(Message<?> message) throws Exception{
return this.cimOutput.send(message);
}
}
yml配置文件:
spring:
cloud:
stream:
default-binder: kafka
bindings:
#cim_output自定义配置的,上看代码中的接口KafkaChannel .output,input不需要配置接口,默认提供,直接使用即可。
cim_output:
destination: topic1
content-type: application/json
binder: kafka
producer:
headerMode: raw
output:
destination: topic2
content-type: application/json
binder: kafka
producer:
headerMode: raw
input:
destination: topic2
content-type: application/json
binder: kafka
group: hpsgroup
consumer:
headerMode: raw
kafka:
binder:
brokers: *.*.*.1:9092,1.*.*.*:9092,1.*.*.*:9092
zk-nodes: *.*.*.1:2181,1.*.*.*:2181,1.*.*.*:2181
kafka:
producer:
client-id: mqtttransferkafka
最后提醒:
复制要注意把配置复制全了,都是血泪调试出来的必需品,少了可能就要开始汤坑了。
更多推荐
已为社区贡献1条内容
所有评论(0)