Kafka消息广播解决平台集群部署websocket消息不推送问题
解决集群部署情况下,websocket消息推送问题
·
Kafka消息广播解决后端服务集群部署WebSocket消息不推送问题
1.当前平台现状
消息推送流程
- 用户登录,建立socket连接,将socket通道ID存储到Redis;
- socket服务端将socket连接通道缓存到本地HashMap中,其中key使用步骤1中socket通道ID,value基于Netty框架ChannelHandlerContext实现;
- 调用消息推送接口时,根据消息接收人ID获取socket通道ID,从通道缓存HashMap中获取连接通道,推送消息到客户端
存在问题
集群部署情况下,如云平台启动平台服务负载的多个副本,那么连接通道只会在用户登录时存储到某一个实例的本地HashMap,而其他副本无法同步(WebSocket的连接通道是一个与服务器端强关联的对象,无法通过Redis缓存进行共享),当消息推送接口请求分发到其他副本时,无法获取连接通道,因此无法推送消息,从而导致消息丢失。
2.解决方案
本文使用以下方案实现:
- 消息推送时,进行一次平台内部的消息广播,通知所有平台副本进行判断,通过消息接收人ID从Redis获取socket通道ID,然后尝试从通道缓存HashMap中获取连接通道,判断是否存在,如果不存在则表明socket连接不在当前副本;如果存在,表明socket连接在当前副本,进行消息推送。
- 消息广播基于kafka实现,消费端通过随机UUID产生消费者组ID,确保platform服务的每一个集群中的实例启动时,消费者组ID都是唯一的,广播消息会被所有副本消费一次,从而实现消息广播。
消费者端代码示例:
//UUID设置消费者组ID
@KafkaListener(groupId = QueueConstants.Message.EXCHANGE_MESSAGE + "-" + "#{T(java.util.UUID).randomUUID()}", topics = QueueConstants.Message.ROUTING_KEY_MESSAGE+"."+"broadcast")
public class KafkaMessageQueueConsumerBroadCast <M extends Message<?>> implements IQueueConsumer<M> {
private static final String CACHE_SOCKET_KEY = "socket.cache";
private static final Logger logger = LoggerFactory.getLogger(KafkaMessageQueueConsumerBroadCast.class);
//消费广播消息,向接收人客户端推送socket消息
@Override
@KafkaHandler
public void popup(M message) {
String content = message.getTemplate().getContent();
SocketMessagePo smPo = JacksonUtil.getDTO(content,SocketMessagePo.class);
RedisTemplate<String, Serializable> redisTemplate = (RedisTemplate) AppUtil.getBean("redisTemplate");
if(redisTemplate.opsForHash().hasKey(CACHE_SOCKET_KEY, smPo.getToUserId())){
try{
IMDataProto.IMData.MsgData.Builder builder = IMDataProto.IMData.MsgData.newBuilder();
builder.setId(smPo.getId());
builder.setToUserId(smPo.getToUserId());
builder.setMsgType(smPo.getType());
builder.setCustomType(smPo.getCustomType());
builder.setMsgBody(smPo.getBody());
builder.setSendTime(smPo.getSendTime().getTime());
if (StringUtil.isNotBlank(smPo.getExpand())) {
Map<String, Object> ext = JacksonUtil.toMap(smPo.getExpand());
ext.forEach((k, v) -> builder.putExpand(k, v.toString()));
}
IMDataProto.IMData.MsgData.FromUser.Builder fromUser = IMDataProto.IMData.MsgData.FromUser.newBuilder();
fromUser.setId(smPo.getFromUserId());
builder.setFromUser(fromUser.build());
SocketUserInfo socketUserInfo = (SocketUserInfo) redisTemplate.opsForHash().get(CACHE_SOCKET_KEY, smPo.getToUserId());
if(logger.isDebugEnabled()) {
logger.debug("获得缓存中的用户信息 => userId: [{}], token: [{}]", socketUserInfo.getUserId(), socketUserInfo.getToken());
}
String channelId = socketUserInfo.getChannelId();
if(logger.isDebugEnabled()) {
logger.debug("channelId: [{}], isNotBlank: [{}]", socketUserInfo.getChannelId(), StringUtil.isNotBlank(channelId));
}
//用户保持socket连接的推送消息
if(StringUtil.isNotBlank(channelId) && NettyConstants.CHANNEL_MAP.containsKey(channelId)){
logger.info("--------->推送消息,用户id {},channelId {}",smPo.getToUserId(),channelId);
ChannelHandlerContext ctx = NettyConstants.CHANNEL_MAP.get(channelId);
NettyMessageUtils.sendMessage(ctx, null, NettyConstants.DataType.MSG_PARAM, builder);
}else{
logger.debug("--------->当前应用实例本地无socket连接通道缓存,不推送消息");
}
}catch (Exception e) {
logger.error("推送信息异常:{}", JacksonUtil.toJsonString(smPo), e);
}
}
}
}
3.总结
集群情况下,从本地缓存获取连接通道的方式是行不通的,本文采用kafka消息广播方式,让集群所有实例都进行本地缓存获取连接通道的操作,确保拥有连接通道缓存的实例进行消息推送,从而确保消息不丢失。
消息广播实现除了kafka外,其他消息中间件或Redis也可以。
更多推荐
已为社区贡献1条内容
所有评论(0)