分布式专题——10.3 ShardingSphere实现原理以及内核解析

发布于:2025-09-13 ⋅ 阅读:(18) ⋅ 点赞:(0)

1 ShardingSphere-JDBC 内核工作原理

  • 当往 ShardingSphere 提交一个逻辑SQL后,ShardingSphere 到底做了哪些事情呢?首先要从 ShardingSphere 官方提供的这张整体架构图说起:

    在这里插入图片描述

1.1 配置管控

  • 在 SQL 进入 ShardingSphere 内核处理(如解析、路由、重写等)之前,ShardingSphere 会先对应用的配置信息进行处理。这些配置可能涉及:

    • 数据库分片规则(哪些表分片、分片键是什么、分片算法如何);

    • 读写分离规则(读请求和写请求路由到哪些库);

    • 数据加密规则(哪些字段需要加密、加密算法是什么)等;

  • ShardingSphere 不仅能解析应用本地的配置,还支持将配置信息存储到 第三方注册中心(如 Nacos、ZooKeeper 等)。这样做的价值是:

    • 实现应用层的水平扩展:多个应用实例可共享注册中心的配置,无需每个实例单独维护配置,集群扩容时更高效;

    • 配置集中管理:运维人员能在注册中心统一修改、下发配置,无需逐个修改应用配置,降低维护成本;

  • ShardingSphere-JDBC vs ShardingProxy

    • ShardingSphere-JDBC:作为客户端侧的数据库中间件(以 Jar 包形式集成到应用中),应用本身可以自己管理配置(比如在应用配置文件里写规则),或者自行接入 Nacos 等配置中心。因此,配置管控对 ShardingSphere-JDBC 来说,不是特别亮眼的功能(因为应用有其他替代方案);

    • ShardingProxy:作为服务端侧的数据库中间件(对外提供数据库服务,应用像连普通数据库一样连 Proxy),运维人员通过 Proxy 管理多应用的数据库访问规则。此时,配置管控(尤其是对接注册中心实现集中配置)的价值就非常突出——能更高效地管理多应用、多节点的配置。

1.2 SQL Parser:SQL解析引擎

  • SQL 解析分为两步:

    • 词法解析:把 SQL 拆成不可再分的原子符号(Token),并根据不同数据库方言锁提供的字典,将其归类为关键字(如 SELECT FROM WHERE)、表达式、字面量(如 'ACTIVE' 18)、操作符(如 = > AND)等;

    • 语法解析:基于词法解析的结果,将 SQL 转换为抽象语法树(AST,Abstract Syntax Tree)——用“树结构”表达 SQL 的逻辑结构,例:

      SELECT id, name FROM t_user WHERE status = 'ACTIVE' AND age > 18
      

      在这里插入图片描述

  • ShardingSphere 对 SQL 解析引擎的选择,经历了多个阶段:

    • 1.4.x 及之前:用 Druid(性能较快的开源解析引擎);

    • 1.5.x 版本:自研解析引擎。针对分库分表场景,采用对 SQL 半理解的方式,提升解析性能和兼容性(更适配中间件的业务需求);

    • 3.0.x 及之后:改用 ANTLR(开源的 SQL 解析引擎)。ANTLR 被很多开源产品采用(如 Druid、Flink、Hive 等),通用性和扩展性更强。

1.3 SQL Router:SQL 路由引擎

  • 路由引擎的关键是分片键(决定数据分片的字段):

    • 携带分片键的SQL:走分片路由——根据分片键的操作符(如=单片路由、IN多片路由、BETWEEN范围路由),匹配数据库/表的分片策略,生成精准的路由路径(明确该SQL该访问哪些分片);

    • 不携带分片键的SQL:走广播路由——因为没分片键,无法精准定位分片,需向所有相关数据库/表广播执行(但广播路由影响大,不利于集群管理,所以实际应尽量用携带分片键的SQL);

  • 分片路由又因SQL场景不同,分为多种子路由:

    • 直接路由:通过hint(手动指定路由规则),强制SQL路由到特定分片;

    • 标准路由:单表或绑定表(关联时可视为单表的表)的SQL,按分片规则精准路由;

    • 笛卡尔路由:多表且无绑定表关系的关联查询,需对多表分片做笛卡尔积组合路由(性能较差,应尽量避免);

  • 不携带分片键时,不同类型的SQL(如DQL查询、DML增删改、DDL建表、DCL权限操作等),广播路由也细分了多种子路由:

    • 全库表路由:DQL/DML/DDL类语句(如select * from course),遍历所有库的所有表执行;

    • 全库路由:设置类DAL/TCL语句(如set autocommit=0),遍历所有库执行;

    • 全实例路由:DCL语句(如CREATE USER),每个数据库实例执行一次;

    • 单播路由:查询类DAL语句(如DESCRIBE course),仅从任意一个库表获取元数据;

    • 阻断路由:像USE database这类对虚拟库的操作,直接阻断(因为中间件的库是虚拟的,无需切换真实库)。

