疫情防控交流社区平台——5.1 Kafka构建TB级异步消息系统(发送系统通知)
疫情防控交流社区平台——5.1 Redis一站式高性能存储(发送系统通知)-1.创建消费者、生产者-2.将生产者部署到需要的controller上-3.测试
·
文章目录
Kafka构建TB级异步消息系统(发送系统通知)
补:kafka崩溃问题
windows下的kafka经常会崩溃,如果看报错日志就会说:某条数据被锁死了,有冲突。
解决
:直接而将kafka-logs删了就可!
1.封装事件对象
Event
/*对应kafka的事件——事件对象*/
@Data
@ToString
public class Event {
private String topic; //主题,就是事件的类型
private int userId; //事件触发的人id
private int entityType; //实体类型——事件触发什么操作(点赞、关注、回复),所以要知道事件发生在哪个实体身上,所以要实体类型
private int entityId; //实体id
private int entityUserId; //实体作者
//除此以外,事件对象要具有通用性;有可能在一些其他的业务中还要处理一些数据,所以用map来封装那些数据,从而
// 拥有一定扩展性(现在只处理这三种事件,以后可能会处理更多的事件(点赞、关注、评论))
private Map<String,Object> data = new HashMap<>();
public String getTopic() {
return topic;
}
public Event setTopic(String topic) {
this.topic = topic;
return this;
}
public int getUserId() {
return userId;
}
public Event setUserId(int userId) {
this.userId = userId;
return this;
}
public int getEntityType() {
return entityType;
}
public Event setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public Event setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityUserId() {
return entityUserId;
}
public Event setEntityUserId(int entityUserId) {
this.entityUserId = entityUserId;
return this;
}
public Map<String, Object> getData() {
return data;
}
public Event setData(String key, Object value) {
this.data = data;
return this;
}
}
2.开发事件的生产者
EventProducer
/*事件生产者*/
@Component
public class EventProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
//处理事件 :就是发送消息
public void fireEvent(Event event){
//将事件发布到指定的主题上
kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
//以上第二个参数:将event转换为一个json字符串,消费者得到这个字符串后,能将其还原成event对象,这样就得到了event所有数据并对其进一步处理
}
}
3.开发事件的消费者
EventConsumer
/*事件消费者:不用主动调,只要有数据,会自动的,只需调生产者在对应接口上即可!*/
@Component
public class EventConsumer implements CommunityConstant {
//打印日志
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
/*处理一个事件,其实最终就是要给某一个人发送一条消息,发消息其实就是往message表插一条数据*/
@Autowired
private MessageService messageService;
/*一个方法可以消费一个或多个主题;一个主体也能被多个方法消费————多对多的关系*/
//当前我的业务是:点赞、关注、评论。这三个通知形式是很相近的,所以写到一个方法里处理这三个主题
@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
//定义消费者的方法
public void handleCommentMessage(ConsumerRecord record){
if (record == null || record.value()==null){
logger.error("消息内容为空!");
return;
}
//将传过来的json字符串转为对象。字符串从record中取,转为后面的class参数对象
Event event = JSONObject.parseObject(record.value().toString(), Event.class);
if (event == null){
logger.error("消息格式错误!必须为JSON");
return;
}
/*上面内容和格式都判断通过,开始正式业务*/
//发送站内通知
Message message = new Message(); //构造一个Message对象
message.setFromId(SYSTEM_USER_ID); //消息发布者:就是系统给用户发,系统在表里是id为1的用户,所以将1定义为常量1
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());//这里不是xxx_xxx的会话信息id,而是主题名
message.setCreateTime(new Date());
//内容content就存 事件触发拼接信息
Map<String, Object> content = new HashMap<>();
content.put("userId",event.getUserId()); //触发事件人的id
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());
//对其他数据进行判断
if (!event.getData().isEmpty()){
for(Map.Entry<String, Object> entry : event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());
}
}
//将content以字符串格式存入
message.setContent(JSONObject.toJSONString(content));
messageService.addMessage(message);
}
}
4.常量接口
5.生产者部署到三个对应的controller接口
5.1 CommentController
@Controller
@RequestMapping("/comment")
public class CommentController implements CommunityConstant {
@Autowired
private CommentService commentService;
@Autowired
private HostHolder hostHolder; //获取当前用户信息
@Autowired
private EventProducer eventProducer; //注入生产者
@Autowired
private DiscussPostService discussPostService;
/*1.插入评论*/
//比如进入一个帖子,我发布完评论后,应该重定向到这个帖子接着查看详情页面;所以最终重定向的地方需要用到帖子id,所以我在添加的路径上把id也传过来
//@PathVariable("discussPostId") 用来接收参数
@RequestMapping(path = "/add/{discussPostId}",method = RequestMethod.POST)
public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){
comment.setUserId(hostHolder.getUsers().getId()); //如果用户未登录,这里就会报错。不碍事,后面统一异常处理
comment.setStatus(0);
comment.setCreateTime(new Date());
//实现添加业务
commentService.addComment(comment);
/*触发评论事件-kafka*/
Event event = new Event()
.setTopic(TOPIC_COMMENT)
.setUserId(hostHolder.getUsers().getId())
.setEntityType(comment.getEntityType())
.setEntityId(comment.getEntityId())
.setData("postId",discussPostId); //被评论帖子的id
if (comment.getEntityType()==ENTITY_TYPE_POST){
//实体类型等于帖子,即对帖子的评论;
DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}else if (comment.getEntityType()==ENTITY_TYPE_COMMENT){
//实体类型等于帖子,即对用户的评论;
Comment target = commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());
}
//调生产者就完成了!
eventProducer.fireEvent(event);
return "redirect:/discuss/detail/"+discussPostId;
}
}
5.2 LikeController
@Controller
public class LikeController implements CommunityConstant {
@Autowired
private LikeService likeService;
@Autowired
private HostHolder hostHolder;
@Autowired
private EventProducer eventProducer;
@RequestMapping(path ="/like",method = RequestMethod.POST)
@ResponseBody
public String like(int entityType,int entityId,int entityUserId,int postId){
//获取当前用户 _ 不用判断登录状态,因为有拦截器,添加一下即可。后期security也可以的。
User user = hostHolder.getUsers();
//点赞
likeService.like(user.getId(),entityType,entityId,entityUserId);
//统计点赞数量
long likeCount = likeService.findEntityLikeCount(entityType, entityId);
//统计点赞状态
int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);
//最终要把这两个值传给页面,封装一下再传
//返回的结果
Map<String,Object> map = new HashMap<>();
map.put("likeCount",likeCount);
map.put("likeStatus",likeStatus);
/*触发点赞事件-kafka*/
//这个点赞是有双重能力的方法,点一下喜欢,再点一下取消喜欢,所以只需要通知喜欢进行
if (likeStatus == 1){
Event event = new Event()
.setTopic(TOPIC_LIKE)
.setUserId(hostHolder.getUsers().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityUserId)
.setData("postId",postId);
eventProducer.fireEvent(event);
}
return CommunityUtil.getJSONString(0,null,map);
}
}
5.3 FollowController
//一、关注
@RequestMapping(path = "/follow",method = RequestMethod.POST)
@ResponseBody
//点击关注,应该是一个异步请求,局部刷新
//关注肯定是当前用户关注某一个实体,所以用户id不用传,取当前用户即可。
public String follow(int entityType,int entityId){
//1.获取当前用户(如果没登录,就不行。所以这个方法用拦截器做个检查,登录后才能访问)
User user = hostHolder.getUsers();
//2.关注
followService.follow(user.getId(),entityType,entityId);
/*触发关注事件——kafka*/
Event event = new Event()
.setTopic(TOPIC_FOLLOW)
.setUserId(hostHolder.getUsers().getId())
.setEntityType(entityType)
.setEntityId(entityId)
.setEntityUserId(entityId);
eventProducer.fireEvent(event);
return CommunityUtil.getJSONString(0,"已关注!");
}
6.测试
登录yty1账户对aaa账户的帖子进行:关注、点赞、评论。
看看数据库中message表会不会插入三条数据。
操作结果:
这里的from_id为1等于系统,to_id=0是bug,现在已修复代码BUG!
期间一定要注意kafka有没有自动关闭,windows下不稳定。不稳定直接删除kafka-logs目录即可!
更多推荐
已为社区贡献2条内容
所有评论(0)