[每周一更]-(第145期):分表数据扩容处理:原理与实战

发布于:2025-06-15 ⋅ 阅读:(14) ⋅ 点赞:(0)

在这里插入图片描述

一、为什么需要分表扩容?

随着系统业务量增长,单表数据不断膨胀,可能会出现:

  • 查询性能下降(索引失效、全表扫描)
  • 写入速度变慢(锁竞争、I/O 瓶颈)
  • 数据库单点风险增加

因此,分表是一种常用的“水平扩展(Horizontal Scaling)”策略。


二、分表 vs 分库:先分表,还是同步进行?

  • 分表:一个数据库中,把某个大表按某种规则拆成多张小表。
  • 分库分表:数据分散到多个数据库实例的多个表中,适合更大规模场景。

初期大多采用先分表策略,便于控制和迁移。


三、分表策略有哪些?

  1. 按范围分表(Range Sharding)
    例:按时间、ID段划分,如 user_202401、user_202402
    • 优点:易于维护,查询有边界
    • 缺点:冷热数据不均衡,某些表会成为热点
  2. 按哈希分表(Hash Sharding)
    例:user_%(user_id % 4) → user_0、user_1、user_2、user_3
    • 优点:数据分布均匀
    • 缺点:不易范围查询,扩容麻烦
  3. 按业务维度分表(如地区、商户、租户)
    适合多租户系统,实现物理隔离或逻辑隔离

四、扩容的挑战与处理方式

1. 扩容时机如何判断?

  • 单表记录数达到上限(如千万级)
  • 查询、写入明显变慢
  • 某个表频繁成为访问瓶颈

2. 扩容策略:如何处理已有数据?

  • 新旧表并行:老表保留不动,新增数据写入新表
  • 全量迁移:将旧数据迁移到新表结构,做过渡处理(高风险)
  • 分区管理:借助中间层(如 ShardingSphere、MyCAT)统一管理分表逻辑

3. 扩容过程需解决的问题:

  • 数据迁移的一致性与幂等处理
  • 历史数据查询兼容
  • 应用层改造:支持多表路由
  • 灰度发布与回滚策略

五、如何设计支持“动态扩容”的分表系统?

  • 抽象出路由层(sharding router):根据 key 决定数据落表
  • 保持扩容前后的路由规则兼容性(如:支持2路转4路)
  • 使用配置中心动态维护分表映射关系
  • 日志追踪/审计:便于调试和排查错误路由

六、典型实践案例简述

示例:用户表 user 表按 user_id 取模分 4 表,未来需扩到 8 表。

迁移步骤大致如下:

  1. 开新表 user_4 ~ user_7
  2. 建立新分表规则(如 %8)
  3. 数据批量迁移(可用临时双写、或异步复制)
  4. 应用灰度切换至新规则
  5. 验证无误后,淘汰旧分表规则

七、总结建议

  • 提前设计好路由层与分表逻辑,避免耦合业务代码
  • 监控分表热点情况,及时预警扩容
  • 如条件允许,可借助 中间件统一管理分表,减少扩容痛点

八、示例

(1)实现分表路由策略逻辑的示例

适合你在文章中展示如何根据业务主键(如 user_id)将数据路由到对应的子表。

我们以「按哈希分表」策略为例,即:

tableIndex = user_id % tableCount

1. 分表路由器结构定义

package sharding

import (
	"fmt"
)

type ShardRouter struct {
	TableBaseName string // 基础表名,如 "user"
	TableCount    int    // 表的总数,如 4 表:user_0 ~ user_3
}

// NewShardRouter 创建一个分表路由器
func NewShardRouter(baseName string, count int) *ShardRouter {
	return &ShardRouter{
		TableBaseName: baseName,
		TableCount:    count,
	}
}

2. 获取目标表名

// GetTableName 返回指定 userID 应该落到哪张表
func (r *ShardRouter) GetTableName(userID int64) string {
	index := userID % int64(r.TableCount)
	return fmt.Sprintf("%s_%d", r.TableBaseName, index)
}

