Kafka构建TB级异步消息系统(发送系统通知)

在这里插入图片描述

补:kafka崩溃问题

windows下的kafka经常会崩溃,如果看报错日志就会说:某条数据被锁死了,有冲突。
解决:直接而将kafka-logs删了就可!
在这里插入图片描述

1.封装事件对象

Event

/*对应kafka的事件——事件对象*/
@Data
@ToString
public class Event {

    private String topic;           //主题,就是事件的类型
    private int userId;             //事件触发的人id
    private int entityType;         //实体类型——事件触发什么操作(点赞、关注、回复),所以要知道事件发生在哪个实体身上,所以要实体类型
    private int entityId;           //实体id
    private int entityUserId;       //实体作者
    //除此以外,事件对象要具有通用性;有可能在一些其他的业务中还要处理一些数据,所以用map来封装那些数据,从而
    // 拥有一定扩展性(现在只处理这三种事件,以后可能会处理更多的事件(点赞、关注、评论))
    private Map<String,Object> data = new HashMap<>();


    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key, Object value) {
        this.data = data;
        return this;
    }
}

2.开发事件的生产者

EventProducer

/*事件生产者*/
@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    //处理事件 :就是发送消息
    public void fireEvent(Event event){
        //将事件发布到指定的主题上
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
        //以上第二个参数:将event转换为一个json字符串,消费者得到这个字符串后,能将其还原成event对象,这样就得到了event所有数据并对其进一步处理

    }
}

3.开发事件的消费者

EventConsumer

/*事件消费者:不用主动调,只要有数据,会自动的,只需调生产者在对应接口上即可!*/
@Component
public class EventConsumer implements CommunityConstant {

    //打印日志
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    /*处理一个事件,其实最终就是要给某一个人发送一条消息,发消息其实就是往message表插一条数据*/
    @Autowired
    private MessageService messageService;

    /*一个方法可以消费一个或多个主题;一个主体也能被多个方法消费————多对多的关系*/
    //当前我的业务是:点赞、关注、评论。这三个通知形式是很相近的,所以写到一个方法里处理这三个主题
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    //定义消费者的方法
    public void handleCommentMessage(ConsumerRecord record){
        if (record == null || record.value()==null){
            logger.error("消息内容为空!");
            return;
        }
        //将传过来的json字符串转为对象。字符串从record中取,转为后面的class参数对象
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        if (event == null){
            logger.error("消息格式错误!必须为JSON");
            return;
        }

        /*上面内容和格式都判断通过,开始正式业务*/
        //发送站内通知
       Message message = new Message();            //构造一个Message对象
        message.setFromId(SYSTEM_USER_ID);          //消息发布者:就是系统给用户发,系统在表里是id为1的用户,所以将1定义为常量1
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());//这里不是xxx_xxx的会话信息id,而是主题名
        message.setCreateTime(new Date());
                //内容content就存 事件触发拼接信息
        Map<String, Object> content = new HashMap<>();
        content.put("userId",event.getUserId()); //触发事件人的id
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
                //对其他数据进行判断
        if (!event.getData().isEmpty()){
            for(Map.Entry<String, Object> entry : event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        //将content以字符串格式存入
        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }
}

4.常量接口

在这里插入图片描述

5.生产者部署到三个对应的controller接口

5.1 CommentController

@Controller
@RequestMapping("/comment")
public class CommentController implements CommunityConstant {

    @Autowired
    private CommentService commentService;
    @Autowired
    private HostHolder hostHolder;  //获取当前用户信息
    @Autowired
    private EventProducer eventProducer; //注入生产者
    @Autowired
    private DiscussPostService discussPostService;

    /*1.插入评论*/
    //比如进入一个帖子,我发布完评论后,应该重定向到这个帖子接着查看详情页面;所以最终重定向的地方需要用到帖子id,所以我在添加的路径上把id也传过来
    //@PathVariable("discussPostId") 用来接收参数
    @RequestMapping(path = "/add/{discussPostId}",method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){
        comment.setUserId(hostHolder.getUsers().getId());  //如果用户未登录,这里就会报错。不碍事,后面统一异常处理
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        //实现添加业务
        commentService.addComment(comment);

        /*触发评论事件-kafka*/
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUsers().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",discussPostId); //被评论帖子的id
        if (comment.getEntityType()==ENTITY_TYPE_POST){
            //实体类型等于帖子,即对帖子的评论;
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }else if (comment.getEntityType()==ENTITY_TYPE_COMMENT){
            //实体类型等于帖子,即对用户的评论;
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
        //调生产者就完成了!
        eventProducer.fireEvent(event);

        return "redirect:/discuss/detail/"+discussPostId;
    }

}

在这里插入图片描述

5.2 LikeController

@Controller
public class LikeController implements CommunityConstant {

    @Autowired
    private LikeService likeService;
    @Autowired
    private HostHolder hostHolder;
    @Autowired
    private EventProducer eventProducer;

    @RequestMapping(path ="/like",method = RequestMethod.POST)
    @ResponseBody
    public String like(int entityType,int entityId,int entityUserId,int postId){
        //获取当前用户 _ 不用判断登录状态,因为有拦截器,添加一下即可。后期security也可以的。
        User user = hostHolder.getUsers();

        //点赞
        likeService.like(user.getId(),entityType,entityId,entityUserId);

        //统计点赞数量
        long likeCount = likeService.findEntityLikeCount(entityType, entityId);
        //统计点赞状态
        int likeStatus = likeService.findEntityLikeStatus(user.getId(), entityType, entityId);
        //最终要把这两个值传给页面,封装一下再传
        //返回的结果
        Map<String,Object> map = new HashMap<>();
        map.put("likeCount",likeCount);
        map.put("likeStatus",likeStatus);

        /*触发点赞事件-kafka*/
        //这个点赞是有双重能力的方法,点一下喜欢,再点一下取消喜欢,所以只需要通知喜欢进行
        if (likeStatus == 1){
            Event event = new Event()
                    .setTopic(TOPIC_LIKE)
                    .setUserId(hostHolder.getUsers().getId())
                    .setEntityType(entityType)
                    .setEntityId(entityId)
                    .setEntityUserId(entityUserId)
                    .setData("postId",postId);
            eventProducer.fireEvent(event);
        }

        return CommunityUtil.getJSONString(0,null,map);
    }
}

在这里插入图片描述

5.3 FollowController

	//一、关注
    @RequestMapping(path = "/follow",method = RequestMethod.POST)
    @ResponseBody
    //点击关注,应该是一个异步请求,局部刷新
    //关注肯定是当前用户关注某一个实体,所以用户id不用传,取当前用户即可。
    public String follow(int entityType,int entityId){
        //1.获取当前用户(如果没登录,就不行。所以这个方法用拦截器做个检查,登录后才能访问)
        User user = hostHolder.getUsers();
        //2.关注
        followService.follow(user.getId(),entityType,entityId);

        /*触发关注事件——kafka*/
        Event event = new Event()
                .setTopic(TOPIC_FOLLOW)
                .setUserId(hostHolder.getUsers().getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityId);
        eventProducer.fireEvent(event);

        return CommunityUtil.getJSONString(0,"已关注!");
    }

6.测试

登录yty1账户对aaa账户的帖子进行:关注、点赞、评论。
看看数据库中message表会不会插入三条数据。

操作结果:
在这里插入图片描述
这里的from_id为1等于系统,to_id=0是bug,现在已修复代码BUG!

期间一定要注意kafka有没有自动关闭,windows下不稳定。不稳定直接删除kafka-logs目录即可!

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