思考这么一个业务场景:本系统A是一个上游服务,A需要执行某个操作(DB操作、Redis操作之类),当A完成了这个本地操作后,需要将这一操作的结果通知给多个下游服务B、C、D、E...

要满足:(1)如果本系统A的本地操作失败,则不能通知下游;(2)并且,这个结果对部分下游系统如B、C来说非常重要,对另一部分下游系统如D、E来说并不重要,因此对于下游系统B和C,希望本系统A的消息能被正确的投递到并且被下游系统B和C正确的消费;

举2个例子便于理解

(1)系统A是业务订单模块,当前系统收到支付系统的订单支付结果,需要更新业务订单状态,并且将完成支付后需要给用户发送用户权益的通知发送给权益发放系统B;此外,活动系统C也需要监听业务订单结果,来判断用户是否完成某项消费额度的活动;

(2)系统A是ToB的开放平台,CP通过系统A上传一些重要的合作素材,如APP应用信息、APP预约信息、合作的活动信息,而承接这些合作素材的业务方有多个(多个C端系统来承接来自系统A的重要物料变更信息),即系统A需要将素材物料同步给多个下游系统;

看到上面的问题,有没有联想到一个概念——分布式事务消息事务

这里的消息事务理解为广义的消息事务,而非仅限于消息中间件RocketMQ、Kafka之类的消息事务机制,而是一种设计思路;本篇基于上述的2个示例,给出相关的表设计、关键代码;关于消息中间件的消息事务的原理可参考下面的文章:

事务消息大揭秘!RocketMQ、Kafka、Pulsar全方位对比

RocketMQ消息事务

1. 什么是消息事务

(1)什么是事务Trasaction?

事务是一个程序执行单元,里面的所有操作要么全部执行成功,要么全部执行失败;一个事务有四个基本特性,也就是我们常说的(ACID)。

1. Atomicity(原子性):事务是一个不可分割的整体,事务内所有操作要么全做成功,要么全失败。

2. Consistency(一致性):事务执行前后,数据从一个状态到另一个状态必须是一致的(A向B转账,不能出现A扣了钱,B却没收到)。

3. Isolation(隔离性):多个并发事务之间相互隔离,不能互相干扰。

4. Durablity(持久性):事务完成后,对数据的更改是永久保存的,不能回滚。

(2)分布式事务

分布式事务是指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务通常用于在分布式系统中保证不同节点之间的数据一致性。

分布式事务的解决方案一般有以下几种(这里只是讲思想而不是具体方案):

  • XA(2PC/3PC)

最具有代表性的是由Oracle Tuxedo系统提出的XA分布式事务协议;XA中大致分为两部分:事务管理器本地资源管理器。其中本地资源管理器往往由数据库实现,比如Oracle、DB2这些商业数据库都实现了XA接口,而事务管理器作为全局的调度者,负责各个本地资源的提交和回滚。

XA协议通常包含两阶段提交(2PC)和三阶段提交(3PC)两种实现。二阶段提交看似能够提供原子性的操作,但它存在着一些缺陷,三段提交(3PC)是对两段提交(2PC)的一种升级优化,有兴趣的可以深入了解一下,这里不再赘述。两阶段提交顾名思义就是要进行两个阶段的提交:第一阶段,准备阶段(投票阶段);第二阶段,提交阶段(执行阶段)。实现过程如下所示:

  • TCC

TCC(Try-Confirm-Cancel)是Try、Commit、Cancel三种指令的缩写,又被称补偿事务,其逻辑模式类似于XA两阶段提交,事务处理流程也很相似,但2PC是应用于在DB层面,TCC则可以理解为在应用层面的2PC,是需要我们编写业务逻辑来实现。

TCC核心思想是:“针对每个操作都要注册一个与其对应的确认(Try)和补偿(Cancel)”。

  • 消息事务

所谓的消息事务就是基于消息队列的两阶段提交,本质上是对消息队列的一种特殊利用,它是将本地事务和发消息放在了一个分布式事务里,保证要么本地操作成功成功并且对外发消息成功,要么两者都失败。

基于消息队列的两阶段提交往往用在高并发场景下,将一个分布式事务拆成一个消息事务(A系统的本地操作+发消息)+B系统的本地操作,其中B系统的操作由消息驱动,只要消息事务成功,那么A操作一定成功,消息也一定发出来了,这时候B会收到消息去执行本地操作,如果本地操作失败,消息会重投,直到B操作成功,这样就变相地实现了A与B的分布式事务。

虽然上面的方案能够完成A和B的操作,但是A和B并不是强一致的,而是最终一致(Eventually consistent)的。而这也是满足BASE理论的要求的。

这里引申一下,BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的缩写。BASE理论是对CAP(指的是在一个分布式系统中,Consistency(一致性)、Availability(可用性)、Partition Tolerance(分区容错性),不能同时成立)中AP(CAP已经被证实一个分布式系统最多只能同时满足CAP三项中的两项)的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。

本篇讲述的实现就是基于最终一致性的解决方案;

2. 表设计及关键代码示例

(1)业务订单模块

对于上游的支付系统的回调消息,我们应该尽可能的处理它;也就是说在处理消息的流程中,如除非是当前系统的线程耗尽、数据库异常这些极端情况,我们都应该正常的接收这个消息并且给支付系统回执;

