Kafka消息广播解决后端服务集群部署WebSocket消息不推送问题

1.当前平台现状

消息推送流程

  1. 用户登录,建立socket连接,将socket通道ID存储到Redis;
  2. socket服务端将socket连接通道缓存到本地HashMap中,其中key使用步骤1中socket通道ID,value基于Netty框架ChannelHandlerContext实现;
  3. 调用消息推送接口时,根据消息接收人ID获取socket通道ID,从通道缓存HashMap中获取连接通道,推送消息到客户端

存在问题

集群部署情况下,如云平台启动平台服务负载的多个副本,那么连接通道只会在用户登录时存储到某一个实例的本地HashMap,而其他副本无法同步(WebSocket的连接通道是一个与服务器端强关联的对象,无法通过Redis缓存进行共享),当消息推送接口请求分发到其他副本时,无法获取连接通道,因此无法推送消息,从而导致消息丢失。

2.解决方案

本文使用以下方案实现:

  1. 消息推送时,进行一次平台内部的消息广播,通知所有平台副本进行判断,通过消息接收人ID从Redis获取socket通道ID,然后尝试从通道缓存HashMap中获取连接通道,判断是否存在,如果不存在则表明socket连接不在当前副本;如果存在,表明socket连接在当前副本,进行消息推送。
  2. 消息广播基于kafka实现,消费端通过随机UUID产生消费者组ID,确保platform服务的每一个集群中的实例启动时,消费者组ID都是唯一的,广播消息会被所有副本消费一次,从而实现消息广播。

消费者端代码示例:

//UUID设置消费者组ID

@KafkaListener(groupId = QueueConstants.Message.EXCHANGE_MESSAGE +  "-" + "#{T(java.util.UUID).randomUUID()}", topics = QueueConstants.Message.ROUTING_KEY_MESSAGE+"."+"broadcast")
public class KafkaMessageQueueConsumerBroadCast <M extends Message<?>> implements IQueueConsumer<M> {
    private static final String CACHE_SOCKET_KEY = "socket.cache";

    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageQueueConsumerBroadCast.class);

    //消费广播消息,向接收人客户端推送socket消息
    @Override
    @KafkaHandler
    public void popup(M message) {

        String content = message.getTemplate().getContent();
        SocketMessagePo smPo = JacksonUtil.getDTO(content,SocketMessagePo.class);
        RedisTemplate<String, Serializable> redisTemplate = (RedisTemplate) AppUtil.getBean("redisTemplate");
        if(redisTemplate.opsForHash().hasKey(CACHE_SOCKET_KEY, smPo.getToUserId())){
            try{
                IMDataProto.IMData.MsgData.Builder builder = IMDataProto.IMData.MsgData.newBuilder();
                builder.setId(smPo.getId());
                builder.setToUserId(smPo.getToUserId());
                builder.setMsgType(smPo.getType());
                builder.setCustomType(smPo.getCustomType());
                builder.setMsgBody(smPo.getBody());
                builder.setSendTime(smPo.getSendTime().getTime());
                if (StringUtil.isNotBlank(smPo.getExpand())) {
                    Map<String, Object> ext = JacksonUtil.toMap(smPo.getExpand());
                    ext.forEach((k, v) -> builder.putExpand(k, v.toString()));
                }

                IMDataProto.IMData.MsgData.FromUser.Builder fromUser = IMDataProto.IMData.MsgData.FromUser.newBuilder();
                fromUser.setId(smPo.getFromUserId());

                builder.setFromUser(fromUser.build());

                SocketUserInfo socketUserInfo = (SocketUserInfo) redisTemplate.opsForHash().get(CACHE_SOCKET_KEY, smPo.getToUserId());
                if(logger.isDebugEnabled()) {
                    logger.debug("获得缓存中的用户信息 => userId: [{}], token: [{}]", socketUserInfo.getUserId(), socketUserInfo.getToken());
                }
                String channelId = socketUserInfo.getChannelId();
                if(logger.isDebugEnabled()) {
                    logger.debug("channelId: [{}], isNotBlank: [{}]", socketUserInfo.getChannelId(), StringUtil.isNotBlank(channelId));
                }
                //用户保持socket连接的推送消息
                if(StringUtil.isNotBlank(channelId) && NettyConstants.CHANNEL_MAP.containsKey(channelId)){
                    logger.info("--------->推送消息,用户id {},channelId {}",smPo.getToUserId(),channelId);
                    ChannelHandlerContext ctx = NettyConstants.CHANNEL_MAP.get(channelId);
                    NettyMessageUtils.sendMessage(ctx, null, NettyConstants.DataType.MSG_PARAM, builder);
                }else{
                    logger.debug("--------->当前应用实例本地无socket连接通道缓存,不推送消息");
                }

            }catch (Exception e) {
                logger.error("推送信息异常:{}", JacksonUtil.toJsonString(smPo), e);
            }
    }

    }

   
}

3.总结

集群情况下,从本地缓存获取连接通道的方式是行不通的,本文采用kafka消息广播方式,让集群所有实例都进行本地缓存获取连接通道的操作,确保拥有连接通道缓存的实例进行消息推送,从而确保消息不丢失。

消息广播实现除了kafka外,其他消息中间件或Redis也可以。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