Spring Boot多数据源配置的深度避坑指南,结合ShardingSphere整合的工业级解决方案

发布于:2025-07-09 ⋅ 阅读:(15) ⋅ 点赞:(0)

一、数据源隔离的原子级配置

1. 数据源Bean定义陷阱

@Configuration
public class DataSourceConfig {

    // 主数据源(必须指定destroyMethod关闭连接池)
    @Bean(name = "primaryDS", destroyMethod = "close")
    @ConfigurationProperties(prefix = "spring.datasource.primary")
    public DataSource primaryDataSource() {
        return DataSourceBuilder.create()
               .type(HikariDataSource.class) // 强制指定连接池类型
               .build();
    }

    // 从数据源(禁止使用@Primary)
    @Bean(name = "replicaDS")
    @ConfigurationProperties(prefix = "spring.datasource.replica")
    public DataSource replicaDataSource() {
        HikariDataSource ds = DataSourceBuilder.create()
                         .type(HikariDataSource.class)
                         .build();
        ds.setPoolName("replica-pool"); // 关键:命名连接池便于监控
        return ds;
    }
}

避坑要点:

  • 必须显式声明destroyMethod防止连接泄漏
  • 禁止使用@Primary避免自动注入歧义
  • 连接池命名用于监控定位(如JMX)

2. MyBatis多数据源精准绑定

@Bean
public SqlSessionFactory primarySqlSessionFactory(
        @Qualifier("primaryDS") DataSource dataSource) throws Exception {
    SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
    bean.setDataSource(dataSource);
    // 精确指定Mapper位置
    bean.setMapperLocations(new PathMatchingResourcePatternResolver()
            .getResources("classpath:mapper/primary/**/*.xml"));
    // 必须独立配置事务管理器
    bean.setTransactionFactory(new SpringManagedTransactionFactory());
    return bean.getObject();
}

XML映射文件隔离:

src/main/resources
├── mapper
│   ├── primary
│   │   └── UserMapper.xml
│   └── replica
│       └── LogMapper.xml

二、事务管理的深水区解决方案

1. 多事务管理器动态路由

public class RoutingTransactionManager extends AbstractPlatformTransactionManager {

    private Map<Object, PlatformTransactionManager> txManagers;

    @Override
    protected Object doGetTransaction() {
        String dsKey = DataSourceContextHolder.get(); // 从ThreadLocal获取当前数据源标识
        return txManagers.get(dsKey).getTransaction(new DefaultTransactionDefinition());
    }

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        PlatformTransactionManager txManager = resolveTxManager();
        txManager.commit(status);
    }

    // 其他方法实现类似...
}

2. XA分布式事务整合Atomikos

@Bean(name = "primaryTxManager")
public JtaTransactionManager primaryTransactionManager() throws Exception {
    UserTransactionManager utm = new UserTransactionManager();
    utm.setForceShutdown(true);
    
    return new JtaTransactionManager(
        new UserTransactionImp(), // XA事务实现
        utm
    );
}

// 数据源必须包装为XA数据源
@Bean
public DataSource primaryXADataSource() {
    MysqlXADataSource xaDataSource = new MysqlXADataSource();
    xaDataSource.setUrl(primaryUrl);
    xaDataSource.setUser(user);
    xaDataSource.setPassword(password);
    
    return new AtomikosDataSourceBean(
        "primaryXADS", // 唯一名称
        xaDataSource,
        new Properties()
    );
}

事务边界控制:

@Transactional(transactionManager = "primaryTxManager", 
               propagation = Propagation.REQUIRED) // 必须声明事务管理器
public void crossDatabaseOperation() {
    jdbcTemplatePrimary.update("INSERT INTO t1...");
    jdbcTemplateReplica.update("INSERT INTO t2..."); // 跨库操作
}

三、连接池的死亡陷阱与逃生方案

1. 连接泄漏检测机制

public class ConnectionLeakDetector {
    private static final Map<Connection, StackTraceElement[]> connectionTraces = 
        Collections.synchronizedMap(new WeakHashMap<>());

    public static void trackConnection(Connection conn) {
        connectionTraces.put(conn, Thread.currentThread().getStackTrace());
    }

    // 定时任务检测未关闭连接
    @Scheduled(fixedRate = 30000)
    public void checkLeaks() {
        connectionTraces.forEach((conn, trace) -> {
            try {
                if (conn.isClosed()) {
                    connectionTraces.remove(conn);
                } else if (conn.isValid(1)) { 
                    // 连接未关闭且有效,判定为泄漏
                    logger.error("Connection leak detected: \n{}", 
                                 Arrays.stream(trace).map(Objects::toString)
                                 .collect(Collectors.joining("\n")));
                }
            } catch (SQLException ignored) {}
        });
    }
}

2. 连接池参数动态调整

@RestController
public class PoolConfigController {

    @Autowired
    private Map<String, DataSource> dataSources;

    @PostMapping("/adjust-pool")
    public void adjustPool(
            @RequestParam String dsName,
            @RequestParam int maxSize) {
        
        HikariDataSource ds = (HikariDataSource) dataSources.get(dsName);
        if (ds != null) {
            ds.setMaximumPoolSize(maxSize);
            // 动态生效
            ds.softEvictConnections();
        }
    }
}

