ABP vNext + Debezium CDC:从事务库到事件流的最小侵入落地

发布于:2025-08-18 ⋅ 阅读:(18) ⋅ 点赞:(0)

ABP vNext + Debezium CDC:从事务库到事件流的最小侵入落地 🧩



TL;DR 🧠

  • 零侵入业务代码:仅开启数据库逻辑复制(Postgres wal_level=logical 等),Debezium 从变更日志捕获行级变更,天然与事务原子
  • 至少一次语义:消费者需幂等(Inbox + Upsert),并采用手动提交 offset——业务成功后再提交
  • 安全最小化:PG15 publication 行/列过滤从源头裁剪;连接器端再配 表/列白名单 + SMT 脱敏 🔒
  • 演进友好:通过信号表触发增量快照补历史;使用 ExtractNewRecordState 扁平化事件,并用新版 delete.tombstone.handling.mode 处理删除
  • 观测:Connect/Debezium 暴露 JMX 指标;可加 heartbeat 主题检测延迟,Prometheus 转发可选 📈

1) 为什么选 CDC(而非直接写事件) ❓

  • 最小侵入:不改老系统/第三方库代码即可获得行级变更流
  • 补历史不中断:首次 initial snapshot + 运行期 incremental snapshot(信号触发)
  • 边界:交付语义为至少一次;大事务可能造成捕获延迟,需监控与调优 ⚠️

2) 目标架构与数据流(总览图) 🗺️

Sinks
Consumer
Stream
CDC Layer
Source DB
WAL/Logical Replication
topics: per-table
heartbeats
读模型/缓存/搜索/报表
审计/回放工具
ABP CDC-Consumer
Inbox 幂等 + 多租户
Kafka/Redpanda
_debezium-heartbeat*
Debezium
(Kafka Connect / Debezium Server)
PostgreSQL

要点

  • 每张表通常对应一个主题;可使用 RegexRouter 统一命名前缀
  • 心跳主题被消费者侧订阅/观测,用于活性与端到端延迟检测

3) 最小侵入准备(PostgreSQL) 🛠️

3.1 数据库参数(postgresql.conf

  • wal_level=logical
  • max_wal_senders>=4max_replication_slots>=4(按规模调)
  • 复制用户:仅授予必要库/表与复制权限(避免 superuser)🔒

3.2 PG15 publication 行/列过滤(强烈推荐)

-- 仅发布 orders 的部分列,并按租户过滤
CREATE PUBLICATION cdc_pub FOR TABLE
  public.orders (id, tenant_id, email, amount) WHERE (tenant_id IN ('A','B')),
  public.order_items;

要点与限制

  • 语法使用 WHERE (...) 括号
  • 行过滤在服务器端生效(降低出库与网络开销)
  • 过滤表达式需可稳定评估(避免不确定函数)
  • UPDATE/DELETE 所需的标识列需在发布列中(通常主键即可)

3.3 信号表(无默认表名 → 自建并配置) 🔔

Debezium 没有默认信号表名;本文示例使用 public.debezium_signals。你可以自定义,但必须在连接器里用 signal.data.collection完整限定名,且把信号表加入 publication(特别是在 publication.autocreate.mode=filtered/disabled 时)。

CREATE TABLE IF NOT EXISTS public.debezium_signals(
  id   varchar(64) primary key,
  type varchar(32) not null,
  data varchar(2048)
);

ALTER PUBLICATION cdc_pub ADD TABLE public.debezium_signals;

4) Compose 一键起(PostgreSQL + Redpanda + Kafka Connect + Console) ⚙️

⚠️ Redpanda --mode dev-container/--overprovisioned 仅用于本地开发/CI;生产请按官方指南规划 CPU/内存/磁盘与副本

docker-compose.yml

