运行在Kafka Connect 分布式worker上的debezium for mysql connector报错:

{"name":"debezium-mysql-source-dev","connector":{"state":"RUNNING","worker_id":"192.168.1.200:18083"},"tasks":[{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Unexpected exception while parsing statement /* ApplicationName=IntelliJ IDEA 2017.2.4 */ ALTER TABLE fund_account ALTER COLUMN is_use_pay SET DEFAULT 0 at line 1, column 99
    at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)
    at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178)
    at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:452)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913)
    at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
    at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.text.ParsingException: Unexpected exception while parsing statement /* ApplicationName=IntelliJ IDEA 2017.2.4 */ ALTER TABLE fund_account ALTER COLUMN is_use_pay SET DEFAULT 0 at line 1, column 99
    at io.debezium.relational.ddl.LegacyDdlParser.parsingFailed(LegacyDdlParser.java:555)
    at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:227)
    at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:200)
    at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:297)
    at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:637)
    at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:436)
    ... 5 more
Caused by: java.lang.NullPointerException
    at io.debezium.connector.mysql.MySqlDdlParser.parseAlterSpecification(MySqlDdlParser.java:1296)
    at io.debezium.connector.mysql.MySqlDdlParser.parseAlterSpecificationList(MySqlDdlParser.java:1254)
    at io.debezium.connector.mysql.MySqlDdlParser.parseAlterTable(MySqlDdlParser.java:1237)
    at io.debezium.connector.mysql.MySqlDdlParser.parseAlter(MySqlDdlParser.java:1199)
    at io.debezium.connector.mysql.MySqlDdlParser.parseNextStatement(MySqlDdlParser.java:177)
    at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:219)
    ... 9 more
","id":0,"worker_id":"192.168.1.100:18083"}],"type":"source"}

看了一下connector的配置内容:

{
 "name":"debezium-mysql-source-dev",
 "config":{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "192.168.1.204",
    "database.port": "3306",
    "database.user": "***",
    "database.password": "****",
    "database.server.id": "184054",
    "database.server.name": "dev",
    "table.whitelist":"simu_ann_dev.announcement_member,simu_ann_dev.announcement_follower,simu_affair_dev.affair_member,simu_affair_dev.role",
    "database.history.kafka.bootstrap.servers": "192.168.1.204:9092,192.168.1.100:9092,192.168.1.200:9092",
    "database.history.kafka.topic": "schema-changes.dev" ,
    "include.schema.changes": "true" ,
    "mode":"incrementing",
    "snapshot.mode":"schema_only_recovery",
    "incrementing.column.name":"id"
  }
}

一开始尝试把”include.schema.changes”配置成false,发现还是不行。

查看connctor运行的task所在的worker节点的connectDistributed.out中的更详细的报错信息:

INFO: Connected to 192.168.1.204:3306 at mysql-bin.000098/467648 (sid:184054, cid:59755)
[2018-08-07 19:05:15,309] INFO Connected to MySQL binlog at 192.168.1.204:3306, starting at binlog file 'mysql-bin.000098', pos=467648, skipping 2 events plus 0 rows (io.debezium.connector.mysql.BinlogReader:941)
[2018-08-07 19:05:15,309] INFO WorkerSourceTask{id=debezium-mysql-source-dev-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:165)
[2018-08-07 19:05:15,311] INFO Creating thread debezium-mysqlconnector-dev-binlog-client (io.debezium.util.Threads:247)
[2018-08-07 19:05:15,364] INFO Cluster ID: rse1l3gPRJW9-aJLJMB8YA (org.apache.kafka.clients.Metadata:265)
[2018-08-07 19:05:15,370] ERROR Error during binlog processing. Last offset stored = null, binlog reader near position = mysql-bin.000098/467713 (io.debezium.connector.mysql.BinlogReader:967)
[2018-08-07 19:05:15,371] ERROR Failed due to error: Error processing binlog event (io.debezium.connector.mysql.BinlogReader:179)
org.apache.kafka.connect.errors.ConnectException: Unexpected exception while parsing statement /* ApplicationName=IntelliJ IDEA 2017.2.4 */ ALTER TABLE fund_account ALTER COLUMN is_use_pay SET DEFAULT 0 at line 1, column 99
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:200)
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:178)
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:452)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1055)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:913)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:559)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:793)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.debezium.text.ParsingException: Unexpected exception while parsing statement /* ApplicationName=IntelliJ IDEA 2017.2.4 */ ALTER TABLE fund_account ALTER COLUMN is_use_pay SET DEFAULT 0 at line 1, column 99
        at io.debezium.relational.ddl.LegacyDdlParser.parsingFailed(LegacyDdlParser.java:555)
        at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:227)
        at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:200)
        at io.debezium.connector.mysql.MySqlSchema.applyDdl(MySqlSchema.java:298)
        at io.debezium.connector.mysql.BinlogReader.handleQueryEvent(BinlogReader.java:637)
        at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:436)
        ... 5 more
