【PmHub后端篇】PmHub整合TransmittableThreadLocal (TTL)缓存用户数据

发布于:2025-05-10 ⋅ 阅读:(18) ⋅ 点赞:(0)

1 相关理论知识

1.1 ThreadLocal简介

1.1.1 ThreadLocal是什么

ThreadLocal是Java中lang包下的一个类,用于解决多线程下共享变量并发问题。它为每个线程维护独立的变量副本,即同一个变量在不同线程下可赋予不同值,避免多个线程同时访问同一个变量时的冲突。

1.1.2 ThreadLocal与synchronized的区别

synchronized基于锁机制,控制对共享资源的访问,确保线程间数据的一致性和安全性,实现线程间的互斥访问。而ThreadLocal采用空间换时间的方式,为每个线程提供变量副本,实现线程隔离。

synchronized是时间换空间让多个线程排队访问,ThreadLocal是空间换时间为每个线程提供了一份变量的副本,从而实现线程隔离。

1.1.3 ThreadLocal使用场景

ThreadLocal 主要用于实现线程间的数据隔离,其使用场景比较多,下面列举几个比较常见的场景:

  • 用户会话信息:在web应用中,每个请求在独立线程中处理时,可使用ThreadLocal存储用户会话信息,防止不同请求线程间的数据混淆。
  • 数据库连接管理:在线程中保存数据库连接,使每个线程拥有自己的数据库连接实例,避免连接共享问题,提高性能。
  • 格式化工具:像SimpleDateFormat这类线程不安全的工具,可使用ThreadLocal为每个线程提供独立实例,避免线程安全问题。
  • 日志上下文信息传递:在日志记录中,使用ThreadLocal存储请求ID、用户ID等上下文信息,在不同日志记录中共享这些信息。

1.2 ThreadLocal原理

1.2.1 ThreadLocal的内部结构

ThreadLocal是一个泛型类,主要作用是提供一个用于存储线程局部变量的容器。每个线程都有一个 ThreadLocalMap 对象,可以用来存储该线程的所有 ThreadLocal 实例及其对应的值。

其内部主要有三个方法:

  • get():获取线程的threadlocalmap,根据key(当前threadlocal)获取值并返回,若map为空或获取不到值则返回默认值。
  • set(T value):先获取当前线程,获取线程的threadlocalmap(若获取不到则创建一个map),将当前ThreadLocal作为key,value作为值进行设置。
  • initialValue:设置默认初始值,可被继承。

1.2.2 ThreadLocalMap基本结构

ThreadLocalMap包含两个重要部分:一是threadlocal静态内部类;二是key为threadlocal对象的弱引用,目的是将threadlocal对象的生命周期和线程的生命周期解绑。

1.3 TransmittableThreadLocal (TTL)介绍

TTL是阿里巴巴开源的工具库,用于解决Java中ThreadLocal在使用线程池或其他多线程框架(如Executors、ForkJoinPool等)时无法传递父线程上下文的问题。其开源地址为https://github.com/alibaba/transmittable-thread-local 。整个TransmittableThreadLocal库核心功能代码量约1000 SLOC,较为精简。

1.3.1 TTL实现原理

在Java多线程编程中,ThreadLocal在使用线程池时,由于线程复用会导致变量在父线程和子线程之间无法正确传递。InheritableThreadLocal虽能传递父线程变量给子线程,但在线程池环境下仍无法解决线程复用问题。

TTL的工作原理主要包括以下三点:

  • 上下文拷贝:任务提交时,TTL会拷贝当前线程的上下文到任务中。
  • 任务执行前设置上下文:在任务执行前,TTL将拷贝的上下文设置到当前线程中。
  • 任务执行后清理上下文:任务执行完毕后,TTL清理线程中的上下文,防止内存泄漏。

在这里插入图片描述

