Springboot使用Redis发布订阅自动更新缓存数据源

发布于:2025-02-20 ⋅ 阅读:(41) ⋅ 点赞:(0)

背景

当项目有很多数据源的时候,通常会在启动的时候就把数据源连接加载缓存上,当数据源进行变更后如何自动实时将缓存的数据源进行更新呢?如果是单个项目直接调接口方法就行了,但是涉及到分布式多个系统呢?

解决方案:

使用Redis轻量级消息队列,它可以实现实时通知,实时状态更新等功能,配合AOP实现自动更新数据源状态。

下面结合代码写一个使用示例:

1.首先创建数据源对象

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 *
 * @author ws
 * @since 2022-08-12
 */
@Getter
@Setter
@ToString
@Accessors(chain = true)
@TableName("ed_datasource_info")
public class DatasourceInfo implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;

    /**
     * 数据源编码
     */
    @TableField("datasource_code")
    private String datasourceCode;

    /**
     * 数据源名称
     */
    @TableField("datasource_name")
    private String datasourceName;

    /**
     * 数据源类型
     */
    @TableField("datasource_type")
    private String datasourceType;

    /**
     * 类型 0:数据库 1:Rest-api
     */
    @TableField("type")
    private Integer type;

    /**
     * 创建人
     */
    @TableField("creator")
    private String creator;

    /**
     * 模式
     */
    @TableField("schema_name")
    private String schemaName;


    @TableField("create_time")
    private Date createTime;

    @TableField("update_time")
    private Date updateTime;

    /**
     * 数据源连接信息
     */
    @TableField("link_json")
    private String linkJson;
 
}

2.初始化启动加载数据源

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.sztech.common.constant.DataSourceTypeEnum;
import com.sztech.entity.DatasourceInfo;
import com.sztech.service.DatasourceInfoService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Slf4j
@Component
public class DataSourceRecovery implements InitializingBean {

    @Resource
    private DatasourceInfoService datasourceInfoService;

    @Override
    public void afterPropertiesSet() throws Exception {
        refresh();
    }

    private void refresh() throws Exception{
        this.refresh(null);
    }

    public void refresh(String sourceCode){
        QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("type", DataSourceTypeEnum.DB.getKey());
        if(StringUtils.isNotBlank(sourceCode)){
            queryWrapper.eq("datasource_code",sourceCode);
        }

        List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);
        if(CollectionUtils.isEmpty(list)){
            return;
        }

        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        for(DatasourceInfo datasourceInfo : list){
            new Thread(new ReadloadThread(datasourceInfo, countDownLatch)).start();
        }

        try {
            countDownLatch.await(1,TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            log.error("数据源加载等待超时",e);
        }
    }

    /**
     * 多线程加载数据源,提高启动速度
     */
    static class ReadloadThread implements Runnable {

        private DatasourceInfo datasourceInfo;
        private CountDownLatch countDownLatch;

        public ReadloadThread() {
        }

        public ReadloadThread(DatasourceInfo datasourceInfo,CountDownLatch countDownLatch) {
            this.datasourceInfo = datasourceInfo;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                DataSourceContext.setClientMap(datasourceInfo);
                DataSourceContext.setConfigMap(datasourceInfo.getDatasourceCode(),datasourceInfo);
            }catch (Exception e){
                log.error("datasource:{},加载失败",datasourceInfo.getDatasourceCode(),e);
            }finally {
                countDownLatch.countDown();
            }
        }
    }
}

3.创建DataSourceContext,用于数据源缓存数据源连接

import com.sztech.core.tool.DBTool;
import com.sztech.entity.DatasourceInfo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * User: wangsheng
 * Date: 2022-02-11
 * Time: 14:05
 */
public class DataSourceContext {
    /**
     * 客户端缓存
     */
    private final static Map<String, IClient> clientMap = new ConcurrentHashMap<>();

    /**
     * 数据源配置缓存
     */
    private final static Map<String, DatasourceInfo> configMap = new ConcurrentHashMap<>();

    public static void setClientMap(DatasourceInfo datasourceInfo) {
        if(clientMap.containsKey(datasourceInfo.getDatasourceCode())){
            try {
                clientMap.get(datasourceInfo.getDatasourceCode()).close();
            }catch (Exception ignored){
            }
        }
        clientMap.put(datasourceInfo.getDatasourceCode(),
                DBTool.buildClient(datasourceInfo));
    }

