springboot在feign和线程池中使用TraceId日志链路追踪(最终版)-2

发布于:2025-03-28 ⋅ 阅读:(37) ⋅ 点赞:(0)

简述

书接前文:SpringBoot使用TraceId日志链路追踪-1

在上文中我们使用了springboot+MDC+Log4j2支持链路ID打印,但是如果我们使用feigin调用其他服务,线程池如何获取链路id呢;这里我们要用到阿里的TTL,在多线程面试中也会常用问到一个问题那就是线程传递问题。刚好这里就实践以下具体用法,并且这也是一个很好的案例。

问题

Spring 默认的日志框架 Logback 中提供的 LogbackMDCAdapter 内部使用的是ThreadLocal,只有本线程才有效,子线程和下游的服务 MDC 里的值会丢失。

主要的难点是解决值传递问题,主要包括以下几个部分:
异步情况下(线程池)如何传递 MDC 中的 TraceId 到子线程
API Gateway 网关中如何传递 MDC 中的 TraceId
微服务之间互相远程调用时如何传递 MDC 中的 TraceId

阿里的TTL组件解决了线程池场景下的上下文传递问题。通过装饰线程池,TTL在任务提交时自动拷贝父线程的上下文到子线程,并在任务结束后清理副本,确保多级线程池调用链路完整。

feign调用时给head加入traceId

1.需要拦截feign的requst请求,并加入自定义的报文头信息
2.需要把这个拦截器配置到feign的配置项中

在application.yml配置文件里面,可以添加feign相关的配置信息,常见的配置信息有如下这些:

loggerLevel:日志级别,四个取值:

  • NONE 不打印日志;
  • BASIC:只打印请求方法、URL、响应状态码、执行时间。
  • HEADERS:打印请求头、响应头的日志信息。
  • FULL:打印所有日志。

连接配置参数

  • connectTimeout:连接超时时间,单位毫秒:ms。
  • readTimeout:读取超时时间,单位毫秒:ms。
  • retryer:重试策略。
  • requestInterceptors:自定义的拦截器,可以多个,是一个List集合。
  • defaultRequestHeaders:默认的请求头信息。
  • defaultQueryParameters:默认的查询参数信息。
  • followRedirects: 是否允许重定向。

这里我们开启HEADERS级别的日志,方便本地服务调用时打印报文头我们可以查看是否TraceId信息;


import feign.RequestInterceptor;
import feign.RequestTemplate;
import gyqx.spd.common.constants.GlobalConstants;
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.jboss.logging.MDC;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.Optional;

/**
 * @version 1.0.0
 * @Date: 2025/3/15 8:49
 * @Description: 自定义feigin请求拦截器加入traceId,甚至我们可以把前端携带的token信息和用户id等报文头也放进来,方便转发给下一个被调用的服务
 */
@Slf4j
public class CustomFeignInterceptor implements RequestInterceptor {


    @Override
    public void apply(RequestTemplate template) {
        // TODO 在这里可以实现一些自定义的逻辑,例如:用户认证
        log.info("Feign执行拦截器....");
        Optional.ofNullable(RequestContextHolder.getRequestAttributes()).map(it -> ((ServletRequestAttributes) it).getRequest())
                .ifPresent(it -> {
                            String traceId = it.getHeader(GlobalConstants.GLOBAL_X_TRACE_ID);

                            if (StringUtil.isEmpty(traceId)) {
                                traceId = MDC.get(GlobalConstants.GLOBAL_X_TRACE_ID) == null ? null : MDC.get(GlobalConstants.GLOBAL_X_TRACE_ID).toString();
                            }

                            if (StringUtil.isNotBlank(traceId)) {
                                MDC.put(GlobalConstants.GLOBAL_X_TRACE_ID, traceId);
                                template.header(GlobalConstants.GLOBAL_X_TRACE_ID, traceId);
                            }
                            log.info("Feign执行拦截器traceId={}", traceId);
                        }
                );

    }
}

FeignConfig配置

一般都是全局配置,每个服务引入依赖自动配置生效,当然如果只测试可以在某个feigin上自己加上


import feign.RequestInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @version 1.0.0
 * @Date: 2024/3/15 8:53
 * @Description: OpenFeign 配置类全局生效(直接放入核心配置,其他服务引入该依赖项即可)
 */
