- DecodeLogicalDecodingMessageContent SMT将PostgreSQL逻辑解码消息的二进制内容转换为结构化形式。
- 当Debezium PostgreSQL连接器捕获逻辑解码消息时,它会将消息事件记录发送到Kafka。默认情况下,这些消息记录中的内容字段包含编码的二进制数据。为了方便其他Kafka消费者处理PostgreSQL事件消息,可以使用DecodeLogicalDecodingMessageContent SMT来解码原始消息的二进制内容,并将其转换为更易消费的格式。
- 还可以将SMT与其他SMT一起使用,例如Debezium Outbox Event Router。
示例
为了使Debezium PostgreSQL连接器能够解码消息事件中的二进制内容,将DecodeLogicalDecodingMessageContent SMT添加到连接器的Kafka Connect配置中,示例如下:
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
...
"transforms": "decodeLogicalDecodingMessageContent",
"transforms.decodeLogicalDecodingMessageContent.type": "io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
...
以下示例显示了在应用转换之前和之后事件记录的键和值。
示例1. 应用DecodeLogicalDecodingMessageContent SMT的效果
处理前
SMT处理记录前的事件键
{
"prefix": "test-prefix"
}
SMT处理记录前的事件值
{
"op": "m",
"ts_ms": 1723115240065,
"source": {
"version": "3.0.0-SNAPSHOT",
"connector": "postgresql",
"name": "connector-name",
"ts_ms": 1723115239782,
"snapshot": "false",
"db": "source-db",
"sequence": "[\"26997744\",\"26997904\"]",
"ts_us": 1723115239782690,
"ts_ns": 1723115239782690000,
"schema": "",
"table": "",
"txId": 756,
"lsn": 26997904,
"xmin": null
},
"message": {
"prefix": "test-prefix",
"content": "eyJpZCI6IDEsICJpdGVtIjogIkRlYmV6aXVtIGluIEFjdGlvbiIsICJzdGF0dXMiOiAiRU5URVJFRCIsICJxdWFudGl0eSI6IDIsICJ0b3RhbFByaWNlIjogMzkuOTh9"
}
}
SMT处理记录后的事件键
null
SMT处理记录后的事件值
{
"op": "c",
"ts_ms": 1723115415729,
"source": {
"version": "3.0.0-SNAPSHOT",
"connector": "postgresql",
"name": "connector-name",
"ts_ms": 1723115415640,
"snapshot": "false",
"db": "source-db",
"sequence": "[\"26717416\",\"26717576\"]",
"ts_us": 1723115415640161,
"ts_ns": 1723115415640161000,
"schema": "",
"table": "",
"txId": 745,
"lsn": 26717576,
"xmin": null
},
"after": {
"id": 1,
"item": "Debezium in Action",
"status": "ENTERED",
"quantity": 2,
"totalPrice": 39.98
}
}
在上述示例中,SMT对原始事件记录应用以下更改:
- 删除原始逻辑解码消息中包含前缀字段的键(“prefix”: “test-prefix”)。
- 将op字段的值从m(message)转换为c(create),有效地将事件类型从message更改为INSERT。
- 用包含逻辑解码消息解码内容的after字段替换消息字段。
在SMT应用这些更改后,记录可以更容易地被下游消费者或其他SMT处理,比如Debezium Outbox Event Router。
配置选项
以下表格列出了您可以在DecodeLogicalDecodingMessageContent SMT中使用的配置选项。
表1. DecodeLogicalDecodingMessageContent SMT配置选项
- fields.null.include
- 类型:boolean
- 默认值:false
- 指定解码过程如何处理源消息中具有null值的字段。默认情况下,该转换会删除具有null值的字段。