springcloud 整合Mqtt,kafka (rabbit与kafka整和方式相同)

原理说明:

springcloud整合mqtt主要是使用spring-boot-starter-integration、spring-integration-mqtt,springcloud整合kafka、rabbitmq主要是使用spring-cloud-starter-stream-kafka

springcloud整合mqtt

依赖:

<!--mqtt-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
    <optional>true</optional>
</dependency>

 

整合代码:

import cn.enncloud.iot.iotmqtttransferkafkahps.constant.AdapterProperties;
import cn.enncloud.iot.iotmqtttransferkafkahps.process.MessageMqttProcessHandler;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.util.Optional;

@Slf4j
@Configuration
public class MqttConfig {
    @Autowired
    private MessageMqttProcessHandler messageProcess;
    @Autowired
    private AdapterProperties adapterProperties;
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { adapterProperties.getHost() });
        options.setUserName(adapterProperties.getUsername());
        options.setPassword(adapterProperties.getPassword().toCharArray());
        options.setCleanSession(adapterProperties.isCleanSession());
        factory.setConnectionOptions(options);
        return factory;
    }

    // publisher

    @Bean
    public IntegrationFlow mqttOutFlow() {
        //console input
//        return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
//                e -> e.poller(Pollers.fixedDelay(1000)))
//                .transform(p -> p + " sent to MQTT")
//                .handle(mqttOutbound())
//                .get();
        return IntegrationFlows.from(outChannelMqtt())
                .handle(mqttOutbound())
                .get();
    }

    @Primary
    @Bean
    public MessageChannel outChannelMqtt() {
        return new DirectChannel();
    }

    @Bean
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(adapterProperties.getPublisher(), mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(adapterProperties.getTopic());
        return messageHandler;
    }

    // consumer

    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
//                .transform(p -> p + ", received from MQTT")
//                .handle(logger())
                .handle(accephandler())
                .get();
    }
    @Bean
//   这里注入处理逻辑服务
    public MessageHandler accephandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                Optional optional = Optional.ofNullable(message.getPayload());
                if(optional.isPresent()){
                    messageProcess.doProcess(message);
                }else{
                    log.info("my"+message.getHeaders());
                }

            }

        };
    }


    private LoggingHandler logger() {
        LoggingHandler loggingHandler = new LoggingHandler("INFO");
        loggingHandler.setLoggerName("siSample");
        return loggingHandler;
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(adapterProperties.getConsumer(),
                mqttClientFactory(), adapterProperties.getTopic());
        adapter.setCompletionTimeout(adapterProperties.getTimeout());
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(adapterProperties.getQos());
        return adapter;
    }
    @MessagingGateway(defaultRequestChannel = "outChannelMqtt")
    public interface MessageWriter{
        void write(String data);
    }



}

写一个发送消息demo

import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class SendMessageController {

    @Autowired
    MqttConfig.MessageWriter messageWriter;


    @RequestMapping(value = "send",method = RequestMethod.GET)
    public void send(@RequestParam(value = "message") String message){
        log.info("收到meseage"+message);
        messageWriter.write(message);

    }

}

yml中配置:

enn:
  mqtt:
    # todo
    host: tcp://**:1883
    publisher: samplePublisher
    # todo
    consumer: dampleConsumerprod2
    # 共享订阅
    #    topic: $share/group1/allInOne
    # 本地共享订阅
    topic: $local/$share/group1/allInOne
    username: %%
    password: %%
    timeout: 5000
    qos: 2
    cleanSession: false

AdapterProperties.java获取yml中配置信息类

import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @description:    获取mqtt信息
 * @author:         zdl
 * @createDate:     2018/8/16 16:59
 * @updateUser:     zdl
 * @updateDate:     2018/8/16 16:59
 * @updateRemark:   修改内容
 * @version:        1.0
 */
@Component
@ConfigurationProperties(prefix = "enn.mqtt")
@Data
@ToString
public class AdapterProperties {

    private String host;
    private Integer pakageSize;

    private String username;
    private String password;
    private String topic;
    private String publisher;
    private String consumer;
    private Integer timeout;
    private Integer qos;
    private String pre;
    private boolean cleanSession;


}

SendMessageController.java具体声明的消费处理接收到mqtt信息类

import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class SendMessageController {

    @Autowired
    MqttConfig.MessageWriter messageWriter;


    @RequestMapping(value = "send",method = RequestMethod.GET)
    public void send(@RequestParam(value = "message") String message){
        log.info("收到meseage"+message);
        messageWriter.write(message);

    }

 

springcloud整合kafka(rabbitmq相同)

依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<!--<dependency>-->
    <!--<groupId>org.springframework.cloud</groupId>-->
    <!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>-->
<!--</dependency>-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

整合代码(已经包含了使用逻辑):

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface KafkaChannel {
    /**
     * 发消息的通道名称
     */
    String CIM_OUTPUT = "cim_output";

    /**
     * 消息的订阅通道名称
     */
//    String CIM_INPUT = "cim_input";

    /**
     * 发消息的通道
     *
     * @return
     */
    @Output(CIM_OUTPUT)
    MessageChannel sendCIMMessage();

    /**
     * 收消息的通道
     *
     * @return
     */
//    @Input(CIM_INPUT)
//    SubscribableChannel recieveCIMMessage();
}
@EnableBinding(value = Sink.class)
public class MessageKafkaCimHandler {
   
    @Autowired
    private MessageKafkaOutHandler messageKafkaOutHandler;
    

    private static final String CHARSET = "UTF-8";
    @StreamListener(Sink.INPUT)
    public void doProcess(Message<byte[]> message){           messageKafkaOutHandler.doProcess(MessageBuilder.withPayload(kafkaData).build());
}
}
import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.KafkaChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@EnableBinding(value = KafkaChannel.class)
public class MessageKafkaOutHandler {
    @Qualifier(value = KafkaChannel.CIM_OUTPUT)
    @Autowired
    private MessageChannel cimOutput;
    public boolean doProcess(Message<?> message) throws Exception{
        return this.cimOutput.send(message);
    }
}

yml配置文件:

spring:
  cloud:
    stream:
      default-binder: kafka
      bindings:
        #cim_output自定义配置的,上看代码中的接口KafkaChannel .output,input不需要配置接口,默认提供,直接使用即可。
        cim_output:
          destination: topic1
          content-type: application/json
          binder: kafka
          producer:
            headerMode: raw
        output:
          destination: topic2
          content-type: application/json
          binder: kafka
          producer:
            headerMode: raw
        input:
          destination: topic2
          content-type: application/json
          binder: kafka
          group: hpsgroup
          consumer:
            headerMode: raw
      kafka:
        binder:
          brokers: *.*.*.1:9092,1.*.*.*:9092,1.*.*.*:9092
          zk-nodes: *.*.*.1:2181,1.*.*.*:2181,1.*.*.*:2181
  kafka:
    producer:
      client-id: mqtttransferkafka

最后提醒:

复制要注意把配置复制全了,都是血泪调试出来的必需品,少了可能就要开始汤坑了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