Caused by: java.lang.NullPointerException
        at io.debezium.connector.mysql.MySqlDdlParser.parseAlterSpecification(MySqlDdlParser.java:1296)
        at io.debezium.connector.mysql.MySqlDdlParser.parseAlterSpecificationList(MySqlDdlParser.java:1254)
        at io.debezium.connector.mysql.MySqlDdlParser.parseAlterTable(MySqlDdlParser.java:1237)
        at io.debezium.connector.mysql.MySqlDdlParser.parseAlter(MySqlDdlParser.java:1199)
        at io.debezium.connector.mysql.MySqlDdlParser.parseNextStatement(MySqlDdlParser.java:177)
        at io.debezium.relational.ddl.LegacyDdlParser.parse(LegacyDdlParser.java:219)
        ... 9 more
[2018-08-07 19:05:15,373] INFO Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored. (io.debezium.connector.mysql.BinlogReader:457)

从中看到了binlog日志的文件,去mysql的binlog日志文件(日志在mysql的/etc/my.cnf中配置的binlog文件位置)中查看信息。

root@ubuntu-204:/var/log/mysql# mysqlbinlog  --no-defaults --start-position=467648  --stop-position 469713 ./mysql-bin.000098
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#180806  6:40:30 server id 1  end_log_pos 123 CRC32 0x2da62c9f  Start: binlog v 4, server v 5.7.21-log created 180806  6:40:30
BINLOG '
3nxnWw8BAAAAdwAAAHsAAAAAAAQANS43LjIxLWxvZwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
AAAAAAAAAAAAAAAAAAAAAAAAEzgNAAgAEgAEBAQEEgAAXwAEGggAAAAICAgCAAAACgoKKioAEjQA
AZ8spi0=
'/*!*/;
# at 467648
#180806 12:16:35 server id 1  end_log_pos 467713 CRC32 0x9678a828   Anonymous_GTID  last_committed=832  sequence_number=833 rbr_only=no
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 467713
#180806 12:16:35 server id 1  end_log_pos 467906 CRC32 0x193b98c0   Query   thread_id=55848 exec_time=0 error_code=0
use `menkor_dev`/*!*/;
SET TIMESTAMP=1533528995/*!*/;
SET @@session.pseudo_thread_id=55848/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1436549152/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=45,@@session.collation_connection=45,@@session.collation_server=224/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
/* ApplicationName=IntelliJ IDEA 2017.2.4 */ ALTER TABLE fund_account ALTER COLUMN is_use_pay SET DEFAULT 0
/*!*/;
# at 467906
#180806 12:26:09 server id 1  end_log_pos 467971 CRC32 0x28ebc3fa   Anonymous_GTID  last_committed=833  sequence_number=834 rbr_only=yes
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 467971
#180806 12:26:09 server id 1  end_log_pos 468059 CRC32 0xf281f0a1   Query   thread_id=55839 exec_time=0 error_code=0
SET TIMESTAMP=1533529569/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=224,@@session.collation_connection=224,@@session.collation_server=224/*!*/;
....