    public static void setConfigMap(String key, DatasourceInfo datasourceInfo) {
        configMap.put(key, datasourceInfo);
    }

    public static void removeClientMap(String key) {
        if(clientMap.containsKey(key)){
            try {
                clientMap.get(key).close();
            }catch (Exception ignored){
            }
        }
        clientMap.remove(key);
    }

    public static void removeConfigMap(String key) {
        configMap.remove(key);
    }

    public static IClient getClientMap(String key) {
        IClient client = clientMap.get(key);
        if(null == client){
            throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));
        }
        return client;
    }

    public static DatasourceInfo getConfigMap(String key) {
        DatasourceInfo datasourceInfo = configMap.get(key);
        if(null == datasourceInfo){
            throw new RuntimeException(String.format("数据源编码:[%s]不存在或被删除...", key));
        }

        return datasourceInfo;
    }
}
package com.sztech.core.tool;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Instance;
import com.sztech.common.constant.ResultEnum;
import com.sztech.common.exception.BizException;
import com.sztech.common.utils.ReflectionUtils;
import com.sztech.common.utils.SpringUtils;
import com.sztech.common.utils.ThreadPoolUtil;
import com.sztech.core.datasource.DataSourceContext;
import com.sztech.core.datasource.IClient;
import com.sztech.core.datasource.rdbms.RdbmsConfig;
import com.sztech.entity.*;
import com.sztech.pojo.dto.ColumnDto;
import com.sztech.pojo.dto.QueryTableDto;
import com.sztech.pojo.dto.TableDto;
import com.sztech.pojo.node.PartitionColumn;
import com.sztech.pojo.vo.*;
import com.sztech.service.CreateTableLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Description:
 * User: wangsheng
 * Date: 2022-08-12
 * Time: 16:59
 */
@Slf4j
public class DBTool {

    /**
     * 建立客户端
     */
    public static IClient buildClient(DatasourceInfo datasourceInfo) {
        IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);
        return client.open(datasourceInfo);
    }

    /**
     * 测试数据源
     *
     * @return
     */
    public static boolean testSource(DatasourceInfo datasourceInfo) {
        IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);
        return client.testSource(datasourceInfo);
    }

    public static List<String> getSchemas(DatasourceInfo datasourceInfo) {
        List<String> schemas = new ArrayList<>();
        Connection conn = null;
        try {
            IClient client = ReflectionUtils.getInstanceFromCache(datasourceInfo.getDatasourceType(), "type", IClient.class);
            Class.forName(client.driverName());
            String linkJson = datasourceInfo.getLinkJson();
            RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);
            conn = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getDecodePassword());
            DatabaseMetaData metadata = conn.getMetaData();
            try (ResultSet resultSet = metadata.getSchemas()) {
                while (resultSet.next()) {
                    String schemaName = resultSet.getString("TABLE_SCHEM");
                    schemas.add(schemaName);
                }
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        }
        return schemas;
    }

    /**
     * 获取驱动名称
     */
    public static String getDriverName(String datasourceType) {
        IClient client = ReflectionUtils.getInstanceFromCache(datasourceType, "type", IClient.class);
        return client.driverName();
    }

    /**
     * 获取表中列信息
     */
    public static List<ColumnDto> getColumns(String datasourceCode, String tableName) {
        return DataSourceContext.getClientMap(datasourceCode).getColumns(tableName);
    }

    /**
     * 获取表中分区列信息
     */
    public static List<String> getPartitionColumns(String datasourceCode, String tableName) {
        return DataSourceContext.getClientMap(datasourceCode).getPartitionColumns(tableName);
    }

    /**
     * 获取表信息
     */
    public static List<String> getTableNames(String datasourceCode, String tableNameLike) {
        return DataSourceContext.getClientMap(datasourceCode).getTableNames(tableNameLike);
    }

    /**
     * 获取表信息
     */
    public static List<TableDto> getTables(String datasourceCode) {
        return DataSourceContext.getClientMap(datasourceCode).getTables();
    }

    /**
     * 获取单个表信息
     */
    public static TableDto getTableByName(String datasourceCode, String tableName) {
        return DataSourceContext.getClientMap(datasourceCode).getTableByName(tableName);
    }

    /**
     * 获取单个表信息(创建时间,字段数)
     */
    public static TableDto getTableField(String datasourceCode, String tableName) {
        return DataSourceContext.getClientMap(datasourceCode).getTableField(tableName);
    }


    /**
     * 获取表信息(获取创建时间)
     *
     * @param dto
     * @return
     */
    public static TableInfoVo getTableData(QueryTableDto dto) {
        IClient client = DataSourceContext.getClientMap(dto.getDataSourceCode());
        return client.getTableInfo(dto.getTableName());
    }


    /**
     * 根据字段type建表
     */
    public static void createTableByColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        List<String> sqls = client.buildTableSql(columnDtos, tableName, true);
        log.info("执行建表语句为:" + JSON.toJSONString(sqls));
        sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
    }

    /**
     * 根据字段type建表
     */
    public static void createTableByNotTransformedColumns(List<ColumnDto> columnDtos, String tableName, String datasourceCode) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        List<String> sqls = client.buildTableSql(columnDtos, tableName, false);
        log.info("执行建表语句为:" + JSON.toJSONString(sqls));
        sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
    }

    /**
     * 创建索引
     * 注: oracle 索引名在整个库里必须唯一 否则建立失败
     *
     * @param datasourceCode 数据源编码
     * @param tableName      表名
     * @param filedNames     filed1,filed2...
     * @param unique         唯一
     */
    public static void createIndex(String datasourceCode, String tableName, String filedNames, Boolean unique) {
        DataSourceContext.getClientMap(datasourceCode).createIndex(tableName, filedNames, unique);
    }

    /**
     * sql校验
     *
     * @param datasourceCode
     * @param sql
     * @param sourceType
     * @return
     */
    public static Map<String, Object> checkSql(String datasourceCode, String sql, String sourceType) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        return client.checkSql(sql, sourceType);
    }

    /**
     * 根据sql创建表
     *
     * @param datasourceCode
     * @param sql
     */
    public static void createTableWithSql(String datasourceCode, String sql) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        log.info("执行建表语句为:" + JSON.toJSONString(sql));
        client.executeCommandSyn(sql, new HashMap<>());
