项目中需要使用Flink消费Kafka中的数据,然后使用事务的方式写入到MySQL里面。网上找到了一大堆相关的例子,但是没有一个是能非常稳定的运行的......开发加调试,搞了将近两天的时间,哎,其实主要还是应该怪自己太菜了。所以本文就是解析下Flink二阶段提交的源码,然后给出自己的二阶段提交的实现。如果文章中有错误,非常欢迎大家的指正,多多交流!

TwoPhaseCommitSinkFunction解析:

Flink二阶段提交是指:

    preCommit:预提交。之前说过,各个Operator在snapshotState()方法中调用此方法。

    commit:真正执行提交。所有Opeartor成功执行完preCommit()之后,Flink在notifyCheckpointComplete()中调用此方法,即jobmanager收到各个operator都完成checkpoint的之后,统一调用此方法。

    如果有一个preCommit执行失败了,其他preCommit也会中止,Flink按照重试策略回滚到最近成功完成的checkpoint然后重新执行。

    但是系统要保证preCommit执行完成之后,Commit一定可以执行成功,否则可能会出现一些意想不到的异常。因为二阶段提交协议,本身就是无法处理这种情况的,有可能出现数据不一致(有些commit成功,有些commit失败)的情况。(ps: 所以后面才出现了三阶段提交协议,稍微改善了这种情况)

    源码中的二阶段提交类如下所示,可以看到事务相关的变量是TXT和CONTEXT,它们都是保存在State中的,多个事务是放在了一个LinkedHashMap中按放入顺序执行:

    和之前介绍checkpoint时提到的流程一样,这个类里面主要执行的是这几个方法:initializeState、snapshotState、notifyCheckpointComplete、invoke。

    initializeState是在Flink程序刚启动的时候执行,snapshotState和notifyCheckpointComplete是在Flink做checkpoint时执行,先执行前者再执行后者。invoke的入参是输入的一条条数据,让用户自己来决定如何处理输入的数据。各个方法内部会执行如下功能:

    initializeState: 

        获取上面存储了事务数据相关的ListState。

        提交所有执行了preCommit,但是未commit的事务。

        终止所有尚未preCommit的事务(因为会从之前的checkpoint恢复嘛,所以本次的数据当然不能提交了)。

       最后会调用beginTransactionInternal(),开启一个新的事务。

    snapshotState: 里面会先执行preCommit()方法,先进行一些事务前的准备。然后将事务放到一个队列里(目的时顺序执行)。 接着执行beginTransactionI(),开启一个事务。最后将上述信息全部放到State中保存,一边容灾。

    notifyCheckpointComplete:里面就是事务commit的处理逻辑(如下面的接入,从队列中逐个获取待执行的事务按顺序执行)

    beginTransactionI():

    该方法和preCommit()差不多,都是事务前的一些工作,注释推荐说这个方法用于 Method that starts a new transaction.

    invoke():

    接受这个方法的输入一条条数据,然后由你自己的程序判断该如何处理这些数据。

    可以看出,Flink的官方提供的二阶段方法设计的非常详细的。但是对于Flink写入MySQL的场景来说,其实并不需要按照Flink官方建议的这样写的那么详细的步骤来做。

网上找到的Flink二阶段方式写入MySQL的代码:

完整代码我就不贴全了,反正都是类似的代码,这里也只摘抄了核心的部分:

package com.fwmagic.flink.sink;