1.4 SQL Rewriter: SQL 优化引擎

  • ShardingSphere 能实现不同数据库方言之间的自动转换

    • 例如:用 MySQL 客户端发送 MySQL 方言的 SQL 给 ShardingSphere,ShardingSphere 会识别目标存储节点类型(比如要访问 PostgreSQL、MariaDB),自动把 SQL 转成对应数据库的方言,再下发执行;
    • 这让用户可以面向逻辑库/逻辑表写 SQL,无需关心底层不同数据库的语法差异,ShardingSphere 会负责“翻译”;

    在这里插入图片描述

  • SQL 改写分为正确性改写优化改写,目的是让 SQL 能在真实数据库中正确执行或更高效执行;

  • 正确性改写:让 SQL 能正确执行。解决逻辑库表到真实库表的匹配问题,确保 SQL 语义准确。包含以下能力:

    • 标识符改写:修改表名、索引名、Schema(数据库名)等标识符。比如分表场景下,把逻辑表名(如 t_order)改写成真实的分片表名(如 t_order_0 t_order_1);

    • 补列:为 SQL 补充必要的列,保证执行逻辑正确。比如:

      • 排序补列:若排序依赖分片键,补充分片键列确保排序逻辑对;
      • 分组补列:分组查询时补充分片键列,保证分组正确;
      • 聚合补列:聚合(如 COUNT SUM)时补充相关列;
      • 自增主键补列:处理分布式场景下自增主键的生成与填充;
    • 分页修正:分库分表后,分页逻辑可能跨多个分片,需修正分页参数,保证结果正确;

    • 批量拆分:把批量操作(如批量插入、IN 条件包含大量值)拆分成小批量,避免单条 SQL 过大或触发数据库限制;

  • 优化改写:让 SQL 执行更高效。在不影响 SQL 正确性的前提下,提升执行性能。包含:

    • 单节点优化:针对单个数据库节点的 SQL 执行逻辑优化;

    • 流式归并优化:对多分片返回的结果,用流式归并的方式聚合,减少内存占用、提升响应速度(比如多分片的查询结果,边查边合并,而非等所有分片都查完再合并);

在这里插入图片描述

1.5 SQL Executor: SQL执行引擎

  • ShardingSphere 采用一套自动化的执行引擎,负责将路由和改写完成之后的真实 SQL 安全且高效发送到底层数据源执行;

    • ShardingSphere 的执行引擎,不是简单用 JDBC 直连数据库发 SQL,也不是直接把请求丢进线程池并发执行。它更关注平衡资源消耗

      • 控制数据库连接创建的开销;

      • 控制内存占用的消耗;

      • 最大化利用并发能力

    • 最终实现自动化平衡资源控制与执行效率

  • 执行流程

    在这里插入图片描述

    • 准备阶段

      • 结果集分组:把要执行的 SQL 按目标数据源分组(确定哪些 SQL 要发给哪个数据库);
      • 获取连接 & 创建执行单元:为每组 SQL 获取数据库连接,并封装成执行单元(包含 SQL、连接等信息);
      • 锁数据源(可选):若满足“结果集数量≠1 且 内存限制模式”,会锁定数据源(避免并发冲突);
    • 执行阶段

      • 分组执行:按分组,执行每个执行单元里的 SQL;
      • 事件发送:执行过程中,触发分布式事务订阅(保证分布式场景下事务一致性)和性能跟踪订阅(监控执行性能);
      • 查询结果集:以流式内存方式获取结果(流式适合大数据量,减少内存爆仓;内存适合小数据量,提升读取速度);
  • 执行模式由每个数据库连接需执行的 SQL 数量决定,而这个数量的计算公式是:

    每个数据库连接需执行的SQL数量=所有需在该数据库上执行的SQL数量maxConnectionSizePerQuery \text{每个数据库连接需执行的SQL数量} = \frac{\text{所有需在该数据库上执行的SQL数量}}{\text{maxConnectionSizePerQuery}} 每个数据库连接需执行的SQL数量=maxConnectionSizePerQuery所有需在该数据库上执行的SQL数量

    • 所有需在该数据库上执行的SQL数量是路由至该数据源的路由结果;
    • maxConnectionSizePerQuery是用户配置项;
  • 基于这个数量,分为两种模式:

    • 内存限制模式

      • 条件:每个数据库连接需执行的 SQL 数量 ≤ 1(即 = 0 或 1);

      • 逻辑:一个 JDBC 连接只执行 1 条 SQL;ShardingSphere 不限制一次操作消耗的数据库连接总数(比如要执行 10 条 SQL,可能开 10 个连接,每个连执行 1 条);

      • 适合场景:SQL 执行耗时短、并发不高,优先减少单连接压力,用多连接提升并行度;

    • 连接限制模式

      • 条件:每个数据库连接需执行的 SQL 数量 > 1

      • 逻辑:一个 JDBC 连接要执行多条 SQL;ShardingSphere 严格控制一次操作消耗的数据库连接总数(比如要执行 10 条 SQL,可能只开 2 个连接,每个连接执行 5 条);

      • 适合场景:数据库连接资源宝贵(比如连接池大小有限),优先节省连接数,用单连接执行多 SQL 减少连接开销。

