文章下半部分:Debezium connector for MySQL 配置部署

Debezium connector for MySQL

MySQL 的 binlog 会按照事务提交的顺序记录所有的操作变更。这些变更既包含 表 schema的变更也包含 数据的变更。MySQL 使用binlog来复制和恢复数据。

Debezium MySQL 连接器读取 binlog,为行级INSERT,UPDATEDELETE操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。

由于 MySQL 通常设置为在指定时间段后清除 binlog,因此 MySQL 连接器会对您的每个数据库执行初始一致快照。MySQL 连接器从创建快照的位置读取 binlog。

有关与此连接器兼容的 MySQL 数据库版本的信息,请参阅 https://debezium.io/releases/。

1. connector 是怎么工作的?

从整体了解连接器支持的 MySQL (集群运行模式)拓扑 对于规划您的应用程序很有价值。为了优化配置和运行 Debezium MySQL 连接器,了解连接器如何跟踪表结构、对外公开模式更改、执行快照以及确定 Kafka 主题名称会很有帮助。

Debezium MySQL 连接器尚未在 MariaDB 上进行测试,但来自社区的多份报告表明该连接器已成功用于该数据库。计划在未来的 Debezium 版本中提供对 MariaDB 的官方支持。

1.1 支持的 MySQL (集群运行模式)拓扑

Debezium MySQL 连接器支持以下 MySQL 拓扑:

Standalone 独立实例

当使用单个 MySQL 服务器时,服务器必须启用 binlog(并且可选地启用 GTID),以便 Debezium MySQL 连接器可以跟踪监控数据库服务器。这通常是可以接受的,因为二进制日志也可以用作增量备份。在这种情况下,MySQL 连接器始终连接并跟随这个独立的 MySQL 服务器实例。

Primary and replica 主库和副本

Debezium MySQL 连接器可以跟随主服务器之一或副本之一(如果该副本启用了其 binlog),但连接器仅看到该服务器可见的集群中的更改。通常,除了 在多主拓扑之外,这基本不是问题【因为只能跟踪一个实例,再多主模式种,需要跟踪多个实例这是Debezium 不支持的】。

连接器将其位置记录在服务器的 binlog 中,这在集群中的每台服务器上都是不同的。因此,连接器必须只跟随一个 MySQL 服务器实例。如果该服务器出现故障,则必须重新启动或恢复该服务器,然后连接器才能继续。

High available clusters 高可用集群

MySQL 存在多种高可用性解决方案,这些方案能容忍问题和故障并且几乎立即可以从问题和故障中恢复变得更加容易。大多数 HA MySQL 集群使用 GTID,以便副本能够跟踪任何主服务器上的所有更改。

Multi-primary 多主

网络数据库 (NDB) 集群复制使用一个或多个 MySQL 副本节点,每个节点从多个主服务器复制。这是聚合多个 MySQL 集群的复制的强大方法。此拓扑需要使用 GTID。

Debezium MySQL 连接器可以使用这些多主 MySQL 副本作为源,并且只要新副本赶上旧副本,就可以故障转移到不同的多主 MySQL 副本。也就是说,新副本具有在第一个副本上看到的所有事务。即使连接器仅使用数据库或表的一个子集,这也有效,因为可以将连接器配置为在尝试重新连接到新的多主 MySQL 副本并找到二进制日志的正确偏移位置。

Hosted 托管云服务

支持 Debezium MySQL 连接器以使用托管选项,例如 Amazon RDS 和 Amazon Aurora。

因为这些托管选项不允许全局读锁,所以使用表级锁来创建一致快照

1.2 表模式历史主题 schema history topic

当数据库客户端查询数据库时,客户端使用数据库的当前 schema。但是,数据库schema可以随时更改,这意味着连接器必须能够识别在每个记录插入、更新或删除操作时的schema是什么。此外,连接器不能只使用当前schema,因为连接器可能正在处理的记录是在 schema 变更之前生成的。

为了确保正确处理schema变更后发生的数据变更,MySQL 在 binlog 中不仅包括对数据的行级变更,还包括应用于数据库的 DDL 语句。当连接器读取 binlog 并遇到这些 DDL 语句时,它会解析它们并更新每个表schema的内存表示。连接器使用此schema表示来识别每次插入、更新或删除操作时的表schema,并产生适当的更改事件。在一个单独的数据库历史 Kafka 主题中,连接器记录所有 DDL 语句以及每个 DDL 语句出现在 binlog 中的位置。

当连接器在崩溃或优雅停止后重新启动时,连接器会从特定位置,即从特定时间点开始读取 binlog。连接器通过读取数据库历史 Kafka 主题并解析所有 DDL 语句,直到连接器启动的 binlog 中的点,来重建此时存在的表结构。

数据库历史主题仅供连接器使用。连接器可以选择将模式更改事件发送到针对消费者应用程序的不同主题

当 MySQL 连接器捕获应用了架构更改工具(例如gh-ost或)的表中的更改pt-online-schema-change时,会在迁移过程中创建辅助表。需要配置连接器以捕获对这些帮助表的更改。如果消费者不需要为帮助表生成的记录,则可以应用单个消息转换将它们过滤掉。

查看接收 Debezium 事件记录的默认主题名称。

1.3 schema change topic

您可以配置 Debezium MySQL 连接器以生成模式更改事件,这些事件描述应用于数据库中捕获的表的模式更改。连接器将模式更改事件写入名为 的 Kafka 主题<serverName>,其中serverName是连接器配置属性中database.server.name指定的逻辑服务器名称。连接器发送到模式更改主题的消息包含有效负载,并且(可选)还包含更改事件消息的schema。

schema 更改事件消息的有效负载包括以下元素:

ddl

提供导致schema 更改的 SQL CREATEALTERDROP语句。

databaseName

应用 DDL 语句的数据库的名称。databaseName的值用作消息键。

pos

语句出现在 binlog 中的位置。

tableChanges

schema 更改后整个表schema 的结构化表示。该tableChanges字段包含一个数组,其中包含表中每一列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者可以轻松读取消息,而无需先通过 DDL 解析器对其进行处理。