import com.fwmagic.flink.util.DBConnectUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 自定义kafka to mysql,继承TwoPhaseCommitSinkFunction,实现两阶段提交。
 * 功能:保证kafak to mysql 的Exactly-Once
 */
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {

    public MySqlTwoPhaseCommitSink() {
        super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
    }

    /**
     * 执行数据入库操作
     * @param connection
     * @param objectNode
     * @param context
     * @throws Exception
     */
    @Override
    protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {
        System.err.println("start invoke.......");
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        System.err.println("===>date:" + date + " " + objectNode);
        String value = objectNode.get("value").toString();
        String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";
        PreparedStatement ps = connection.prepareStatement(sql);
        ps.setString(1, value);
        ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
        //执行insert语句
        ps.execute();
        //手动制造异常
        if(Integer.parseInt(value) == 15) System.out.println(1/0);
    }

    /**
     * 获取连接,开启手动提交事物(getConnection方法中)
     * @return
     * @throws Exception
     */
    @Override
    protected Connection beginTransaction() throws Exception {
        String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
        Connection connection = DBConnectUtil.getConnection(url, "root", "123456");
        System.err.println("start beginTransaction......."+connection);
        return connection;
    }

    /**
     * 预提交,这里预提交的逻辑在invoke方法中
     * @param connection
     * @throws Exception
     */
    @Override
    protected void preCommit(Connection connection) throws Exception {
        System.err.println("start preCommit......."+connection);

    }

    /**
     * 如果invoke执行正常则提交事物
     * @param connection
     */
    @Override
    protected void commit(Connection connection) {
        System.err.println("start commit......."+connection);
        DBConnectUtil.commit(connection);

    }
    
    @Override
    protected void recoverAndCommit(Connection connection) {
        System.err.println("start recoverAndCommit......."+connection);

    }


    @Override
    protected void recoverAndAbort(Connection connection) {
        System.err.println("start abort recoverAndAbort......."+connection);
    }

    /**
     * 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行
     * @param connection
     */
    @Override
    protected void abort(Connection connection) {
        System.err.println("start abort rollback......."+connection);
        DBConnectUtil.rollback(connection);
    }

}

    不好用的原因在于,运行的不稳定,我测试的结果是大概只能稳定运行约40分钟,然后就会爆出各种问题。

    看了参考中的“3”,大神说的还是挺对的,个人猜测的原因是代码中使用的是数据库连接而不是连接池,连接一直是打开的状态,可能出现了坏连接...

    不过至于评论中说的连接序列化问题,个人感觉这个不是问题,而且连接被序列化也是没办法的,TXT和CONTEXT都是要保存在State里的,连接肯定会被序列化。即使连接被序列化了导致连接不能用了也没关系,MySQL如果事务不提交,连接断开或者连接超时的时候是会自动回滚的,所以也不需要走recoverAndAbort()逻辑。

个人修改的Flink二阶段方式写入MySQL的代码:

其实就是基于上面的代码做了一些调整,目前长时间跑之后没有发现问题:

public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction {

    public MySqlTwoPhaseCommitSink() {
        super(new KryoSerializer<>(MyContentTransaction.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
    }

    /**
     * 建立数据库连接池
     * @return
     * @throws Exception
     */
    @Override
    protected MyContentTransaction beginTransaction() throws Exception {
        //  建立mysql数据库连接池,网上方法一大堆,这里就不写了

       HikariUtis.initMySQLConnectionPool();

       return new MyContentTransaction();
    }

    /**
     * 存储每一条数据,每一条数据都要提交到mysql中
     * @param connection
     * @param objectNode
     * @param context
     * @throws Exception
     */
    @Override
    protected void invoke(MyContentTransaction myContentTransaction, String value, Context context) throws Exception {
       myContentTransaction.store(value);
    }

    /**
     * 预提交。方法中什么都不需要做
     * @param connection
     * @throws Exception
     */
    @Override
    protected void preCommit(MyContentTransaction myContentTransaction) throws Exception {
        LOG.info("start preCommit......."+connection);

    }

    /**
     * 提交事物
     * @param connection
     */
    @Override
    protected void commit(MyContentTransaction myContentTransaction) {
        myContentTransaction.commit();

    }
    
    @Override
    protected void recoverAndCommit(Connection connection) {

        //程序异常恢复时,重新提交没有提交的事务
        System.err.println("start recoverAndCommit......."+connection);

    }

    /**
     * 如果invoke执行时发生异常,则回滚事务
     */
    @Override
    protected void abort(MyContentTransaction myContentTransaction) {
        myContentTransaction.rollback();
    }

}

public class MyContentTransaction {

    private List storeData=new ArrayList();

    private transient Connection connection;

    // 存储需要insert的事务数据

    private store(String value){

        storeData.add(value);

    }

   

    // 提交事务

    private void commit(){

        connection = HikariUtis.getConnection(); // 获取一个MySQL连接

        connection.setAutoCommit(false);

        // 然后遍历 storeData中的数据,拼接成insert的PreparedStatement,最后执行connection.commit()提交事务。(代码就不写了,手敲累死了...)

    }

    // 回滚事务

    private void rollback(){

        connection.rollback();

    }

}

参考:

    https://zhuanlan.zhihu.com/p/35616810(分布式一致性之两阶段提交协议、三阶提交协议)

    https://blog.51cto.com/simplelife/2401521(个人觉得不好用的写法)

    https://www.jianshu.com/p/5bdd9a0d7d02(评论中有大神说明了上面的写法为什么不好,不过个人也觉得评论中的说明并不完全对)

    https://bbs.csdn.net/topics/392297263(MySQL事务不提交,自动回滚)

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