version: "3.8"
services:
  postgres:
    image: debezium/postgres:16
    environment:
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=inventory
    ports: ["5432:5432"]

  redpanda:
    image: redpandadata/redpanda:latest
    command: >
      redpanda start --mode dev-container --smp 1 --memory 1G
      --overprovisioned --check=false --node-id 0
    ports: ["9092:9092","8081:8081"] # 8081: Schema Registry(若启用 Avro/Protobuf)

  connect:
    image: debezium/connect:2.6
    environment:
      - BOOTSTRAP_SERVERS=redpanda:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=_connect_configs
      - OFFSET_STORAGE_TOPIC=_connect_offsets
      - STATUS_STORAGE_TOPIC=_connect_status
      # JSON(schemaless)方案 ↓
      - KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - KEY_CONVERTER_SCHEMAS_ENABLE=false
      - VALUE_CONVERTER_SCHEMAS_ENABLE=false
      # 若选 Avro,请改为(并启用 Schema Registry 8081):
      # - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      # - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
      # - KEY_CONVERTER_SCHEMAS_ENABLE=true
      # - VALUE_CONVERTER_SCHEMAS_ENABLE=true
      # - KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://redpanda:8081
      # - VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://redpanda:8081
      - JMX_PORT=9010
    ports: ["8083:8083","9010:9010"]
    depends_on: [redpanda]

  console:
    image: redpandadata/console:latest
    volumes:
      - ./console.yaml:/etc/console/config.yaml:ro
    command: ["--config.filepath=/etc/console/config.yaml"]
    ports: ["8080:8080"]
    depends_on: [redpanda, connect]

console.yaml

kafka:
  brokers:
    - redpanda:9092

schemaRegistry:
  enabled: true
  urls:
    - http://redpanda:8081   # 启用 Avro/Protobuf 时打开;纯 JSON 可忽略

connect:
  clusters:
    - name: local
      url: http://connect:8083

5) 初始化表与样例数据 🧪

CREATE TABLE public.orders(
  id bigserial PRIMARY KEY,
  tenant_id text not null,
  email text,
  phone text,
  amount numeric(12,2) not null,
  updated_at timestamptz default now()
);

CREATE TABLE public.order_items(
  id bigserial PRIMARY KEY,
  order_id bigint not null references public.orders(id),
  sku text, qty int not null, price numeric(12,2) not null
);

INSERT INTO public.orders(tenant_id,email,phone,amount)
VALUES ('A','a@x.com','13800000001',99.9),('B','b@x.com','13800000002',120.5);

6) 注册 Source Connector(白名单 + 脱敏 + 解包 + 路由 + 信号 + 心跳) 📡

推荐 PUT /connectors/{name}/config,幂等创建/更新

inventory-connector.json

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "plugin.name": "pgoutput",

  "database.hostname": "postgres",
  "database.port": "5432",
  "database.user": "postgres",
  "database.password": "pass",
  "database.dbname": "inventory",

  "topic.prefix": "cdc",
  "publication.name": "cdc_pub",
  "publication.autocreate.mode": "disabled",

  "table.include.list": "public.orders,public.order_items",
  "include.schema.changes": "true",
  "tombstones.on.delete": "true",

  "snapshot.mode": "initial",
  "snapshot.fetch.size": "10240",
  "provide.transaction.metadata": "true",
  "decimal.handling.mode": "string",

  "signal.enabled.channels": "source",
  "signal.data.collection": "public.debezium_signals",

  "heartbeat.interval.ms": "10000",
  "heartbeat.topics.prefix": "_debezium-heartbeat",

  "transforms": "unwrap,mask,route",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.add.fields": "op,table,lsn,source.ts_ms",
  "transforms.unwrap.delete.tombstone.handling.mode": "rewrite",

  "transforms.mask.type": "org.apache.kafka.connect.transforms.MaskField$Value",
  "transforms.mask.fields": "email,phone",

  "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.route.regex": "cdc\\.public\\.(.*)",
  "transforms.route.replacement": "cdc.inventory.$1"
}

兼容提示(只写在文档,不放进 JSON):

若你的 Debezium 版本不识别 transforms.unwrap.delete.tombstone.handling.mode,可退回旧键名 transforms.unwrap.delete.handling.mode,取值保持一致(如 rewrite

提交:

curl -X PUT http://localhost:8083/connectors/pg-inventory/config \
  -H "Content-Type: application/json" \
  -d @inventory-connector.json

小贴士

  • RegexRouter(.*) 捕获组→$1 复用表名
  • MaskField 支持置空、固定字符串、固定长度掩码(如 *)等策略,按你的配置生效(不是固定“打星号”);高敏感字段优先源头列过滤
  • heartbeat.* 让你在 Console 侧快速看到活性与延迟 🫀
  • 确保 信号表在 publication 内,否则增量快照信号不会被捕获

7) 增量快照 🔁

INSERT INTO public.debezium_signals(id,type,data)
VALUES ('snap1','execute-snapshot',
        '{"data-collections":["public.orders"],"type":"incremental"}');

增量快照流程图