//        DataSourceContext.getClientMap(datasourceCode).createTableWithSql(sql);
    }

    /**
     * 删除表
     *
     * @param datasourceCode
     * @param tableName
     */
    public static void dropTable(String datasourceCode, String tableName) {
        DataSourceContext.getClientMap(datasourceCode).dropTable(tableName);
    }

    /**
     * 单表查询数据
     */
    public static List<Map<String, Object>> selectDataFromTable(String datasourceCode, List<DataTableColumn> columns, String tableName, String search, Integer limit) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.getSelectSql(columns, tableName, search, limit);
        log.info("执行语句:" + selectSql);
        return client.selectDataFromTable(selectSql, null);
    }

    /**
     * 单表查询数据
     */
    public static List<Map<String, Object>> selectFromTable(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.getFormSelectSql(columns, searchColumns, tableName, search, pageNum, pageSize, params);
        log.info("执行语句:" + selectSql);
        return client.selectDataFromTable(selectSql, params);
    }

    /**
     * 单表查询数据
     */
    public static List<Map<String, Object>> selectFromForBackUp(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.selectFromForBackUp(columns, searchColumns, tableName, search, pageNum, pageSize, params);
        log.info("执行语句:" + selectSql);
        return client.selectDataFromTable(selectSql, params);
    }

    /**
     * 单表查询数据
     */
    public static List<Map<String, Object>> selectFromFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, Integer pageNum, Integer pageSize, MapSqlParameterSource params) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.getFormSelectSqlForFile(columns, searchColumns, tableName, search, pageNum, pageSize, params);
        log.info("执行语句:" + selectSql);
        return client.selectDataFromTable(selectSql, params);
    }
    /**
     * 查询单表是否存在文件名
     */
    public static List<Map<String, Object>> getExistOldName(String datasourceCode, String tableName, String search) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.getExistOldName( tableName, search);
        log.info("执行语句:" + selectSql);
        return client.selectDataFromTable(selectSql, null);
    }
    /**
     * 单表查询数据(查询归集表专门使用)
     */
    public static List<Map<String, Object>> selectCollectTable(CollectConditionVo vo) {
        IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());
        // 获取查询语句
        String selectSql = client.getCollectTable(vo);
        log.info("执行语句:" + selectSql);
        return client.selectDataFromTable(selectSql, vo.getParams());
    }

    /**
     * 单表查询数据量
     */
    public static Map<String, Object> getFormCount(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.getCountSql(columns, searchColumns, tableName, search, params);
        log.info("执行语句:" + selectSql);
        return client.getCount(selectSql, params);
    }

    /**
     * 查询区县库表的数据量
     */
    public static Map<String, Object> getCountryCount(String datasourceCode, String tableName,  MapSqlParameterSource params) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql ="select count(1) as count from "+tableName;
        log.info("执行语句:" + selectSql);
        return client.getCount(selectSql, params);
    }

    public static Map<String, Object> getFormCountForFile(String datasourceCode, List<FormColumn> columns, List<FormColumn> searchColumns, String tableName, String search, MapSqlParameterSource params) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        // 获取查询语句
        String selectSql = client.getCountSqlForFile(columns, searchColumns, tableName, search, params);
        log.info("执行语句:" + selectSql);
        return client.getCount(selectSql, params);
    }

    /**
     * 查询表数据量
     */
    public static Long getTableRows(String datasourceCode, String tableName) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        return client.getTableRows(tableName);
    }

    /**
     * 查询表对应分区数据量
     */
    public static Long getTablePartitionRows(String datasourceCode, String tableName, List<PartitionColumn> partitionColumns) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        return client.getTablePartitionRows(tableName, partitionColumns);
    }

    /**
     * 查询表数据量
     */
    public static Integer getTablePhysicalSize(String datasourceCode, String tableName) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        return client.getPhysicalSize(tableName);
    }

    /**
     * 获取表最大值
     *
     * @param datasourceCode 数据源编码
     * @param tableName      表名
     * @param incColumnName  自增列名
     * @return {@link Integer}
     */
    public static Object getMaxValue(String datasourceCode, String tableName, String incColumnName, String condition) {
        return DataSourceContext.getClientMap(datasourceCode).getMaxValue(tableName, incColumnName, condition);
    }

    public static Object getMaxValue(String datasourceCode, String schema, String tableName, String incColumnName, String condition) {
        return DataSourceContext.getClientMap(datasourceCode).getMaxValue(schema, tableName, incColumnName, condition);
    }


    public static Object getMaxTime(String datasourceCode, String schema, String tableName, String incColumnName, String tongId,String condition) {
        return DataSourceContext.getClientMap(datasourceCode).getMaxTime(schema, tableName, incColumnName,tongId, condition);
    }


    /**
     * 字段存在
     *
     * @param datasourceCode 数据源编码
     * @param tableName      表名
     * @param fieldName      字段名
     * @return {@link Boolean}
     */
    public static Boolean fieldExist(String datasourceCode, String tableName, String fieldName) {
        List<ColumnDto> columns = getColumns(datasourceCode, tableName);
        return columns.stream().anyMatch(s -> s.getName().equalsIgnoreCase(fieldName));
    }

    /**
     * 数据预览 获取前十条
     *
     * @return
     */
    public static String dataView(String datasourceCode, String tableName, String condition) {
        return DataSourceContext.getClientMap(datasourceCode).dataView(tableName, condition);
    }

    /**
     * 创建分区临时表
     * odps适用
     */
    public static void createPartitionedTableByColumns(List<ColumnDto> columnDtos, String tableName, String tableComment, String partitionedField, String datasourceCode) {
        DataSourceContext.getClientMap(datasourceCode).createPartitionedTableByColumns(columnDtos, tableName, tableComment, partitionedField);
    }

    /**
     * 同步执行命令
     */
    public static void executeCommandSyn(String datasourceCode, String command, Map<String, Object> params) {
        DataSourceContext.getClientMap(datasourceCode).executeCommandSyn(command, params);
    }

    /**
     * 异步执行命令
     * odps适用
     */
    public static Instance executeCommandASyn(String datasourceCode, String command, Map<String, Object> params) {
        return DataSourceContext.getClientMap(datasourceCode).executeCommandASyn(command, params);
    }

    /**
     * 是否有导出权限
     * odps适用
     *
     * @param datasourceCode 数据源编码
     * @param tableName      表名
     * @return {@link Boolean}
     */
    public static Boolean exportEnable(String datasourceCode, String tableName) {
        return DataSourceContext.getClientMap(datasourceCode).exportEnable(tableName);
    }

    /**
     * 插入单条数据
     *
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer insert(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).insert(vo);
    }

    /**
     * 批量插入数据
     *
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer[] betchInsert(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).betchInsert(vo);
    }

    /**
     * 批量插入数据
     *
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer[] betchInsertByConnection(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).betchInsertByConnection(vo);
    }

    /**
     * 这个方法不需要分装参数,直接传字段名称list就好了
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer[] betchInsertForCommom(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).betchInsertForCommom(vo);
    }

    /**
     * 删除数据
     *
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer delete(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).delete(vo);
    }

    /**
     * 这个删除方法可以自定义条件服号
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer deleteForCommon(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).deleteForCommon(vo);
    }


    public static Integer deleteForFile(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).deleteForFile(vo);
    }

    public static String deleteForPre(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).deleteForPre(vo);
    }

    /**
     * 修改数据
     *
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer update(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).update(vo);
    }



    /**
     * 修改数据
     *
     * @param datasourceCode
     * @param vo
     * @return
     */
    public static Integer updateForFile(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).updateForFile(vo);
    }

    /**
     * 获取表单基本信息
     *
     * @param vo
     * @return
     */
    public static TableMetaDataVo getTableBasicInfo(String datasourceCode, FormTableVo vo) {
        return DataSourceContext.getClientMap(datasourceCode).getTableBasicInfo(vo);
    }

    /**
     * 根据字段type建表
     */
    public static void createCollectTable(List<CatalogColumnInfo> columnDtos, String tableName, String datasourceCode, String tableComment, Boolean ifPartition) {
        IClient client = DataSourceContext.getClientMap(datasourceCode);
        List<String> sqls = client.buildTableSqlForCollect(columnDtos, tableName, tableComment, ifPartition);
        log.info("执行建表语句为:" + JSON.toJSONString(sqls));

        try {
            sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
        } catch (Exception e) {
            e.printStackTrace();
            String message = e.getMessage();
            if (e instanceof BizException) {
                BizException exception = (BizException) e;
                message = exception.getMsg();
            }

            log.error("建表错误=======================>{}:", message);
            ThreadPoolExecutor instance = ThreadPoolUtil.instance();
            String finalMessage = message;
            instance.submit(() -> {
                CreateTableLog createTableLog = new CreateTableLog();
                createTableLog.setErrorLog(finalMessage);
                createTableLog.setParams(JSON.toJSONString(sqls));
                createTableLog.setCode(tableName);
                CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);
                createTableLogService.save(createTableLog);
            });
            throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");
        }

    }

    /**
     * 根据字段type建表
     */
    public static void updateCollectTable(CreateCollectVo vo) {
        IClient client = DataSourceContext.getClientMap(vo.getDatasourceCode());
        List<String> sqls = client.buildTableSqlForUpdate(vo);
        log.info("执行更新表语句为:" + JSON.toJSONString(sqls));

        try {
            sqls.forEach(s -> client.executeCommandSyn(s, new HashMap<>()));
        } catch (Exception e) {
            e.printStackTrace();
            String message = e.getMessage();
            if (e instanceof BizException) {
                BizException exception = (BizException) e;
                message = exception.getMsg();
            }
            log.error("建表错误=======================>{}:", message);
            ThreadPoolExecutor instance = ThreadPoolUtil.instance();
            String finalMessage = message;
            instance.submit(() -> {
                CreateTableLog createTableLog = new CreateTableLog();
                createTableLog.setErrorLog(finalMessage);
                createTableLog.setParams(JSON.toJSONString(sqls));
                createTableLog.setCode(vo.getTableName());
                CreateTableLogService createTableLogService = SpringUtils.getBean(CreateTableLogService.class);
                createTableLogService.save(createTableLog);
            });
            log.info("建表失败了开始准备抛出了-------------------------------------->");
            throw new BizException(ResultEnum.ERROR.getCode(), "建表失败请联系管理员");
        }
    }

    /**
     * 获取数据源下所有表信息(包括表名,表字段数,表创建时间)
     *
     * @param datasourceCode
     * @param tableNameLike
     * @return
     */
    public static List<TableDto> getTablesDetail(String datasourceCode, String tableNameLike, Integer start, Integer pageSize, String specifyTableName) {
        return DataSourceContext.getClientMap(datasourceCode).getTablesDetail(tableNameLike, start, pageSize, specifyTableName);
    }

    /**
     * 获取表数量
     * @param datasourceCode
     * @param tableName
     * @return
     */
    public static Long getTableCountSchema(String datasourceCode, String tableName) {
        return DataSourceContext.getClientMap(datasourceCode).getTableCountSchema(tableName);
    }

    public static Integer getTableColumnCount(String dataSourceCode, String tableName) {
        return DataSourceContext.getClientMap(dataSourceCode).getTableColumnCount(tableName);
    }

    public static Integer getPreTableColumnCount(String dataSourceCode, String tableName) {
        return DataSourceContext.getClientMap(dataSourceCode).getPreTableColumnCount(tableName);
    }

    /**
     * 获取符号
     * @return
     */
    public static String getSymbol(String datasourceCode) {
        return DataSourceContext.getClientMap(datasourceCode).getSymbol();
    }

}