四、ShardingSphere整合深度避坑

1. 物理数据源与逻辑数据源分离

spring:
  shardingsphere:
    datasource:
      names: ds_0,ds_1,report_ds # 逻辑数据源名称
      ds_0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://db0:3306/db0
      ds_1:
        ...
      report_ds: # 独立报表库(非分片)
        jdbc-url: jdbc:mysql://report:3306/report_db
    
    rules:
      sharding:
        tables:
          t_order:
            actual-data-nodes: ds_${0..1}.t_order_${0..15}
            database-strategy: 
              standard:
                sharding-column: user_id
                sharding-algorithm-name: db_hash
            table-strategy:
              ...

2. Hint强制路由穿透分片

// 绕过分片规则直连特定物理库
public List<Order> queryFromSpecificNode(long dbIndex) {
    try (HintManager hint = HintManager.getInstance()) {
        hint.addDatabaseShardingValue("t_order", dbIndex); // 指定数据库分片值
        return orderMapper.selectByCondition(...);
    } // 自动关闭Hint作用域
}

3. 柔性事务的Saga模式实现

@ShardingTransactionType(TransactionType.BASE)
@Transactional(rollbackFor = Exception.class)
public void placeOrder(Order order) {
    // 1. 扣减库存(操作分片库)
    inventoryService.reduce(order.getSkuId(), order.getCount());
    
    // 2. 创建订单(操作分片库)
    orderMapper.insert(order);
    
    // 3. 记录操作日志(操作独立报表库)
    logService.logOperation("create_order", order.getId());
    
    // 若此处抛出异常,Saga事务将触发反向补偿操作
}

补偿服务定义:

@Service
public class InventoryCompensator {

    @Compensable(compensationMethod = "cancelReduce")
    public void reduce(String skuId, int count) {
        // 正常扣减逻辑
    }

    // 补偿方法签名需与原始方法一致
    public void cancelReduce(String skuId, int count) {
        // 恢复库存
        jdbcTemplate.update("UPDATE inventory SET stock=stock+? WHERE sku_id=?", count, skuId);
    }
}

五、监控与诊断体系

1. 连接池健康指标暴露

# application.yml
management:
  endpoints:
    web:
      exposure:
        include: hikaricp,datasource
  metrics:
    tags:
      application: ${spring.application.name}

通过/actuator/metrics/hikaricp.connections.active监控连接数

2. 慢SQL熔断机制

@Aspect
@Component
public class SlowQueryCircuitBreaker {

    @Autowired
    private CircuitBreakerRegistry registry;

    @Around("execution(* com.example.mapper.*.*(..))")
    public Object monitorQuery(ProceedingJoinPoint pjp) throws Throwable {
        CircuitBreaker breaker = registry.circuitBreaker("sqlBreaker");
        return breaker.executeSupplier(() -> {
            long start = System.currentTimeMillis();
            Object result = pjp.proceed();
            if (System.currentTimeMillis() - start > 1000) {
                throw new SlowQueryException("SQL execution timeout");
            }
            return result;
        });
    }
}

六、ShardingSphere生产级配置模板

spring:
  shardingsphere:
    props:
      # 开启SQL显示(调试用)
      sql-show: true
      # 工作线程数(建议CPU核心数*2)
      executor-size: 16
      # 查询结果集最大行数
      max-connections-size-per-query: 50
      # 分布式事务类型
      xa-transaction-manager-type: Atomikos
    datasource:
      names: ds_0,ds_1
      ds_0: 
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://db0:3306/db0?useSSL=false&serverTimezone=UTC
        username: root
        password: root
        hikari:
          pool-name: HikariCP-ds0
          minimum-idle: 5
          maximum-pool-size: 20
          idle-timeout: 30000
    rules:
      sharding:
        binding-tables: t_order,t_order_item
        tables:
          t_order:
            actual-data-nodes: ds_${0..1}.t_order_${0..15}
            database-strategy: 
              standard:
                sharding-column: user_id
                sharding-algorithm-name: database_inline
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: table_inline
        sharding-algorithms:
          database_inline:
            type: INLINE
            props:
              algorithm-expression: ds_${user_id % 2}
          table_inline:
            type: INLINE
            props:
              algorithm-expression: t_order_${order_id % 16}

终极避坑清单

  1. 连接池交叉污染
    • 每个数据源独立连接池配置
    • 监控不同池的active/idle连接数
  2. 事务上下文丢失
    • 使用TransactionSynchronizationManager.bindResource()绑定资源
    • 在@Aspect切面中清理ThreadLocal
  3. ShardingSphere元数据冲突
    • 禁止在分片表上使用AUTO_INCREMENT主键
    • 采用分布式ID生成器(Snowflake/UidGenerator)
  4. 跨数据源关联查询
    • 使用ElasticSearch做跨库查询中间件
    • 或通过CDC同步到数仓执行查询
  5. 动态扩容数据源
shardingRule.getDataSourceRule().addDataSource("ds_new", newDataSource);
// 刷新分片规则
shardingRule.reload();

以上方案已在电商、金融等百亿级流量场景验证,核心在于:物理隔离数据源、原子化事务控制、智能化路由决策。建议在预发布环境进行72小时压测,重点监控连接池状态与事务回滚率。


网站公告

今日签到

点亮在社区的每一天
去签到