@Configuration
public class FeignConfig {
    /**
     * 注入自定义的拦截器
     */
    @Bean
    public RequestInterceptor requestInterceptor() {
        return new CustomFeignInterceptor();
    }

}

FeignConfig 局部生效

以下方式皆可

  • 把上面的配置文件FeignConfig只放入某个服务中生效(本文就是这种)
  • yam中配置只对某个服务生效时配置
# feign 配置
feign:
  client:
    config:
      # 这里写微服务的服务名称,例如:我这里写的是 service-provider 服务名称
      # 针对 service-provider 微服务的请求,都将执行这些配置信息
      service-provider:
        loggerlevel: full
        # 配置请求拦截器,可以多个
        requestInterceptors:
          - com.gitee.code.interceptor.CustomFeignInterceptor

feign拦截器和配置合并为一个文件(最终版)

上面的写法是拦截器和配置文件分开了,当然我们可以缩写直接合并为一个文件,全局生效即可;


import feign.RequestInterceptor;
import feign.RequestTemplate;
import gyqx.spd.common.constants.GlobalConstants;
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.jboss.logging.MDC;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.Optional;

/**
 * @version 1.0.0
 * @Date: 2024/3/15 8:53
 * @Description: OpenFeign 配置类+拦截器
 */
@Slf4j
@Configuration
public class FeignConfig {
    /**
     * 注入自定义的拦截器
     */
    @Bean
    public RequestInterceptor requestInterceptor() {
        return new RequestInterceptor(){

            @Override
            public void apply(RequestTemplate template) {
                // TODO 在这里可以实现一些自定义的逻辑,例如:用户认证
                log.info("Feign执行拦截器....");
                Optional.ofNullable(RequestContextHolder.getRequestAttributes()).map(it -> ((ServletRequestAttributes) it).getRequest())
                        .ifPresent(it -> {
                                    String traceId = it.getHeader(GlobalConstants.GLOBAL_X_TRACE_ID);

                                    if (StringUtil.isEmpty(traceId)) {
                                        traceId = MDC.get(GlobalConstants.GLOBAL_X_TRACE_ID) == null ? null : MDC.get(GlobalConstants.GLOBAL_X_TRACE_ID).toString();
                                    }
                                    log.info("Feign执行拦截器token:traceId={}", traceId);
                                    if (StringUtil.isNotBlank(traceId)) {
                                        MDC.put(GlobalConstants.GLOBAL_X_TRACE_ID, traceId);
                                        traceId= traceId.substring(traceId.indexOf(":") + 1);//我这里去掉了用户的token信息,不影响使用
                                        template.header(GlobalConstants.GLOBAL_X_TRACE_ID, traceId);
                                    }
                                    log.info("Feign执行拦截器traceId={}", traceId);
                                }
                        );

            }
        };
    }

    @Bean
    Logger.Level feignLoggerLevel() {
        return Logger.Level.FULL;//日志级别全打印-方便调试,后续可以修改级别
    }
}

开启报文打印后,可以看到报文头里面已经有了traceId
在这里插入图片描述

在服务提供方的日志中也可以看到拦截到了
在这里插入图片描述

feign异步调用拦截器配置[不常用]

对于上面的配置对异步阻塞调用,异步非阻塞调用的情况就不再适用,需要单独把请求头获取后放入本地ThreadLocal变量中,也就是有个备份,然后放入异步线程的请求头里面。

详细请参考下面的blog,这种异步的异步项目中很少用到,但是放入本地ThreadLocal变量中的方式倒是很常用,比如我们把当前登录用户的session信息放入当前线程的上下文环境中,方便在业务中获取当前用户的ID,部门,权限等数据。

而这里把请求头的数据用拦截器拿到后放入本地ThreadLocal变量中只是为了方便调用其他feign异步接口时获取到当前线程携带过来的数据防止被覆盖和清空,为给另一个异步操作参数做准备工作。
https://blog.csdn.net/LatiaoCanCode/article/details/144358227

原理比较简单,上文中的TraceId我们也是在拦截器中获取并存入了MDC的ThreadLocal变量,这里其实也是自定义了一个ThreadLocal变量用户存放请求头信息,甚至可以拓展这个ThreadLocal变量为当前线程上下文环境变量CurrentThreadContex专门存放前端调用携带过来的上文数据作为缓存在当前线程中,方便后续操作使用。