因为我们不能完全信任上游系统的可靠性;即如果我们处理失败抛出异常,未按时给予上游支付系统回执,上游系统可能并不能按照预期给我们重新发送回调消息,这跟上游系统的稳定性、重试机制都有关系;甚至,我们还需要额外的定时任务,根据业务订单去主动查询订单系统,将业务订单的状态主动拉下来;

此外,我们应该尽可能减少本系统之外的依赖,即不能因为处理回执消息的流程中嵌入的对下游系统的调用,在下游系统调用失败时,导致我们本次处理上游支付回执的失败;往往是在上游消息处理流程中,尽可能减少对下游系统的调用,甚至是完全去除外部依赖,仅留下本地的DB操作;

相对强一致性来说,支付链路的设计更需要最终一致性;因为用户资产的扣减往往在支付渠道(蚂蚁金服、微信支付、银行),链路很长;此外,支付成功,但是由于业务系统故障处理消息失败需要给用户返还资产(如退款操作),操作对接非常复杂并且非常影响业务系统所在公司的口碑,因此在订单系统中往往需要保证最终一致性,即扣款成功后,临时的业务系统异常仅仅只会导致发货延迟,不会很大程度伤害用户和公司口碑;

下面介绍相关代码:

1. 支付回执的接口:

@Slf4j
@RestController
@RequestMapping("/api/order")
@Validated
public class OrderController {

    /**
     * 处理成功
     */
    private static final String SUCCESS = "success";

    /**
     * 处理失败
     */
    private static final String FAIL = "fail";

    @DubboReference
    private OrderFacade orderFacade;

    /**
     * 支付回调(支付、代扣)
     *
     * @param payResultReqDTO
     * @return
     */
    @CrossOrigin(origins = "*")
    @SentinelResource(value = "payResult")
    @RequestMapping(value = "/payResult", method = RequestMethod.POST)
    @ResponseBody
    public String handlePayResult(PayResultReqDTO payResultReqDTO) {
        log.warn("[callback]receive payResult notify. [payResultReqDTO={}]", JSON.toJSONString(payResultReqDTO));
        FacadeResultDTO<Boolean> result = orderFacade.handlePayResult(payResultReqDTO);
        if (result.isSuccess() && result.getData().equals(Boolean.TRUE)) {
            return SUCCESS;
        }
        return FAIL;
    }

    /**
     * 合约回调(签约、解约)
     *
     * @param agreementResultReqDTO
     * @return
     */
    @CrossOrigin(origins = "*")
    @SentinelResource(value = "agreementResult")
    @RequestMapping(value = "/agreementResult", method = RequestMethod.POST)
    @ResponseBody
    public String handleAgreementResult(AgreementResultReqDTO agreementResultReqDTO) {
        log.warn("[callback]receive agreementResult notify. [agreementResultReqDTO={}]", JSON.toJSONString(agreementResultReqDTO));
        FacadeResultDTO<Boolean> result = orderFacade.handleAgreementResult(agreementResultReqDTO);
        if (result.isSuccess() && result.getData().equals(Boolean.TRUE)) {
            return SUCCESS;
        }
        return FAIL;
    }
}