以下是结合图片对TransmittableThreadLocal(TTL)工作流程的说明:

  1. 创建和设置TTL对象

    • 步骤1:createTtl():业务代码(Biz Code)首先创建一个TransmittableThreadLocal对象。这是使用TTL来传递上下文信息的起始点,通过创建该对象,后续可以利用它来存储和传递特定于线程的上下文数据。
    • 步骤2:setTtlValue():在创建好TransmittableThreadLocal对象后,业务代码向该对象中设置具体的值。这个值通常是需要在多线程环境中跨线程传递的上下文信息,比如用户会话信息、请求标识等。
  2. 包装业务任务

    • 步骤3:createBizTaskRunnable():业务代码创建实际的业务任务,该任务实现了Runnable接口,包含了具体的业务逻辑操作。
    • 步骤4:createTtlRunnableWrapper(Runnable):将上述创建的业务任务(Runnable)包装成TtlRunnable。在这个过程中,会执行captureAllTtlValues()操作 :
      • 步骤4.1:captureAllTtlValues():通过Transmitter工具类来捕获当前线程中所有TransmittableThreadLocal对象的值。
        • 步骤4.1.1:get():从TransmittableThreadLocal对象中获取其存储的值。
        • 步骤4.1.2:copy(value:T):对获取到的值进行拷贝操作。这一步很关键,通过拷贝,确保在后续线程执行时,使用的是独立的、与当前线程上下文相关联的值副本,而不会受到其他线程的干扰 。
  3. 提交任务到线程池

    • 步骤5:submitTtlRunnableToThreadPool():将包装好的TtlRunnable任务提交到线程池(ThreadPool)中。线程池会根据自身的调度策略来安排任务的执行。
  4. 任务执行过程中的上下文处理

    • 步骤6:run():当线程池中的线程调度到该任务时,TtlRunnablerun方法被调用开始执行任务。
      • 步骤6.1:beforeExecute():在实际业务任务执行前,执行一些前置操作,这些操作可能包括对线程上下文环境的进一步准备工作等。
      • 步骤6.2:replayCapturedTtlValues():通过Transmitter工具类,将之前捕获并拷贝的TransmittableThreadLocal的值重新应用到当前执行线程的上下文中。这样,在任务执行过程中,就可以使用到从父线程传递过来的上下文信息。
      • 步骤6.3:run():执行实际包装的业务任务(Runnable)中的业务逻辑,在这个过程中可以使用到已经恢复的TransmittableThreadLocal中的值。
        • 步骤6.3.1:useValueInTTL():在业务逻辑执行过程中,使用TransmittableThreadLocal中存储的上下文值来完成具体的业务操作,例如根据用户会话信息进行权限判断等。
      • 步骤6.4:restoreTtlValuesBeforeReplay():在任务执行完成后,恢复TransmittableThreadLocal的值到之前捕获并应用值之前的状态,以避免对后续任务的上下文产生干扰。
      • 步骤6.5:afterExecute():执行一些后置操作,清理相关资源或者进行一些任务执行完成后的记录等工作 。

整个TTL的工作流程围绕着在多线程环境下,尤其是在线程池场景中,如何准确地捕获、传递和恢复线程的上下文信息,从而保证业务逻辑在不同线程中能够正确使用到所需的上下文数据 。

1.3.2 TTL主要使用场景

  • 分布式追踪:在分布式系统中传递追踪ID,便于日志的关联和问题排查。
  • 事务管理:在分布式事务中传递事务上下文,确保事务的一致性。
  • 上下文信息传递:在多线程环境中传递用户会话、请求上下文等信息。

1.3.3 TTL对比ThreadLocal优势

  • 上下文传递:ThreadLocal仅在当前线程内存储,无法跨线程传递;TTL能够在线程池和多线程框架中传递上下文信息。
  • 线程复用支持:ThreadLocal在线程池复用线程时无法保证变量一致性;TTL支持线程池复用,确保变量在任务间传递和保持一致。
  • 无侵入性:ThreadLocal需手动管理变量设置和清除,容易出错;TTL替换ThreadLocal即可自动管理上下文传递和清除。
  • 集成方便:ThreadLocal适用于简单线程环境;TTL可与各种线程池和多线程框架无缝集成。

2 项目实战

2.1 具体实现流程

在微服务架构中,需求是将用户登录后的信息保存在上下文变量中,并进行跨线程之间传递。

在这里插入图片描述

  • 具体流程如下
    用户登录后会获得token,后续请求携带该token,且token中携带有用户信息。所有请求首先经过网关的过滤器AuthFilter,在AuthFilter中,将用户信息放到请求头中。请求经过网关后,会到达自定义请求头拦截器HeaderInterceptor,在HeaderInterceptor中,取出请求头中的用户信息并放到TTL中,这样链路上的服务就可以直接从TTL中获取用户信息。

TTL(ThreadLocal)的应用主要分为两个层面,以下是具体实现流程:

  • 网关层认证(AuthFilter)
// 关键代码流程:
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
    // 1. 验证token有效性(依赖Redis TTL)
    boolean islogin = redisService.hasKey(getTokenKey(userkey)); // 第66行
    if (!islogin) {
        return unauthorizedResponse(exchange, "登录状态已过期");
    }
    
    // 2. 将认证信息写入请求头
    addHeader(mutate, SecurityConstants.USER_KEY, userkey); // 第77行
    addHeader(mutate, SecurityConstants.DETAILS_USER_ID, userid);
    addHeader(mutate, SecurityConstants.DETAILS_USERNAME, username);
}
  • 业务层上下文传递(HeaderInterceptor)
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    // 1. 从请求头提取网关写入的信息
    SecurityContextHolder.setUserId(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USER_ID));
    SecurityContextHolder.setUserName(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USERNAME));
    SecurityContextHolder.setUserKey(ServletUtils.getHeader(request, SecurityConstants.USER_KEY));

    // 2. 将用户信息存储到ThreadLocal(TTL)
    if (StringUtils.isNotNull(loginUser)) {
        SecurityContextHolder.set(SecurityConstants.LOGIN_USER, loginUser); // 第49行
    }
}
  • 架构流程图解:
客户端请求
  │
  ↓ 
[网关层 AuthFilter]
  │ 1. 校验Redis TTL
  │ 2. 添加认证头信息
  ↓
[业务服务 HeaderInterceptor]
  │ 1. 从请求头提取信息
  │ 2. 存入ThreadLocal(TTL)
  ↓
[Controller/Service]
  │ 使用SecurityContextHolder.getXXX()获取上下文
  • 这种设计的优势:

    • 双重验证机制:网关层做基础认证,业务层做上下文管理
    • 无状态传输:通过请求头传递认证信息,避免会话状态维护
    • 线程级隔离:TTL保证每个请求的用户信息独立存储
    • 自动清理:afterCompletion方法确保TTL内容及时清除(防止内存泄漏)

2.2 完整代码实现

  • com.laigeoffer.pmhub.gateway.filter.AuthFilter
/**
 * 网关鉴权
 *
 * @author canghe
 */
@Component
public class AuthFilter implements GlobalFilter, Ordered {
    private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);

    private static final String BEGIN_VISIT_TIME = "begin_visit_time";//开始访问时间

    // 排除过滤的 uri 地址,nacos自行添加
    @Autowired
    private IgnoreWhiteProperties ignoreWhite;

    @Autowired
    private RedisService redisService;


    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpRequest.Builder mutate = request.mutate();

        String url = request.getURI().getPath();
        // 跳过不需要验证的路径
        if (StringUtils.matches(url, ignoreWhite.getWhites())) {
            return chain.filter(exchange);
        }
        String token = getToken(request);
        if (StringUtils.isEmpty(token)) {
            return unauthorizedResponse(exchange, "令牌不能为空");
        }
        Claims claims = JwtUtils.parseToken(token);
        if (claims == null) {
            return unauthorizedResponse(exchange, "令牌已过期或验证不正确!");
        }
        String userkey = JwtUtils.getUserKey(claims);
        boolean islogin = redisService.hasKey(getTokenKey(userkey));
        if (!islogin) {
            return unauthorizedResponse(exchange, "登录状态已过期");
        }
        String userid = JwtUtils.getUserId(claims);
        String username = JwtUtils.getUserName(claims);
        if (StringUtils.isEmpty(userid) || StringUtils.isEmpty(username)) {
            return unauthorizedResponse(exchange, "令牌验证失败");
        }

        // 设置用户信息到请求
        addHeader(mutate, SecurityConstants.USER_KEY, userkey);
        addHeader(mutate, SecurityConstants.DETAILS_USER_ID, userid);
        addHeader(mutate, SecurityConstants.DETAILS_USERNAME, username);
        // 内部请求来源参数清除(防止网关携带内部请求标识,造成系统安全风险)
        removeHeader(mutate, SecurityConstants.FROM_SOURCE);

        //先记录下访问接口的开始时间
        exchange.getAttributes().put(BEGIN_VISIT_TIME, System.currentTimeMillis());
        
        // Mono.fromRunnable 是非阻塞的,适合在 then 中处理后续的日志逻辑。
        return chain.filter(exchange).then(Mono.fromRunnable(() -> {
            try {
                // 记录接口访问日志
                Long beginVisitTime = exchange.getAttribute(BEGIN_VISIT_TIME);
                if (beginVisitTime != null) {
                    URI uri = exchange.getRequest().getURI();
                    Map<String, Object> logData = new HashMap<>();
                    logData.put("host", uri.getHost());
                    logData.put("port", uri.getPort());
                    logData.put("path", uri.getPath());
                    logData.put("query", uri.getRawQuery());
                    logData.put("duration", (System.currentTimeMillis() - beginVisitTime) + "ms");

                    log.info("访问接口信息: {}", logData);
                    log.info("我是美丽分割线: ###################################################");
                }
            } catch (Exception e) {
                log.error("记录日志时发生异常: ", e);
            }
        }));
    }

    private void addHeader(ServerHttpRequest.Builder mutate, String name, Object value) {
        if (value == null) {
            return;
        }
        String valueStr = value.toString();
        String valueEncode = ServletUtils.urlEncode(valueStr);
        mutate.header(name, valueEncode);
    }

    private void removeHeader(ServerHttpRequest.Builder mutate, String name) {
        mutate.headers(httpHeaders -> httpHeaders.remove(name)).build();
    }

    private Mono<Void> unauthorizedResponse(ServerWebExchange exchange, String msg) {
        log.error("[鉴权异常处理]请求路径:{}", exchange.getRequest().getPath());
        return ServletUtils.webFluxResponseWriter(exchange.getResponse(), msg, HttpStatus.UNAUTHORIZED);
    }

    /**
     * 获取缓存key
     */
    private String getTokenKey(String token) {
        return CacheConstants.LOGIN_TOKEN_KEY + token;
    }

    /**
     * 获取请求token
     */
    private String getToken(ServerHttpRequest request) {
        String token = request.getHeaders().getFirst(TokenConstants.AUTHENTICATION);
        // 如果前端设置了令牌前缀,则裁剪掉前缀
        if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) {
            token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
        }
        return token;
    }

    @Override
    public int getOrder() {
        return -200;
    }
}
  • com.laigeoffer.pmhub.base.security.interceptor.HeaderInterceptor