3. 用法示例

package main

import (
	"fmt"
	"your_project/sharding"
)

func main() {
	router := sharding.NewShardRouter("user", 4)

	testUserIDs := []int64{1001, 2022, 3033, 4044, 5055}
	for _, uid := range testUserIDs {
		table := router.GetTableName(uid)
		fmt.Printf("UserID: %d => Table: %s\n", uid, table)
	}
}
输出示例:
UserID: 1001 => Table: user_1
UserID: 2022 => Table: user_2
UserID: 3033 => Table: user_1
UserID: 4044 => Table: user_0
UserID: 5055 => Table: user_3

扩展建议

如果你计划支持 动态扩容(如从 4 表扩到 8 表),可以这样设计:

type VersionedShardRouter struct {
	VersionMapping map[int]*ShardRouter // version => router
	CurrentVersion int
}

// 可按时间或ID范围映射使用不同版本的路由

或者,如果未来要支持跨库分表:

type ShardTarget struct {
	DBName    string
	TableName string
}
func (r *ShardRouter) Route(userID int64) ShardTarget {
	// hash or range logic, return db+table
}

(2)自动迁移脚本基础版

下面看一个实际可能用到的需求,数据表扩容,用过分表的可能都清楚,比如早起分了10个表,随着数据量增加发现又得到了单表的限制,需要再次分表来扩容,那下边我们看看如何处理。

我们以:

  • 原表为:user_0 ~ user_3(4 表)
  • 扩容到新表:user_0 ~ user_7(8 表)
  • user_id % 8 的新规则重新分布数据。
package main

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/go-sql-driver/mysql"
)

const (
	oldTableCount = 4
	newTableCount = 8
	baseTableName = "user"
	batchSize     = 1000
)

func main() {
	dsn := "user:password@tcp(127.0.0.1:3306)/your_db?charset=utf8mb4&parseTime=True&loc=Local"
	db, err := sql.Open("mysql", dsn)
	if err != nil {
		log.Fatalf("Connect error: %v", err)
	}
	defer db.Close()

	for i := 0; i < oldTableCount; i++ {
		oldTable := fmt.Sprintf("%s_%d", baseTableName, i)
		fmt.Printf("Migrating from table: %s\n", oldTable)
		migrateTable(db, oldTable)
	}
}

func migrateTable(db *sql.DB, oldTable string) {
	var lastID int64 = 0

	for {
		// 读取一批数据
		query := fmt.Sprintf("SELECT id, user_id, name, email FROM %s WHERE id > ? ORDER BY id ASC LIMIT ?", oldTable)
		rows, err := db.Query(query, lastID, batchSize)
		if err != nil {
			log.Fatalf("Query error from %s: %v", oldTable, err)
		}
		defer rows.Close()

		type User struct {
			ID     int64
			UserID int64
			Name   string
			Email  string
		}

		var users []User
		for rows.Next() {
			var u User
			if err := rows.Scan(&u.ID, &u.UserID, &u.Name, &u.Email); err != nil {
				log.Fatalf("Scan error: %v", err)
			}
			users = append(users, u)
			lastID = u.ID
		}
		if len(users) == 0 {
			break
		}

		// 插入到新表
		for _, u := range users {
			newTable := fmt.Sprintf("%s_%d", baseTableName, u.UserID%newTableCount)
			insertSQL := fmt.Sprintf("INSERT INTO %s (id, user_id, name, email) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), email=VALUES(email)", newTable)
			_, err := db.Exec(insertSQL, u.ID, u.UserID, u.Name, u.Email)
			if err != nil {
				log.Printf("Insert error into %s: %v", newTable, err)
			}
		}
		fmt.Printf("Migrated batch up to ID %d from %s\n", lastID, oldTable)
	}
}

脚本要点说明