DBA/应用 PostgreSQL Debezium Connector Kafka INSERT INTO public.debezium_signals(...) 变更日志(WAL) 解析信号 (execute-snapshot) 逐批读取指定表快照 产出快照事件 + 持续增量事件 DBA/应用 PostgreSQL Debezium Connector Kafka

8) ABP CDC-Consumer 模块 🧑‍💻

8.1 Inbox(去重/幂等)

CREATE TABLE IF NOT EXISTS app_inbox(
  id bigserial primary key,
  message_id text not null,
  consumer_name text not null,
  tenant_id uuid null,
  processed_at timestamptz not null default now(),
  unique(message_id, consumer_name)
);

幂等键建议:优先用 LSN + 主键/表名 <table>/<pk>/<lsn>;若不可得,回退 <topic>-<partition>-<offset>

8.2 消费者代码

using System.Globalization;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Volo.Abp.Uow;
using Volo.Abp.MultiTenancy;

public sealed class CdcConsumer : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly IInboxStore _inbox;
    private readonly IUnitOfWorkManager _uow;
    private readonly ICurrentTenant _currentTenant;
    private readonly ILogger<CdcConsumer> _logger;

    public CdcConsumer(IInboxStore inbox, IUnitOfWorkManager uow,
        ICurrentTenant currentTenant, ILogger<CdcConsumer> logger, IConfiguration cfg)
    {
        _inbox = inbox; _uow = uow; _currentTenant = currentTenant; _logger = logger;

        var kafkaCfg = new ConsumerConfig {
            BootstrapServers = cfg["Kafka:BootstrapServers"] ?? "localhost:9092",
            GroupId = cfg["Kafka:GroupId"] ?? "orders-consumer",
            EnableAutoCommit = false,                         // 手动提交
            AutoOffsetReset = AutoOffsetReset.Earliest,       // 首次启动从最早
            EnablePartitionEof = true
        };
        _consumer = new ConsumerBuilder<string, string>(kafkaCfg).Build();
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        _consumer.Subscribe(new[] { "cdc.inventory.orders" });

        while (!ct.IsCancellationRequested)
        {
            ConsumeResult<string,string>? cr = null;
            try
            {
                cr = _consumer.Consume(ct);
                if (cr.IsPartitionEOF) continue;

                var options = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
                var payload = JsonSerializer.Deserialize<JsonElement>(cr.Message.Value, options);

                // 取 lsn/表名/op
                var hasLsn = payload.TryGetProperty("lsn", out var lsnProp);
                var op = payload.TryGetProperty("op", out var opProp) ? opProp.GetString() : null;
                var table = payload.TryGetProperty("table", out var t) ? t.GetString() : "orders";

                // 更健壮地获取主键 id(支持 number 或 string)
                if (!payload.TryGetProperty("id", out var idProp) ||
                    (idProp.ValueKind != JsonValueKind.Number && idProp.ValueKind != JsonValueKind.String))
                {
                    _logger.LogWarning("Missing/invalid 'id' in payload, send to DLQ");
                    await SendToDlqAsync(cr!);      // TODO: 实现 DLQ
                    _consumer.Commit(cr);
                    continue;
                }
                long id = idProp.ValueKind == JsonValueKind.Number
                    ? idProp.GetInt64()
                    : long.Parse(idProp.GetString()!, CultureInfo.InvariantCulture);

                var inboxId = hasLsn
                    ? $"{table}/{id}/{lsnProp.GetInt64()}"
                    : $"{cr.Topic}-{cr.Partition.Value}-{cr.Offset.Value}";

                if (await _inbox.ExistsAsync(inboxId, "orders-consumer", ct))
                { _consumer.Commit(cr); continue; }

                using var uow = _uow.Begin(requiresNew: true);

                // 多租户:若事件里携带 tenant_id
                Guid? tenantId = null;
                if (payload.TryGetProperty("tenant_id", out var ten) && Guid.TryParse(ten.GetString(), out var tid))
                    tenantId = tid;

                using (_currentTenant.Change(tenantId))
                {
                    // 删除与 Upsert 分支
                    if (string.Equals(op, "d", StringComparison.OrdinalIgnoreCase))
                    {
                        await DeleteOrderAsync(id, ct);   // 你的删除/软删实现
                    }
                    else
                    {
                        var dto = JsonSerializer.Deserialize<OrderCdcDto>(cr.Message.Value, options);
                        await UpsertOrderAsync(dto!, ct); // 幂等 Upsert
                    }

                    await _inbox.MarkProcessedAsync(inboxId, "orders-consumer", tenantId, ct);
                }

                await uow.CompleteAsync(ct);
                _consumer.Commit(cr); // ✅ 成功后提交 offset
            }
            catch (ConsumeException ex)
            {
                _logger.LogError(ex, "Kafka consume error");
                // 不提交,触发重投;可加退避重试
            }
            catch (JsonException ex)
            {
                _logger.LogError(ex, "JSON parse error, send to DLQ");
                if (cr != null) { await SendToDlqAsync(cr); _consumer.Commit(cr); }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Unhandled error, will retry");
                // 不提交 → 至少一次 + Inbox 幂等确保最终一致
            }
        }
    }

    private Task SendToDlqAsync(ConsumeResult<string,string> cr)
    {
        // TODO: 写入死信主题/表
        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _consumer.Close();
        _consumer.Dispose();
        base.Dispose();
    }
}