1.6 Result Merger:结果归并

  • 当 SQL 涉及多分片(多个数据节点)时,每个分片会返回部分结果。结果归并就是把这些分散的结果集组合成一个完整的结果集,再返回给客户端;

  • 归并引擎会根据 SQL 的分页、分组、排序、聚合等需求,选择不同的归并策略:

    在这里插入图片描述

  • 带分页的场景 → 分页归并:处理需要分页的查询(如 LIMIT 语句),确保最终结果的分页逻辑正确(比如从多分片结果中,筛选出符合页码的记录);

  • 分组/排序/无分组排序的场景

    • 分组归并:若 SQL 有分组GROUP BY)需求,对多分片结果按分组规则合并(又分流式分组归并内存分组归并,取决于排序分组列是否相同);

    • 排序归并:若 SQL 有排序ORDER BY)需求,对多分片结果按排序规则合并;

    • 迭代归并:若 SQL 既无分组也无排序,直接迭代合并多分片结果;

  • 带聚合的场景 → 聚合归并:若 SQL 有聚合函数(如 COUNT SUM AVG MAX MIN),对多分片的聚合结果再做一次聚合:

    • COUNT/SUM → 累加归并(把各分片的计数/求和结果相加);

    • AVG → 平均值归并(基于各分片的计数,计算整体平均值);

    • MAX/MIN → 比较归并(从各分片的最大/最小值中,再选最大/最小);

  • 归并模式的选择,决定了结果如何存储、如何返回,适配不同业务场景:

    • 流式归并

      • 每次从结果集中取一条数据,逐条返回(和数据库原生返回结果集的方式一致);

      • 无需把所有结果都加载到内存,内存消耗小,适合数据量大、需快速返回首条结果的场景(如 OLTP 在线交易,强调低延迟、高并发);

      • 典型场景:遍历、排序、流式分组归并等。通常内存限制模式会用流式归并;

    • 内存归并

      • 把所有分片的结果全部加载到内存,再统一做分组、排序、聚合,最后封装成可逐次访问的结果集返回;

      • 需要更多内存,但能支持更复杂的全局分组、排序、聚合逻辑。适合分析型查询(OLAP)(如报表统计,需处理大量数据做全局计算);

      • 典型场景:通常连接限制模式会用内存归并。

2 ShardingSphere-JDBC 扩展机制