重要提示对于处于捕获模式的表,连接器不仅将模式更改的历史记录存储在模式更改主题中,还会存储在内部数据库历史记录主题中。内部数据库历史主题仅供连接器使用,不适合消费应用程序直接使用。确保需要有关架构更改通知的应用程序仅使用来自架构更改主题的信息。
重要提示永远不要对数据库历史主题进行分区。要使数据库历史主题正确运行,它必须保持连接器向其发出的事件记录的一致的全局顺序。为确保主题不会在分区之间拆分,请使用以下方法之一设置主题的分区计数:
- 如果您手动创建数据库历史主题,请将分区计数指定为1
- 如果您使用 Apache Kafka 代理自动创建数据库历史主题,则会创建主题,请将Kafkanum.partitions配置选项的值设置为1
警告连接器向其模式更改主题发出的消息格式处于孵化状态,如有更改,恕不另行通知。

示例:发送到 MySQL 连接器schema 更改主题的消息

以下示例显示了 JSON 格式的典型schema 更改消息。该消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
        "source": {  // (1)
        "version": "2.0.0.Alpha1",
        "connector": "mysql",
        "name": "dbserver1",
        "ts_ms": 0,
        "snapshot": "false",
        "db": "inventory",
        "sequence": null,
        "table": "customers",
        "server_id": 0,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 219,
        "row": 0,
        "thread": null,
        "query": null
    },
    "databaseName": "inventory", // (2)
    "schemaName": null,
    "ddl": "ALTER TABLE customers ADD COLUMN middle_name VARCHAR(2000)", // (3)
    "tableChanges": [ // (4)
        {
        "type": "ALTER", // (5)
        "id": "\"inventory\".\"customers\"",  // (6)
        "table": { // (7)
            "defaultCharsetName": "latin1",
            "primaryKeyColumnNames": [  // (8)
                "id"
            ],
            "columns": [ // (9)
                {
                "name": "id",
                "jdbcType": 4,
                "nativeType": null,
                "typeName": "INT",
                "typeExpression": "INT",
                "charsetName": null,
                "length": 11,
                "scale": null,
                "position": 1,
                "optional": false,
                "autoIncremented": true,
                "generated": true
            },
            {
                "name": "first_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 2,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },                        {
                "name": "last_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 3,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            {
                "name": "email",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 255,
                "scale": null,
                "position": 4,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            },
            {
                "name": "middle_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "latin1",
                "length": 2000,
                "scale": null,
                "position": 5,
                "optional": true,
                "autoIncremented": false,
                "generated": false
            }
          ]
        }
      }
    ]
  },
  "payload": {
    "databaseName": "inventory",
    "ddl": "CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512), weight FLOAT ); ALTER TABLE products AUTO_INCREMENT = 101;",
    "source" : {
      "version": "2.0.0.Alpha1",
      "name": "mysql-server-1",
      "server_id": 0,
      "ts_ms": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": null,
      "table": null,
      "query": null
    }
  }
}

表 1. 发送到schema 更改主题的消息中的字段描述

编号字段名称描述
1sourcesource字段的结构与连接器写入特定于表的主题的标准数据更改事件完全相同。此字段可用于关联不同主题的事件。
2databaseName
schemaName
标识包含更改的数据库和schema 。该databaseName字段的值用作记录的消息键。
3ddl此字段包含负责schema 更改的 DDL。该ddl字段可以包含多个 DDL 语句。每个语句都适用于数据库中的databaseName字段。多个 DDL 语句按照它们应用于数据库的顺序出现。

客户端可以提交多个适用于多个数据库的 DDL 语句。如果 MySQL 以原子方式应用它们,则连接器按顺序获取 DDL 语句,按数据库对它们进行分组,并为每个组创建一个模式更改事件。如果 MySQL 单独应用它们,连接器会为每个语句创建一个单独的模式更改事件。
4tableChanges包含由 DDL 命令生成的架构更改的一个或多个项目的数组。
5type描述变化的种类。该值为以下之一:
CREATE 表已创建。
ALTER 表已修改。
DROP 表已删除。
6id创建、更改或删除的表的完整标识符。在表重命名的情况下,此标识符是表名的组合。*<old>*,*<new>*
7table表示应用更改后的表元数据。
8primaryKeyColumnNames组成表的主键的列的列表。
9columns已更改表中每一列的元数据。

另请参阅:模式历史主题

1.4 快照

首次启动 Debezium MySQL 连接器时,它会执行数据库的初始一致快照。以下流程描述了连接器如何创建此快照。此流程适用于默认快照模式,即initial. 有关其他快照模式的信息,请参阅MySQL 连接器snapshot.mode配置属性

表 2. 使用全局读锁执行初始快照的工作流程

步骤动作
1获取阻止其他数据库客户端写入的全局读锁。

快照本身不会阻止其他客户端应用可能会干扰连接器尝试读取 binlog 位置和表模式的 DDL。连接器在读取 binlog 位置时保持全局读锁,并如后面的步骤所述释放锁。
2启动具有可重复读取语义的事务,以确保事务中的所有后续读取都针对一致的快照完成。
3读取当前的 binlog 位置。
4读取连接器配置为捕获更改的数据库和表的架构。
5释放全局读锁。其他数据库客户端现在可以写入数据库。
6如果适用,将 DDL 更改写入架构更改主题,包括所有必要DROP…​CREATE…​DDL 语句。
7扫描数据库表。对于每一行,连接器将CREATE事件发送到相关的特定于表的 Kafka 主题。
8提交事务。
9在连接器偏移中记录完成的快照。

连接器重新启动

如果连接器在执行初始快照时发生故障、停止或重新平衡,则在连接器重新启动后,它会执行新的快照。在初始快照完成后,Debezium MySQL 连接器从 binlog 中的相同位置重新启动,因此它不会错过任何更新。

如果连接器停止的时间足够长,MySQL 可能会清除旧的二进制日志文件,连接器的位置就会丢失。如果位置丢失,连接器将恢复为其起始位置的*初始快照。*有关对 Debezium MySQL 连接器进行故障排除的更多提示,请参阅出现问题时的行为

不允许全局读锁

某些环境不允许全局读锁。如果 Debezium MySQL 连接器检测到不允许全局读锁,则连接器使用表级锁代替并使用此方法执行快照。这要求 Debezium 连接器的数据库用户具有LOCK TABLES权限。

表 3. 使用表级锁执行初始快照的工作流程