import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class ReflectionUtils {

    private static final Map<String, Set<?>> clazzMap = new ConcurrentHashMap<>();
    private static final ReentrantLock clazzLock = new ReentrantLock();

    /**
     * 通过反射获取接口/抽象类的所有实现类
     * 通过缓存类信息减少查找时间
     * 接口与抽象类必须放在实现类的同级目录或者父目录
     */
    @SuppressWarnings("unchecked")
    public static <T> Set<Class<? extends T>> getReflections(Class<T> clazz) {
        if (clazzMap.containsKey(clazz.getName())) {
            return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());
        }

        try {
            clazzLock.lock();
            if (clazzMap.containsKey(clazz.getName())) {
                return (Set<Class<? extends T>>) clazzMap.get(clazz.getName());
            }

            Reflections reflections = new Reflections(clazz.getPackage().getName());
            Set<Class<? extends T>> subTypesOf = reflections.getSubTypesOf(clazz);
            clazzMap.put(clazz.getName(), subTypesOf);
            return subTypesOf;
        } catch (Exception e) {
            log.error("getReflections error", e);
        } finally {
            clazzLock.unlock();
        }

        return new HashSet<>();
    }


    /**
     * 通过反射获取新对象
     * @param type type
     * @param methodName methodName
     * @param clazz clazz
     * @return <T>
     */
    public static <T> T getInstance(String type, String methodName, Class<T> clazz) {
        Set<Class<? extends T>> set = getReflections(clazz);
        for (Class<? extends T> t : set) {
            try {
                //排除抽象类
                if (Modifier.isAbstract(t.getModifiers())) {
                    continue;
                }
                Object obj = t.getMethod(methodName).invoke(t.newInstance());
                if (type.equalsIgnoreCase(obj.toString())) {
                    return t.newInstance();
                }
            } catch (Exception e) {
                log.error("getInstance error", e);
            }
        }

        throw new RuntimeException("implement class not exist");
    }

    /**
     * 通过反射获取新对象
     * @param type type
     * @param methodName methodName
     * @param clazz clazz
     * @return <T>
     */
    public static <T> T getInstanceFromCache(String type, String methodName, Class<T> clazz) {
        return getInstance(type, methodName, clazz);
    }

}

 client客户接口端适配多种数据源


