Debezium日常分享系列之:Debezium3.1版本之增量快照
按需快照
默认情况下,连接器仅在首次启动后执行初始快照操作。在此初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来的更改事件数据都仅通过流处理过程进入。
然而,在某些情况下,连接器在初始快照期间获得的数据可能会变得过时、丢失或不完整。为了提供一种重新捕获表数据的机制,Debezium 包含了一个执行按需快照的选项。你可能希望在以下任何更改发生在你的 Debezium 环境中后执行按需快照:
- 连接器配置被修改以捕获不同的表集。
- Kafka 主题被删除且必须重建。
- 由于配置错误或其他问题导致数据损坏。
你可以通过发起所谓的按需快照,重新运行之前已捕获快照的表的快照。按需快照需要使用 。你通过向 Debezium 信号表发送信号请求来启动按需快照。
当你对现有表启动按需快照时,连接器会将内容附加到该表已存在的主题上。如果先前存在的主题已被移除,当 启用时,Debezium 可以自动创建一个主题。
按需快照信号指定了要包含在快照中的表。快照可以捕获数据库的全部内容,或者只捕获数据库中的一组表。此外,快照还可以捕获数据库中表(们)的一部分内容。
你通过向信号表发送执行快照消息来指定要捕获的表。设置执行快照信号的类型为增量或阻塞,并提供要包含在快照中的表的名称,如下表所述:
表 2. 临时执行快照信号记录的示例
字段 | 默认值 | 描述 |
---|---|---|
type | incremental | 指定要运行的快照类型。目前,您可以请求增量快照或阻塞快照。 |
data-collections | N/A | 包含正则表达式数组,匹配要包含在快照中的表的全限定名称。<br>对于 SQL Server 连接器,使用以下格式指定表的全限定名称:database.schema.table |
additional-conditions | N/A | 一个可选数组,指定连接器评估以确定要包含在快照中的记录子集的一组附加条件。<br>每个附加条件是一个对象,指定用于过滤临时快照捕获的数据的标准。您可以为每个附加条件设置以下参数:data-collection:应用过滤器的表的完全限定名称。您可以为每个表应用不同的过滤器。filter:指定数据库记录中必须存在的列值,以便快照包含该记录,例如,“color=‘blue’”。您分配给过滤器参数的值与在设置阻塞快照的 snapshot.select.statement.overrides 属性时,在 SELECT 语句的 WHERE 子句中可能指定的值类型相同。 |
surrogate-key | N/A | 一个可选字符串,指定连接器在快照过程中用作表主键的列名。 |
触发一次临时增量快照
您可以通过在信号表中添加一个带有 execute-snapshot 信号类型的条目,或通过向 Kafka 信号主题发送信号消息来启动一次临时增量快照。连接器处理该消息后,将开始快照操作。快照过程会读取每个表的第一个和最后一个主键值,并使用这些值作为每个表的起始点和结束点。根据表中的条目数量和配置的块大小,Debezium 将表分成多个块,并依次逐个进行快照。
触发临时阻塞快照
您可以通过在信号表或信号主题中添加带有 execute-snapshot
信号类型的消息来启动临时阻塞快照。连接器处理消息后,将开始快照操作。连接器会暂时停止流式传输,然后对指定的表进行快照,其过程与初始快照相同。快照完成后,连接器将恢复流式传输。
增量快照
SQL Server 排序规则
每个 SQL Server 服务器或数据库都配置为使用特定的排序规则,这决定了字符数据如何存储、排序、比较和显示。某些排序规则集(如SQL Server 排序规则 (SQL_*))的排序规则与 Unicode 排序算法不兼容。在某些情况下,不兼容的排序规则可能会导致连接器在执行即时快照时丢失数据。例如,如果 SQL Server 配置为以 Unicode 格式发送字符串(即连接属性 sendStringParametersAsUnicode 设置为 true),连接器在快照期间可能会跳过记录。为了防止在即时快照期间丢失数据,可以将连接字符串属性 driver.sendStringParametersAsUnicode 的值设置为 false。
为了提供管理快照的灵活性,Debezium 包含了一种补充的快照机制,称为增量快照。增量快照依赖于 Debezium 的向 Debezium 连接器发送信号的机制。增量快照基于 DDD-3 设计文档。
在增量快照中,与初始快照一次性捕获数据库的完整状态不同,Debezium 会分阶段、以一系列可配置的块来捕获每个表。你可以指定希望快照捕获的表以及每块的大小。块大小决定了每次从数据库获取数据时快照收集的行数。增量快照的默认块大小为 1024 行。
随着增量快照的进行,Debezium 使用水印来跟踪其进度,记录它捕获的每一行表数据。这种分阶段的数据捕获方法相比标准的初始快照过程具有以下优势:
- 您可以在流式数据捕获的同时并行运行增量快照,而无需等到快照完成后再开始流式捕获。连接器在整个快照过程中继续从变更日志中捕获近乎实时的事件,两项操作互不干扰。
- 如果增量快照的进度被中断,您可以从中断的地方恢复,而不会丢失任何数据。恢复后,快照将从停止的地方开始,而不是重新从头开始捕获整个表。
- 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可能在修改连接器配置以添加一个表到其属性后重新运行快照。
增量快照过程
当你运行一个增量快照时,Debezium会根据主键对每个表进行排序,然后根据配置的块大小将表分成若干块。按块处理,它会捕获每块中的每一行记录。对于捕获的每一行,快照会发出一个READ事件。该事件表示在快照开始时该行的值。
随着快照的进行,其他进程可能会继续访问数据库,从而可能修改表记录。为了反映这些变化,INSERT、UPDATE或DELETE操作会像往常一样提交到事务日志。同样,持续的Debezium流处理过程会继续检测这些变更事件,并将相应的变更事件记录发送到Kafka。
如何 Debezium 解决具有相同主键的记录之间的冲突
在某些情况下,流处理过程中发出的 UPDATE 或 DELETE 事件会乱序接收。也就是说,流处理过程可能会在一个包含该行的 READ 事件的快照捕获之前,先发出一个修改表行的事件。当快照最终发出该行对应的 READ 事件时,其值已经被覆盖。为了确保乱序到达的增量快照事件能够按正确的逻辑顺序处理,Debezium 采用了缓冲方案来解决冲突。只有在解决了快照事件和流事件之间的冲突后,Debezium 才会将事件记录发送到 Kafka。
快照窗口
为了协助解决晚到的读取(READ)事件与修改同一表行的流式事件之间的冲突,Debezium 采用了所谓的快照窗口。快照窗口定义了增量快照捕获指定表分块数据的时间间隔。在某个分块的快照窗口开启之前,Debezium 按照其常规行为,直接将事务日志中的事件发送到目标 Kafka 主题。但从该特定分块的快照开启时刻起,直到它关闭为止,Debezium 会执行一个去重步骤,以解决具有相同主键的事件之间的冲突。
对于每个数据集合,Debezium 发出两种类型的事件,并将这些事件的记录存储在一个目标 Kafka 主题中。从表中直接捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,每次提交都会反映在事务日志中,Debezium 为每次更改发出 UPDATE 或 DELETE 操作。
当快照窗口打开,Debezium 开始处理快照分块时,它会将快照记录交付到内存缓冲区。在快照窗口期间,缓冲区中的 READ 事件的主键会被与传入的流式事件的主键进行比较。如果没有找到匹配项,流式事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件逻辑上覆盖了静态快照事件。在该分块的快照窗口关闭后,缓冲区中仅剩下没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。
连接器对每个快照分块重复这一过程。
注意:
要使 Debezium 能够执行增量快照,您必须授予连接器写入信号表的权限。
只有可以配置为只读增量快照的连接器(MariaDB, MySQL, or PostgreSQL )才不需要写入权限。目前,您可以使用以下任一方法来启动增量快照:
- 向源数据库上的信令表发送临时快照信号。
- 向配置的 Kafka 信令主题发送消息。
SQL Server 的 Debezium 连接器在增量快照运行期间不支持模式更改。
触发增量快照
要启动增量快照,您可以向源数据库的信号表发送一个即时快照信号。您可以通过SQL INSERT查询提交快照信号。
当Debezium检测到信号表中的变化后,它会读取该信号,并执行请求的快照操作。
您提交的查询指定了要包含在快照中的表,并且可以选择性地指定快照操作的类型。目前,Debezium支持增量和阻塞两种类型的快照。
要指定要包含在快照中的表,请提供一个列出这些表的数据集合数组,或者用于匹配表的正则表达式数组,例如:
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
增量快照信号的数据集合数组没有默认值。如果数据集合数组为空,Debezium会将其解释为无需执行任何操作,因此不会进行快照。
注意:
如果您要包含在快照中的表名包含点(.)、空格或某些其他非字母数字字符,您必须用双引号将表名转义。
例如,要在db1数据库的public模式中包含名为My.Table的表,使用以下格式:“db1.public.“My.Table””。
先决条件
- 已启用信号。
- 源数据库上存在信号数据集合。
- 信号数据集合在 signal.data.collection 属性中指定。
使用源信令通道触发增量快照
发送 SQL 查询,将临时增量快照请求添加到信令表中:
INSERT INTO <signalTable> (id, type, data)
VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
例如:
INSERT INTO db1.myschema.debezium_signal (id, type, data)
VALUES ('ad-hoc-1',
'execute-snapshot',
'{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"],
"type":"incremental",
"additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=''blue''"}]}');
命令中的 id
、type
和 data
参数的值对应于 信令表的字段。
下表描述了示例中的参数:
表 3. 发送增量快照信号到信令表的 SQL 命令中的字段说明
Item | Value | Description |
---|---|---|
1 | database.schema.debezium_signal | 指定源数据库上信令表的完全限定名称。 |
2 | ad-hoc-1 | id 参数指定一个任意字符串,该字符串被指定为信号请求的 id 标识符。使用此字符串将日志消息标识到信令表中的条目。Debezium 不使用此字符串。相反,在快照期间,Debezium 会生成自己的 id 字符串作为水印信号。 |
3 | execute-snapshot | 类型参数指定信号想要触发的操作。 |
4 | data-collections | 信号数据字段的必需组件,用于指定表名或正则表达式的数组,以匹配要包含在快照中的表名。该数组列出了使用 database.schema.table 格式的正则表达式,以匹配表的完全限定名称。此格式与您用于指定连接器信号表名称的格式相同。 |
5 | incremental | 信号数据字段的可选类型组件,用于指定要运行的快照操作的类型。有效值为增量和阻塞。如果您未指定值,则连接器默认执行增量快照。 |
6 | additional-conditions | 可选数组,用于指定一组附加条件,连接器将评估这些条件以确定要包含在快照中的记录子集。每个附加条件都是一个具有数据收集和过滤器属性的对象。您可以为每个数据收集指定不同的过滤器。* 数据收集属性是过滤器适用的数据收集的完全限定名称。 |
使用附加条件运行临时增量快照
如果您希望快照仅包含表中内容的子集,则可以通过将附加条件参数附加到快照信号来修改信号请求。
典型快照的 SQL 查询采用以下形式:
SELECT * FROM <tableName> ....
通过添加附加条件参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:
SELECT * FROM <data-collection> WHERE <filter> ....
以下示例显示了一个 SQL 查询,用于将带有附加条件的临时增量快照请求发送到信令表:
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
例如,假设您有一个包含以下列的产品表:
- id(主键)
- color
- quantity
如果希望产品表的增量快照仅包含 color=blue 的数据项,则可以使用以下 SQL 语句来触发快照:
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
附加条件参数还允许您传递基于多个列的条件。例如,使用上例中的产品表,您可以提交一个查询来触发增量快照,该快照仅包含颜色=蓝色且数量>10 的商品的数据:
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
以下示例显示了连接器捕获的增量快照事件的 JSON。
示例 1. 增量快照事件消息
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" 1
},
"op":"r", 2
"ts_ms":"1620393591654",
"ts_us":"1620393591654547",
"ts_ns":"1620393591654547920",
"transaction":null
}
表 4. 增量快照事件消息中字段的描述
Item | Field name | Description |
---|---|---|
1 | snapshot | 指定要运行的快照操作的类型。目前,唯一有效的选项是阻止和增量。在提交给信令表的 SQL 查询中指定类型值是可选的。如果您未指定值,则连接器将运行增量快照。 |
2 | op | 指定事件类型。快照事件的值为 r,表示读取操作。 |
使用 Kafka 信号通道触发增量快照
您可以向配置的 Kafka 主题发送消息,请求连接器运行临时增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个带有类型和数据字段的 JSON 对象。
信号类型为 execute-snapshot,数据字段必须具有以下字段:
表 5. 执行快照数据字段
Field | Default | Value |
---|---|---|
type | incremental | 要执行的快照类型。目前 Debezium 支持增量和阻塞类型。 |
data-collections | N/A | 与要包含在快照中的表的完全限定名称匹配的逗号分隔正则表达式数组。使用与 signal.data.collection 配置选项所需的相同格式指定名称。 |
additional-conditions | N/A | 可选的附加条件数组,用于指定连接器评估的条件,以指定要包含在快照中的记录子集。每个附加条件都是一个对象,用于指定用于筛选临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数:data-collection:: 过滤器适用的表的完全限定名称。您可以对每个表应用不同的过滤器。filter:: 指定数据库记录中必须存在的列值,快照才能将其包含,例如“color=‘blue’”。您分配给 filter 参数的值与您在为阻塞快照设置 snapper.select.statement.overrides 属性时在 SELECT 语句的 WHERE 子句中指定的值类型相同。 |
示例 2. 执行快照 Kafka 消息
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
临时增量快照与附加条件
Debezium 使用 additional-conditions
字段来选择表内容的子集。
通常,当 Debezium 运行快照时,它会执行类似以下的 SQL 查询:
SELECT * FROM <tableName> ...
当快照请求包含 additional-conditions
属性时,该属性的数据收集和过滤参数将被附加到 SQL 查询中,例如:
SELECT * FROM <data-collection> WHERE <filter> ...
例如,假设有一个 products
表,包含 id
(主键)、color
和 brand
列,如果希望快照只包含 color='blue'
的内容,在请求快照时,可以添加 additional-conditions
属性来过滤内容:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
您还可以使用 additional-conditions 属性传递基于多列的条件。例如,使用与上例相同的产品表,如果您希望快照仅包含 color=‘blue’ 和 brand=‘MyBrand’ 的产品表中的内容,则可以发送以下请求:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
停止增量快照
在某些情况下,可能需要停止增量快照。例如,你可能会发现快照配置不正确,或者你希望确保资源可用于其他数据库操作。你可以通过向源数据库的信号表发送信号来停止正在进行的快照。
你可以通过发送SQL INSERT
查询将停止快照的信号提交到信号表。停止快照的信号指定了快照操作的类型为增量,并且可以选择性地指定你想从当前正在运行的快照中排除的表。当 Debezium 检测到信号表中的变化后,它会读取该信号,并在快照操作进行时停止增量快照。
附加资源
还可以通过向发送 JSON 消息来停止增量快照。
先决条件:
- 源数据库上存在一个信令数据集合。
- 信令数据集合在属性中指定。
使用源信令通道停止增量快照
向信令表发送 SQL 查询以停止临时增量快照:
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');
例如,
INSERT INTO db1.myschema.debezium_signal (id, type, data)
values ('ad-hoc-1',
'stop-snapshot',
'{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"],
"type":"incremental"}');
signal命令中的id、type、data参数的值与信令表的字段对应。
示例中的参数说明如下表:
表 6. 向信令表发送停止增量快照信号的 SQL 命令中字段说明
Item | Value | Description |
---|---|---|
1 | database.schema.debezium_signal | 指定源数据库上信令表的完全限定名称。 |
2 | ad-hoc-1 | id 参数指定一个任意字符串,该字符串被指定为信号请求的 id 标识符。使用此字符串将日志消息标识到信令表中的条目。Debezium 不使用此字符串。 |
3 | stop-snapshot | 指定类型参数指定信号想要触发的操作。 |
4 | data-collections | 信号数据字段的可选组件,用于指定要从快照中删除的表名或正则表达式的数组。该数组列出了与表的完全限定名称匹配的正则表达式,格式为 database.schema.table如果从数据字段中省略此组件,信号将停止正在进行的整个增量快照。 |
5 | incremental | 信号数据字段的必需组件,用于指定要停止的快照操作的类型。目前,唯一有效的选项是增量。如果您未指定类型值,则信号无法停止增量快照。 |
使用 Kafka 信令通道停止增量快照
您可以向配置的 Kafka 信令主题发送信号消息以停止临时增量快照。
Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。
消息的值是一个具有类型和数据字段的 JSON 对象。
信号类型为 stop-snapshot,数据字段必须具有以下字段:
表 7. 执行快照数据字段
Field | Default | Value |
---|---|---|
type | incremental | 要执行的快照类型。目前 Debezium 仅支持增量类型。 |
data-collections | N/A | 一个可选的逗号分隔正则表达式数组,用于匹配要从快照中删除的表的完全限定名称,该数组包含表名或正则表达式,用于匹配要从快照中删除的表名。使用 database.schema.table 格式指定表名。 |
以下示例显示了典型的停止快照 Kafka 消息:
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`