步骤动作
1获取表级锁。
2启动具有可重复读取语义的事务,以确保事务中的所有后续读取都针对一致的快照完成。
3读取和过滤数据库和表的名称。
4读取当前的 binlog 位置。
5读取连接器配置为捕获更改的数据库和表的架构。
6如果适用,将 DDL 更改写入架构更改主题,包括所有必要DROP…​CREATE…​DDL 语句。
7扫描数据库表。对于每一行,连接器将CREATE事件发送到相关的特定于表的 Kafka 主题。
8提交事务。
9释放表级锁。
10在连接器偏移中记录完成的快照。
1.4.1 临时即席快照

默认情况下,连接器仅在首次启动后才运行初始快照操作。在这个初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流式处理进入。

但是,在某些情况下,连接器在初始快照期间获得的数据可能会变得陈旧、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium 包含一个执行临时快照的选项。数据库中的以下更改可能会导致执行临时快照:

  • 修改连接器配置以捕获一组不同的表。

  • Kafka 主题被删除,必须重建。

  • 由于配置错误或其他问题而发生数据损坏。

您可以通过启动所谓的ad-hoc 快照为之前捕获快照的表重新运行快照。即席快照需要使用signaling tables (信号表)。您可以通过向 Debezium 信号表发送信号请求来启动临时快照。

当您启动现有表的临时快照时,连接器会将内容附加到表已存在的主题中。如果删除了以前存在的主题,如果启用了自动主题创建,Debezium 可以自动创建主题。

即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的子集。

您可以通过向信令表发送execute-snapshot消息来指定要捕获的表。将execute-snapshot信号的类型设置为incremental,并提供要包含在快照中的表的名称,如下表所述:

Table 4. Example of an ad hoc execute-snapshot signal record

字段默认值说明
typeincremental指定要运行的快照类型。
设置类型是可选的。目前,您只能请求incremental快照。
data-collectionsN/A一个数组,其中包含要生成快照的表的完全限定名称。名称的格式与配置选项
的格式相同。signal.data.collection

触发临时快照

您可以通过将具有信号execute-snapshot类型的条目添加到信号表来启动临时快照。连接器处理完消息后,将开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个表的起点和终点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,并继续对每个块进行快照,一次一个。

目前,execute-snapshot操作类型仅触发增量快照。有关详细信息,请参阅增量快照

1.4.2 增量快照

为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依靠 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。

在增量快照中,Debezium 不是像在初始快照中那样一次捕获数据库的完整状态,而是在一系列可配置的块中分阶段捕获每个表。您可以指定您希望快照捕获的表和每个块的大小。块大小决定了快照在数据库上的每次提取操作期间收集的行数。增量快照的默认块大小为 1 KB。

随着增量快照的进行,Debezium 使用watermarks 来跟踪其进度,维护它捕获的每个表行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优势:

  • 您可以在流式数据捕获的同时运行增量快照,而不是将流式传输推迟到快照完成。连接器在整个快照过程中继续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。

  • 如果增量快照的进度中断,您可以恢复它而不会丢失任何数据。进程恢复后,快照从它停止的点开始,而不是从头重新捕获表。

  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将表添加到其table.include.list属性后重新运行快照。

增量快照过程

当您运行增量快照时,Debezium 按主键对每个表进行排序,然后根据配置的块大小将表拆分为块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个READ事件。该事件表示块的快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,可能会修改表记录。为反映此类更改,INSERTUPDATEDELETE操作将照常提交到事务日志。同样,正在进行的 Debezium 流式处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。

Debezium 如何解决具有相同主键的记录之间的冲突

在某些情况下,流式处理发出的UPDATEDELETE事件被乱序接收。也就是说,流式处理可能会在快照捕获包含该行的READ事件的块之前发出一个修改表行的事件。当快照最终为该行发出相应的READ事件时,它的值已经被取代。为了确保以正确的逻辑顺序处理乱序到达的增量快照事件,Debezium 采用了一种缓冲方案来解决冲突。只有在解决了快照事件和流事件之间的冲突后,Debezium 才会向 Kafka 发出事件记录。

快照窗口

为了帮助解决延迟到达事件和修改同一表行的流事件之间的冲突READ,Debezium 采用了所谓的快照窗口。快照窗口划分了增量快照捕获指定表块数据的时间间隔。在一个块的快照窗口打开之前,Debezium 遵循其通常的行为并从事务日志直接向下游发送事件到目标 Kafka 主题。但是从特定块的快照打开的那一刻起,直到它关闭,Debezium 执行重复数据删除步骤以解决具有相同主键的事件之间的冲突。

对于每个数据集合,Debezium 发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为READ操作发出。同时,随着用户不断更新数据集合中的记录,事务日志也更新以反映每次提交,Debezium 会针对每次更改发出UPDATE或操作。DELETE

当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传递到内存缓冲区。在快照窗口期间,READ缓冲区中事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的READ事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。块的快照窗口关闭后,缓冲区仅包含READ不存在相关事务日志事件的事件。Debezium 将这些剩余READ事件发送到表的 Kafka 主题。

连接器对每个快照块重复该过程。

触发增量快照

目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令表。INSERT您将信号作为 SQL查询提交给表。Debezium 检测到信号表中的变化后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的表,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值incremental.

要指定要包含在快照中的表,请提供一个data-collections列出这些表的数组,例如,
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

增量快照信号的data-collections数组没有默认值。如果data-collections数组为空,Debezium 检测到不需要任何操作并且不执行快照。

先决条件

程序

  1. 发送 SQL 查询以将临时增量快照请求添加到信令表:

    INSERT INTO _<signalTable>_ (id, type, data) VALUES (_'<id>'_, _'<snapshotType>'_, '{"data-collections": ["_<tableName>_","_<tableName>_"],"type":"_<snapshotType>_"}');
    

    例如,

    INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.table1", "schema2.table2"],"type":"incremental"}');
    

    id命令中的、type和参数的值data对应于信令表的字段

    下表描述了这些参数:

    表 5. 向信令表发送增量快照信号的 SQL 命令字段说明

    价值描述
    myschema.debezium_signal指定源数据库上的信令表的完全限定名称
    ad-hoc-1id参数指定一个任意字符串,该字符串被分配为id信号请求的标识符。
    使用此字符串将日志消息标识到信令表中的条目。Debezium 不使用此字符串。相反,在快照期间,Debezium 会生成自己的id字符串作为水印信号。
    execute-snapshot指定type参数指定信号要触发的操作。
    data-collections信号字段的必需组件,data它指定要包含在快照中的表名数组。
    该数组按表的完全限定名称列出表,使用的格式与您在signal.data.collection配置属性中指定连接器信号表的名称时使用的格式相同。
    incremental信号字段的可选type组件data,指定要运行的快照操作的种类。
    目前,唯一有效的选项是默认值incremental.
    在您提交给信令表的 SQL 查询中指定一个type值是可选的。
    如果您未指定值,则连接器将运行增量快照。

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增量快照事件消息