import com.ws.websocket.entity.DatasourceInfo;


/**
 * Description:
 * User: wangsheng
 * Date: 2022-12-30
 * Time: 10:31
 */
public interface IClient {
    /**
     * 连接数据源
     *
     * @param dataSourceInfo 数据源信息
     * @return {@link IClient}
     */
    IClient open(DatasourceInfo dataSourceInfo);
    /**
     * 关闭数据源
     */
    void close();
    /**
     * 驱动类型
     *
     * @return
     */
    String driverName();
    /**
     * 数据源类型
     *
     * @return {@link String}
     */
    String type();
    /**
     * 测试数据源
     *
     * @param datasourceInfo
     * @return
     */
    boolean testSource(DatasourceInfo datasourceInfo);

}


import com.ws.websocket.entity.DatasourceInfo;
//公共查询
public abstract class AbsClient implements IClient  {
    protected DatasourceInfo datasourceInfo;
}
package com.ws.websocket.util;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

@Slf4j
public abstract class AbsRdbmsClient extends AbsClient {

    protected DruidDataSource druidDataSource;
    @Override
    public IClient open(DatasourceInfo datasourceInfo) {
        RdbmsConfig rdbmsConfig = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);
        DruidDataSource druidDataSource = new DruidDataSource();
        druidDataSource.setInitialSize(5);
        druidDataSource.setMinIdle(30);
        druidDataSource.setMaxActive(300);
        druidDataSource.setMaxWait(10000);