2.1 ShardingSphereDataSource

  • 如何调试 ShardingSphere-JDBC 的源码呢?这就需要一个比较简单明了的测试案例来作为调试代码的入口:

    public class ShardingJDBCDemo {
        public static void main(String[] args) throws SQLException {
    
            // 一、配置数据库连接池:创建两个物理数据库的数据源
            Map<String, DataSource> dataSourceMap = new HashMap<>(2);
            // 配置第一个数据源,对应数据库 shardingdb1
            HikariDataSource dataSource0 = new HikariDataSource();
            dataSource0.setDriverClassName("com.mysql.cj.jdbc.Driver");
            dataSource0.setJdbcUrl("jdbc:mysql://192.168.65.212:3306/shardingdb1?serverTimezone=GMT%2B8&useSSL=false");
            dataSource0.setUsername("root");
            dataSource0.setPassword("root");
            dataSourceMap.put("m0", dataSource0); // 数据源标识为 m0
            
            // 配置第二个数据源,对应数据库 shardingdb2
            HikariDataSource dataSource1 = new HikariDataSource();
            dataSource1.setDriverClassName("com.mysql.cj.jdbc.Driver");
            dataSource1.setJdbcUrl("jdbc:mysql://192.168.65.212:3306/shardingdb2?serverTimezone=GMT%2B8&useSSL=false");
            dataSource1.setUsername("root");
            dataSource1.setPassword("root");
            dataSourceMap.put("m1", dataSource1); // 数据源标识为 m1
            
            // 二、配置分库分表规则:定义数据如何分布到不同的库和表中
            ShardingRuleConfiguration shardingRuleConfig = createRuleConfig();
            
            // 三、配置ShardingSphere属性:开启SQL执行日志显示
            Properties properties = new Properties();
            properties.setProperty("sql-show", "true"); // 显示分片后的真实SQL语句
            
            // TEST:创建ShardingSphere数据源,整合所有配置,创建具有分片功能的数据源
            DataSource dataSource = ShardingSphereDataSourceFactory.createDataSource(dataSourceMap,Collections.singleton(shardingRuleConfig), properties);
    
            // 测试部分
            ShardingJDBCDemo test = new ShardingJDBCDemo();
            // 建表操作(需要时取消注释执行)
            //test.droptable(dataSource);
    		//test.createtable(dataSource);
    
            // 插入测试数据(需要时取消注释执行)
    		//test.addcourse(dataSource);
            
            // TEST(调试的起点):查询数据,验证分片查询功能
            test.querycourse(dataSource);
        }
    
        /**
         * 创建分片规则配置
         * 配置逻辑表course如何映射到物理表(分布在m0和m1两个库,每个库有course_1和course_2两个表)
         */
        private static ShardingRuleConfiguration createRuleConfig(){
            ShardingRuleConfiguration result = new ShardingRuleConfiguration();
            
            // 配置逻辑表course对应的实际数据节点:m0和m1两个库,每个库有course_1和course_2表
            ShardingTableRuleConfiguration courseTableRuleConfig = new ShardingTableRuleConfiguration("course",
                    "m$->{0..1}.course_$->{1..2}");
            
            // 配置分布式ID生成算法(雪花算法)
            Properties snowflakeprop = new Properties();
            snowflakeprop.setProperty("worker.id", "123"); // 设置工作节点ID
            result.getKeyGenerators().put("alg_snowflake", new AlgorithmConfiguration("SNOWFLAKE", snowflakeprop));
            
            // 配置课程表的主键生成策略:使用雪花算法为cid字段生成ID
            courseTableRuleConfig.setKeyGenerateStrategy(new KeyGenerateStrategyConfiguration("cid","alg_snowflake"));
            
            // 配置分库策略:按照cid字段进行分库,使用MOD算法(取模)
            courseTableRuleConfig.setDatabaseShardingStrategy(new StandardShardingStrategyConfiguration("cid","course_db_alg"));
            Properties modProp = new Properties();
            modProp.put("sharding-count",2); // 设置分片数量为2(两个库)
            result.getShardingAlgorithms().put("course_db_alg",new AlgorithmConfiguration("MOD",modProp));
            
            // 配置分表策略:按照cid字段进行分表,使用INLINE表达式算法
            courseTableRuleConfig.setTableShardingStrategy(new StandardShardingStrategyConfiguration("cid","course_tbl_alg"));
            // 分表算法表达式:根据cid计算表名后缀(结果为1或2)
            Properties inlineProp = new Properties();
            inlineProp.setProperty("algorithm-expression", "course_$->{((cid+1)%4).intdiv(2)+1}");
            result.getShardingAlgorithms().put("course_tbl_alg",new AlgorithmConfiguration("INLINE",inlineProp));
    
            result.getTables().add(courseTableRuleConfig);
            return result;
        }
    
        // 添加测试课程数据:插入9条记录,观察ID生成和分片效果
        public void addcourse(DataSource dataSource) throws SQLException {
            for (int i = 1; i < 10; i++) {
                long orderId = executeAndGetGeneratedKey(dataSource, "INSERT INTO course (cname, user_id, cstatus) VALUES ('java'," + i + ", '1')");
                System.out.println("添加课程成功,课程ID:" + orderId);
            }
        }
    
        // 查询课程数据:根据特定cid查询,测试分片查询功能
        public void querycourse(DataSource dataSource) throws SQLException {
            Connection conn = null;
            try {
                // 获取ShardingSphere连接(特殊化的Connection实现)
                conn = dataSource.getConnection();
                // 创建ShardingSphere语句对象
                Statement statement = conn.createStatement();
                String sql = "SELECT cid,cname,user_id,cstatus from course where cid=851198093910081536";
                // 执行查询,获取分片结果集
                ResultSet result = statement.executeQuery(sql);
                while (result.next()) {
                    System.out.println("result:" + result.getLong("cid"));
                }
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (null != conn) {
                    conn.close();
                }
            }
        }
    
        // 通用SQL执行方法
        private void execute(final DataSource dataSource, final String sql) throws SQLException {
            try (
                    Connection conn = dataSource.getConnection();
                    Statement statement = conn.createStatement()) {
                statement.execute(sql);
            }
        }
    
        // 执行SQL并返回生成的主键(用于获取雪花算法生成的ID)
        private long executeAndGetGeneratedKey(final DataSource dataSource, final String sql) throws SQLException {
            long result = -1;
            try (
                    Connection conn = dataSource.getConnection();
                    Statement statement = conn.createStatement()) {
                statement.executeUpdate(sql, Statement.RETURN_GENERATED_KEYS);
                ResultSet resultSet = statement.getGeneratedKeys();
                if (resultSet.next()) {
                    result = resultSet.getLong(1); // 获取生成的主键值
                }
            }
            return result;
        }
    
        /**
         * 表初始化操作
         */
        public void droptable(DataSource dataSource) throws SQLException {
            execute(dataSource, "DROP TABLE IF EXISTS course_1");
            execute(dataSource, "DROP TABLE IF EXISTS course_2");
        }
    
        public void createtable(DataSource dataSource) throws SQLException {
            execute(dataSource, "CREATE TABLE course_1 (cid BIGINT(20) PRIMARY KEY,cname VARCHAR(50) NOT NULL,user_id BIGINT(20) NOT NULL,cstatus varchar(10) NOT NULL);");
            execute(dataSource, "CREATE TABLE course_2 (cid BIGINT(20) PRIMARY KEY,cname VARCHAR(50) NOT NULL,user_id BIGINT(20) NOT NULL,cstatus varchar(10) NOT NULL);");
        }
    }
    
  • 代码中最关键的是 ShardingSphereDataSource(标记为 TEST 处),它是整个分库分表功能的中枢

    • 它实现了 JDBC 的 DataSource 接口,因此可以像普通数据源(如 Druid、Hikari)一样,与 Spring Data、MyBatis 等框架无缝集成(符合 JDBC 规范,无需修改上层代码);
    • 当通过它获取连接(getConnection())、执行 SQL 时,ShardingSphere 会在底层自动完成:
      1. SQL 解析(理解 SQL 要操作什么);
      2. 路由(根据分库分表规则,确定该访问哪些真实库表);
      3. SQL 改写(将逻辑表名改为真实表名等);
      4. 执行(在目标库表上执行 SQL);
      5. 结果归并(将多库表的结果合并为一个)。

