Debezium日常分享系列之:解码逻辑解码消息内容

发布于:2025-02-13 ⋅ 阅读:(11) ⋅ 点赞:(0)

Debezium日常分享系列之:解码逻辑解码消息内容

  • DecodeLogicalDecodingMessageContent SMT将PostgreSQL逻辑解码消息的二进制内容转换为结构化形式。
  • 当Debezium PostgreSQL连接器捕获逻辑解码消息时,它会将消息事件记录发送到Kafka。默认情况下,这些消息记录中的内容字段包含编码的二进制数据。为了方便其他Kafka消费者处理PostgreSQL事件消息,可以使用DecodeLogicalDecodingMessageContent SMT来解码原始消息的二进制内容,并将其转换为更易消费的格式。
  • 还可以将SMT与其他SMT一起使用,例如Debezium Outbox Event Router。

示例

为了使Debezium PostgreSQL连接器能够解码消息事件中的二进制内容,将DecodeLogicalDecodingMessageContent SMT添加到连接器的Kafka Connect配置中,示例如下:

"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
...
"transforms": "decodeLogicalDecodingMessageContent",
"transforms.decodeLogicalDecodingMessageContent.type": "io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
...

以下示例显示了在应用转换之前和之后事件记录的键和值。

示例1. 应用DecodeLogicalDecodingMessageContent SMT的效果

处理前

SMT处理记录前的事件键

{
	"prefix": "test-prefix"   
}

SMT处理记录前的事件值

{
	"op": "m",  
	"ts_ms": 1723115240065,
	"source": {
		"version": "3.0.0-SNAPSHOT",
		"connector": "postgresql",
		"name": "connector-name",
		"ts_ms": 1723115239782,
		"snapshot": "false",
		"db": "source-db",
		"sequence": "[\"26997744\",\"26997904\"]",
		"ts_us": 1723115239782690,
		"ts_ns": 1723115239782690000,
		"schema": "",
		"table": "",
		"txId": 756,
		"lsn": 26997904,
		"xmin": null
	},
	"message": {     
		"prefix": "test-prefix",
		"content": "eyJpZCI6IDEsICJpdGVtIjogIkRlYmV6aXVtIGluIEFjdGlvbiIsICJzdGF0dXMiOiAiRU5URVJFRCIsICJxdWFudGl0eSI6IDIsICJ0b3RhbFByaWNlIjogMzkuOTh9"
	}
}

SMT处理记录后的事件键

null  

SMT处理记录后的事件值

{
	"op": "c",   
	"ts_ms": 1723115415729,
	"source": {
		"version": "3.0.0-SNAPSHOT",
		"connector": "postgresql",
		"name": "connector-name",
		"ts_ms": 1723115415640,
		"snapshot": "false",
		"db": "source-db",
		"sequence": "[\"26717416\",\"26717576\"]",
		"ts_us": 1723115415640161,
		"ts_ns": 1723115415640161000,
		"schema": "",
		"table": "",
		"txId": 745,
		"lsn": 26717576,
		"xmin": null
	},
	"after": {     
		"id": 1,
		"item": "Debezium in Action",
		"status": "ENTERED",
		"quantity": 2,
		"totalPrice": 39.98
	}
}

在上述示例中,SMT对原始事件记录应用以下更改:

  • 删除原始逻辑解码消息中包含前缀字段的键(“prefix”: “test-prefix”)。
  • 将op字段的值从m(message)转换为c(create),有效地将事件类型从message更改为INSERT。
  • 用包含逻辑解码消息解码内容的after字段替换消息字段。

在SMT应用这些更改后,记录可以更容易地被下游消费者或其他SMT处理,比如Debezium Outbox Event Router。

配置选项

以下表格列出了您可以在DecodeLogicalDecodingMessageContent SMT中使用的配置选项。

表1. DecodeLogicalDecodingMessageContent SMT配置选项

  • fields.null.include
  • 类型:boolean
  • 默认值:false
  • 指定解码过程如何处理源消息中具有null值的字段。默认情况下,该转换会删除具有null值的字段。