注册:

// Startup/Module
services.AddHostedService<CdcConsumer>();

消费者处理流程图

反序列化失败
Kafka 消息
解析 JSON/提取 lsn, table, id, op
Inbox 已处理?
Commit offset
结束
op == 删除?
Begin UoW
执行删除/软删
Inbox 标记
Complete UoW
Commit
Begin UoW / 多租户作用域
Upsert 读模型
Inbox 标记
Complete UoW
Commit offset
写入 DLQ

9) Schema 演进策略 🔄

  • 新增列:消费者把新列视为可选,空值兜底
  • 重命名/删除:采用新增 + 双写 + 过渡期
  • 类型变更:上游 decimal.handling.mode=string,消费者再解析,避免精度丢失
  • 事务元数据provide.transaction.metadata=true 产出事务主题,便于审计与回放
  • (可选)Schema Registry:若采用 Avro/Protobuf,使用注册表进行约束与版本治理

10) 安全过滤与合规 🔐

  1. 源头裁剪:PG15 publication 行/列过滤(如按租户/业务域)
  2. 连接器白名单table.include.list / column.include/exclude.list
  3. 脱敏MaskField 支持置空、固定字符串或固定长度掩码(如 *);高敏感场景优先源头列过滤 + 最小权限
  4. ACL:复制用户权限最小化;消息侧按主题/消费者组设 ACL

11) 可观测性(JMX/Prometheus)与 SLO 📊

  • JMX 指标:启用 JMX_PORT 后可用 jconsole/jmx_exporter 抓取 Connect/Debezium 指标(源延迟、错误率、队列长度、快照状态)
  • Heartbeat 主题_debezium-heartbeat 前缀主题用于空闲期活性与端到端延迟检测
  • 经验目标:抓取延迟 p95 < 2s(负载相关),消费 p95 < 200ms

观测视角简图

JMX
Heartbeat
Debezium/Connect
Prometheus/JMX Exporter
Heartbeat Topic
Grafana 面板
Redpanda Console

12) 失败与回放 ♻️

  • 连接器端:关注 WAL 积压/断连;必要时重启任务或调优批量/心跳
  • 消费者端:反序列化/校验失败 → DLQ(死信主题/表);修复后按时间窗/主键范围回放
  • 偏移量管理:需要从头重放可新建消费组或重置偏移;依赖 Inbox 幂等确保“重复不重生效”

13) 压测与验收 ✅

快速压测(写入 1w 条)

for i in {1..10000}; do
  psql "host=localhost port=5432 dbname=inventory user=postgres password=pass" \
  -c "insert into public.orders(tenant_id,email,amount) values ('A', 'user${i}@x.com', random()*1000)"
done

达标线

  • 源库 CPU/IO 抖动可控(逻辑复制 WAL 开销受控)
  • Debezium 捕获延迟低且稳定(含心跳主题)
  • 消费端 零重复生效(Inbox 唯一约束验证)

14) 与 Outbox 的取舍(对照) ⚖️

方案 侵入性 一致性语义 额外开销 适用
CDC 最小 至少一次(需幂等) 依赖 DB 日志 老库/第三方库不可改代码
Outbox 事务内落库 + 至少一次 代码/表开销 新系统/可控写路径服务
CDC + Outbox 中高 互补 最高 渐进改造/双轨过渡

15) Debezium Server(替代 Kafka Connect,二选一) 🪶

若不想运行 Kafka Connect,可用 Debezium Server(单进程、application.properties 配 sink=Kafka/NATS/Kinesis…)。部署更轻,但 Connect 生态(SMT/管理 UI)会少一些,按场景取舍。



网站公告

今日签到

点亮在社区的每一天
去签到