2.2 基于 ShardingSphereDataSource 的工作方式

  • 实际上,ShardingSphereDataSource 除了拥有分库分表的功能外,还实现了很多自己的扩展功能。其中最常用的,是他能自己解析配置文件。因此, ShardingSphere-JDBC 其实完全可以脱离 SpringBoot 等框架,以通过标准 JDBC 方式独立运行。例:

    在上一章节10.2 ShardingSphere-JDBC分库分表实战与讲解的案例,实际上是通过基于 SpringBoot 的第三方拓展,来实现解析配置文件、创建数据源等功能;

    public class ShardingJDBCDriverTest {
        @Test
        public void test() throws ClassNotFoundException, SQLException {
            String jdbcDriver = "org.apache.shardingsphere.driver.ShardingSphereDriver";
            String jdbcUrl = "jdbc:shardingsphere:classpath:config.yaml";
            String sql = "select * from sharding_db.course";
    
            // 用 Class.forName 加载 ShardingSphereDriver
            Class.forName(jdbcDriver);
            // 通过 DriverManager.getConnection 连接,URL 指向配置文件 config.yaml
            try(Connection connection = DriverManager.getConnection(jdbcUrl);) {
                // 后续执行 SQL 的流程(createStatement、executeQuery 等)和标准 JDBC 完全一致
                Statement statement = connection.createStatement();
                ResultSet resultSet = statement.executeQuery(sql);
                while (resultSet.next()){
                    System.out.println("course cid= "+resultSet.getLong("cid"));
                }
            }
        }
    }
    
    • 这说明 ShardingSphere 本身是 JDBC 规范的实现,只要提供正确的驱动和配置,就能独立完成分库分表等工作;
  • config.yaml 是 ShardingSphere 的核心配置载体,用 YAML 格式定义分库分表、数据源、全局属性等规则,和之前用 Java 代码配置的逻辑是等价的,只是形式不同。配置文件主要包含以下模块:

    • rules:定义各类规则(如 SHARDING 分库分表规则、TRANSACTION 事务规则、SQL_PARSER SQL 解析规则等)。以 SHARDING 为例,复刻上一章节对course表进行分库分表的功能:
      • 真实表映射(actualDataNodes: m${0..1}.course_${1..2},对应 m0/m1 库的 course_1/course_2 表);
      • 分库策略(按 cid 取模,路由到 m0m1);
      • 分表策略(按 cid 计算,路由到 course_1course_2);
      • 主键生成(雪花算法生成 cid);
    • props:全局属性(如连接池大小、SQL 日志开关、执行器线程数等);
    • databaseName:逻辑数据库名;
    • dataSources:配置真实数据源(如 m0m1 对应的数据库连接信息);
    # 权限规则配置:定义用户权限
    rules:
      - !AUTHORITY  # 权限规则标识
        users:
          - root@%:root  # 用户root,可从任何主机访问,密码root
          - sharding@:sharding  # 用户sharding,无主机限制,密码sharding
        provider:
          type: ALL_PERMITTED  # 权限提供者类型:所有用户拥有所有权限
    
      - !TRANSACTION  # 事务规则配置
        defaultType: XA  # 默认事务类型:XA分布式事务
        providerType: Atomikos  # 事务管理器提供者:Atomikos
    
      - !SQL_PARSER  # SQL解析器配置
        sqlCommentParseEnabled: true  # 启用SQL注释解析
        sqlStatementCache:  # SQL语句缓存配置
          initialCapacity: 2000  # 初始容量2000条
          maximumSize: 65535  # 最大容量65535条
        parseTreeCache:  # 解析树缓存配置
          initialCapacity: 128  # 初始容量128个
          maximumSize: 1024  # 最大容量1024个
    
      - !SHARDING  # 分片规则配置(核心配置)
        tables:
          course:  # 逻辑表course的配置
            actualDataNodes: m${0..1}.course_${1..2}  # 实际数据节点:两个数据库(m0,m1),每个库两个表(course_1,course_2)
            databaseStrategy:  # 分库策略
              standard:
                shardingColumn: cid  # 分库字段:cid
                shardingAlgorithmName: course_db_alg  # 分库算法名称
            tableStrategy:  # 分表策略
              standard:
                shardingColumn: cid  # 分表字段:cid
                shardingAlgorithmName: course_tbl_alg  # 分表算法名称
            keyGenerateStrategy:  # 主键生成策略
              column: cid  # 主键列名
              keyGeneratorName: alg_snowflake  # 主键生成器名称
    
        # 分片算法定义
        shardingAlgorithms:
          course_db_alg:  # 分库算法
            type: MOD  # 取模算法
            props:
              sharding-count: 2  # 分片数量:2个库
          course_tbl_alg:  # 分表算法
            type: INLINE  # 内联表达式算法
            props:
              algorithm-expression: course_$->{cid%2+1}  # 表名计算表达式:cid对2取模后加1
    
        # 主键生成器定义
        keyGenerators:
          alg_snowflake:
            type: SNOWFLAKE  # 使用雪花算法生成分布式ID
    
    # 系统属性配置
    props:
      max-connections-size-per-query: 1  # 每个查询的最大连接数
      kernel-executor-size: 16  # 内核线程池大小,默认无限
      proxy-frontend-flush-threshold: 128  # 代理前端刷新阈值,默认128
      proxy-hint-enabled: false  # 是否启用hint强制路由
      sql-show: false  # 是否显示实际执行的SQL语句
      check-table-metadata-enabled: false  # 是否检查表元数据一致性
      proxy-backend-query-fetch-size: -1  # 代理后端查询获取大小,-1表示使用JDBC驱动最小值
      proxy-frontend-executor-size: 0  # 代理前端执行器大小,0由Netty决定
      proxy-backend-executor-suitable: OLAP  # 代理后端执行器适用类型:OLAP(联机分析处理)
      proxy-frontend-max-connections: 0  # 代理前端最大连接数,0表示无限制
      sql-federation-type: NONE  # SQL联邦查询类型:不启用
      proxy-backend-driver-type: JDBC  # 代理后端驱动类型:JDBC
      proxy-mysql-default-version: 8.0.20  # MySQL默认版本
      proxy-default-port: 3307  # 代理服务器默认端口
      proxy-netty-backlog: 1024  # Netty backlog参数
    
    # 逻辑数据库名称(客户端连接时使用的数据库名)
    databaseName: sharding_db
    
    # 数据源配置:定义物理数据库连接
    dataSources:
      m0:  # 第一个数据源标识
        dataSourceClassName: com.zaxxer.hikari.HikariDataSource  # 使用HikariCP连接池
        url: jdbc:mysql://192.168.65.212:3306/shardingdb1?serverTimezone=UTC&useSSL=false  # 数据库连接URL
        username: root  # 数据库用户名
        password: root  # 数据库密码
        connectionTimeoutMilliseconds: 30000  # 连接超时时间30秒
        idleTimeoutMilliseconds: 60000  # 空闲连接超时时间60秒
        maxLifetimeMilliseconds: 1800000  # 连接最大生命周期30分钟
        maxPoolSize: 50  # 最大连接池大小50
        minPoolSize: 1  # 最小连接池大小1
      
      m1:  # 第二个数据源标识
        dataSourceClassName: com.zaxxer.hikari.HikariDataSource
        url: jdbc:mysql://192.168.65.212:3306/shardingdb2?serverTimezone=UTC&useSSL=false
        username: root
        password: root
        connectionTimeoutMilliseconds: 30000
        idleTimeoutMilliseconds: 60000
        maxLifetimeMilliseconds: 1800000
        maxPoolSize: 50
        minPoolSize: 1
    
  • 那么为什么上一章不直接用这个 YAML 配置文件呢?因为 IDEA 没有提示;

  • 最后,这个配置文件,其实是和 ShardingSphere-Proxy 通用的;

