SQL Server 2022 数据同步到 Elasticsearch 思考

发布于:2025-04-07 ⋅ 阅读:(21) ⋅ 点赞:(0)

公司的老项目了,采用的是sqlserver 2022作为数据卡做的,但是产品对接客户,发现对搜索的要求很高,尤其是全文检索,考虑到ES采用倒排所以效率上的优势和整体开发的成本,大佬们商量之后,果断的采用了Elasticsearch作为搜索引擎的策略,那么剩下的就是如何将数据同步到ES的问题了,这个的一部分也就是我的工作了,所以分享一下自己工作过程当中的思路和遇到的问题。

一、整体同步思路

1. 同步架构选择

方案 优点 缺点 适用场景
CDC + Logstash 实时性好,低延迟 配置复杂 需要准实时同步
定时批量导出导入 实现简单 数据延迟大 非实时分析场景
触发器+消息队列 灵活可控 影响源库性能 高定制化需求
第三方工具(如Debezium) 开箱即用 额外成本 企业级解决方案

2. 采用方案:变更数据捕获(CDC) + Logstash 管道


二、具体实现案例

步骤1:启用CDC

-- 在数据库级别启用CDC
EXEC sys.sp_cdc_enable_db;
​
-- 对特定表启用CDC
EXEC sys.sp_cdc_enable_table
    @source_schema = 'dbo',
    @source_name = 'Products',
    @role_name = NULL,
    @supports_net_changes = 1;

步骤2:配置Logstash管道

input {
  jdbc {
    jdbc_driver_library => "D:/sqljdbc_12.2/enu/mssql-jdbc-12.2.0.jre11.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://localhost:1433;databaseName=YourDB"
    jdbc_user => "sa"
    jdbc_password => "yourpassword"
    schedule => "* * * * *"  # 每分钟执行一次
    statement => "SELECT * FROM cdc.dbo_Products_CT WHERE __$start_lsn > ?"
    use_column_value => true
    tracking_column => "__$start_lsn"
    tracking_column_type => "numeric"
    last_run_metadata_path => "D:/logstash-8.12.0/products_last_run"
  }
}
​
output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "sqlserver-products"
    document_id => "%{ProductID}"
    action => "update"
    doc_as_upsert => true
  }
}

案例2:批量全量+增量同步

使用Elasticsearch JDBC插件直接导入

# 全量导入
bin/elasticsearch-jdbc \
  -url "jdbc:sqlserver://localhost:1433;databaseName=YourDB" \
  -user sa -password yourpassword \
  -table "Products" \
  -index "products-index" \
  -type "product-type" \
  -id "ProductID"
​
# 增量导入(基于时间戳)
bin/elasticsearch-jdbc \
  -url "jdbc:sqlserver://localhost:1433;databaseName=YourDB" \
  -user sa -password yourpassword \
  -table "Products" \
  -index "products-index" \
  -type "product-type" \
  -id "ProductID" \
  -incremental "true" \
  -incremental_column "ModifiedDate" \
  -incremental_last_value "2024-01-01"

三、常见问题及解决方案

1. 性能问题

问题表现

  • SQL Server CPU使用率高

  • Elasticsearch索引速度慢

  • 网络带宽成为瓶颈

解决方案

# Logstash优化配置示例
input {
  jdbc {
    # 增加分页大小
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
    # 使用fetch_size提高性能
    jdbc_fetch_size => 1000
  }
}
​
output {
  elasticsearch {
    # 启用批量提交
    flush_size => 1000
    # 增加工作线程
    workers => 4
  }
}

2. 数据一致性问题

问题表现

  • 同步过程中数据不一致

  • 漏同步或重复同步

  • 数据类型映射错误

解决方案

-- 1. 在SQL Server端添加版本控制字段
ALTER TABLE Products ADD SyncVersion ROWVERSION;
​
-- 2. 使用事务一致性快照
SET TRANSACTION ISOLATION LEVEL SNAPSHOT;
BEGIN TRANSACTION;
-- 查询数据
COMMIT TRANSACTION;

3. 网络和连接问题

问题表现

  • 连接超时

  • 断线后无法恢复

  • SSL/TLS配置问题

解决方案

input {
  jdbc {
    # 连接池配置
    connection_retry_attempts => 3
    connection_retry_attempts_wait_time => 10
    # 连接验证
    validate_connection => true
    # 超时设置
    jdbc_connection_timeout => 60
  }
}

4. 映射和转换问题

问题表现

  • 字段类型不匹配

  • 日期格式问题

  • 特殊字符处理

解决方案

filter {
  # 日期格式转换
  date {
    match => ["CreatedDate", "yyyy-MM-dd HH:mm:ss.SSS"]
    target => "CreatedDate"
  }
  
  # 字段类型转换
  mutate {
    convert => {
      "Price" => "float"
      "Stock" => "integer"
    }
  }
  
  # 处理NULL值
  if [Description] == NULL {
    mutate {
      add_field => { "Description" => "" }
    }
  }
}

5. 监控和错误处理

推荐方案

output {
  if "_jsonparsefailure" in [tags] {
    file {
      path => "D:/logstash-8.12.0/error_logs/%{+yyyy-MM-dd}-parse-errors.log"
    }
  }
  
  elasticsearch {
    # 主输出
  }
  
  # 监控管道性能
  pipeline {
    send_to => ["monitoring"]
  }
}

四、过去的思考

  1. 索引设计优化

    PUT /products-index
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1,
        "refresh_interval": "30s"
      },
      "mappings": {
        "properties": {
          "ProductName": { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
          "Price": { "type": "scaled_float", "scaling_factor": 100 },
          "CreatedDate": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||epoch_millis" }
        }
      }
    }
  2. 使用Ingest Pipeline预处理

    PUT _ingest/pipeline/sqlserver_pipeline
    {
      "description": "Process SQL Server data",
      "processors": [
        {
          "remove": {
            "field": ["__$start_lsn", "__$update_mask"]
          }
        },
        {
          "script": {
            "source": """
              if(ctx['IsActive'] == false) {
                ctx['tags'] = ['inactive'];
              }
            """
          }
        }
      ]
    }

网站公告

今日签到

点亮在社区的每一天
去签到