可以看到报错的行内容是”/* ApplicationName=IntelliJ IDEA 2017.2.4 */ ALTER TABLE fund_account ALTER COLUMN is_use_pay SET DEFAULT 0”。

然而到现在还是没找到线索。查看了报错所在的源码:

    protected void parseAlterSpecification(Marker start, TableEditor table, Consumer<TableId> newTableName) {
        parseTableOptions(start, table);
        if (tokens.canConsume("ADD")) {
            if (tokens.matches("COLUMN", "(") || tokens.matches('(')) {
                tokens.canConsume("COLUMN");
                parseCreateDefinitionList(start, table);
            } else if (tokens.canConsume("PARTITION", "(")) {
                parsePartitionDefinition(start, table);
                tokens.consume(')');
            } else {
                parseCreateDefinition(start, table, true);
            }
        } else if (tokens.canConsume("DROP")) {
            if (tokens.canConsume("PRIMARY", "KEY")) {
                table.setPrimaryKeyNames();
            } else if (tokens.canConsume("FOREIGN", "KEY")) {
                tokens.consume(); // foreign key symbol
            } else if (tokens.canConsumeAnyOf("INDEX", "KEY")) {
                tokens.consume(); // index name
            } else if (tokens.canConsume("PARTITION")) {
                parsePartitionNames(start);
            } else {
                if(!isNextTokenQuotedIdentifier()) {
                    tokens.canConsume("COLUMN");
                }
                String columnName = parseColumnName();
                table.removeColumn(columnName);
                tokens.canConsume("RESTRICT");
            }
        } else if (tokens.canConsume("ALTER")) {
            if (!isNextTokenQuotedIdentifier()) {
                tokens.canConsume("COLUMN");
            }
            String columnName = tokens.consume(); // column name
            if (!tokens.canConsume("DROP", "DEFAULT")) {
                tokens.consume("SET");
                //就是下面这行报空指针异常的
                ColumnEditor columnEditor = table.columnWithName(columnName).edit();
                parseDefaultClause(start, columnEditor);
            }
        } else if (tokens.canConsume("CHANGE")) {
            if (!isNextTokenQuotedIdentifier()) {
                tokens.canConsume("COLUMN");
            }
            String oldName = parseColumnName();
            String newName = parseColumnName();
            parseCreateColumn(start, table, oldName, newName);
        } else if (tokens.canConsume("MODIFY")) {
            if (!isNextTokenQuotedIdentifier()) {
                tokens.canConsume("COLUMN");
            }
            String columnName = parseColumnName();
            parseCreateColumn(start, table, columnName, null);
        } else if (tokens.canConsumeAnyOf("ALGORITHM", "LOCK")) {
            tokens.canConsume('=');
            tokens.consume();
        } else if (tokens.canConsume("DISABLE", "KEYS")
                || tokens.canConsume("ENABLE", "KEYS")) {} else if (tokens.canConsume("RENAME", "INDEX")
                        || tokens.canConsume("RENAME", "KEY")) {
            tokens.consume(); // old
            tokens.consume("TO");
            tokens.consume(); // new
        } else if (tokens.canConsume("RENAME")) {
            tokens.canConsumeAnyOf("AS", "TO");
            TableId newTableId = parseQualifiedTableName(start);
            newTableName.accept(newTableId);
        } else if (tokens.canConsume("ORDER", "BY")) {
            consumeCommaSeparatedValueList(start); // this should not affect the order of the columns in the table
        } else if (tokens.canConsume("CONVERT", "TO", "CHARACTER", "SET")
                || tokens.canConsume("CONVERT", "TO", "CHARSET")) {
            tokens.consume(); // charset name
            if (tokens.canConsume("COLLATE")) {
                tokens.consume(); // collation name
            }
        } else if (tokens.canConsume("CHARACTER", "SET")
                || tokens.canConsume("CHARSET")
                || tokens.canConsume("DEFAULT", "CHARACTER", "SET")
                || tokens.canConsume("DEFAULT", "CHARSET")) {
            tokens.canConsume('=');
            String charsetName = tokens.consume(); // charset name
            table.setDefaultCharsetName(charsetName);
            if (tokens.canConsume("COLLATE")) {
                tokens.canConsume('=');
                tokens.consume(); // collation name (ignored)
            }
        } else if (tokens.canConsume("DISCARD", "TABLESPACE") || tokens.canConsume("IMPORT", "TABLESPACE")) {
            // nothing
        } else if (tokens.canConsume("FORCE")) {
            // nothing
        } else if (tokens.canConsume("WITH", "VALIDATION") || tokens.canConsume("WITHOUT", "VALIDATION")) {
            // nothing
        } else if (tokens.canConsume("DISCARD", "PARTITION") || tokens.canConsume("IMPORT", "PARTITION")) {
            if (!tokens.canConsume("ALL")) {
                tokens.consume(); // partition name
            }
            tokens.consume("TABLESPACE");
        } else if (tokens.canConsume("COALLESCE", "PARTITION")) {
            tokens.consume(); // number
        } else if (tokens.canConsume("REORGANIZE", "PARTITION")) {
            parsePartitionNames(start);
            tokens.consume("INTO", "(");
            do {
                parsePartitionDefinition(start, table);
            } while (tokens.canConsume(','));
            tokens.consume(')');
        } else if (tokens.canConsume("EXCHANGE", "PARTITION")) {
            tokens.consume(); // partition name
            tokens.consume("WITH", "TABLE");
            parseSchemaQualifiedName(start); // table name
            if (tokens.canConsumeAnyOf("WITH", "WITHOUT")) {
                tokens.consume("VALIDATION");
            }
        } else if (tokens.matches(TokenStream.ANY_VALUE, "PARTITION")) {
            tokens.consumeAnyOf("TRUNCATE", "CHECK", "ANALYZE", "OPTIMIZE", "REBUILD", "REPAIR");
            tokens.consume("PARTITION");
            if (!tokens.canConsume("ALL")) {
                parsePartitionNames(start);
            }
        } else if (tokens.canConsume("REMOVE", "PARTITIONING")) {
            // nothing
        } else if (tokens.canConsume("UPGRADE", "PARTITIONING")) {
            // nothing
        }
    }