3 ShardingSphere 的 SPI 扩展机制

3.1 从主键生成策略入手

  • SPI(Service Provider Interface)是 Java 提供的服务发现机制:允许第三方(或用户)为接口提供实现类,框架能自动加载这些实现类,从而扩展功能;

  • 一个完整的分库分表方案,要配置的信息还是挺多的。我们要理解配置的各种策略是如何从 ShardingSphere 中扩展出来的,就要先找一个比较简单的目标入手。这里,以主键生成策略为例,抽取 ShardingSphere 中重点源码进行解读:

    package org.apache.shardingsphere.sharding.factory;
    
    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class KeyGenerateAlgorithmFactory {
        // 加载所有主键生成策略
        static {
            ShardingSphereServiceLoader.register(KeyGenerateAlgorithm.class);
            // 内部通过 Java 原生的 ServiceLoader.load() 方法,扫描并加载所有实现了 KeyGenerateAlgorithm 接口的类
        }
        
        // 获取主键生成算法实例
        // newInstance 方法根据配置(如“类型 = SNOWFLAKE”),从加载的实现类中,创建对应的 KeyGenerateAlgorithm 实例
        public static KeyGenerateAlgorithm newInstance(final AlgorithmConfiguration keyGenerateAlgorithmConfig) {
            return ShardingSphereAlgorithmFactory.createAlgorithm(keyGenerateAlgorithmConfig, KeyGenerateAlgorithm.class);
        }
        
        // 判断算法是否存在:contains 方法检查“配置的算法类型(如 SNOWFLAKE)”是否有对应的实现类
        public static boolean contains(final String keyGenerateAlgorithmType) {
            return TypedSPIRegistry.findRegisteredService(KeyGenerateAlgorithm.class, keyGenerateAlgorithmType).isPresent();
        }
    }
    
  • 先来看主键生成策略是如何加载的:ShardingSphereServiceLoader.register(KeyGenerateAlgorithm.class);

    /**
     * ShardingSphere服务加载器 - 基于Java SPI机制的服务发现和注册类
     * 用于动态加载和缓存ShardingSphere的各种扩展实现
     */
    public final class ShardingSphereServiceLoader {
        
        // 使用线程安全的ConcurrentHashMap缓存所有已注册的服务
        // Key: 服务接口的Class对象,Value: 该接口的所有实现类实例集合
        private static final Map<Class<?>, Collection<Object>> SERVICES = new ConcurrentHashMap<>();
        
        /**
         * 注册指定服务接口的所有实现类
         * 使用Java SPI机制自动发现并加载META-INF/services目录下的实现类
         * @param serviceInterface 要注册的服务接口Class对象
         */
        public static void register(final Class<?> serviceInterface) {
            // 双重检查锁定模式:避免重复加载同一接口的实现类
            if (!SERVICES.containsKey(serviceInterface)) {
                // 如果该接口尚未注册,则加载并缓存其所有实现类
                SERVICES.put(serviceInterface, load(serviceInterface));
            }
        }
        
        /**
         * 使用Java SPI机制加载指定接口的所有实现类实例
         * @param <T> 服务接口泛型类型
         * @param serviceInterface 要加载的服务接口Class对象
         * @return 包含所有实现类实例的集合
         */
        private static <T> Collection<Object> load(final Class<T> serviceInterface) {
            Collection<Object> result = new LinkedList<>();
            
            // 使用ServiceLoader加载指定接口的所有实现类
            // ServiceLoader会自动扫描classpath中META-INF/services/目录下的配置文件
            for (T each : ServiceLoader.load(serviceInterface)) {
                result.add(each); // 将每个实现类实例添加到结果集合中
            }
            
            return result;
        }
    }
    
  • 如果想自己写一个主键生成算法(即KeyGenerateAlgorithm的实现类),只需要通过 SPI 的方式让 ShardingSphere 加载进去就行;

    • 先看看KeyGenerateAlgorithm有哪些实现类,比如NanoIdKeyGenerateAlgorithm,它的源码比较简单:

      /**
       * NanoId分布式主键生成算法实现类
       * 实现ShardingSphere的KeyGenerateAlgorithm接口,提供基于NanoId的主键生成功能
       */
      public final class NanoIdKeyGenerateAlgorithm implements KeyGenerateAlgorithm {
          
          // 配置属性对象,用于接收初始化参数
          private Properties props;
      
          /**
           * 默认无参构造函数
           */
          public NanoIdKeyGenerateAlgorithm() {
          }
      
          /**
           * 初始化方法,在算法实例创建后调用
           * @param props 配置属性,可以包含自定义参数(如:字母表、长度等)
           */
          public void init(Properties props) {
              this.props = props; // 保存配置属性供后续使用
          }
      
          /**
           * 生成分布式主键的核心方法
           * @return 返回生成的NanoId字符串作为主键
           */
          public String generateKey() {
              // 使用NanoId工具类生成ID:
              // 1. ThreadLocalRandom.current(): 获取当前线程的随机数生成器(线程安全)
              // 2. NanoIdUtils.DEFAULT_ALPHABET: 使用默认字母表(大小写字母+数字)
              // 3. 21: 生成ID的长度为21个字符
              return NanoIdUtils.randomNanoId(ThreadLocalRandom.current(), NanoIdUtils.DEFAULT_ALPHABET, 21);
          }
      
          /**
           * 获取算法类型标识
           * @return 返回算法类型名称"NANOID",用于配置文件中引用
           */
          public String getType() {
              return "NANOID";
          }
      
          /**
           * 获取配置属性(自动生成的方法)
           * @return 返回当前算法的配置属性
           */
          @Generated
          public Properties getProps() {
              return this.props;
          }
      }
      
    • 接下来仿照着自己实现一下:

      /**
       * 自定义分布式主键生成算法实现类
       * 实现ShardingSphere的KeyGenerateAlgorithm接口,提供基于时间戳+序列号的主键生成方案
       */
      public class MyKeyGeneratorAlgorithm implements KeyGenerateAlgorithm {
      
          // 原子长整型计数器,用于生成序列号,保证线程安全
          private AtomicLong atom = new AtomicLong(0);
      
          // 配置属性对象,用于接收初始化参数
          private Properties props;
      
          /**
           * 生成分布式主键的核心方法
           * @return 返回生成的Long类型主键,格式为:时间戳 + 序列号
           */
          @Override
          public Comparable<?> generateKey() {
              // 获取当前时间
              LocalDateTime ldt = LocalDateTime.now();
              // 格式化时间为时分秒毫秒(HHmmssSSS格式,共8位数字)
              String timestampS = DateTimeFormatter.ofPattern("HHmmssSSS").format(ldt);
              // 组合时间戳和原子递增的序列号,生成最终主键
              return Long.parseLong(""+timestampS+atom.incrementAndGet());
          }
      
          /**
           * 获取配置属性
           * @return 返回当前算法的配置属性
           */
          @Override
          public Properties getProps() {
              return this.props;
          }
        
          /**
           * 获取算法类型标识
           * @return 返回算法类型名称"MYKEY",用于配置文件中引用
           */
          public String getType() {
              return "MYKEY";
          }
          
          /**
           * 初始化方法,在算法实例创建后调用
           * @param props 配置属性,可以包含自定义参数
           */
          @Override
          public void init(Properties props) {
              this.props = props;
          }
      }
      
    • 配置 SPI 扩展文件。在项目的classpath/META-INF/services/(这个目录是 Java 的 SPI 机制加载的固定目录)目录下,创建文件:

      • 文件名:接口的全限定名 → org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm

      • 文件内容:自定义实现类的全限定名 → com.nosy.shardingDemo.algorithm.MyKeyGenerateAlgorithm

    • 在 ShardingSphere 的配置中,指定主键生成策略的 type 为自定义的标识:

      spring.shardingsphere.rules.sharding.key-generators.course_cid_alg.type=MYKEY
      
    • 这样,ShardingSphere 就会通过 SPI 机制,加载并使用 MyKeyGenerateAlgorithm 生成主键;

  • 通过 SPI 机制,用户可以不修改 ShardingSphere 源码,就能扩展其功能(比如自定义分片算法、主键生成算法、加密算法等),让框架更贴合业务需求。

