springboot2.1.3集群环境websocket配合kafka发送消息,kafka配置多个group

项目地址

websocket配置参考我的另一篇博文https://blog.csdn.net/qq_35433926/article/details/91864127
项目地址:https://gitee.com/xuelingkang/spring-boot-demo
完整配置参考com.example.kafka包

maven依赖

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

yml添加自定义group配置

kafka:
  consumer:
    websocket:
      properties:
        group.id: spring-boot-demo-websocket-consumer-1

主配置类

package com.example.config;

import com.example.kafka.DefaultProducer;
import com.example.kafka.WebsocketConsumer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableConfigurationProperties(KafkaConfig.WebsocketConsumerProperties.class)
@Slf4j
public class KafkaConfig {

    /** 广播消息主题 */
    public static final String BROADCAST_TOPIC = "broadcast";
    /** 点对点消息主题 */
    public static final String CHAT_TOPIC = "chat";
    @Autowired
    private KafkaProperties kafkaProperties;
    @Autowired
    private KafkaConfig.WebsocketConsumerProperties websocketConsumerProperties;

    // 自定义KafkaListenerContainerFactory,使用@KafkaListener注解监听主题时可以设置containerFactory为这个bean的名字
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> websocketListenerContainerFactory() {
        Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
        properties.putAll(websocketConsumerProperties.getProperties());
        ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(properties);
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        return factory;
    }

    @Bean
    public DefaultProducer defaultProducer() {
        return new DefaultProducer();
    }

    @Bean
    public WebsocketConsumer websocketConsumer() {
        return new WebsocketConsumer();
    }

    @Data
    @ConfigurationProperties(prefix = "kafka.consumer.websocket")
    public class WebsocketConsumerProperties {
        private final Map<String, String> properties = new HashMap<>();
    }

}

consumer

package com.example.kafka;

import com.example.model.po.BroadcastMessage;
import com.example.model.po.ChatMessage;
import com.example.model.po.User;
import com.example.model.vo.BroadcastMessageVO;
import com.example.model.vo.ChatMessageVO;
import com.example.model.vo.UserVO;
import com.example.util.MessageHeadersBuilder;
import com.example.util.ModelUtil;
import com.example.websocket.SessionIdRegistry;
import io.jsonwebtoken.lang.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static com.example.config.KafkaConfig.BROADCAST_TOPIC;
import static com.example.config.KafkaConfig.CHAT_TOPIC;

public class WebsocketConsumer {

    @Value("${websocket.destination.broadcast}")
    private String broadcast;
    @Value("${websocket.destination.chat}")
    private String chat;
    @Autowired
    private SimpMessagingTemplate template;
    @Autowired
    private SessionIdRegistry sessionIdRegistry;

    @KafkaListener(topics = {BROADCAST_TOPIC}, containerFactory = "websocketListenerContainerFactory")
    public void broadcastConsumer(List<BroadcastMessage> broadcastMessages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
        if (Collections.isEmpty(broadcastMessages)) {
            return;
        }
        broadcastMessages.forEach(broadcastMessage -> {
            BroadcastMessageVO broadcastMessageVO = (BroadcastMessageVO) ModelUtil.copy(broadcastMessage,
                    new ModelUtil.Mapping(BroadcastMessage.class, BroadcastMessageVO.class, "toUsers"),
                    new ModelUtil.Mapping(User.class, UserVO.class, "password", "roles"));
            Set<String> sessionIds = new HashSet<>();
            broadcastMessage.getToUsers().forEach(user -> sessionIds.addAll(sessionIdRegistry.getSessionIds(user.getId())));
            sessionIds.forEach(sessionId -> template.convertAndSendToUser(
                    sessionId,
                    broadcast,
                    broadcastMessageVO,
                    new MessageHeadersBuilder()
                            .sessionId(sessionId)
                            .leaveMutable(true)
                            .build()));
        });
    }

    @KafkaListener(topics = {CHAT_TOPIC}, containerFactory = "websocketListenerContainerFactory")
    public void chatConsumer(List<ChatMessage> chatMessages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
        if (Collections.isEmpty(chatMessages)) {
            return;
        }
        chatMessages.forEach(chatMessage -> {
            Set<String> sessionIds = new HashSet<>();
            sessionIds.addAll(sessionIdRegistry.getSessionIds(chatMessage.getToUserId()));
            sessionIds.addAll(sessionIdRegistry.getSessionIds(chatMessage.getSendUserId()));
            ChatMessageVO chatMessageVO = (ChatMessageVO) ModelUtil.copy(chatMessage,
                    new ModelUtil.Mapping(ChatMessage.class, ChatMessageVO.class),
                    new ModelUtil.Mapping(User.class, UserVO.class, "password", "roles"));
            sessionIds.forEach(sessionId -> template.convertAndSendToUser(
                    sessionId,
                    chat,
                    chatMessageVO,
                    new MessageHeadersBuilder()
                            .sessionId(sessionId)
                            .leaveMutable(true)
                            .build()));
        });
    }

}

启动项目命令

nohup java -Dfile.encoding=utf-8 -jar xxx.jar --kafka.consumer.websocket.properties.group.id=自定义分组 > ${log} 2>&1 &

集群的各个节点启动时使用不同的启动参数即可

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