{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" 
    },
    "op":"r", 
    "ts_ms":"1620393591654",
    "transaction":null
}
行号字段名称描述
1snapshot指定要运行的快照操作的类型。
目前,唯一有效的选项是默认值incremental.
在您提交给信令表的 SQL 查询中指定一个type值是可选的。
如果您未指定值,则连接器将运行增量快照。
2op指定事件类型。
快照事件的值为r,表示READ操作。
1.4.3 只读增量快照

MySQL 连接器允许使用与数据库的只读连接来运行增量快照。要运行具有只读访问权限的增量快照,连接器使用已执行的全局事务 ID (GTID) 设置为高 watermarks和低watermarks。通过将二进制日志 (binlog) 事件的 GTID 或服务器的心跳与低watermarks和高watermarks进行比较来更新块窗口的状态。

要切换到只读实现,请将read.only属性的值设置为true

先决条件

  • 启用 MySQL GTID

  • 如果连接器从多线程副本(即,值replica_parallel_workers大于的副本0)读取,则必须设置以下选项之一:

    • replica_preserve_commit_order=ON

    • slave_preserve_commit_order=ON

1.4.4 即席只读增量快照

当 MySQL 连接为只读时,信令表机制](https://debezium.io/documentation/reference/2.0/connectors/mysql.html#mysql-property-signal-kafka-topic)还可以通过向 Kafka 主题发送消息 并指定 signal.kafka.topic属性来运行快照。

Kafka 消息的键必须与database.server.name连接器配置选项的值匹配。

该值是一个带有typedata字段的 JSON 对象。

信号类型是execute-snapshotdata字段必须有以下字段:

表 6. 执行快照数据字段

字段默认描述
typeincremental要执行的快照的类型。目前仅incremental支持。
有关详细信息,请参阅下一节。
data-collections不适用要快照的表的限定名称数组。名称的格式与signal.data.collection配置选项
的格式相同。

执行快照 Kafka 消息的示例:

键 = test_connector
值 = {"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}

1.5 快照事件的操作类型

MySQL 连接器将快照事件比如将 READ操作定义为("op" : "r") 发出。如果您希望连接器将快照事件作为CREATE( c) 事件发出,请配置 DebeziumReadToInsertEvent单消息转换 (SMT) 以修改事件类型。

以下示例显示了如何配置 SMT:

示例:使用ReadToInsertEventSMT 更改快照事件的类型

transforms=snapshotasinsert,...
transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent

1.6 主题名称

默认情况下,MySQL 连接器将表中发生的所有 INSERT, UPDATE, 和 DELETE操作更改事件写入特定于该表的单个 Apache Kafka 主题。

连接器使用以下约定来命名更改事件主题:

serverName.databaseName.tableName

假设fulfillment是服务器名称,inventory是数据库名称,并且数据库包含名为orderscustomers和的表products。Debezium MySQL 连接器向三个 Kafka 主题发出事件,每个主题对应一个数据库中的表:

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

以下列表提供了默认名称组件的定义:

serverName

database.server.name由连接器配置属性指定的服务器的逻辑名称。

schemaName

发生操作的模式的名称。

tableName

发生操作的表的名称。

连接器应用类似的命名约定来标记其内部数据库历史主题、模式更改主题事务元数据主题

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅主题路由

1.7 事务元数据 Transaction metadata

Debezium 可以生成表示事务边界和增强的数据更改事件消息的事件。

注意事项Debezium 接收事务元数据的时间限制
Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。

Debezium 为每个事务中的BEGINEND分隔符生成事务边界事件。事务边界事件包含以下字段:

status

BEGINEND

id

唯一事务标识符的字符串表示形式。

event_count(用于END活动)

事务发出的事件总数。

data_collections(用于END活动)

一对data_collectionevent_count元素的数组。表示连接器针对源自数据集合的更改发出的事件数。

例子

{
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过transaction.topic选项覆盖,否则连接器会向主题发出事务事件。*<database.server.name>*.transaction

更改增强的数据事件

启用事务元数据后,数据消息Envelope会增加一个新transaction字段。此字段以字段组合的形式提供有关每个事件的信息:

  • id- 唯一交易标识符的字符串表示

  • total_order- 事件在事务产生的所有事件中的绝对位置

  • data_collection_order- 事件在事务发出的所有事件中的每个数据收集位置

以下是消息的示例:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

对于没有启用 GTID 的系统,事务标识符是使用 binlog 文件名和 binlog 位置的组合构建的。例如,如果事务 BEGIN 事件对应的 binlog 文件名和位置分别为 mysql-bin.000002 和 1913,则 Debezium 构造的事务标识符将为file=mysql-bin.000002,pos=1913.

2. 数据更改事件

Debezium MySQL 连接器为每个行级INSERTUPDATEDELETE操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于已更改的表。

Debezium 和 Kafka Connector 是围绕连续的事件消息流设计的。但是,这些事件的结构可能会随着时间的推移而发生变化,这对于消费者来说可能难以处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,消费者可以使用它从注册表中获取模式的模式 ID。这使得每个事件都是独立的。

以下 JSON框架 显示了更改事件的基本四个部分。但是,如何配置您选择在应用程序中使用的 Kafka Connector 转换器决定了这四个部分在更改事件中的表示。仅当您将转换器配置为生成该字段时,该schema字段才处于更改事件中。同样,仅当您将转换器配置为生成事件键和值时,事件键和事件有效负载才在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:

{
 "schema": { // (1)
   ...
  },
 "payload": { // (2)
   ...
 },
 "schema": { // (3)
   ...
 },
 "payload": { // (4)
   ...
 },
}

表 7. 变更事件基本内容概览

编号字段名称描述
1schema第一个schema字段是事件键的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件键payload部分中的内容。换句话说,第一个schema字段描述了主键的结构,如果表没有主键,则描述唯一键的结构,用于已更改的表。
可以通过设置message.key.columns连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述了由该属性标识的键的结构。
2payload第一个payload字段是事件键的一部分。它具有前一个字段描述的结构,schema并且包含已更改行的键。
3schema第二个schema字段是事件值的一部分。它指定了描述事件值payload部分内容的 Kafka Connect 模式。换句话说,第二个schema描述了被改变的行的结构。通常,此模式包含嵌套模式。
4payload第二个payload字段是事件值的一部分。它具有前一个字段描述的结构,schema并且包含已更改行的实际数据。

默认情况下,连接器将事件记录流更改为名称与事件源表相同的主题。请参阅主题名称

警告事项MySQL 连接器确保所有 Kafka Connect 模式名称都遵循Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 az、AZ 或 _。逻辑服务器名称中的每个剩余字符以及数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 az、AZ、0-9 或 _。如果存在无效字符,则将其替换为下划线字符。

如果逻辑服务器名称、数据库名称或表名称包含无效字符,并且用于区分名称的唯一字符无效并因此替换为下划线,这可能会导致意外冲突。

2.1 事件的键 Change event keys

更改事件的键包含更改表键的模式和更改行的实际键。PRIMARY KEY在连接器创建事件时,模式及其相应的有效负载都包含已更改表(或唯一约束)中每一列的字段。

请考虑下customers表,随后是此表的更改事件键的示例。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

捕获对表的更改的每个更改事件customers都具有相同的事件键架构。只要customers表具有先前的定义,捕获customers表更改的每个更改事件都具有以下关键结构。在 JSON 中,它看起来像这样:

{
 "schema": { // (1)
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key", // (2)
    "optional": false, // (3)
    "fields": [ // (4)
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { // (5)
    "id": 1001
  }
}

表 8. 更改事件键的说明

编号字段名称描述
1schema键的模式部分指定了描述键payload部分内容的 Kafka Connect 模式。
2mysql-server-1.inventory.customers.Key定义密钥有效负载结构的架构名称。此架构描述了已更改表的主键结构。键架构名称的格式为connector-name数据库名称表名Key. 在这个例子中:
- mysql-server-1是生成此事件的连接器的名称。
- inventory是包含已更改表的数据库。
- customers是更新的表。
3optionalpayload指示事件键是否必须在其字段中包含值。在此示例中,密钥的有效负载中的值是必需的。当表没有主键时,键的有效负载字段中的值是可选的。
4fields指定 中预期的payload每个字段,包括每个字段的名称、类型以及是否需要。
5payload包含为其生成此更改事件的行的键。在此示例中,键包含一个id值为 的字段1001

2.2 事件的值 Change event values

更改事件中的值比键复杂一些。与键一样,值也有一个schema部分和一个payload部分。该schema部分包含描述该部分Envelope结构的架构payload,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都有一个带有信封结构的值负载。

考虑用于显示更改事件键示例的相同示例表:

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

对此表的更改的更改事件的值部分描述为:

2.2.1 创建事件

以下示例显示了连接器为在customers表中创建数据的操作生成的更改事件的值部分:

{
  "schema": { // (1)
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", // (2)
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", // (3)
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" // (4)
  },
  "payload": { // (5)
    "op": "c", // (6)
    "ts_ms": 1465491411815, // (7)
    "before": null, // (8)
    "after": { // (9)
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { // (10)
      "version": "2.0.0.Alpha1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}

表 9.创建事件值字段的描述

编号字段名称描述
1schema值的 schema,它描述了值的有效负载的结构。在连接器为特定表生成的每个更改事件中,更改事件的值的schema都是相同的。
2name在该schema 配置中,每个name字段都为值的有效负载中的字段指定schema。
mysql-server-1.inventory.customers.Value是有效负载beforeafter字段的schema。此schema特定于customers表。

模式名称beforeafter字段的格式为logicalName*.*tableName.Value,这确保模式名称在数据库中是唯一的。这意味着当使用Avro 转换器时,每个逻辑源中每个表的生成 Avro 模式都有自己的演变和历史。
3nameio.debezium.connector.mysql.Source是有效负载source字段的架构。此模式特定于 MySQL 连接器。连接器将它用于它生成的所有事件。
4namemysql-server-1.inventory.customers.Envelope是有效负载整体结构的架构,其中mysql-server-1是连接器名称,inventory是数据库,customers是表。
5payload该值的实际数据。这是更改事件提供的信息。

看起来事件的 JSON 表示比它们描述的行大得多。这是因为 JSON 表示必须包括消息的模式和有效负载部分。但是,通过使用Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。
6op强制字符串,描述导致连接器生成事件的操作类型。在本例中,c表示操作创建了一行。有效值为:
- c= 创建
- u= 更新
- d= 删除
- r= 读取(仅适用于快照)
7ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source对象中,ts_ms指示在数据库中进行更改的时间。通过比较 forpayload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和 Debezium 之间的延迟。
8before一个可选字段,指定事件发生之前行的状态。当该op字段c用于创建时,如本例所示,该before字段是null因为此更改事件用于新内容。
9after一个可选字段,指定事件发生后行的状态。在此示例中,该after字段包含新行的idfirst_namelast_nameemail列的值。
10source描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括:

- Debezium 版本
- 连接器名称
- 记录事件的 binlog 名称
- 二进制日志位置
- 活动中的行
- 如果事件是快照的一部分
- 包含新行的数据库和表的名称
- 创建事件的 MySQL 线程的 ID(仅限非快照)
- MySQL 服务器 ID(如果可用)
- 在数据库中进行更改时的时间戳

如果启用了binlog_rows_query_log_eventsMySQL 配置选项并启用了连接器配置include.query属性,则该source字段还提供该query字段,该字段包含导致更改事件的原始 SQL 语句。
2.2.2 更新事件

示例表中更新的更改事件的值与该表的创建customers事件具有相同的模式。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。以下是连接器为表中的更新生成的事件中的更改事件值示例:customers

{
  "schema": { ... },
  "payload": {
    "before": { // (1)
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { // (2)
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { // (3)
      "version": "2.0.0.Alpha1",
      "name": "mysql-server-1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581029100,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", // (4)
    "ts_ms": 1465581029523 // (5)
  }
}

表 10.更新事件值字段的描述

编号字段名称描述
1before一个可选字段,指定事件发生之前行的状态。在更新事件值中,该before字段包含每个表列的字段以及数据库提交之前该列中的值。在本例中,first_name值为Anne.
2after一个可选字段,指定事件发生后行的状态。您可以比较beforeafter结构来确定对此行的更新是什么。在示例中,first_name值为 now Anne Marie
3source描述事件源元数据的必填字段。字段结构与创建事件中的source字段相同,但有些值不同,例如示例更新事件来自binlog中的不同位置。源元数据包括:

- Debezium 版本
- 连接器名称
- 记录事件的 binlog 名称
- 二进制日志位置
- 活动中的行
- 如果事件是快照的一部分
- 包含更新行的数据库和表的名称
- 创建事件的 MySQL 线程的 ID(仅限非快照)
- MySQL 服务器 ID(如果可用)
- 在数据库中进行更改时的时间戳

如果启用了binlog_rows_query_log_eventsMySQL 配置选项并启用了连接器配置include.query属性,则该source字段还提供该query字段,该字段包含导致更改事件的原始 SQL 语句。
4op描述操作类型的强制字符串。在更新事件值中,op字段值为u,表示该行因更新而更改。
5ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source对象中,ts_ms指示在数据库中进行更改的时间。通过比较 forpayload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和 Debezium 之间的延迟。
注意事项更新行的主键/唯一键的列会更改行键的值。当一个键发生变化时,Debezium 输出三个事件:一个DELETE事件和一个墓碑事件,该事件具有该行的旧键,然后是一个具有该行的新键的事件。详细信息在下一节中。
2.2.3 主键更新

更改行的主键字段的UPDATE操作称为主键更改。对于主键更改,代替UPDATE事件记录,连接器会发出DELETE旧键的CREATE事件记录和新(更新的)键的事件记录。这些事件具有通常的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • 事件DELETE记录具有__debezium.newkey消息头。此标头的值是更新行的新主键。

  • 事件CREATE记录具有__debezium.oldkey消息头。此标头的值是更新行所具有的先前(旧)主键。

2.2.4 删除事件

删除更改事件中的值与同一表的创建更新schema事件具有相同的部分。示例表的删除事件中的部分如下所示:payload``customers

{
  "schema": { ... },
  "payload": {
    "before": { // (1)
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, // (2)
    "source": { // (3)
      "version": "2.0.0.Alpha1",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581902300,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", // (4)
    "ts_ms": 1465581902461 // (5)
  }
}

表 11.删除事件值字段的描述

编号字段名称描述
1before可选字段,指定事件发生前行的状态。在删除事件值中,该before字段包含在使用数据库提交删除之前该行中的值。
2after可选字段,指定事件发生后行的状态。在删除事件值中,after字段为null,表示该行不再存在。
3source描述事件源元数据的必填字段。在删除事件值中,source字段结构与同一表的创建更新事件相同。许多source字段值也相同。在删除事件值中,ts_mspos字段值以及其他值可能已更改。但是删除事件值source中的字段提供了相同的元数据:

- Debezium 版本
- 连接器名称
- 记录事件的 binlog 名称
- 二进制日志位置
- 活动中的行
- 如果事件是快照的一部分
- 包含更新行的数据库和表的名称
- 创建事件的 MySQL 线程的 ID(仅限非快照)
- MySQL 服务器 ID(如果可用)
- 在数据库中进行更改时的时间戳

如果启用了binlog_rows_query_log_eventsMySQL 配置选项并启用了连接器配置include.query属性,则该source字段还提供该query字段,该字段包含导致更改事件的原始 SQL 语句。
4op描述操作类型的强制字符串。op字段值为d,表示该行已被删除。
5ts_ms显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source对象中,ts_ms指示在数据库中进行更改的时间。通过比较 forpayload.source.ts_ms的值payload.ts_ms,您可以确定源数据库更新和 Debezium 之间的延迟。

删除更改事件记录为消费者提供了处理删除该行所需的信息。包含旧值是因为某些消费者可能需要它们才能正确处理删除。

MySQL 连接器事件旨在与Kafka 日志压缩一起使用。只要至少保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这让 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

2.2.5 墓碑事件

当删除一行时,删除事件值仍然适用于日志压缩,因为 Kafka 可以删除所有具有相同键的早期消息。但是,要让 Kafka 删除具有相同键的所有消息,消息值必须是null. 为了实现这一点,在 Debezium 的 MySQL 连接器发出删除事件后,连接器会发出一个特殊的墓碑事件,该事件具有相同的键但有一个null值。

3. 数据类型映射

Debezium MySQL 连接器表示对行的更改,事件的结构类似于行所在的表。该事件包含每个列值的字段。该列的 MySQL 数据类型决定了 Debezium 如何表示事件中的值。

存储字符串的列在 MySQL 中使用字符集和排序规则定义。MySQL 连接器在读取 binlog 事件中列值的二进制表示时使用列的字符集。

连接器可以将 MySQL 数据类型映射到文字语义类型。

  • 文字类型:如何使用 Kafka Connect 模式类型表示值

  • 语义类型:Kafka Connect 模式如何捕获字段的含义(模式名称)

3.1 基本类型

下表显示了连接器如何映射基本 MySQL 数据类型。

表 12. 基本类型映射的描述

MySQL 类型文字类型语义类型
BOOLEAN, BOOLBOOLEANn/a
BIT(1)BOOLEANn/a
BIT(>1)BYTESio.debezium.data.Bits
lengthschema 参数包含一个表示位数的整数 。byte[]包含little-endian形式的位,并调整大小以包含指定数量的位。例如,n位在哪里:
numBytes = n/8 + (n%8== 0 ? 0 : 1)
TINYINTINT16n/a
SMALLINT[(M)]INT16n/a
MEDIUMINT[(M)]INT32n/a
INT, INTEGER[(M)]INT32n/a
BIGINT[(M)]INT64n/a
REAL[(M,D)]FLOAT32n/a
FLOAT[(M,D)]FLOAT64n/a
DOUBLE[(M,D)]FLOAT64n/a
CHAR(M)]STRINGn/a
VARCHAR(M)]STRINGn/a
BINARY(M)]BYTES或者STRINGn/a
原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
VARBINARY(M)]BYTES或者STRINGn/a
原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
TINYBLOBBYTES或者STRINGn/a
原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
TINYTEXTSTRINGn/a
BLOBBYTES或者STRINGn/a
原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
TEXTSTRINGn/a
仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
MEDIUMBLOBBYTES或者STRINGn/a
原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
MEDIUMTEXTSTRINGn/a
LONGBLOBBYTES或者STRINGn/a
原始字节(默认)、base64 编码的字符串或十六进制编码的字符串,取决于binary.handling.mode连接器配置属性设置。
仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
LONGTEXTSTRINGn/a
仅支持最大为 2GB 的值。建议使用声明检查模式将大列值外部化。
JSONSTRINGio.debezium.data.Json
包含JSON文档、数组或标量的字符串表示形式。
ENUMSTRINGio.debezium.data.Enum
allowedschema 参数包含逗号分隔的允许值列表 。
SETSTRINGio.debezium.data.EnumSet
allowedschema 参数包含逗号分隔的允许值列表 。
YEAR[(2|4)]INT32io.debezium.time.Year
TIMESTAMP[(M)]STRINGio.debezium.time.ZonedTimestamp
采用ISO 8601格式,精度为微秒。MySQL 允许M0-6.

3.2 时间类型

排除TIMESTAMP数据类型,MySQL 时态类型取决于time.precision.mode连接器配置属性的值。对于TIMESTAMP默认值指定为CURRENT_TIMESTAMP或的列NOW,该值1970-01-01 00:00:00用作 Kafka Connect 架构中的默认值。

MySQL 允许、 和列使用零值DATE,因为零值有时优于空值。当列定义允许空值时,MySQL 连接器将零值表示为空值,或者当列不允许空值时,将零值表示为纪元日。DATETIME``TIMESTAMP

没有时区的时间值

DATETIME类型表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,没有时区信息。此类列使用 UTC 根据列的精度转换为纪元毫秒或微秒。该TIMESTAMP类型表示没有时区信息的时间戳。MySQL 在写入时将其从服务器(或会话的)当前时区转换为 UTC,在读回值时将其从 UTC 转换为服务器(或会话的)当前时区。例如:

  • DATETIME值为. 2018-06-20 06:37:03_1529476623000

  • TIMESTAMP值为. 2018-06-20 06:37:03_2018-06-20T13:37:03Z

io.debezium.time.ZonedTimestamp根据服务器(或会话的)当前时区,此类列将转换为 UTC 中的等效项。默认从服务器查询时区。如果失败,则必须由数据库connectionTimeZoneMySQL 配置选项明确指定。例如,如果数据库的时区(全局或通过connectionTimeZone选项为连接器配置)是“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”ZonedTimestamp由值“2018-06-20T13:37:03Z”。

运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。

有关与时间值相关的属性的更多详细信息,请参见MySQL 连接器配置属性的文档。

time.precision.mode=adaptive_time_microseconds(默认)

MySQL 连接器根据列的数据类型定义确定文字类型和语义类型,以便事件准确表示数据库中的值。所有时间字段都以微秒为单位。只有在 to 范围内的正字段TIME值才能被正确捕获。00:00:00.000000``23:59:59.999999

表 13 . 映射时time.precision.mode=adaptive_time_microseconds

MySQL 类型文字类型语义类型
DATEINT32io.debezium.time.Date
表示自纪元以来的天数。
TIME[(M)]INT64io.debezium.time.MicroTime
以微秒为单位表示时间值,不包括时区信息。MySQL 允许M0-6.
DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)INT64io.debezium.time.Timestamp
表示经过纪元的毫秒数,不包括时区信息。
DATETIME(4), DATETIME(5), DATETIME(6)INT64io.debezium.time.MicroTimestamp
表示经过纪元的微秒数,不包括时区信息。

time.precision.mode=连接

MySQL 连接器使用定义的 Kafka Connect 逻辑类型。这种方法不如默认方法精确,如果数据库列的小数秒精度值大于3. 00:00:00.000只能处理to范围内的值23:59:59.999time.precision.mode=connect仅当您可以确保TIME表中的值永远不会超过支持的范围时才设置。该connect设置预计将在 Debezium 的未来版本中删除。

表 14. 映射时time.precision.mode=connect

MySQL 类型文字类型语义类型
DATEINT32org.apache.kafka.connect.data.Date
表示自纪元以来的天数。
TIME[(M)]INT64org.apache.kafka.connect.data.Time
表示自午夜以来的时间值(以微秒为单位),不包括时区信息。
DATETIME[(M)]INT64org.apache.kafka.connect.data.Timestamp
表示自纪元以来的毫秒数,不包括时区信息。

3.3 小数类型

Debezium 连接器根据decimal.handling.mode连接器配置属性的设置处理小数。

decimal.handling.mode=精确

表 15. 映射时decimal.handing.mode=precise

MySQL 类型文字类型语义类型
NUMERIC[(M[,D])]BYTESorg.apache.kafka.connect.data.Decimal
schema 参数包含一个整数,scale表示小数点移动了多少位。
DECIMAL[(M[,D])]BYTESorg.apache.kafka.connect.data.Decimal
schema 参数包含一个整数,scale表示小数点移动了多少位。

decimal.handling.mode=double

MySQL 类型文字类型语义类型
NUMERIC[(M[,D])]FLOAT64不适用
DECIMAL[(M[,D])]FLOAT64不适用

十进制处理模式=字符串

表 17. 映射时decimal.handing.mode=string

MySQL 类型文字类型语义类型
NUMERIC[(M[,D])]STRING不适用
DECIMAL[(M[,D])]STRING不适用

3.4 布尔值

MySQLBOOLEAN以特定方式在内部处理该值。该BOOLEAN列在内部映射到TINYINT(1)数据类型。在流式传输期间创建表时,它使用正确的BOOLEAN映射,因为 Debezium 接收原始 DDL。在快照期间,Debezium 执行以获取为和列SHOW CREATE TABLE返回的表定义。Debezium 然后无法获得原始类型映射,因此映射到.TINYINT(1)``BOOLEAN``TINYINT(1)``TINYINT(1)

操作员可以配置开箱即用的TinyIntOneToBooleanConverter自定义转换器TINYINT(1),它将所有列映射到,BOOLEAN或者如果selector设置了参数,则可以使用逗号分隔的正则表达式枚举列的子集。

以下是一个示例配置:

converters=boolean
boolean.type=io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
boolean.selector=db1.table1.*, db1.table2.column1

3.5空间类型

目前,Debezium MySQL 连接器支持以下空间数据类型。

表 18. 空间类型映射的描述

MySQL 类型文字类型语义类型
GEOMETRY,<br>LINESTRING,<br>POLYGON,<br>MULTIPOINT,<br>MULTILINESTRING,<br>MULTIPOLYGON,<br>GEOMETRYCOLLECTIONSTRUCTio.debezium.data.geometry.Geometry
包含具有两个字段的结构:

- srid (INT32: 定义存储在结构中的几何对象类型的空间参考系统 ID
- wkb (BYTES): 以 Well-Known-Binary (wkb) 格式编码的几何对象的二进制表示。有关详细信息,请参阅开放地理空间联盟

4.设置 MySQL

在安装和运行 Debezium 连接器之前,需要执行一些 MySQL 设置任务。

4.1创建用户

Debezium MySQL 连接器需要 MySQL 用户帐户。此 MySQL 用户必须对 Debezium MySQL 连接器捕获更改的所有数据库具有适当的权限。

先决条件

  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

程序

  1. 创建 MySQL 用户:

    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    
  2. 授予用户所需的权限:

    mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    

    下表描述了权限。

    重要提示如果是数据库不允许全局读取锁定的云服务(例如 Amazon RDS 或 Amazon Aurora),则使用表级锁定来创建一致快照。在这种情况下,您还需要向您创建的用户授予LOCK TABLES权限。有关更多详细信息,请参阅快照
  3. 最终确定用户的权限:

    mysql> FLUSH PRIVILEGES;
    

表 19. 用户权限的描述

关键词描述
SELECT使连接器能够从数据库中的表中选择行。这仅在执行快照时使用。
RELOAD允许连接器使用该FLUSH语句来清除或重新加载内部缓存、刷新表或获取锁。这仅在执行快照时使用。
SHOW DATABASESSHOW DATABASE通过发出语句使连接器能够查看数据库名称。这仅在执行快照时使用。
REPLICATION SLAVE使连接器能够连接并读取 MySQL 服务器 binlog。
REPLICATION CLIENT允许连接器使用以下语句:

- SHOW MASTER STATUS
- SHOW SLAVE STATUS
- SHOW BINARY LOGS

连接器总是需要这个。
ON标识权限适用的数据库。
TO 'user'指定要授予权限的用户。
IDENTIFIED BY 'password'指定用户的 MySQL 密码。

4.2启用二进制日志

您必须为 MySQL 复制启用二进制日志记录。二进制日志记录复制工具的事务更新以传播更改。

先决条件

  • 一个 MySQL 服务器。

  • 适当的 MySQL 用户权限。

程序

  1. 检查该log-bin选项是否已打开:

    // for MySql 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySql 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
    
  2. 如果是OFF,请使用以下属性配置您的 MySQL 服务器配置文件,如下表所述:

    server-id         = 223344
    log_bin           = mysql-bin
    binlog_format     = ROW
    binlog_row_image  = FULL
    expire_logs_days  = 10
    
  3. 通过再次检查 binlog 状态来确认您的更改:

    // for MySql 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySql 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
    

表 20. MySQL binlog 配置属性的描述

字段描述
server-id对于 MySQL 集群中的每个服务器和复制客户端,的值server-id必须是唯一的。在 MySQL 连接器设置期间,Debezium 为连接器分配一个唯一的服务器 ID。
log_bin的值log_bin是二进制日志文件序列的基本名称。
binlog_formatbinlog-format必须设置为ROWrow
binlog_row_imagebinlog_row_image必须设置为FULLfull
expire_logs_days这是自动删除 binlog 文件的天数。默认值为0,表示不自动删除。设置值以匹配您的环境需求。请参阅MySQL 清除 binlog 文件

4.3 启用 GTID

全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。尽管 Debezium MySQL 连接器不需要,但使用 GTID 可以简化复制并使您能够更轻松地确认主服务器和副本服务器是否一致。

GTID 在 MySQL 5.6.5 及更高版本中可用。有关更多详细信息,请参阅MySQL 文档

先决条件

  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

程序

  1. 启用gtid_mode

    mysql> gtid_mode=ON
    
  2. 启用enforce_gtid_consistency

    mysql> enforce_gtid_consistency=ON
    
  3. 确认更改:

    mysql> show global variables like '%GTID%';
    

结果

+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| enforce_gtid_consistency | ON    |
| gtid_mode                | ON    |
+--------------------------+-------+

表 21. GTID 选项的描述

选项描述
gtid_mode布尔值,指定是否启用 MySQL 服务器的 GTID 模式。

- ON= 启用
- OFF= 禁用
enforce_gtid_consistency布尔值,指定服务器是否通过允许执行可以以事务安全方式记录的语句来强制执行 GTID 一致性。使用 GTID 时需要。

- ON= 启用
- OFF= 禁用

4.4 配置会话超时

当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置interactive_timeout和来防止这种行为。wait_timeout

先决条件

  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

程序

  1. 配置interactive_timeout

    mysql> interactive_timeout=<duration-in-seconds>
    
  2. 配置wait_timeout

    mysql> wait_timeout=<duration-in-seconds>
    

表 22. MySQL 会话超时选项的描述

选项描述
interactive_timeout服务器在关闭交互式连接之前等待其活动的秒数。有关详细信息,请参阅MySQL 的文档。
wait_timeout服务器在关闭非交互式连接之前等待其活动的秒数。有关详细信息,请参阅MySQL 的文档。

4.5 启用查询日志事件

您可能希望查看SQL每个 binlog 事件的原始语句。在 MySQL 配置文件中启用该binlog_rows_query_log_events选项允许您执行此操作。

此选项在 MySQL 5.6 及更高版本中可用。

先决条件

  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

程序

  • 启用binlog_rows_query_log_events

    mysql> binlog_rows_query_log_events=ON
    

    binlog_rows_query_log_events设置为启用/禁用对SQL在 binlog 条目中包含原始语句的支持的值。

    • ON= 启用

    • OFF= 禁用

如果需要配置安装请参考

本文的下半部分: # Debezium connector for MySQL 配置部署

或查看英文原文
点击查看原文: Debezium connector for MySQL :: Debezium Documentation

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