3.2 尝试扩展分片算法

  • 在分库分表场景中,ShardingSphere 提供了内置分片算法(如 MODINLINE 等),但业务可能需要自定义分片逻辑(比如更复杂的路由规则)。此时,可通过 SPI 机制扩展分片算法;

  • 扩展分片算法的步骤

    • 在上一章节10.2 ShardingSphere-JDBC分库分表实战与讲解3.4 CLASS_BASED 自定义分片中实现了自定义分片,即MyComplexAlgorithm类。我们是通过 ShardingSphere 提供的 CLASS_BASED 类型的分片算法配置进去的。实际上,我们也可以使用 ShardingSphere 提供的 SPI 机制配置进去;

    • 在项目的 classpath/META-INF/services/ 目录下,创建文件:

      • 文件名:接口的全限定名 → org.apache.shardingsphere.sharding.spi.ShardingAlgorithm
      • 文件内容:自定义分片算法类的全限定名 → com.roy.shardingDemo.algorithm.MyComplexAlgorithm
      • 这样,ShardingSphere 启动时会通过 SPI 机制,自动加载 MyComplexAlgorithm
    • 如果想要能够被配置文件识别,在MyComplexAlgorithm类中,增加实现getType()方法:

      public class MyComplexAlgorithm implements ComplexKeysShardingAlgorithm<Long> {
          //……
          @Override
          public String getType(){
              return "MYCOMPLEX";
          }
      }
      
    • 在 ShardingSphere 配置分库分表策略时,指定这个我们自己的实现类:

      spring.shardingsphere.rules.sharding.sharding-algorithms.course_tbl_alg.type=MYCOMPLEX
      
  • 通过 SPI 机制,用户可以不修改 ShardingSphere 源码,就能自定义分片逻辑,让框架适配更复杂的业务场景(比如按“多字段组合、特定业务规则”分片)。