Debezium日常分享系列之:MongoDB 新文档状态提取
- 变更事件结构
- 行为
- 配置
- 数组编码
- 嵌套结构展平
- MongoDB $unset 处理
- 确定原始操作
- 添加元数据字段
- 选择性应用转换的选项
- 配置选项
- 已知限制
Debezium MongoDB 连接器会发出数据变更消息,以表示 MongoDB 集合中发生的每个操作。这些事件消息的复杂结构忠实地反映了原始数据库事件的详细信息。然而,某些下游消费者可能无法以原始格式处理这些消息。例如,为了表示数据集合中的嵌套文档,连接器会以包含嵌套字段的格式发出事件消息。为了支持接收器连接器或其他无法处理原始消息层级格式的消费者,您可以使用 Debezium MongoDB 事件扁平化 (ExtractNewDocumentState) 单消息转换 (SMT)。SMT 简化了原始消息的结构,并可以通过其他方式修改消息,使数据更易于处理。
事件扁平化转换是一种 Kafka Connect SMT。
变更事件结构
Debezium MongoDB 连接器会生成具有复杂结构的变更事件。每条事件消息包含以下部分:
源元数据
包括但不限于以下字段:
- 更改集合中数据的操作类型(创建/插入、更新或删除)。
- 发生变更的数据库和集合的名称。
- 标识变更时间的时间戳。
- 可选的事务信息。
文档数据
before data
- 在运行 MongoDB 6.0 及更高版本的环境中,当 Debezium 连接器的 capture.mode 设置为以下值之一时,此字段会出现:
- change_streams_with_pre_image
- change_streams_update_full_with_pre_image
after data
表示当前操作后文档中存在的值的 JSON 字符串。事件消息中是否存在 after 字段取决于事件类型和连接器配置。MongoDB 插入操作的创建事件始终包含 after 字段,无论 capture.mode 的设置如何。对于更新事件,仅当 capture.mode 设置为以下值之一时,才会显示 after 字段:
- change_streams_update_full
- change_streams_update_full_with_pre_image。
注意:
变更事件消息中的后续值不一定代表事件发生后文档的状态。该值并非动态计算;相反,连接器捕获变更事件后,会查询集合以获取文档的当前值。
例如,假设多个操作 a、b 和 c 快速连续地修改了文档。当连接器处理变更 a 时,它会查询集合以获取完整文档。与此同时,发生了变更 b 和 c。当连接器收到对变更 a 的完整文档查询的响应时,它可能会收到基于变更 b 或 c 的后续变更的文档版本。
以下片段显示了连接器在 MongoDB 插入操作后发出的创建更改事件的基本结构:
{"op": "c","after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}","source": { ... }
}
上例中 after 字段的复杂格式提供了有关源数据库中发生的更改的详细信息。但是,某些使用者无法处理包含嵌套值的消息。要将原始消息的复杂嵌套字段转换为更简单、更通用的结构,请使用 MongoDB 的事件展平 SMT。SMT 会展平消息中嵌套字段的结构,如下例所示:
{"field1" : "newvalue1","field2" : "newvalue2"
}
行为
MongoDB 的事件展平 SMT 会从 Debezium MongoDB 连接器发出的创建或更新变更事件消息中提取 after 字段。SMT 处理原始变更事件消息后,会生成一个仅包含 after 字段内容的简化版本。
根据您的用例,您可以将 ExtractNewDocumentState SMT 应用于 Debezium MongoDB 连接器,或应用于接收 Debezium 连接器所生成消息的接收器连接器。如果将 SMT 应用于 Debezium MongoDB 连接器,SMT 会在连接器发出的消息发送到 Apache Kafka 之前对其进行修改。为了确保 Kafka 保留完整的 Debezium 变更事件消息的原始格式,请将 SMT 应用于接收器连接器。
当您使用事件展平 SMT 处理 MongoDB 连接器发出的消息时,SMT 会将原始消息中记录的结构转换为正确类型的 Kafka Connect 记录,以便典型的接收器连接器可以使用。例如,SMT 会将表示原始消息中后续信息的 JSON 字符串转换为任何消费者都可以处理的架构结构。
您也可以选择为 MongoDB 配置事件展平 SMT,以便在处理过程中以其他方式修改消息。更多信息,请参阅配置主题。
配置
为接收连接器配置 MongoDB 的事件扁平化 (ExtractNewDocumentState) SMT,用于消费 Debezium MongoDB 连接器发出的消息。
基本配置
要获取 SMT 的默认行为,请将 SMT 添加到接收连接器的配置中,无需指定任何选项,如下例所示:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
与任何 Kafka Connect 连接器配置一样,您可以将 transforms= 设置为多个以逗号分隔的 SMT 别名。Kafka Connect 会按照您指定的转换的列出顺序应用它们。
您可以为使用 MongoDB 事件展平 SMT 的连接器设置多个选项。以下示例展示了为连接器设置 delete.tombstone.handling.mode 和 add.headers 选项的配置:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.delete.tombstone.handling.mode=drop
transforms.unwrap.add.headers=op
数组编码
默认情况下,事件展平 SMT 会将 MongoDB 数组转换为与 Apache Kafka Connect 或 Apache Avro 模式兼容的数组。虽然 MongoDB 数组可以包含多种类型的元素,但 Kafka 数组中的所有元素必须属于同一类型。
为了确保 SMT 以符合您环境需求的方式对数组进行编码,您可以指定 array.encoding 配置选项。以下示例展示了设置数组编码的配置:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>
根据配置,SMT 会使用以下编码方法之一处理源消息中每个数组实例:
数组编码
- 如果将 array.encoding 设置为 array(默认值),SMT 编码会使用数组数据类型对原始消息中的数组进行编码。为确保正确处理,数组实例中的所有元素必须属于同一类型。此选项有一定的限制,但它使下游客户端能够轻松处理数组。
文档编码
- 如果将 array.encoding 设置为 document,SMT 会将源消息中的每个数组转换为结构体,其方式类似于 BSON 序列化。主结构体包含名为 _0、_1、_2 等的字段,其中每个字段名称代表原始数组中元素的索引。SMT 会用其在源数组中检索到的等效元素的值填充每个索引字段。索引名称以下划线为前缀,因为 Avro 编码禁止字段名称以数字字符开头。
以下示例显示了 Debezium MongoDB 连接器如何表示包含异构数据类型的数组的数据库文档:
例 1. 示例:包含多种数据类型的数组的文档编码
{"_id": 1,"a1": [{"a": 1,"b": "none"},{"a": "c","d": "something"}]
}
如果array.encoding设置为document,SMT会将上述文档转换为以下格式:
{"_id": 1,"a1": {"_0": {"a": 1,"b": "none"},"_1": {"a": "c","d": "something"}}
}
文档编码选项使 SMT 能够处理由异构元素组成的任意数组。但是,在使用此选项之前,请务必验证接收器连接器和其他下游消费者是否能够处理包含多种数据类型的数组。
嵌套结构展平
当数据库操作涉及嵌入式文档时,Debezium MongoDB 连接器会发出 Kafka 事件记录,该记录的结构反映了原始文档的层级结构。也就是说,事件消息将嵌套文档表示为一组嵌套字段结构。在下游连接器无法处理包含嵌套结构的消息的环境中,您可以配置事件展平 SMT,以展平消息中的层级结构。扁平消息结构更适合表式存储。
要配置 SMT 展平嵌套结构,请将 flatten.struct 配置选项设置为 true。在转换后的消息中,字段名称的构造与文档源一致。SMT 通过将父文档字段的名称与嵌套文档字段的名称连接起来,重命名每个展平字段。flatten.struct.delimiter 选项定义的分隔符用于分隔名称的各个组成部分。struct.delimiter 的默认值为下划线 (_)。
以下示例显示了指定 SMT 是否展平嵌套结构的配置:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>
以下示例展示了 MongoDB 连接器发出的事件消息。该消息包含文档 a 的字段,该文档包含两个嵌套文档 b 和 c 的字段:
{"_id": 1,"a": {"b": 1,"c": "none"},"d": 100
}
以下示例中的消息显示了 MongoDB 的 SMT 将上一条消息中的嵌套结构展平后的输出:
{"_id": 1,"a_b": 1,"a_c": "none","d": 100
}
在生成的消息中,原始消息中嵌套的字段 b 和 c 被展平并重命名。重命名的字段由父文档 a 的名称与嵌套文档的名称 a_b 和 a_c 连接而成。新字段名称的各个部分由下划线分隔,具体定义在 struct.delimiter 配置属性的设置中。
MongoDB $unset 处理
在 MongoDB 中,$unset 运算符和 $rename 运算符均会从文档中删除字段。由于 MongoDB 集合是无模式的,因此在更新操作从文档中删除字段后,无法从更新后的文档中推断出缺失字段的名称。为了支持接收器连接器或其他可能需要已删除字段信息的消费者,Debezium 会发出包含 removedFields 元素的更新消息,该元素列出了已删除字段的名称。
以下示例展示了导致字段 a 被删除的操作的更新消息的一部分:
"payload": {"op": "u","ts_ms": "...","ts_us" : "...","ts_ns" : "...","before": "{ ... }","after": "{ ... }","updateDescription": {"removedFields": ["a"],"updatedFields": null,"truncatedArrays": null}
}
在上面的示例中,before 和 after 分别表示源文档更新前后的状态。只有当连接器的 capture.mode 设置为以下列表所述时,这些字段才会出现在连接器发出的事件消息中:
before 字段
提供文档更改前的状态。只有当 capture.mode 设置为以下值之一时,此字段才会出现:
- change_streams_with_pre_image
- change_streams_update_full_with_pre_image。
after 字段
提供文档更改后的完整状态。只有当 capture.mode 设置为以下值之一时,此字段才会出现:
- change_streams_update_full
- change_streams_update_full_with_pre_image。
假设一个连接器配置为捕获完整文档,当 ExtractNewDocumentState SMT 收到 $unset 事件的更新消息时,SMT 会通过表示已删除的字段具有空值来重新编码该消息,如下例所示:
{"id": 1,"a": null
}
对于未配置为捕获完整文档的连接器,当 SMT 收到 $unset 操作的更新事件时,它会生成以下输出消息:
{"a": null
}
确定原始操作
SMT 展平事件消息后,生成的消息将不再指示生成该事件的操作类型是创建、更新还是初始快照读取。通常,您可以通过配置连接器以公开伴随删除的逻辑删除或重写事件的信息来识别删除操作。有关配置连接器以在事件消息中公开逻辑删除和重写信息的更多信息,请参阅 delete.tombstone.handling.mode 属性。
要在事件消息中报告数据库操作的类型,SMT 可以将 op 字段添加到以下元素之一:
- 事件消息正文。
- 消息头。
例如,要添加显示原始操作类型的头属性,请添加转换,然后将 add.headers 属性添加到连接器配置中,如下例所示:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op
基于上述配置,SMT 会在消息中添加 op 头,并为其分配一个字符串值来标识操作类型,从而报告事件类型。分配的字符串值基于原始 MongoDB 变更事件消息中的 op 字段值。
添加元数据字段
MongoDB 的事件展平 SMT 可以将原始变更事件消息中的元数据字段添加到简化消息中。添加的元数据字段以双下划线(“__”)为前缀。向事件记录添加元数据可以包含诸如发生变更事件的集合名称之类的内容,也可以包含特定于连接器的字段,例如副本集名称。目前,SMT 仅支持从以下变更事件子结构中添加字段:source、transaction 和 updateDescription。
例如,您可以指定以下配置,将变更事件的副本集名称 (rs) 和集合名称添加到最终的展平事件记录中:
transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection
上述配置会导致以下内容被添加到扁平记录中:
{ "__rs" : "rs0", "__collection" : "my-collection", ... }
如果希望 SMT 为删除事件添加元数据字段,请将 delete.tombstone.handling.mode 选项的值设置为 rewrite。
选择性应用转换的选项
除了数据库更改发生时 Debezium 连接器发出的更改事件消息外,该连接器还会发出其他类型的消息,包括心跳消息以及有关架构更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性应用 SMT,以便它仅处理预期的数据更改消息。
配置选项
下表描述了 MongoDB 事件展平 SMT 的配置选项。
属性 | 默认值 | 描述 |
---|---|---|
array.encoding | array | 指定 SMT 对从原始事件消息中读取的数组进行编码时使用的格式。请设置以下选项之一:array SMT 使用数组数据类型将 MongoDB 数组编码为与 Apache Kafka Connect 或 Apache Avro 模式兼容的格式。如果设置此选项,请验证每个数组实例中的元素是否属于同一类型。虽然 MongoDB 允许数组包含多种数据类型,但某些下游客户端无法处理数组。document SMT 将每个 MongoDB 数组转换为结构体的结构体,其方式类似于 BSON 序列化。主结构体包含名为 _0、_1、_2 等的字段。为了符合 Avro 命名标准,SMT 在每个索引字段的数字名称前添加下划线。每个数字字段名称代表原始数组中元素的索引。 SMT 用从源文档中检索到的指定数组元素的值填充每个索引字段。 |
flatten.struct | false | SMT 通过连接消息中嵌套属性的名称(以可配置的分隔符分隔)来展平原始事件消息中的结构(struct),以形成一个简单的字段名称。 |
flatten.struct.delimiter | 当 flatten.struct 设置为 true 时,指定转换在从输入记录连接的字段名称之间插入的分隔符,以在输出记录中生成字段名称。 | |
delete.tombstone.handling.mode | tombstone | Debezium 会为每个 DELETE 操作生成一条变更事件记录。此设置决定 MongoDB 事件展平 SMT 如何处理流中的 DELETE 事件。请设置以下选项之一:drop SMT 会从流中移除 DELETE 事件和“TOMBSTONE”记录。tombstone(默认)SMT 会在流中保留TOMBSTONE 记录。TOMBSTONE 记录仅包含以下值:“value”: “null”。rewrite SMT 会在流中保留变更事件记录并进行以下更改:向记录添加一个 value 字段,该字段包含原始记录 before 字段中的键/值对。向记录的值添加 __deleted: true。删除 TOMBSTONE 记录。此设置提供了另一种指示记录已被删除的方式。rewrite-with-tombstoneSMT 的行为与选择重写选项时的行为相同,只是它还会保留 TOMBSTONE 记录。 |
delete.tombstone.handling.mode.rewrite-with-id | false | 当设置为 true 并且 delete.tombstone.handling.mode 被重写时,从键中复制 id 字段并将其作为 _id 包含在删除事件的有效负载中。 |
add.headers.prefix | __ (double-underscore) | 设置此可选字符串作为标题的前缀。 |
add.headers | No default | 指定您希望 SMT 添加到简化消息头的元数据字段列表(不带空格),这些字段以逗号分隔。当原始消息包含重复的字段名称时,您可以通过提供结构体名称和字段名称来标识要修改的特定字段,例如 source.ts_ms。您也可以选择覆盖字段的原始名称,并通过在列表中添加以下格式的条目来为其分配新名称:<field_name>:<new_field_name>。例如:version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP 您指定的新名称值区分大小写。当 SMT 将元数据字段添加到简化消息的标头时,它会在每个元数据字段名称前添加双下划线。对于结构体规范,SMT 还会在结构体名称和字段名称之间插入下划线。如果您指定的字段不在变更事件原始消息中,SMT 不会将该字段添加到标头。 |
add.fields.prefix | __ (double-underscore) | 指定作为字段名称前缀的可选字符串。 |
add.fields | No default | 将此选项设置为以逗号分隔的列表,其中包含要添加到简化 Kafka 消息的 value 元素的元数据字段,不带空格。当原始消息包含重复的字段名称时,您可以通过提供结构体名称和字段名称来标识要修改的特定字段,例如 source.ts_ms。您也可以选择覆盖字段的原始名称,并通过向列表中添加以下格式的条目来为其分配新名称:<field_name>:<new_field_name>。例如:version:VERSION, connector:CONNECTOR,source.ts_ms:EVENT_TIMESTAMP 您指定的新名称值区分大小写。当 SMT 将元数据字段添加到简化消息的 value 元素时,它会在每个元数据字段名称前添加双下划线。对于结构体规范,SMT 还会在结构体名称和字段名称之间插入下划线。如果您指定的字段在原始变更事件消息中不存在,SMT 仍会将指定的字段添加到修改后消息的 value 元素中。 |
已知限制
由于 MongoDB 是无模式数据库,为了确保在使用 Debezium 将更改流式传输到基于模式的数据关系数据库时列定义一致,集合中同名字段必须存储相同类型的数据。
请配置 SMT,使其生成与接收器连接器兼容格式的消息。如果接收器连接器需要“扁平”消息结构,但它收到一条将源 MongoDB 文档中的数组编码为结构体结构的消息,则接收器连接器无法处理该消息。