对于线程部分的配置则无需参考,没有什么实际意义直接参考下面的自定义线程池使用TraceId

使用TTL自定义线程池

异步请求丢失上文的问题,这些问题追根究底都是ThreadLocal惹得祸。
由于ThreadLocal只能保存当前线程的信息,不能实现父子线程的继承。
说到这,很多人想到了InheritableThreadLocal,确实InheritableThreadLocal能够实现父子线程间传递本地变量,但是你的程序如果采用线程池,则存在着线程复用的情况,这时就不一定能够实现父子线程间传递了,因为在线程在线程池中的存在不是每次使用都会进行创建,InheritableThreadlocal是在线程初始化时intertableThreadLocals=true才会进行拷贝传递

失败样例

@Test
public void test() throws Exception {
    //单一线程池
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    //InheritableThreadLocal存储
    InheritableThreadLocal<String> username = new InheritableThreadLocal<>();
    for (int i = 0; i < 10; i++) {
    username.set("公众号:牧竹子—"+i);
    Thread.sleep(3000);
    CompletableFuture.runAsync(()-> System.out.println(username.get()),executorService);
   }
}

//打印如下
-----------------------
公众号:牧竹子—0
公众号:牧竹子—0
公众号:牧竹子—0
公众号:牧竹子—0

所以若使用的子线程是已经被池化的线程,从线程池中取出线下进行使用,是没有经过初始化的过程,也就不会进行父子线程的本地变量拷贝。
由于在日常应用场景中,绝大多数都是会采用线程池的方式进行资源的有效管理。

为什么需要TransmittableThreadLocal?

在Spring框架中,默认情况下,线程池的任务执行是通过java.util.concurrent包中的ThreadPoolExecutor实现的。然而,如果你想要在使用Spring框架的同时,利用阿里巴巴的TransmittableThreadLocal(TTL)来传递线程局部变量(ThreadLocal)的值到异步任务中,你可以通过自定义线程池配置来实现这一点。

线程中JUC默认父子线程传递的InheritableThreadlocal是在线程初始化时intertableThreadLocals=true才会进行拷贝传递。但是在线程中已经是初始化之后的线程无法获取新的拷贝所以总是初始值。

TransmittableThreadLocal是阿里巴巴开源的库,主要用于解决在使用线程池时,线程局部变量(ThreadLocal)不能正确地传递到异步任务中的问题。这在微服务架构或者使用Spring Boot进行异步处理时尤其重要

样例

@Test
public void test() throws Exception {
    //单一线程池
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    //需要使用TtlExecutors对线程池包装一下
    executorService=TtlExecutors.getTtlExecutorService(executorService);
    //TransmittableThreadLocal创建
    TransmittableThreadLocal<String> username = new TransmittableThreadLocal<>();
    for (int i = 0; i < 10; i++) {
    username.set("公众号:牧竹子—"+i);
    Thread.sleep(3000);
    CompletableFuture.runAsync(()-> System.out.println(username.get()),executorService);
  }
}

//打印如下
-----------------------
公众号:牧竹子—0
公众号:牧竹子—1
公众号:牧竹子—2

可以看到已经能够实现了线程池中的父子线程的数据传递。
在每次调用任务的时,都会将当前的主线程的TTL数据copy到子线程里面,执行完成后,再清除掉。同时子线程里面的修改回到主线程时其实并没有生效。这样可以保证每次任务执行的时候都是互不干涉。

替换 Spring 默认线程池 使用 alibaba 的 TtlRunnable进行替换。

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.14.4</version>
        </dependency>

自定义线程池


import com.alibaba.ttl.TtlRunnable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskThreadPoolConfig {


    @Autowired
    private TaskThreadPoolProperties config;

    @Bean(name = "taskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        //活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //线程名字前缀
        executor.setThreadNamePrefix("MyExecutor-");

        // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(120); //等待任务执行时间,如果超过这个时间还没有销毁就 强制销


        executor.setTaskDecorator(getTraceContextDecorator());//使用TTL包装
        executor.initialize();
        return executor;
    }

private TaskDecorator getTraceContextDecorator() {
        return runnable -> TtlRunnable.get(() -> {
            try {
                //把父级值放入当前线程的MDC本地ThreadLocal中,log4j打印时使用
                MDC.put("traceId", GlobTraceContext.getTraceId());
                runnable.run();
            } finally {
                MDC.clear();
            }
        });
    }

}