点位 说明
分批处理 避免内存和锁压力,提升容错能力
ON DUPLICATE KEY 保证幂等执行,可重复运行
多协程可扩展 若你数据量大,可以将每张旧表并发迁移
支持中断续跑 可持久化 lastID 或用状态表做断点续传
避免写入死锁 插入顺序与主键升序有助于减少锁冲突

迁移后建议的验证

  1. 统计校验:

    SELECT COUNT(*) FROM user_0 + user_1 + ... + user_3;
    SELECT COUNT(*) FROM user_0 + ... + user_7;
    
  2. spot-check 用户:

    SELECT * FROM user_5 WHERE user_id = 10055;
    
  3. 应用层切换到新路由策略前,建议“新旧表双写”一段时间以保障数据一致性。


(3)升级改进方案设计

增加支持断点续传、每张表迁移进度、多协程处理、迁移日志等功能

我们引入一个迁移状态表migration_progress)来记录每张旧表的迁移进度(即每张表上次迁移到的最大 id 值)。

1. 迁移状态表结构

CREATE TABLE migration_progress (
    table_name   VARCHAR(64) PRIMARY KEY,
    last_id      BIGINT NOT NULL,
    updated_at   DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

2.日志表设计(可选持久化到数据库)

如果你想将日志存入数据库:

CREATE TABLE migration_log (
    id           BIGINT AUTO_INCREMENT PRIMARY KEY,
    table_name   VARCHAR(64),
    batch_start  BIGINT,
    batch_end    BIGINT,
    record_count INT,
    status       VARCHAR(20),
    message      TEXT,
    created_at   DATETIME DEFAULT CURRENT_TIMESTAMP
);

推荐使用文件日志为主,数据库日志为补充。

3.支持断点续传的多协程分表迁移脚本

package main

import (
	"database/sql"
	"fmt"
	"log"
	"sync"
	"time"

	_ "github.com/go-sql-driver/mysql"
)

const (
	oldTableCount = 4
	newTableCount = 8
	baseTableName = "user"
	batchSize     = 1000
	maxGoroutines = 4
)

type User struct {
	ID     int64
	UserID int64
	Name   string
	Email  string
}

func main() {
	dsn := "user:password@tcp(127.0.0.1:3306)/your_db?charset=utf8mb4&parseTime=True&loc=Local"
	db, err := sql.Open("mysql", dsn)
	if err != nil {
		log.Fatalf("Connect error: %v", err)
	}
	defer db.Close()

	var wg sync.WaitGroup
	sem := make(chan struct{}, maxGoroutines)

	for i := 0; i < oldTableCount; i++ {
		wg.Add(1)
		sem <- struct{}{}
		go func(index int) {
			defer wg.Done()
			defer func() { <-sem }()
			oldTable := fmt.Sprintf("%s_%d", baseTableName, index)
			log.Printf("Starting migration from %s\n", oldTable)
			if err := migrateTable(db, oldTable); err != nil {
				log.Printf("Migration failed for %s: %v", oldTable, err)
			}
			log.Printf("Finished migration from %s\n", oldTable)
		}(i)
	}

	wg.Wait()
	log.Println("All migrations completed.")
}

func migrateTable(db *sql.DB, oldTable string) error {
	lastID, err := getLastMigratedID(db, oldTable)
	if err != nil {
		return err
	}

	for {
		query := fmt.Sprintf("SELECT id, user_id, name, email FROM %s WHERE id > ? ORDER BY id ASC LIMIT ?", oldTable)
		rows, err := db.Query(query, lastID, batchSize)
		if err != nil {
			logError(oldTable, lastID, lastID, 0, "query_error", err.Error())
			return fmt.Errorf("query failed: %w", err)
		}

		var users []User
		var batchStartID = lastID
		for rows.Next() {
			var u User
			if err := rows.Scan(&u.ID, &u.UserID, &u.Name, &u.Email); err != nil {
				rows.Close()
				logError(oldTable, batchStartID, u.ID, len(users), "scan_error", err.Error())
				return fmt.Errorf("scan failed: %w", err)
			}
			users = append(users, u)
			lastID = u.ID
		}
		rows.Close()

		if len(users) == 0 {
			break
		}

		var successCount int
		for _, u := range users {
			newTable := fmt.Sprintf("%s_%d", baseTableName, u.UserID%newTableCount)
			insertSQL := fmt.Sprintf(
				"INSERT INTO %s (id, user_id, name, email) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE name=VALUES(name), email=VALUES(email)",
				newTable,
			)
			_, err := db.Exec(insertSQL, u.ID, u.UserID, u.Name, u.Email)
			if err != nil {
				log.Printf("Insert failed to %s: %v", newTable, err)
				logError(oldTable, batchStartID, lastID, successCount, "insert_error", err.Error())
				continue
			}
			successCount++
		}

		err = updateLastMigratedID(db, oldTable, lastID)
		if err != nil {
			logError(oldTable, batchStartID, lastID, successCount, "update_progress_error", err.Error())
			return fmt.Errorf("update progress failed: %w", err)
		}

		log.Printf("[%s] Migrated batch: %d records, ID %d to %d", oldTable, successCount, batchStartID, lastID)
		logSuccess(oldTable, batchStartID, lastID, successCount)

		time.Sleep(10 * time.Millisecond)
	}

	return nil
}

func getLastMigratedID(db *sql.DB, tableName string) (int64, error) {
	var lastID int64
	err := db.QueryRow("SELECT last_id FROM migration_progress WHERE table_name = ?", tableName).Scan(&lastID)
	if err == sql.ErrNoRows {
		// 初始化
		_, err := db.Exec("INSERT INTO migration_progress (table_name, last_id) VALUES (?, 0)", tableName)
		if err != nil {
			return 0, fmt.Errorf("init progress row failed: %w", err)
		}
		return 0, nil
	} else if err != nil {
		return 0, fmt.Errorf("query progress failed: %w", err)
	}
	return lastID, nil
}

func updateLastMigratedID(db *sql.DB, tableName string, lastID int64) error {
	_, err := db.Exec("UPDATE migration_progress SET last_id = ? WHERE table_name = ?", lastID, tableName)
	return err
}


func logSuccess(table string, startID, endID int64, count int) {
	logger.Printf("[SUCCESS] table=%s, id_range=%d-%d, count=%d\n",
		table, startID, endID, count)
}

func logError(table string, startID, endID int64, count int, code string, msg string) {
	logger.Printf("[ERROR] table=%s, id_range=%d-%d, count=%d, code=%s, msg=%s\n",
		table, startID, endID, count, code, msg)
}

3.辅助函数:迁移进度读取与更新

func getLastMigratedID(db *sql.DB, tableName string) (int64, error) {
	var lastID int64
	err := db.QueryRow("SELECT last_id FROM migration_progress WHERE table_name = ?", tableName).Scan(&lastID)
	if err == sql.ErrNoRows {
		// 初始化
		_, err := db.Exec("INSERT INTO migration_progress (table_name, last_id) VALUES (?, 0)", tableName)
		if err != nil {
			return 0, fmt.Errorf("init progress row failed: %w", err)
		}
		return 0, nil
	} else if err != nil {
		return 0, fmt.Errorf("query progress failed: %w", err)
	}
	return lastID, nil
}

func updateLastMigratedID(db *sql.DB, tableName string, lastID int64) error {
	_, err := db.Exec("UPDATE migration_progress SET last_id = ? WHERE table_name = ?", lastID, tableName)
	return err
}

脚本优势
能力 描述
并发迁移 提升速度,充分利用多核和数据库连接池
支持断点续传 每张旧表独立记录进度,不怕重启或失败
幂等可重复运行 使用 ON DUPLICATE KEY 保证多次运行不重复插入
细粒度进度监控 可实时查看每张表迁移到哪个 ID

示例查看进度

SELECT * FROM migration_progress ORDER BY updated_at DESC;


网站公告

今日签到

点亮在社区的每一天
去签到