设计一个MySQL数据库和Clickhouse数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较两条两个数据库中,一个数据库有,另一个数据库没有的记录,以id是否存在作为标准,还需要比较两边都有记录的所有数据字段是否都相同,如果有不相同指出这条数据的id,以及不相同数据所在字段的名称和在两个数据库中的值,分批比较时需要考虑所有可能的情况。
以下是一个详细的设计方案,用于比较MySQL数据库和ClickHouse数据仓库中结构相同的表,支持并行数据获取和分批比较:
系统设计
并行处理架构:
- 主线程:负责数据比较
- 工作线程:负责预取下一批数据
- 使用队列实现线程间通信
关键组件:
- 数据获取器(DataFetcher)
- 数据比较器(DataComparator)
- 结果处理器(ResultHandler)
- 批处理控制器(BatchController)
详细程序流程
import queue
import threading
from typing import Dict, List, Tuple, Optional
# 假设已实现数据库连接池
from db_connectors import MySQLPool, ClickHousePool
class BatchData:
def __init__(self):
self.mysql_data: Dict[int, dict] = {
} # {id: {field1:val1,...}}
self.ch_data: Dict[int, dict] = {
}
self.last_id: int = 0 # 本批次最大ID
class DataComparator:
def __init__(self, result_handler):
self.result_handler = result_handler
def compare_batch(self, batch: BatchData):
# 1. 检查ID存在性差异
mysql_ids = set(batch.mysql_data.keys())
ch_ids = set(batch.ch_data.keys())
# 1.1 MySQL有但ClickHouse缺失
for mid in mysql_ids - ch_ids:
self.result_handler.record_missing(mid, source="clickhouse")
# 1.2 ClickHouse有但MySQL缺失
for cid in ch_ids - mysql_ids:
self.result_handler.record_missing(cid, source="mysql")
# 2. 比较共有ID的字段值
common_ids = mysql_ids & ch_ids
for cid in common_ids:
mysql_row = batch.mysql_data[cid]
ch_row = batch.ch_data[cid]
self._compare_rows(cid, mysql_row, ch_row)
def _compare_rows(self, id: int, mysql_row: dict, ch_row: dict):
# 跳过主键字段
fields = set(mysql_row.keys()) - {
"id"}
for field in fields:
mysql_val = mysql_row[field]
ch_val = ch_row[field]
# 处理类型差异(如MySQL的Decimal转Float)
if isinstance(mysql_val, Decimal):
mysql_val = float(mysql_val)
if isinstance(ch_val, Decimal):
ch_val = float(ch_val)
# 特殊处理浮点数精度
if isinstance(mysql_val, float) and isinstance(ch_val, float):
if abs(mysql_val - ch_val) > 1e-9:
self.result_handler.record_mismatch(
id, field, mysql_val, ch_val
)
elif mysql_val != ch_val:
self.result_handler.record_mismatch(
id, field, mysql_val, ch_val
)
class DataFetcher:
BATCH_SIZE = 2000
def __init__(self, mysql_pool, ch_pool):
self.mysql_pool = mysql_pool
self.ch_pool = ch_pool
def fetch_batch(self, last_id: int) -> Optional[BatchData]:
batch = BatchData()
# 从MySQL获取数据
with self.mysql_pool.connection() as conn:
cursor = conn.cursor(dictionary=True)
cursor.execute(
"SELECT * FROM target_table "
"WHERE id > %s ORDER BY id LIMIT %s",
(last_id, self.BATCH_SIZE)
)
for row in cursor:
batch.mysql_data[row['id']] = row
batch