springboot2.1.3集群环境websocket配合kafka发送消息,kafka配置多个group
springboot2.1.3集群环境websocket配合kafka发送消息,kafka配置多个group项目地址maven依赖yml添加自定义group配置主配置类consumer启动项目命令项目地址websocket配置参考我的另一篇博文https://blog.csdn.net/qq_35433926/article/details/91864127项目地址:https://gite...
·
项目地址
websocket配置参考我的另一篇博文https://blog.csdn.net/qq_35433926/article/details/91864127
项目地址:https://gitee.com/xuelingkang/spring-boot-demo
完整配置参考com.example.kafka包
maven依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
yml添加自定义group配置
kafka:
consumer:
websocket:
properties:
group.id: spring-boot-demo-websocket-consumer-1
主配置类
package com.example.config;
import com.example.kafka.DefaultProducer;
import com.example.kafka.WebsocketConsumer;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableConfigurationProperties(KafkaConfig.WebsocketConsumerProperties.class)
@Slf4j
public class KafkaConfig {
/** 广播消息主题 */
public static final String BROADCAST_TOPIC = "broadcast";
/** 点对点消息主题 */
public static final String CHAT_TOPIC = "chat";
@Autowired
private KafkaProperties kafkaProperties;
@Autowired
private KafkaConfig.WebsocketConsumerProperties websocketConsumerProperties;
// 自定义KafkaListenerContainerFactory,使用@KafkaListener注解监听主题时可以设置containerFactory为这个bean的名字
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> websocketListenerContainerFactory() {
Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.putAll(websocketConsumerProperties.getProperties());
ConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(properties);
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
return factory;
}
@Bean
public DefaultProducer defaultProducer() {
return new DefaultProducer();
}
@Bean
public WebsocketConsumer websocketConsumer() {
return new WebsocketConsumer();
}
@Data
@ConfigurationProperties(prefix = "kafka.consumer.websocket")
public class WebsocketConsumerProperties {
private final Map<String, String> properties = new HashMap<>();
}
}
consumer
package com.example.kafka;
import com.example.model.po.BroadcastMessage;
import com.example.model.po.ChatMessage;
import com.example.model.po.User;
import com.example.model.vo.BroadcastMessageVO;
import com.example.model.vo.ChatMessageVO;
import com.example.model.vo.UserVO;
import com.example.util.MessageHeadersBuilder;
import com.example.util.ModelUtil;
import com.example.websocket.SessionIdRegistry;
import io.jsonwebtoken.lang.Collections;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.example.config.KafkaConfig.BROADCAST_TOPIC;
import static com.example.config.KafkaConfig.CHAT_TOPIC;
public class WebsocketConsumer {
@Value("${websocket.destination.broadcast}")
private String broadcast;
@Value("${websocket.destination.chat}")
private String chat;
@Autowired
private SimpMessagingTemplate template;
@Autowired
private SessionIdRegistry sessionIdRegistry;
@KafkaListener(topics = {BROADCAST_TOPIC}, containerFactory = "websocketListenerContainerFactory")
public void broadcastConsumer(List<BroadcastMessage> broadcastMessages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
if (Collections.isEmpty(broadcastMessages)) {
return;
}
broadcastMessages.forEach(broadcastMessage -> {
BroadcastMessageVO broadcastMessageVO = (BroadcastMessageVO) ModelUtil.copy(broadcastMessage,
new ModelUtil.Mapping(BroadcastMessage.class, BroadcastMessageVO.class, "toUsers"),
new ModelUtil.Mapping(User.class, UserVO.class, "password", "roles"));
Set<String> sessionIds = new HashSet<>();
broadcastMessage.getToUsers().forEach(user -> sessionIds.addAll(sessionIdRegistry.getSessionIds(user.getId())));
sessionIds.forEach(sessionId -> template.convertAndSendToUser(
sessionId,
broadcast,
broadcastMessageVO,
new MessageHeadersBuilder()
.sessionId(sessionId)
.leaveMutable(true)
.build()));
});
}
@KafkaListener(topics = {CHAT_TOPIC}, containerFactory = "websocketListenerContainerFactory")
public void chatConsumer(List<ChatMessage> chatMessages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer) {
if (Collections.isEmpty(chatMessages)) {
return;
}
chatMessages.forEach(chatMessage -> {
Set<String> sessionIds = new HashSet<>();
sessionIds.addAll(sessionIdRegistry.getSessionIds(chatMessage.getToUserId()));
sessionIds.addAll(sessionIdRegistry.getSessionIds(chatMessage.getSendUserId()));
ChatMessageVO chatMessageVO = (ChatMessageVO) ModelUtil.copy(chatMessage,
new ModelUtil.Mapping(ChatMessage.class, ChatMessageVO.class),
new ModelUtil.Mapping(User.class, UserVO.class, "password", "roles"));
sessionIds.forEach(sessionId -> template.convertAndSendToUser(
sessionId,
chat,
chatMessageVO,
new MessageHeadersBuilder()
.sessionId(sessionId)
.leaveMutable(true)
.build()));
});
}
}
启动项目命令
nohup java -Dfile.encoding=utf-8 -jar xxx.jar --kafka.consumer.websocket.properties.group.id=自定义分组 > ${log} 2>&1 &
集群的各个节点启动时使用不同的启动参数即可
更多推荐
已为社区贡献2条内容
所有评论(0)