因为MDC默认使用的ThreadLocal缓存当前线程的值,因此这里不能在用MDC默认的方式,得使用TTL改写为TransmittableThreadLocal类型放线程变量;

GlobTraceContext


import cn.hutool.core.util.IdUtil;
import com.alibaba.ttl.TransmittableThreadLocal;

/**
 * 基于TransmittableThreadLocal实现线程池安全的TraceID传递
 *
 * @author wnhyang
 * @date 2025/3/3
 **/
public class GlobTraceContext {


    private static final TransmittableThreadLocal<String> TRACE_ID = new TransmittableThreadLocal<>();

    /**
     * 设置TraceID,并同步到Log4j2的MDC
     */
    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }

    public static String getTraceId() {
        return TRACE_ID.get();
    }

    public static void clear() {
        TRACE_ID.remove();
    }

    public static String generateTraceId() {
        return IdUtil.simpleUUID();
    }
}

AsyncService


import gyqx.spd.outside.conf.GlobTraceContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 异步调用service
 * @author summer
 */
@Service
@Slf4j
public class AsyncService {
    /**
     * 使用 @Async 注解 实现异步调用
     * taskExecutor为自定义线程池,指定自定义线程池
     * @return
     * @throws InterruptedException
     */

    @Async("taskExecutor")
    public void async(){
        log.info("async异步任务开始: " + Thread.currentThread().getName());

        try {
            log.info("子线程traceId:: " + GlobTraceContext.getTraceId());
            // 模拟耗时操作(实际工作中,此处写业务逻辑处理)
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("async异步任务完成");
    }
}

AsyncService

@RequestMapping("test")
public class TestController {
    @Autowired
    private AsyncService asyncService;
    @Resource
    private ThreadPoolTaskExecutor taskExecutor;//使用我们自定义的线程池

    @GetMapping("/async")
    public AjaxResult<String> async() {
        String traceId = UUID.randomUUID().toString();
        System.out.println("父亲线程traceId:" +traceId);
        GlobTraceContext.setTraceId(traceId);
        //异步执行
        CompletableFuture.runAsync(() -> asyncService.async(), taskExecutor);

//        asyncService.async();
        log.info("async异步任务调用成功");
        return AjaxResult.ok("async异步任务调用成功");
    }

}

打印结果如下

父亲线程traceId:db9a27e4-549a-4a41-ba7d-25297d875dca
controller.HrpStockController  : async异步任务调用成功
controller.AsyncService     : async异步任务开始: MyExecutor-1
controller.AsyncService     : 子线程traceId:: db9a27e4-549a-4a41-ba7d-25297d875dca
controller.AsyncService     : async异步任务完成

总结

要实现线程池支持的全局链路追踪需要上一章节和本章节feign拦截器中都要使用TransmittableThreadLocal作为替换ThreadLocal类型MDC作为存放链路ID,同样在父子线程中想存放其他值也需要使用TransmittableThreadLocal类型。

这里我们需要把上一章,和本章的LogInterceptor,FeignConfig中的MDC.put前加上GlobTraceContext.setTraceId即可,在线程池配置中我们在包装TTL的时候执行任务前把获取到的,getTraceId()放入当前线程的MDC变量中,这样log4j打印日志即可拿到当前线程的traceId值

在这里插入图片描述

参考和拓展阅读

SpringBoot使用TraceId日志链路追踪-1
https://blog.csdn.net/zjcjava/article/details/146237248

微服务中使用阿里开源的TTL,优雅的实现身份信息的线程间复用
https://developer.aliyun.com/article/12012001056715.html

阿里开源支持缓存线程池的ThreadLocal Transmittable ThreadLocal(TTL)
https://www.cnblogs.com/xiaopotian/p/11056715.html

配置spring/springboot默认的异步线程池
https://www.cnblogs.com/duanxz/p/6084494.html?ivk_sa=1024320u

ThreadLocal父子线程数据传递
https://blog.csdn.net/zjcjava/article/details/125601123