2. 订单模块的处理回调接口

    /**
     * 处理支付回调 需要本地事务保证(事务会传递)
     *
     * @param payResultReqDTO
     * @return
     */
    @Transactional
    @Override
    public boolean handlePayResult(PayResultReqDTO payResultReqDTO) {
        log.warn("[paySystem callback]handlePayResult [payResultReqDTO={}]", JSON.toJSONString(payResultReqDTO));
        try {
            // 基础参数校验
            boolean checkPayResultReq = checkPayResultReq(payResultReqDTO);
            if (!checkPayResultReq) {
                return false;
            }
            // 判断支付状态
            String tradeStatus = payResultReqDTO.getTradeStatus();
            // tradeStatus不是终态,返回处理失败
            if (!OrderConstants.TRADE_STATUS_SUCCESS.equals(tradeStatus) && !OrderConstants.TRADE_STATUS_FAIL.equals(tradeStatus)) {
                log.warn("[paySystem callback]tradeStatus is not 0000/0002! payResultReqDTO={}", JSON.toJSONString(payResultReqDTO));
                return false;
            }
            // 查询本地订单
            MemberOrderDO orderDO = memberOrderDAO.selectByOrderNo(payResultReqDTO.getCpOrderNumber());
            if (orderDO == null) {
                log.warn("[paySystem callback]order not found in DB! payResultReqDTO={}", JSON.toJSONString(payResultReqDTO));
                return false;
            }
            // (已经处理过的)支付成功或失败不处理
            if (MemberOrderStatusEnum.PAID.getStatus().equals(orderDO.getStatus()) || MemberOrderStatusEnum.PAY_FAILED.getStatus().equals(orderDO.getStatus())) {
                log.warn("[paySystem callback]memberOrder has already been handled. [payResultReqDTO={}] status={}", JSON.toJSONString(payResultReqDTO), orderDO.getStatus());
                return true;
            }
            try {
                if (OrderConstants.TRADE_STATUS_SUCCESS.equals(tradeStatus)) {
                    // 处理支付金额回调,若实际支付金额未取到,则取订单金额
                    String payAmount = StringUtils.isBlank(payResultReqDTO.getTradeAmount()) ? payResultReqDTO.getOrderAmount() : payResultReqDTO.getTradeAmount();
                    return handleMemberOrderAfterPaySuc(orderDO, payResultReqDTO.getOrderNumber(), payAmount, payResultReqDTO.getPayTime(), payResultReqDTO.getDiscount(), buildPayExtraInfo(payResultReqDTO.getPayExtInfo()));
                } else if (OrderConstants.TRADE_STATUS_FAIL.equals(tradeStatus)) {
                    // 支付失败
                    return handleMemberOrderAfterPayFail(orderDO);
                }
            } catch (Exception e) {
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                log.error("[SERIOUS_DB][paySystem callback]handlePayResult error rollback! [payResultReqDTO={}] e:{}", JSON.toJSONString(payResultReqDTO), e);
            }
        } catch (Exception e) {
            log.error("[SERIOUS_BUSINESS][paySystem callback]handlePayResult error! [payResultReqDTO={}] e:{}", JSON.toJSONString(payResultReqDTO), e);
        }
        return false;
    }
	
	/**
     * 支付成功后执行操作 更新业务订单状态并入库本地消息表
     */
    private boolean handleMemberOrderAfterPaySuc(MemberOrderDO memberOrderDO, String payOrderNo, String payAmount, String payTime, String deductAmount, PayExtraInfo payExtraInfo) {
        fillMemberOrderParam(memberOrderDO, payOrderNo, payAmount, payTime, deductAmount, payExtraInfo);
        memberOrderDAO.updatePayResult(memberOrderDO);
        Map<String, String> msgBody = buildPaySucMsgBody(memberOrderDO);
        MessageDeliverDO messageDeliverDO = buildMessageDeliverDO(memberOrderDO.getOrderNo(), MessageDeliverOrderTypeEnum.MEMBER_ORDER.getType(), msgBody);
        // 如果是代扣支付单则更新代扣单状态
        if (MemberOrderTypeEnum.WITHHOLD_ORDER.getType().equals(memberOrderDO.getOrderType())) {
            withholdDAO.updateOrderStatus(memberOrderDO.getOrderNo(), WithholdStatusEnum.PAY_SUC.getStatus());
        }
        messageDeliverDAO.insert(messageDeliverDO);
        activityService.makeBenefitUsed(memberOrderDO.getOpenid(), memberOrderDO.getOrderNo());
        postProcessHandleMemberOrder(memberOrderDO);
        return true;
    }
	
	/**
     * 会员订单后置处理
     */
    private void postProcessHandleMemberOrder(MemberOrderDO memberOrderDO) {
        if (MemberOrderStatusEnum.PAID.getStatus().equals(memberOrderDO.getStatus())) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    // 支付成功发消息给核心应用 进行权益发放
                    Map<String, String> msgBody = buildPaySucMsgBody(memberOrderDO);
                    boolean sendMsgResult = notifyService.sendPayMsg(msgBody);
                    if (sendMsgResult) {
                        log.warn("send paySucMsg to core sus.[orderNo={} msg={}]", memberOrderDO.getOrderNo(), JSON.toJSONString(msgBody));
                    } else {
                        log.warn("send paySucMsg to core failed.[orderNo={} msg={}]", memberOrderDO.getOrderNo(), JSON.toJSONString(msgBody));
                    }
                    if (StringUtils.isNotBlank(memberOrderDO.getAgreementNo())) {
                        // 支付成功后,查询是否有需要投递的签约信息,用于处理微信纯签约,新用户购买,在收到支付回调之后,需要进行处理
                        MessageDeliverDO messageDeliverDO = messageDeliverDAO.queryByOrderAndType(memberOrderDO.getAgreementNo(), MessageDeliverOrderTypeEnum.AGREEMENT.getType());
                        if (messageDeliverDO != null) {
                            // 发送签约信息
                            Map<String, String> signedMsgBody = JSON.parseObject(messageDeliverDO.getMsgBody(), Map.class);
                            boolean sendSignedMsgResult = notifyService.sendSignMsg(signedMsgBody);
                            log.warn("pay_suc_send_signSucMsg_to_core_sus.[orderNo={} res={} msg={}]", memberOrderDO.getOrderNo(), sendSignedMsgResult, JSON.toJSONString(sendSignedMsgResult));
                        }
                    }
                }
            });
        }
    }

3. 本地订单消息表:

CREATE TABLE `message_deliver` (
	`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键id',
	`create_time` TIMESTAMP NOT NULL DEFAULT '1971-01-01 08:00:00' COMMENT '记录创建时间',
	`update_time` TIMESTAMP NOT NULL DEFAULT '1971-01-01 08:00:00' COMMENT '记录更新时间',
	`order_no` VARCHAR(64) NOT NULL COMMENT '单据号' ,
	`order_type` INT(10) NOT NULL COMMENT '递送消息单据类型,MEMBER_ORDER(1, "会员订单"),AGREEMENT(2, "签约单"),BLIND_BOX_ORDER(3, "盲盒订单");',
	`msg_body` VARCHAR(512) NULL COMMENT '消息体' ,
	`retry_times` INT(10) NOT NULL DEFAULT '0' COMMENT '重试次数',
	PRIMARY KEY (`id`) USING BTREE,
	UNIQUE INDEX `uniq_order_no` (`order_no`) USING BTREE
)
COMMENT='消息投递表'
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB;

4. 定时任务驱动

    /**
     * 执行消息投递任务
     *
     * @return
     */
    @Override
    public ExecuteResult processSendMsgTask() {
        long pageNum = 1;
        while (true) {
            log.warn("MessageDeliverTask begin. [pageNum={}]", pageNum);
            long start = (pageNum - 1) * PAGE_SIZE;
            List<MessageDeliverDO> messageDeliverDOs = messageDeliverDAO.queryByPage(start, PAGE_SIZE);
            if (CollectionUtils.isEmpty(messageDeliverDOs)) {
                log.warn("MessageDeliverTask end. [pageNum={}]", pageNum);
                break;
            }
            LocalDateTime now = LocalDateTime.now();
            int retryLimit = ConfigManager.getInteger(OrderConstants.MESSAGE_DELIVER_RETRY_TIME_LIMIT, OrderConstants.MESSAGE_DELIVER_RETRY_TIME_LIMIT_DEFAULT);

            List<Long> ids = Lists.newArrayList();
            messageDeliverDOs.forEach(o -> {
                LocalDateTime createTime = DateUtil.asLocalDateTime(o.getCreateTime());
                Duration duration = Duration.between(createTime, now);
                // 过滤10秒内发且重试次数10次及以上的消息
                if (o.getRetryTimes() >= retryLimit || duration.getSeconds() < ConfigManager.getInteger(OrderConstants.MSG_DELIVER_TIME_LIMIT, OrderConstants.MSG_DELIVER_TIME_LIMIT_DEFAULT)) {
                    if(MessageDeliverOrderTypeEnum.BLIND_BOX_ORDER.getType().equals(o.getOrderType())){
                        log.warn("blind_box__info_upper_limit. [orderNo={}]", o.getOrderNo());
                    }
                    return;
                }
                ids.add(o.getId());
                Map<String, String> msgBody = JSON.parseObject(o.getMsgBody(), Map.class);
                if (MessageDeliverOrderTypeEnum.MEMBER_ORDER.getType().equals(o.getOrderType())) {
                    notifyService.sendPayMsg(msgBody);
                } else if (MessageDeliverOrderTypeEnum.AGREEMENT.getType().equals(o.getOrderType())) {
                    notifyService.sendSignMsg(msgBody);
                } else if (MessageDeliverOrderTypeEnum.BLIND_BOX_ORDER.getType().equals(o.getOrderType())) {
                    rmqMessageDeliverFacade.sendOrderPaidMsg(msgBody);
                }
                log.warn("MessageDeliverTask send msg suc. [msgBody={}]", o.getMsgBody());
            });
            batchUpdateStatus(ids);
            pageNum++;
        }
        return ExecuteResult.successResult("MessageDeliverTask execute suc.");
    }

(2)物料同步下游

先介绍下业务场景,本系统为"开发者平台",即链接CP合作者和C端业务的桥梁,也是多个C端业务的上游;CP在本系统将物料上传,经过审核后生效,然后本系统需要将经过审核的重要同步给下游,而这个物料对部分下游来说是具有重要的业务价值;

我的方案是消息事务,审核通过的物料和审核通过的消息在本地入库,通过定是任务驱动投递给多个下游,投递的实现可以是HTTP/DUBBO/MQ,对于认为当前消息非常重要的下游来说,我们要保证消息被正确的消费,否则会尽最大努力尝试投递若干次直到次数上限,然后发出告警;对于这些业务方,需要回执接口来告知消息被正确的消费掉了;

也就是本地消息表+ACK模式;

方案评审时,也有小白询问为何不做成同步调用多个下游业务方的DUBBO接口,我也是做了回复:

不准备这么做:

1.总流程为"CP配置和上传物料 同步给下游",使用消息可以解耦

下游的逻辑是否可能侵入上游的代码中?

2.同步下游是否成功与主流程无关,做成异步

调用下游接口失败怎么办?主流程终止?下游恢复后怎么处理?开发手动同步?

3.可靠性-失败恢复后自动重试 此次资源同步为重要消息

下游短时间内不可用,如发版、DB连接问题导致的,下游系统恢复后,消息可被正常消费,不需要开发手动处理

下面写记录下个人的思考,也是我写在代码处的注释

1. 对于总流程为"(1)本系统业务操作->(2)同步给下游系统",使用消息可以解耦,本系统不用写调用下游dubbo的代码,这么做的优势在相同消息投递多个业务方时更加明显;

2. 另外消息的一个天然的优势就是——不需要接口协议,这个优点在下游业务方多的情况下也非常突出;此外,如下游业务方dubbo升级时,本系统也完全不需要感知;

3. 思考一个问题:如果使用dubbo同步调用方式,那么各个业务方都有失败的处理逻辑,如果在当前系统内各自去写一遍,是否令人头疼?

4. 再考虑一个问题:同步下游是否成功与主流程无关,或者这么理解:下游业务方的系统异常不应该导致本系统"已经成功"的业务操作失败,所以要做成异步,但是这个消息要求最终一定要投递到业务方并且被正确处理并告知到本系统;C端的较多场景都需要考虑这点,一个重要的原因就是"用户体验"和"用户口碑",对于非转账类业务,一般都要设计的不能那么强一致性;

5. 从鲁棒性/可靠性上考虑:本系统内成功调用下游接口失败又不能让主流程终止,但下游恢复后又希望这一整个流程可以自动恢复(至少不需要开发手动操作),怎么做?——将消息持久化并且通过任务驱动,业务方通过ACK回执告知重要消息被正确的处理;下游的不可用,往往是短时间内发生的,如发布上线、流量切换、DB连接、redis抖动等问题导致的;通过上面的方式,在下游系统自恢复后,消息可被正常消费,不需要开发同学手动处理;

6. 本次使用的模板设计模式可以将投递消息方式(包括消息体的通用处理逻辑)和处理回执的方式抽象出来,交由消息绑定的实现类去执行,具备一定的通用性;

不过还是那句话——总没有完美的技术方案,现在看似合理的方案从某些角度或多或少也存在一些问题;希望开拓视野,技术进步时能选择当下较为合理的方案;

/**
 * @description 消息投递任务服务(内部使用)
 * 思考:——by Akira
 * 1. 对于总流程为"(1)本系统业务操作->(2)同步给下游系统",使用消息可以解耦,本系统不用写调用下游dubbo的代码,这么做的优势在相同消息投递多个业务方时更加明显;
 * 2. 另外消息的一个天然的优势就是——不需要接口协议,这个优点在下游业务方多的情况下也非常突出;此外,如下游业务方dubbo升级时,本系统也完全不需要感知;
 * 3. 思考一个问题:如果使用dubbo同步调用方式,那么各个业务方都有失败的处理逻辑,如果在当前系统内各自去写一遍,是否令人头疼?
 * 4. 再考虑一个问题:同步下游是否成功与主流程无关,或者这么理解:下游业务方的系统异常不应该导致本系统"已经成功"的业务操作失败,所以要做成异步,但是这个消息要求最终一定要投递到业务方并且被正确处理并告知到本系统;
 * C端的较多场景都需要考虑这点,一个重要的原因就是"用户体验"和"用户口碑",对于非转账类业务,一般都要设计的不能那么强一致性;
 * 5. 从鲁棒性/可靠性上考虑:本系统内成功调用下游接口失败又不能让主流程终止,但下游恢复后又希望这一整个流程可以自动恢复(至少不需要开发手动操作),怎么做?
 * 将消息持久化并且通过任务驱动,业务方通过ACK回执告知重要消息被正确的处理;
 * 下游的不可用,往往是短时间内发生的,如发布上线、流量切换、DB连接、redis抖动等问题导致的;通过上面的方式,在下游系统自恢复后,消息可被正常消费,不需要开发同学手动处理;
 * 6. 本次使用的模板设计模式可以将投递消息方式(包括消息体的通用处理逻辑)和处理回执的方式抽象出来,交由消息绑定的实现类去执行,具备一定的通用性;
 * <p>
 * 不过还是那句话——总没有完美的技术方案,现在看似合理的方案从某些角度或多或少也存在一些问题;
 * 希望开拓视野,技术进步时能选择当下较为合理的方案;
 * </>
 */
public interface MessageDeliverTaskFacade {

    /**
     * 生成投递任务 尝试执行
     *
     * @param reqDTO
     * @return
     */
    FacadeDefaultDTO<Boolean> generateTask(@Valid MessageDeliverTaskReqDTO reqDTO);

    /**
     * 查询投递任务
     *
     * @param reqDTO
     * @return
     */
    FacadeDefaultDTO<List<MessageDeliverTaskReqDTO>> queryTask(@Valid MessageDeliverTaskQryReqDTO reqDTO);

}

第一版代码如下(还需优化):

1. 消息投递表设计

CREATE TABLE `message_deliver_task` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `task_no` varchar(128) NOT NULL COMMENT '唯一键,通知任务流水号',
  `message_code` varchar(255) NOT NULL COMMENT '消息code,标识消息业务类型,对应不同的投递和处理回调的方法',
  `scene_type` varchar(255) NOT NULL DEFAULT '' COMMENT '业务场景类型,消息体信息,冗余在列便于检索',
  `deliver_start_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '投递起始时间,精度为小时,默认当前时间',
  `message_body` text COMMENT '消息体',
  `retry_times` int(10) NOT NULL DEFAULT '0' COMMENT '重试次数',
  `need_callback` int(1) NOT NULL DEFAULT '0' COMMENT '是否需要回执标识,0-不需要,1-需要',
  `message_status` int(10) NOT NULL DEFAULT '0' COMMENT '消息状态,DELETE(-1, "已删除");SENDING(0, "投递中");HANDLED(1, "已处理");',
  `create_time` datetime NOT NULL DEFAULT '1971-01-01 08:00:00' COMMENT '记录创建时间',
  `update_time` datetime NOT NULL DEFAULT '1971-01-01 08:00:00' COMMENT '记录更新时间',
  `extend` varchar(1024) NOT NULL DEFAULT '' COMMENT '扩展字段',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `uniq_task_no` (`task_no`) USING BTREE,
  KEY `idx_status_callback_retry` (`message_status`,`need_callback`,`retry_times`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8mb4 COMMENT='消息投递任务表';

2. 消息投递和处理回执的Service

/**
 * @description 消息投递任务请求
 */
@Data
public class MessageDeliverTaskReqDTO implements Serializable {

    /**
     * 消息任务流水号 参考格式messageCode:sceneType:MD5(messageBody)
     */
    @NotBlank
    private String taskNo;

    /**
     * 消息code 处理类型
     */
    @NotBlank
    private String messageCode;

    /**
     * 消息业务场景 标识消息
     */
    @NotBlank
    private String sceneType;

    /**
     * 消息内容 处理类型
     */
    @NotNull
    private Object messageBody;

    /**
     * 消息起始投递时间 默认now
     */
    private Date deliverStartTime;

    /**
     * 消息重试次数 默认10
     */
    private Integer retryTimes;

    /**
     * 消息是否需要回执
     */
    @NotNull
    private Boolean needCallback;
}
    /**
     * 生成投递任务 尝试执行
     *
     * @param reqDTO
     * @return
     */
    @Override
    public boolean generateTask(MessageDeliverTaskReqDTO reqDTO) {
        try {
            final MessageHandlerTemplate handler = messageHandlerFactory.getHandler(reqDTO.getMessageCode());
            handler.tryDeliverMessage(reqDTO);
            log.warn("generateTask_suc. [reqDTO={}]", JSON.toJSONString(reqDTO));
        } catch (Exception e) {
            log.error("generateTask_error. [reqDTO={} e:{}]", JSON.toJSONString(reqDTO), e);
            return Boolean.FALSE;
        }
        return Boolean.TRUE;
    }

    /**
     * 处理回执
     *
     * @param reqDTO
     * @return
     */
    @Override
    public boolean handleCallback(MessageCallBackReqDTO reqDTO) {
        try {
            final MessageHandlerTemplate handler = messageHandlerFactory.getHandler(reqDTO.getMessageCode());
            handler.tryHandleCallback(reqDTO);
            log.warn("handleCallback_suc. [reqDTO={}]", JSON.toJSONString(reqDTO));
        } catch (Exception e) {
            log.error("handleCallback_error. [reqDTO={}]", JSON.toJSONString(reqDTO));
            return Boolean.FALSE;
        }
        return Boolean.TRUE;
    }

3. 处理器工厂

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description 启动页关联内容校验器工厂
 */
@Slf4j
@Component
public class MessageHandlerFactory {

    /**
     * MessageHandlerTemplate实例列表
     */
    @Autowired
    private List<MessageHandlerTemplate> messageHandlerTemplates;

    /**
     * refType和校验器映射
     */
    private static Map<String, MessageHandlerTemplate> HANDLERS = new ConcurrentHashMap<>();

    /**
     * 初始化
     */
    @PostConstruct
    public void init() {
        if (CollectionUtils.isNotEmpty(messageHandlerTemplates)) {
            for (MessageHandlerTemplate messageHandlerTemplate : messageHandlerTemplates) {
                HANDLERS.put(messageHandlerTemplate.getMessageCode(), messageHandlerTemplate);
            }
        }
    }

    /**
     * 获取对应CheckService
     */
    public MessageHandlerTemplate getHandler(String messageCode) {
        return Optional.ofNullable(HANDLERS.get(messageCode))
                .orElseThrow(() -> {
                    log.error("cannot_find_handler_by_messageCode. [messageCode={}]", messageCode);
                    return new RuntimeException("cannot_find_handler_by_messageCode");
                });
    }

}

4. 消息投递和回执处理的抽象模板

/**
 * @description 消处理器抽象模板
 */
@Slf4j
@Component
public abstract class MessageHandlerTemplate {

    @Resource
    private MessageDeliverTaskDAO messageDeliverTaskDAO;

    @Resource
    private MessageDeliverBeanConverter beanConverter;

    @Resource
    private DataSourceTransactionManager transactionManager;

    /**
     * 异步发消息的线程池
     */
    private final ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            r -> {
                Thread thread = new Thread(r);
                thread.setName("messageHandlerTemplateThread");
                //设置异常捕获器
                thread.setUncaughtExceptionHandler((t, e) -> log.error("[message]async_deliver_message_error! e:{}", e.getMessage()));
                return thread;
            }, new ThreadPoolExecutor.AbortPolicy());


    /**
     * 返回消息码类型(关联类型),参考:xx.facade.messagedeliver.enums.MessageCodeEnum
     *
     * @return
     */
    protected abstract String getMessageCode();

    /**
     * 投递消息 方式可以是:(1)异步 MQ redis队列 / (2)同步:DUBBO http / (3)带业务逻辑的混合操作
     *
     * @param reqDTO
     * @param operateDomain
     * @throws BusinessException
     */
    protected abstract void deliverMessage(MessageDeliverTaskReqDTO reqDTO, MessageDeliverTaskDO operateDomain) throws BusinessException;

    /**
     * 处理消息回执 处理数据库记录
     *
     * @param callBackReqDTO
     * @param operateDomain
     * @throws BusinessException
     */
    protected abstract void handleCallback(MessageCallBackReqDTO callBackReqDTO, MessageDeliverTaskDO operateDomain) throws BusinessException;

    /**
     * 模板-投递消息 加事务
     */
    @Transactional(rollbackFor = Exception.class)
    public void tryDeliverMessage(MessageDeliverTaskReqDTO reqDTO) {
        final Date now = new Date();
        final String taskNo = reqDTO.getTaskNo();

        // 1.查询任务
        MessageDeliverTaskDO recordQryInDB = messageDeliverTaskDAO.queryByTaskNo(taskNo);
        final Long recordId;
        // 不存在,则插入记录
        if (recordQryInDB == null) {
            recordQryInDB = initRecordInDB(reqDTO, now);
            messageDeliverTaskDAO.insertSelective(recordQryInDB);
        }
        // lambda变量需要为隐式的final
        final MessageDeliverTaskDO recordInDB = recordQryInDB;
        // 2.执行器 异步尝试执行投递任务deliverMessage()
        if (meetDeliverCondition(recordInDB, now)) {
            executorService.submit(() -> {
                // 同类调用事务失效处理
//                ((MessageHandlerTemplate) AopContext.currentProxy()).deliverMessageThenUpdateRetryTimes(reqDTO, recordId);
                deliverMessageThenUpdateRetryTimes(reqDTO, recordInDB);
                log.warn("tryDeliverMessage_deliverMessageThenUpdateRetryTimes_suc. [taskNo={}]", reqDTO.getTaskNo());
            });
        }
    }

    /**
     * 构建入库参数
     *
     * @param reqDTO
     * @param now
     * @return
     */
    private MessageDeliverTaskDO initRecordInDB(MessageDeliverTaskReqDTO reqDTO, Date now) {
        MessageDeliverTaskDO recordInDB = beanConverter.convert2MessageDeliverTaskDO(reqDTO);
        recordInDB.setDeliverStartTime(Optional.ofNullable(reqDTO.getDeliverStartTime()).orElse(now));
        recordInDB.setRetryTimes(Optional.ofNullable(reqDTO.getRetryTimes()).orElse(0));
        recordInDB.setNeedCallback(Boolean.TRUE.equals(reqDTO.getNeedCallback()) ? NeedCallBackEnum.NEED.getType() : NeedCallBackEnum.NOT_NEED.getType());
        recordInDB.setMessageStatus(MessageStatusEnum.SENDING.getType());
        recordInDB.setCreateTime(now);
        recordInDB.setUpdateTime(now);
        recordInDB.setMessageBody(JSON.toJSONString(reqDTO.getMessageBody()));
        return recordInDB;
    }

    /**
     * 执行投递并将重试次数+1 编程式事务
     *
     * @param reqDTO
     * @param recordInDB
     */
    public void deliverMessageThenUpdateRetryTimes(MessageDeliverTaskReqDTO reqDTO, MessageDeliverTaskDO recordInDB) {

        // 同类方法调用使用 编程式事务
        TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.execute(transactionStatus -> {
            final Long recordId = recordInDB.getId();
            // [投递消息-子类实现]
            deliverMessage(reqDTO, recordInDB);

            // 更新 不需要回执则更形成`已处理`
            final Integer messageStatus = Boolean.FALSE.equals(reqDTO.getNeedCallback()) ? MessageStatusEnum.HANDLED.getType() : null;
            final int update = messageDeliverTaskDAO.plusRetryTime(recordId, messageStatus);
            log.warn("deliverMessageThenUpdateRetryTimes_suc. [recordId={} reqDTO={}]", recordId, JSON.toJSONString(reqDTO));
            return update > 0;
        });

    }

    /**
     * 模板-处理消息回调 加事务
     *
     * @param callBackReqDTO
     */
    @Transactional(rollbackFor = Exception.class)
    public void tryHandleCallback(MessageCallBackReqDTO callBackReqDTO) {
        // 查询任务
        MessageDeliverTaskDO recordInDB = messageDeliverTaskDAO.queryByTaskNo(callBackReqDTO.getTaskNo());
        // 不存在,则返回异常
        if (recordInDB == null) {
            log.error("invalid_callbackReq_no_record_inDB. [reqDTO={}]", JSON.toJSONString(callBackReqDTO));
            throw new BusinessException(FacadeResultCodeEnum.BAD_PARAMS.getCode(), "无效的taskNo");
        }

        final Date now = new Date();
        // 校验状态,
        final Integer messageStatusInDB = recordInDB.getMessageStatus();
        // 1.若是'已处理'直接返回成功
        if (MessageStatusEnum.HANDLED.getType().equals(messageStatusInDB)) {
            log.warn("tryHandleCallback_already_handled. [callBackReqDTO={}]", JSON.toJSONString(callBackReqDTO));
            return;
        }
        // 2.若是'未处理' 从消息处理器工厂获取执行器 执行同步处理回调handleCallback() 执行成功后更新记录返回成功
        if (MessageStatusEnum.SENDING.getType().equals(messageStatusInDB)) {
            MessageDeliverTaskDO updateDomain = new MessageDeliverTaskDO();

            // [处理回调-子类实现]
            handleCallback(callBackReqDTO, updateDomain);

            // 更新状态和次数
            updateDomain.setId(recordInDB.getId());
            updateDomain.setUpdateTime(now);
            updateDomain.setMessageStatus(MessageStatusEnum.HANDLED.getType());
            messageDeliverTaskDAO.updateByPrimaryKeySelective(updateDomain);
            log.warn("tryHandleCallback_handle_suc. [callBackReqDTO={}]", JSON.toJSONString(callBackReqDTO));
            return;
        }
        // error data
        log.error("message_cannot_be_handled_in_status. [callBackReqDTO={} messageStatusInDB={}]", JSON.toJSONString(callBackReqDTO), MessageStatusEnum.getByType(messageStatusInDB));
    }

    /*private methods*/

    /**
     * 校验执行条件,如状态、时间、重试次数
     *
     * @param recordInDB
     * @param now
     * @return
     */
    private static boolean meetDeliverCondition(MessageDeliverTaskDO recordInDB, Date now) {
        if (recordInDB != null) {
            if (!MessageStatusEnum.SENDING.getType().equals(recordInDB.getMessageStatus())) {
                log.warn("not_sending_status. [id={} status={}]", recordInDB.getId(), recordInDB.getMessageStatus());
                return Boolean.FALSE;
            }
            if (now.before(recordInDB.getDeliverStartTime())) {
                log.warn("not_sending_time. [id={} deliverStartTime={}]", recordInDB.getId(), DateUtil.getFormatTime(recordInDB.getDeliverStartTime(), DateUtil.DATE_FORMAT_3));
                return Boolean.FALSE;
            }
            if (recordInDB.getRetryTimes() >= ConfigManager.getInteger(Constants.MSG_DELIVER_RETRY_TIMES, Constants.MSG_DELIVER_RETRY_TIMES_DEFAULT)) {
                log.warn("upper_retry_limit. [id={} retryTimes={}]", recordInDB.getId(), recordInDB.getRetryTimes());
                return Boolean.FALSE;
            }
            log.warn("meetDeliverCondition. [id={} taskNo={}]", recordInDB.getId(), recordInDB.getTaskNo());
            return Boolean.TRUE;
        }
        return Boolean.FALSE;
    }
}

只需要实现这个抽象模板并声明成Bean,即可自动加入到处理器工厂中,根据消息code标识;

5. 定时任务驱动

/**
     * 遍历发送中状态的任务 执行任务、重试次数+1
     *
     * @return
     */
    private ExecutionResult execMessageDeliverTask() {
        // ...
        log.warn("execMessageDeliverTask_start.");
        final long startTimeMillis = System.currentTimeMillis();
        final Date now = new Date();
        final int maxRetryTimes = ConfigManager.getInteger(Constants.MSG_DELIVER_RETRY_TIMES, Constants.MSG_DELIVER_RETRY_TIMES_DEFAULT);
        // 发送中/未达到重试上限
        final List<Long> recordIds = messageDeliverTaskDAO.queryRrdIdsByIdxStatusCallbackRetry(MessageStatusEnum.SENDING.getType(), null, maxRetryTimes);
        if (CollectionUtils.isEmpty(recordIds)) {
            log.warn("need_not_execMessageDeliverTask.");
            return ExecutionResult.SUCCESS;
        }
        // 逻辑分页
        int pageSize = 200;
        int size = recordIds.size();
        int pages = (size % pageSize == 0 ? size / pageSize : size / pageSize + 1);
        for (int pageNum = 1; pageNum <= pages; pageNum++) {
            int fromIndex = (pageNum - 1) * pageSize;
            int currentPage = Math.min(pageNum * pageSize, size);
            final List<Long> idsInPage = recordIds.subList(fromIndex, currentPage);
            log.warn("do_execMessageDeliverTask. [pageNum={}].", pageNum);
            List<MessageDeliverTaskDO> tasksInPage = messageDeliverTaskDAO.queryByIds(idsInPage);
            // 达到执行时间
            List<MessageDeliverTaskDO> sendAbleRrd = tasksInPage.stream()
                    .filter(task -> !now.before(task.getDeliverStartTime()))
                    .collect(Collectors.toList());
            // 串行异步执行
            if (CollectionUtils.isNotEmpty(sendAbleRrd)) {
                sendAbleRrd.forEach(deliverTaskDO -> {
                    try {
                        final MessageHandlerTemplate handler = messageHandlerFactory.getHandler(deliverTaskDO.getMessageCode());
                        final MessageDeliverTaskReqDTO reqDTO = beanConverter.convert2MessageDeliverTaskReqDTO(deliverTaskDO);
                        reqDTO.setNeedCallback(NeedCallBackEnum.needCallBack(deliverTaskDO.getNeedCallback()));
                        CompletableFuture.runAsync(() -> handler.deliverMessageThenUpdateRetryTimes(reqDTO, deliverTaskDO));
                    } catch (Exception e) {
                        log.error("do_execMessageDeliverTask_task_exec_fail. [deliverTaskDO={}]", JSON.toJSONString(deliverTaskDO));
                    }
                });
            }
        }
        log.warn("execMessageDeliverTask_finish. [cost={}ms]", System.currentTimeMillis() - startTimeMillis);
        return ExecutionResult.SUCCESS;
    }

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