        druidDataSource.setBreakAfterAcquireFailure(true);// 跳出重试循环
        druidDataSource.setConnectionErrorRetryAttempts(3);// 重试三次
        druidDataSource.setTimeBetweenConnectErrorMillis(3000);
        druidDataSource.setLoginTimeout(3);

        druidDataSource.setUrl(rdbmsConfig.getJdbcUrl());
        druidDataSource.setDriverClassName(driverName());
        druidDataSource.setUsername(rdbmsConfig.getUsername());
        //解密
        //  druidDataSource.setPassword(RsaUtils.decode(rdbmsConfig.getPassword()));
        druidDataSource.setPassword(rdbmsConfig.getPassword());
        // 设置 MetaUtil 工具类所需参数
        Properties properties = new Properties();

        properties.put("remarks", "true");
        properties.put("useInformationSchema", "true");
        druidDataSource.setConnectProperties(properties);

        this.druidDataSource = druidDataSource;
        this.datasourceInfo = datasourceInfo;
        return this;
    }

    @Override
    public void close() {
        druidDataSource.close();
    }


    @Override
    public boolean testSource(DatasourceInfo datasourceInfo) {
        Connection connection = null;
        try {
            Class.forName(driverName());
            String linkJson = datasourceInfo.getLinkJson();
            RdbmsConfig rdbmsConfig = JSONObject.parseObject(linkJson).toJavaObject(RdbmsConfig.class);
            connection = DriverManager.getConnection(rdbmsConfig.getJdbcUrl(), rdbmsConfig.getUsername(), rdbmsConfig.getPassword());
            // 有效
            if (connection.isValid(3)) {
                return true;
            } else {
                return false;
            }
        } catch (SQLException e) {
            log.error("数据源测试失败", e);
            return false;
        } catch (ClassNotFoundException e) {
            log.error("未找到驱动信息:{}", driverName());
            return false;
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
    @Data
    class RdbmsConfig  {

        private String jdbcUrl;

        private String username;

        private String password;



        public void setSSL() {
            String lowerCase = this.jdbcUrl.toLowerCase();
            if (!lowerCase.contains("usessl")) {
                if (this.jdbcUrl.contains("?")) {
                    this.jdbcUrl = this.jdbcUrl + "&useSSL=false";
                } else {
                    this.jdbcUrl = this.jdbcUrl + "?useSSL=false";
                }
            }
        }
    }
}
import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Slf4j
public class DmClient extends AbsRdbmsClient {
    private String schema;
    @Override
    public String type() {
        return "DMDB";
    }

    @Override
    public String driverName() {
        return "dm.jdbc.driver.DmDriver";
    }
    @Override
    public IClient open(DatasourceInfo datasourceInfo) {
        RdbmsConfig commonLinkParams = JSONObject.parseObject(datasourceInfo.getLinkJson()).toJavaObject(RdbmsConfig.class);
        this.schema = StringUtils.isNotBlank(datasourceInfo.getSchemaName()) ? datasourceInfo.getSchemaName() : commonLinkParams.getUsername().toUpperCase();
        datasourceInfo.setSchemaName(schema);
        return super.open(datasourceInfo);
    }
    @Override
    public void close() {

    }

    @Override
    public boolean testSource(DatasourceInfo datasourceInfo) {
        return false;
    }
}

4.创建redis订阅数据源操作频道配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

/**
 * @Author: wangsheng
 * @Data: 2022/8/16 16:40
 */
@Slf4j
@Configuration
public class RedisListenerConfig {
    /**
     * 订阅数据源操作频道
     *
     * @param connectionFactory connectionFactory
     * @param dataSourceMonitor 数据源监视器
     * @return RedisMessageListenerContainer
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            DataSourceMonitor dataSourceMonitor){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(dataSourceMonitor, new PatternTopic("DATASOURCE_CHANNEL"));
        log.info(dataSourceMonitor.getClass().getName() + " 订阅频道 {}", "DATASOURCE_CHANNEL");
        return container;
    }
}

5.redis监听数据源操作


import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;

/**
 * Description: redis监听数据源操作
 * User: wangsheng
 * Date: 2022-08-12
 * Time: 17:07
 */
@Slf4j
@Component
public class DataSourceMonitor implements MessageListener {

    @Override
    public void onMessage(Message message, byte[] bytes) {
        JSONObject box = JSONObject.parseObject(new String(message.getBody(), StandardCharsets.UTF_8));
        String operation = box.getString("key");
        if ("SAVE_OR_UPDATE".equals(operation)) {
            // 更新 DataSourceContext
            DatasourceInfo datasourceInfo = box.getObject("value", DatasourceInfo.class);
            if (datasourceInfo.getType().equals(0)) {
                String datasourceCode = datasourceInfo.getDatasourceCode();
                DataSourceContext.setConfigMap(datasourceCode, datasourceInfo);
                DataSourceContext.setClientMap(datasourceInfo);
                log.info("redis 监听到数据源 {} 新增或更新,更新 DataSourceContext 完成", datasourceCode);
            }

        } else {
            String datasourceCode = box.getString("value");
            // 更新 DataSourceContext
            DataSourceContext.removeConfigMap(datasourceCode);
            DataSourceContext.removeClientMap(datasourceCode);
            log.info("redis 监听到数据源 {} 删除,更新 DataSourceContext 完成", datasourceCode);
        }

    }

}

6.创建AOP自动监听数据源变化


import com.alibaba.fastjson.JSONObject;
import com.ws.websocket.entity.DatasourceInfo;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;


/**
 * @Author: wangsheng
 * @Data: 2022/8/15 16:37
 */
@Slf4j
@Aspect
@Component
public class DatasourceAspect {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 新增或编辑数据源时发布 Redis 消息
     */
    @AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.saveOrUpdateDatasourceInfo(..))", returning = "datasourceInfo")
    public void saveOrUpdate(JoinPoint joinPoint, DatasourceInfo datasourceInfo) {
        HashMap<String, Object> box = new HashMap<>(4);
        box.put("key", "SAVE_OR_UPDATE");
        box.put("value", datasourceInfo);
        // 发布 Redis 消息
        stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL",JSONObject.toJSONString(box));
        log.info("新增或更新数据源 {} 方法切面发布 Redis 消息完成", datasourceInfo.getDatasourceCode());
    }

    /**
     * 删除数据源时发布 Redis 消息
     */
    @AfterReturning(value = "execution(* com.ws.service.DatasourceInfoService.deleteDatasourceInfo(..))", returning = "datasourceCode")
    public void delete(JoinPoint joinPoint, String datasourceCode) {
        Map<String, Object> box = new HashMap<>(4);
        box.put("key", "DELETE");
        box.put("value", datasourceCode);
        // 发布 Redis 消息
        stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));
        log.info("删除数据源 {} 方法切面发布Redis消息完成", datasourceCode);
    }
}