/**
 * 自定义请求头拦截器,将Header数据封装到线程变量中方便获取
 * 注意:此拦截器会同时验证当前用户有效期自动刷新有效期
 *
 * @author canghe
 */
public class HeaderInterceptor implements AsyncHandlerInterceptor {

    // 需要免登录的路径集合
    private static final Set<String> EXEMPTED_PATHS = new HashSet<>();

    static {
        // 在这里添加所有需要免登录默认展示首页的的路径
        EXEMPTED_PATHS.add("/system/user/getInfo");
        EXEMPTED_PATHS.add("/project/statistics");
        EXEMPTED_PATHS.add("/project/doing");
        EXEMPTED_PATHS.add("/project/queryMyTaskList");
        EXEMPTED_PATHS.add("/project/select");
        EXEMPTED_PATHS.add("/system/menu/getRouters");

    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (!(handler instanceof HandlerMethod)) {
            return true;
        }

        SecurityContextHolder.setUserId(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USER_ID));
        SecurityContextHolder.setUserName(ServletUtils.getHeader(request, SecurityConstants.DETAILS_USERNAME));
        SecurityContextHolder.setUserKey(ServletUtils.getHeader(request, SecurityConstants.USER_KEY));

        String token = SecurityUtils.getToken();
        if (StringUtils.isNotEmpty(token)) {
            LoginUser loginUser = AuthUtil.getLoginUser(token);
            if (StringUtils.isNotNull(loginUser)) {
                AuthUtil.verifyLoginUserExpire(loginUser);
                SecurityContextHolder.set(SecurityConstants.LOGIN_USER, loginUser);
            }
        } else {
            // 首页免登场景展示
            // 检查请求路径是否匹配特定路径
            String requestURI = request.getRequestURI();
            if (isExemptedPath(requestURI)) {
                // 创建一个默认的 LoginUser 对象
                LoginUser defaultLoginUser = createDefaultLoginUser();
                SecurityContextHolder.set(SecurityConstants.LOGIN_USER, defaultLoginUser);
            }
        }
        return true;
    }

    // 判断请求路径是否匹配特定路径
    private boolean isExemptedPath(String requestURI) {
        // 你可以根据需要调整特定路径的匹配逻辑
        return EXEMPTED_PATHS.stream().anyMatch(requestURI::startsWith);
    }

    // 创建一个默认的 LoginUser 对象
    private LoginUser createDefaultLoginUser() {
        LoginUser defaultLoginUser = new LoginUser();
        defaultLoginUser.setUserId(173L);  // 设置默认的用户ID
        defaultLoginUser.setUsername(Constants.DEMO_ACCOUNT);  // 设置默认的用户名

        SysUser demoSysUser = new SysUser();
        demoSysUser.setUserId(173L);
        demoSysUser.setUserName(Constants.DEMO_ACCOUNT);
        demoSysUser.setDeptId(100L);
        demoSysUser.setStatus("0");

        defaultLoginUser.setUser(demoSysUser);
        // 设置其他必要的默认属性
        return defaultLoginUser;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
            throws Exception {
        SecurityContextHolder.remove();
    }
}

3 总结

文章介绍了ThreadLocal,其可解决多线程共享变量并发问题,与synchronized不同。还阐述了TransmittableThreadLocal(TTL),能解决ThreadLocal在多线程框架中上下文传递问题,有分布式追踪等使用场景。项目实战展示了用户信息在微服务中跨线程传递的实现代码。

4 参考链接

  1. PmHub整合TransmittableThreadLocal (TTL)缓存用户数据
  2. 项目仓库(GitHub)
  3. 项目仓库(码云)

网站公告

今日签到

点亮在社区的每一天
去签到