引言
在分布式系统中,数据分片是解决数据规模增长的重要手段。ShardingSphere作为分布式数据库中间件,提供了强大的分片查询能力。本文将深入ShardingSphere源码,通过一个完整的查询示例,详细解析其如何从不同库中查询数据,特别关注广播表这一关键特性。
一、ShardingSphere查询处理流程概览
ShardingSphere的查询处理遵循经典的"解析→路由→执行→归并"四阶段模型:
- SQL解析:将SQL语句解析为抽象语法树(AST)
- SQL路由:根据分片规则确定SQL要访问的数据源和表
- SQL执行:并行执行路由后的SQL
- 结果归并:合并多数据源返回的结果集
下面通过一个完整的查询示例,深入源码分析这个过程。
二、完整查询示例:跨分片查询订单数据
假设我们有一个订单系统,按用户ID分片,配置如下:
# 分片规则配置
rules:
- !SHARDING
tables:
t_order:
actualDataNodes: ds${0..1}.t_order${0..1}
databaseStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: database-inline
tableStrategy:
standard:
shardingColumn: order_id
shardingAlgorithmName: table-inline
t_dict:
broadcast: true # 配置为广播表
shardingAlgorithms:
database-inline:
type: INLINE
props:
algorithm-expression: ds${user_id % 2}
table-inline:
type: INLINE
props:
algorithm-expression: t_order${order_id % 2}
现在我们执行一个查询:
SELECT o.*, d.dict_name
FROM t_order o
JOIN t_dict d ON o.status = d.dict_code
WHERE user_id = 1001 AND order_id > 2000
LIMIT 10;
这个查询涉及分片表t_order
和广播表t_dict
的关联,让我们深入源码,看看ShardingSphere如何处理。
三、源码深度解析
1. SQL解析阶段
SQL解析是查询处理的第一步,ShardingSphere使用Antlr4构建的解析引擎将SQL转换为抽象语法树。
核心类与流程:
// ShardingSphere-JDBC 5.3.2版本核心解析入口
public final class SQLParserEngine {
private final DatabaseType databaseType;
private final Map<String, SQLParser> parsers;
public SQLStatement parse(final String sql, final boolean useCache) {
// 根据数据库类型选择解析器
SQLParser sqlParser = SQLParserFactory.newInstance(databaseType, sql);
// 解析SQL语句,生成AST
SQLVisitorRule sqlVisitorRule = SQLVisitorRuleEngine.getSQLVisitorRule(databaseType, sqlParser.getSQLStatementType());
SQLVisitor<Object> visitor = SQLVisitorFactory.newInstance(databaseType, sqlVisitorRule, sqlParser.getParseTree(), parsers);
// 返回解析后的SQL语句对象
return (SQLStatement) sqlParser.getParseTree().accept(visitor);
}
}
对于我们的查询,解析器会生成一个包含以下信息的SelectStatement对象:
- 查询表名:t_order, t_dict
- 查询条件:user_id = 1001 AND order_id > 2000
- 连接条件:o.status = d.dict_code
- 分页信息:LIMIT 10
2. SQL路由阶段
路由阶段是ShardingSphere的核心,它决定SQL将访问哪些数据源和表。特别关注广播表的处理逻辑。
核心类与流程:
// 标准分片路由引擎
public final class StandardRoutingEngine implements RoutingEngine {
private final ShardingRule shardingRule;
private final TableRule tableRule;
private final SQLStatementContext sqlStatementContext;
@Override
public RouteResult route(final List<Object> parameters) {
// 1. 提取分片条件
Collection<ShardingCondition> shardingConditions = createShardingConditions(parameters);
// 2. 计算数据源分片
Collection<String> routedDataSources = routeDataSources(shardingConditions);
// 3. 计算表分片
Collection<RouteUnit> routeUnits = new LinkedList<>();
for (String each : routedDataSources) {
routeUnits.addAll(routeTables(each, shardingConditions));
}
return new RouteResult(routeUnits);
}
// 广播表路由特殊处理
private Collection<RouteUnit> routeBroadcastTable(final String logicTable, final String dataSource) {
Collection<RouteUnit> result = new LinkedList<>();
// 广播表在每个数据源都有完整副本
// 直接使用逻辑表名作为物理表名
result.add(new RouteUnit(new DataSourceName(dataSource), new TableUnit(logicTable, logicTable)));
return result;
}
// 处理多表关联查询
private Collection<RouteUnit> routeTables(final String routedDataSource, final Collection<ShardingCondition> shardingConditions) {
Collection<RouteUnit> result = new LinkedList<>();
// 遍历SQL中涉及的所有表
for (String logicTable : sqlStatementContext.getTablesContext().getTableNames()) {
if (isBroadcastTable(logicTable)) {
// 广播表处理:直接路由到当前数据源
result.addAll(routeBroadcastTable(logicTable, routedDataSource));
} else {
// 分片表处理
TableRule tableRule = shardingRule.getTableRule(logicTable);
ShardingAlgorithm tableShardingAlgorithm = tableRule.getTableShardingStrategy().getShardingAlgorithm();
Collection<String> availableTargetNames = tableRule.getActualTables(routedDataSource);
// 执行分片算法
Collection<String> routedTables = tableShardingAlgorithm.doSharding(availableTargetNames, shardingConditions);
// 创建路由单元
for (String each : routedTables) {
result.add(new RouteUnit(new DataSourceName(routedDataSource), new TableUnit(logicTable, each)));
}
}
}
return result;
}
// 判断是否为广播表
private boolean isBroadcastTable(final String tableName) {
return shardingRule.getBroadcastTables().contains(tableName);
}
}
对于我们的查询,路由过程如下:
分片表t_order的路由:
- 提取分片键值:user_id=1001, order_id>2000
- 计算数据源:1001 % 2 = 1,路由到ds1
- 由于order_id是范围条件,无法精确计算,需要扫描ds1中的所有表:t_order0和t_order1
广播表t_dict的路由:
- 识别t_dict为广播表
- 直接路由到当前数据源ds1,物理表名为t_dict
最终路由单元:
- ds1.t_order0, ds1.t_dict
- ds1.t_order1, ds1.t_dict
3. SQL执行阶段
执行阶段负责将路由后的SQL发送到对应的数据源执行。
核心类与流程:
// SQL执行引擎
public final class StatementExecutor {
private final ConnectionMode connectionMode;
private final ShardingRuntimeContext runtimeContext;
private final List<Object> parameters;
public <T> List<T> executeQuery(final ExecuteCallback<T> executeCallback) throws SQLException {
// 1. 创建执行单元
Collection<ExecutionUnit> executionUnits = createExecutionUnits();
// 2. 执行查询
return executeGroup(executionUnits, executeCallback);
}
private Collection<ExecutionUnit> createExecutionUnits() {
Collection<ExecutionUnit> result = new LinkedList<>();
// 遍历路由单元
for (RouteUnit each : routeContext.getRouteUnits()) {
// 创建物理SQL
String sql = sqlRewriteContext.getSql();
Map<String, Object> attributes = sqlRewriteContext.getAttributes();
// 创建执行单元
result.add(new ExecutionUnit(each.getDataSourceMapper().getActualName(),
new SQLUnit(sql, parameters, attributes)));
}
return result;
}
private <T> List<T> executeGroup(final Collection<ExecutionUnit> executionUnits, final ExecuteCallback<T> executeCallback) throws SQLException {
// 分组执行(按数据源分组)
Map<String, List<SQLUnit>> sqlUnitGroups = groupSQLUnitsByDataSource(executionUnits);
List<T> result = new ArrayList<>(sqlUnitGroups.size());
// 并行执行各组SQL
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.addAll(executeGroupInternal(entry.getKey(), entry.getValue(), executeCallback));
}
return result;
}
private <T> List<T> executeGroupInternal(final String dataSourceName, final List<SQLUnit> sqlUnits,
final ExecuteCallback<T> executeCallback) throws SQLException {
// 获取数据源
DataSource dataSource = runtimeContext.getDataSourceMap().get(dataSourceName);
// 创建JDBC连接
try (Connection connection = dataSource.getConnection()) {
// 执行SQL并获取结果
List<T> result = new ArrayList<>(sqlUnits.size());
for (SQLUnit each : sqlUnits) {
try (PreparedStatement preparedStatement = connection.prepareStatement(each.getSql())) {
// 设置参数
setParameters(preparedStatement, each.getParameters());
// 执行查询
result.add(executeCallback.execute(preparedStatement));
}
}
return result;
}
}
}
对于我们的查询,执行过程如下:
- 生成两个物理SQL:
- SELECT o.*, d.dict_name FROM ds1.t_order0 o JOIN ds1.t_dict d ON o.status = d.dict_code WHERE user_id = 1001 AND order_id > 2000 LIMIT 10;
- SELECT o.*, d.dict_name FROM ds1.t_order1 o JOIN ds1.t_dict d ON o.status = d.dict_code WHERE user_id = 1001 AND order_id > 2000 LIMIT 10;
- 从连接池获取ds1的连接
- 并行执行这两个SQL
- 获取两个结果集
4. 结果归并阶段
归并阶段负责将多个数据源返回的结果合并为一个统一的结果集。
核心类与流程:
// 归并引擎
public final class MergeEngine {
private final DatabaseType databaseType;
private final SelectStatementContext selectStatementContext;
private final List<QueryResult> queryResults;
public QueryResult merge() throws SQLException {
// 1. 创建归并器链
List<ResultMerger> resultMergers = createResultMergers();
// 2. 执行归并
QueryResult result = queryResults.isEmpty() ? EmptyQueryResult.INSTANCE : queryResults.get(0);
for (ResultMerger each : resultMergers) {
result = each.merge(result, queryResults, selectStatementContext);
}
return result;
}
private List<ResultMerger> createResultMergers() {
List<ResultMerger> result = new LinkedList<>();
// 处理聚合函数
if (!selectStatementContext.getProjectionsContext().getAggregationProjections().isEmpty()) {
result.add(new AggregationDistinctMerger());
result.add(new AggregationMerger());
}
// 处理分组
if (!selectStatementContext.getGroupByContext().getItems().isEmpty()) {
result.add(new GroupByMerger());
}
// 处理排序
if (!selectStatementContext.getOrderByContext().getItems().isEmpty()) {
result.add(new OrderByMerger());
}
// 处理分页
if (null != selectStatementContext.getLimit()) {
result.add(new LimitMerger());
}
return result;
}
}
对于我们的查询,归并过程如下:
- 由于查询没有聚合函数和分组,直接进入排序阶段
- 假设查询没有指定ORDER BY,ShardingSphere会按主键排序
- 使用优先级队列对两个结果集进行归并排序
- 应用LIMIT 10,返回前10条记录
四、广播表的核心机制详解
1. 广播表的定义与用途
广播表是ShardingSphere中的一种特殊表,它在每个数据源中都有完整的副本。广播表适用于以下场景:
- 数据量较小的字典表(如地区表、状态码表)
- 经常与分片表进行关联查询的表
- 数据变更不频繁的基础数据表
2. 广播表的源码实现
2.1 配置解析
// 分片规则配置解析
public final class ShardingRuleConfigurationYamlSwapper implements TypeBasedYamlSwapper<YamlShardingRuleConfiguration, ShardingRuleConfiguration> {
@Override
public ShardingRuleConfiguration swapToObject(final YamlShardingRuleConfiguration yamlConfig) {
ShardingRuleConfiguration result = new ShardingRuleConfiguration();
// 解析广播表配置
if (null != yamlConfig.getBroadcastTables()) {
result.getBroadcastTables().addAll(yamlConfig.getBroadcastTables());
}
// 其他配置解析...
return result;
}
}
2.2 广播表路由优化
// 广播表路由优化
public final class BroadcastRoutingEngine implements RoutingEngine {
private final ShardingRule shardingRule;
private final Collection<String> logicTables;
@Override
public RouteResult route(final List<Object> parameters) {
RouteResult result = new RouteResult();
// 获取所有数据源
Collection<String> dataSources = shardingRule.getDataSourceMap().keySet();
// 为每个广播表在每个数据源创建路由单元
for (String logicTable : logicTables) {
for (String dataSource : dataSources) {
result.getRouteUnits().add(new RouteUnit(
new DataSourceName(dataSource),
new TableUnit(logicTable, logicTable)));
}
}
return result;
}
}
2.3 广播表数据一致性保证
ShardingSphere通过以下机制保证广播表的数据一致性:
// 广播表数据一致性保证
public final class BroadcastTableConsistencyChecker implements ConsistencyChecker {
@Override
public boolean check(final String dataSourceName, final String logicTableName) {
// 获取所有数据源
Map<String, DataSource> dataSourceMap = shardingRuntimeContext.getDataSourceMap();
// 获取广播表的所有物理表
Collection<String> actualTables = shardingRuntimeContext.getRuleMetaData().getRules().stream()
.filter(each -> each instanceof ShardingRule)
.map(each -> (ShardingRule) each)
.flatMap(each -> each.getBroadcastTables().stream())
.filter(each -> each.equalsIgnoreCase(logicTableName))
.findFirst()
.map(each -> dataSourceMap.keySet().stream()
.map(dataSource -> each)
.collect(Collectors.toList()))
.orElse(Collections.emptyList());
// 检查每个数据源中的表结构和数据一致性
// 实际实现中会比较表结构、数据行数等
return isSchemaConsistent(dataSourceMap, actualTables) && isDataConsistent(dataSourceMap, actualTables);
}
private boolean isSchemaConsistent(final Map<String, DataSource> dataSourceMap, final Collection<String> actualTables) {
// 检查表结构一致性
// 实际实现中会比较表字段、索引等
return true;
}
private boolean isDataConsistent(final Map<String, DataSource> dataSourceMap, final Collection<String> actualTables) {
// 检查数据一致性
// 实际实现中会比较数据行数、数据摘要等
return true;
}
}
3. 广播表的优势与注意事项
优势:
- 显著减少跨库连接查询的开销
- 简化SQL编写,无需考虑分片键关联
- 提高查询性能,尤其是多表关联场景
注意事项:
- 广播表数据变更需要同步到所有数据源
- 不适合大数据量表(会导致存储成本增加)
- 需要定期检查各数据源中广播表的一致性
五、跨分片查询的优化机制
1. 广播表优化
// 广播表与分片表关联查询优化
public final class ShardingMergeEngine {
public QueryResult merge(final SelectStatementContext selectStatementContext, final List<QueryResult> queryResults) {
// 如果是广播表与分片表的关联查询
if (isBroadcastJoin(selectStatementContext)) {
// 优化处理:直接在各分片内完成关联
return new BroadcastJoinMergedResult(queryResults, selectStatementContext);
}
// 其他合并逻辑...
}
}
2. 分页优化
// 内存分页优化
public int getMemoryPageOffset() {
if (isSameLogicSQLWithDifferentParameters()) {
// 当SQL相同但参数不同时,需要计算总的offset
return calculateTotalOffset();
}
return limit.getOffsetValue();
}
3. 并行执行优化
// 并行执行框架
public final class ExecuteEngine {
private final ExecutorService executorService;
public <T> List<T> execute(final Collection<? extends Callable<T>> callables) throws SQLException {
try {
// 使用线程池并行执行
return executorService.invokeAll(callables).stream().map(this::getResult).collect(Collectors.toList());
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new SQLException("Interrupted", ex);
}
}
}
六、总结与最佳实践
1. 核心机制总结
- 分片键驱动路由:ShardingSphere通过分片键精确计算目标数据源和表
- 广播表优化:将小表复制到所有数据源,消除跨库连接开销
- 并行执行框架:基于线程池实现多数据源并行查询
- 流式结果归并:通过装饰器模式实现结果集的流式处理和归并
- 查询重写:自动调整SQL以适应分片环境
2. 最佳实践建议
- 合理设计分片键:选择查询频率高、分布均匀的字段作为分片键
- 利用广播表:对于字典表等小表,配置为广播表减少跨库查询
- 避免全表扫描:尽量在SQL中包含分片键条件
- 优化分页:避免大偏移量分页,考虑使用游标分页
- 监控与调优:利用ShardingSphere的监控功能,分析慢查询并优化
- 定期检查广播表一致性:确保各数据源中广播表数据一致
通过深入理解ShardingSphere的源码实现,特别是广播表这一关键特性,我们可以更好地利用其分片能力,避免常见的性能陷阱,构建高效、稳定的分布式数据库系统。