这样就解决了数据源连接信息自动加载更新同步的问题,但还是有个问题,当数据源重启后,缓存的连接信息会失效,且AOP无法监听到数据源重启变动,这个时候还需要一个定时任务对数据源进行连接测试,如果失效则重新连接缓存上。

7.创建定时任务

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ws.websocket.entity.DatasourceInfo;
import com.ws.websocket.service.DatasourceInfoService;
import com.ws.websocket.util.DBTool;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;

@Component
@RequiredArgsConstructor
@Slf4j
public class DataSourceRetryConnectSchedule {
    @Resource
    private DatasourceInfoService datasourceInfoService;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    //每2小时执行一次
    @Scheduled(cron = "0 0 */2 * * ?")
    public void RetryConnect() {
        log.info("开始监测数据源连接");
        QueryWrapper<DatasourceInfo> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("type", 0);

        List<DatasourceInfo> list = datasourceInfoService.list(queryWrapper);
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        for (DatasourceInfo datasourceInfo : list) {
            Boolean bb = DBTool.testSource(datasourceInfo);
            if (!bb) {
                log.info("数据源重连{}"+datasourceInfo.getDatasourceName());
                HashMap<String, Object> box = new HashMap<>(4);
                box.put("key", "SAVE_OR_UPDATE");
                box.put("value", datasourceInfo);
                // 发布 Redis 消息
                stringRedisTemplate.convertAndSend("DATASOURCE_CHANNEL", JSONObject.toJSONString(box));
            }
        }
    }
}