笔者在源码位置加了点日志:

                logger.info("列名="+columnName);
                Column column = table.columnWithName(columnName);
                logger.info("列信息="+column);
                ColumnEditor columnEditor = column.edit();

然后根据源码构建jar报,上传到kafka connect的插件目录中,再一次执行debezium for mysq connector,发现如下日志:

[2018-08-07 20:40:10,287] INFO 列名=is_use_pay (io.debezium.connector.mysql.MySqlDdlParser:1296)
[2018-08-07 20:40:10,288] INFO 列信息=null (io.debezium.connector.mysql.MySqlDdlParser:1298)

看来是列信息为null导致的空指针。具体什么原因导致的,还不知道。

笔者最近事情比较多,仔细回看日志后发现,导致报错的binlog是笔者table.whitelist之外的表,很奇怪,不监控的表的binlog日志解析报错竟然会影响到整个connector。好在笔者的业务需求不需要知道表的结构变化,所以改了一下connector的配置,暂时解决了问题:

{
 "name":"debezium-mysql-source-dev",
 "config":{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "192.168.1.204",
    "database.port": "3306",
    "database.user": "*****",
    "database.password": "*****",
    "database.server.id": "184054",
    "database.server.name": "dev",
    "table.whitelist":"simu_ann_dev.announcement_member,simu_ann_dev.announcement_follower,simu_affair_dev.affair_member,simu_affair_dev.role",
    "database.history.kafka.bootstrap.servers": "192.168.1.204:9092,192.168.1.100:9092,192.168.1.200:9092",
    "database.history.kafka.topic": "schema-changes.dev" ,
    "include.schema.changes": "true" ,
    "mode":"incrementing",
    "incrementing.column.name":"id",
    "database.history.store.only.monitored.tables.ddl":"true",
    "database.history.skip.unparseable.ddl":"true"
  }
}

等不忙的时候得好好把debezium研究一下。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