SkyWalking + Logstash全链路追踪系统详细实施方案

发布于:2025-07-08 ⋅ 阅读:(25) ⋅ 点赞:(0)

SkyWalking + Logstash全链路追踪系统详细实施方案

一、系统架构与数据流向

核心流程

  1. 数据采集:SkyWalking Agent埋点收集调用链路数据
  2. 日志增强:应用程序通过MDC注入TraceID
  3. 日志收集:Logstash采集应用日志并发送至Elasticsearch
  4. 数据存储:SkyWalking指标数据与日志数据分别存储
  5. 可视化分析:SkyWalking UI展示链路追踪,Kibana分析日志
二、环境准备与部署
1. SkyWalking部署(Docker方式)
version: '3.8'
services:
  elasticsearch:
    image: elasticsearch:7.14.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - es-data:/usr/share/elasticsearch/data
    networks:
      - skywalking-network

  skywalking-oap:
    image: apache/skywalking-oap-server:9.7.0
    container_name: skywalking-oap
    depends_on:
      - elasticsearch
    environment:
      - SW_STORAGE=elasticsearch
      - SW_STORAGE_ES_CLUSTER_NODES=elasticsearch:9200
      - SW_CORE_REST_PORT=12800
      - SW_CORE_GRPC_PORT=11800
    ports:
      - "12800:12800"
      - "11800:11800"
    networks:
      - skywalking-network

  skywalking-ui:
    image: apache/skywalking-ui:9.7.0
    container_name: skywalking-ui
    depends_on:
      - skywalking-oap
    environment:
      - SW_OAP_ADDRESS=skywalking-oap:12800
    ports:
      - "8080:8080"
    networks:
      - skywalking-network

  logstash:
    image: logstash:7.14.0
    container_name: logstash
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml
      - ./logs:/usr/share/logstash/logs
    ports:
      - "5044:5044"
    depends_on:
      - elasticsearch
    networks:
      - skywalking-network

networks:
  skywalking-network:
    driver: bridge

volumes:
  es-data:
2. Logstash配置

logstash.yml

http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
path.config: /usr/share/logstash/pipeline

日志处理管道配置

input {
  file {
    path => ["/usr/share/logstash/logs/*.log"]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate => true
      what => "previous"
    }
  }
}

filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:log_time} \(%{DATA:thread}\)%{LOGLEVEL:level} %{DATA:traceId} \[%{DATA:bizTraceId}\] %{DATA:logger}-%{GREEDYDATA:msg}"
    }
  }
  date {
    match => ["log_time", "yyyy-MM-dd HH:mm:ss.SSS"]
    target => "@timestamp"
  }
  mutate {
    add_field => {
      "service_name" => "iotdata-back"
      "host_ip" => "%{host}"
    }
    remove_field => ["host", "log_time"]
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "iotdata-log-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}
三、应用集成详细步骤
1. Maven依赖配置

父pom.xml

<dependencyManagement>
  <dependencies>
    <!-- SkyWalking依赖管理 -->
    <dependency>
      <groupId>org.apache.skywalking</groupId>
      <artifactId>apm-toolkit-bom</artifactId>
      <version>9.7.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <!-- Logback相关依赖 -->
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.4.8</version>
    </dependency>
    <dependency>
      <groupId>net.logstash.logback</groupId>
      <artifactId>logstash-logback-encoder</artifactId>
      <version>7.4.0</version>
    </dependency>
  </dependencies>
</dependencyManagement>

iotdata-common/pom.xml

<dependencies>
  <!-- SkyWalking工具包 -->
  <dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-logback-1.x</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-trace</artifactId>
  </dependency>
  <!-- AOP相关 -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
  </dependency>
</dependencies>
2. 日志配置

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="30 seconds">
  <!-- 上下文名称 -->
  <contextName>iotdata-back</contextName>
  
  <!-- 日志输出格式 -->
  <property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}(%thread)%-5level %tid [%X{X-BIZ-TRACE-ID}] %logger{50}-%msg%n"/>
  <!-- 日志存储路径 -->
  <property name="LOG_PATH" value="d:/shuiwu/iotdata-back/logs"/>
  
  <!-- 控制台输出 -->
  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
      <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
        <pattern>${LOG_PATTERN}</pattern>
      </layout>
    </encoder>
  </appender>
  
  <!-- 文件输出(滚动) -->
  <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${LOG_PATH}/app.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>${LOG_PATH}/app-%d{yyyy-MM-dd}-%i.log</fileNamePattern>
      <MaxHistory>30</MaxHistory>
      <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
        <maxFileSize>200MB</maxFileSize>
      </timeBasedFileNamingAndTriggeringPolicy>
    </rollingPolicy>
    <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
      <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
        <pattern>${LOG_PATTERN}</pattern>
      </layout>
    </encoder>
  </appender>
  
  <!-- 异步输出 -->
  <appender name="ASYNC_FILE" class="ch.qos.logback.classic.AsyncAppender">
    <discardingThreshold>0</discardingThreshold>
    <queueSize>512</queueSize>
    <appender-ref ref="FILE"/>
  </appender>
  
  <!-- 根日志级别 -->
  <root level="INFO">
    <appender-ref ref="CONSOLE"/>
    <appender-ref ref="ASYNC_FILE"/>
  </root>
  
  <!-- 特定包日志级别 -->
  <logger name="com.iotdata" level="DEBUG" additivity="false">
    <appender-ref ref="CONSOLE"/>
    <appender-ref ref="ASYNC_FILE"/>
  </logger>
</configuration>
3. 核心工具类实现

TraceContextUtil.java - 上下文管理工具

package com.iotdata.common.util;

import org.apache.skywalking.apm.toolkit.trace.TraceContext;
import org.slf4j.MDC;

/**
 * 链路追踪上下文工具类
 * 负责管理SkyWalking TraceID和业务自定义TraceID
 */
public class TraceContextUtil {
    // 业务自定义TraceID的MDC键
    public static final String BIZ_TRACE_ID_KEY = "X-BIZ-TRACE-ID";
    
    /**
     * 设置业务追踪上下文
     * @param requestId 请求ID
     * @param userId 用户ID
     * @param activityId 活动ID
     */
    public static void setTraceContext(String requestId, String userId, String activityId) {
        // 生成业务复合TraceID
        String bizTraceId = String.format("%s_%s_%s", requestId, userId, activityId);
        MDC.put(BIZ_TRACE_ID_KEY, bizTraceId);
        
        // 可选:将SkyWalking原生TraceID存入MDC,便于日志分析
        String swTraceId = TraceContext.traceId();
        MDC.put("X-SW-TRACE-ID", swTraceId);
    }
    
    /**
     * 获取当前业务TraceID
     * @return 业务TraceID
     */
    public static String getBizTraceId() {
        return MDC.get(BIZ_TRACE_ID_KEY);
    }
    
    /**
     * 清除追踪上下文
     * 必须在finally块中调用,防止ThreadLocal内存泄漏
     */
    public static void clearTraceContext() {
        MDC.remove(BIZ_TRACE_ID_KEY);
        MDC.remove("X-SW-TRACE-ID");
    }
    
    /**
     * 复制当前上下文到新线程
     * @return 上下文快照
     */
    public static ContextSnapshot captureContext() {
        return new ContextSnapshot(
            MDC.getCopyOfContextMap()
        );
    }
    
    /**
     * 在新线程中恢复上下文
     * @param snapshot 上下文快照
     */
    public static void restoreContext(ContextSnapshot snapshot) {
        if (snapshot != null && snapshot.getContextMap() != null) {
            MDC.setContextMap(snapshot.getContextMap());
        }
    }
    
    /**
     * 上下文快照类
     * 用于在线程间传递MDC上下文
     */
    public static class ContextSnapshot {
        private final java.util.Map<String, String> contextMap;
        
        public ContextSnapshot(java.util.Map<String, String> contextMap) {
            this.contextMap = contextMap;
        }
        
        public java.util.Map<String, String> getContextMap() {
            return contextMap;
        }
    }
}
4. AOP自动埋点实现

1. 自定义注解

package com.iotdata.common.annotation;

import java.lang.annotation.*;

/**
 * 业务链路追踪注解
 * 用于自动注入追踪上下文
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface BizTrace {
    /**
     * 请求ID参数名
     */
    String requestIdParam() default "requestId";
    
    /**
     * 用户ID参数名
     */
    String userIdParam() default "userId";
    
    /**
     * 活动ID参数名
     */
    String activityIdParam() default "activityId";
    
    /**
     * 是否在SkyWalking中创建新的Span
     */
    boolean createSpan() default true;
    
    /**
     * Span名称
     */
    String spanName() default "";
}

2. AOP切面实现

package com.iotdata.common.aspect;

import com.iotdata.common.annotation.BizTrace;
import com.iotdata.common.util.TraceContextUtil;
import org.apache.skywalking.apm.toolkit.trace.Trace;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

/**
 * 业务链路追踪切面
 * 自动为标记@BizTrace的方法注入追踪上下文
 */
@Aspect
@Component
public class BizTraceAspect {
    private static final Logger logger = LoggerFactory.getLogger(BizTraceAspect.class);
    private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
    
    /**
     * 环绕通知,处理业务追踪上下文
     */
    @Around("@annotation(bizTrace)")
    public Object around(ProceedingJoinPoint joinPoint, BizTrace bizTrace) throws Throwable {
        // 1. 提取方法参数
        Map<String, Object> paramMap = getParamMap(joinPoint);
        
        // 2. 获取业务ID
        String requestId = getParamValue(paramMap, bizTrace.requestIdParam());
        String userId = getParamValue(paramMap, bizTrace.userIdParam());
        String activityId = getParamValue(paramMap, bizTrace.activityIdParam());
        
        // 3. 生成默认requestId(如果未提供)
        if (requestId == null || requestId.trim().isEmpty()) {
            requestId = generateRequestId();
            logger.warn("未提供requestId,自动生成: {}", requestId);
        }
        
        try {
            // 4. 设置追踪上下文
            TraceContextUtil.setTraceContext(requestId, userId, activityId);
            logger.info("业务追踪上下文已设置,bizTraceId: {}", TraceContextUtil.getBizTraceId());
            
            // 5. 是否创建SkyWalking Span
            if (bizTrace.createSpan()) {
                String spanName = bizTrace.spanName();
                if (spanName.isEmpty()) {
                    spanName = joinPoint.getSignature().getName();
                }
                return traceWithSpan(joinPoint, spanName);
            } else {
                // 6. 不创建Span,直接执行
                return joinPoint.proceed();
            }
        } finally {
            // 7. 清理上下文(必须在finally中执行)
            TraceContextUtil.clearTraceContext();
            logger.info("业务追踪上下文已清理");
        }
    }
    
    /**
     * 使用SkyWalking Trace注解创建Span
     */
    private Object traceWithSpan(ProceedingJoinPoint joinPoint, String spanName) throws Throwable {
        // 使用SkyWalking的@Trace注解创建Span
        @Trace
        class TraceHelper {
            Object proceed(ProceedingJoinPoint pjp) throws Throwable {
                return pjp.proceed();
            }
        }
        return new TraceHelper().proceed(joinPoint);
    }
    
    /**
     * 提取方法参数名和值的映射
     */
    private Map<String, Object> getParamMap(ProceedingJoinPoint joinPoint) {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        String[] paramNames = parameterNameDiscoverer.getParameterNames(method);
        Object[] paramValues = joinPoint.getArgs();
        
        Map<String, Object> paramMap = new HashMap<>();
        if (paramNames != null && paramValues != null) {
            for (int i = 0; i < paramNames.length; i++) {
                paramMap.put(paramNames[i], paramValues[i]);
            }
        }
        return paramMap;
    }
    
    /**
     * 获取参数值
     */
    private String getParamValue(Map<String, Object> paramMap, String paramName) {
        if (paramMap.containsKey(paramName)) {
            Object value = paramMap.get(paramName);
            return value != null ? value.toString() : null;
        }
        return null;
    }
    
    /**
     * 生成默认requestId(UUID)
     */
    private String generateRequestId() {
        return java.util.UUID.randomUUID().toString().replace("-", "");
    }
}
5. Web请求拦截器

TraceWebInterceptor.java - 统一处理HTTP请求上下文

package com.iotdata.admin.interceptor;

import com.iotdata.common.util.TraceContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;

/**
 * Web请求追踪拦截器
 * 为所有HTTP请求统一设置追踪上下文
 */
public class TraceWebInterceptor implements HandlerInterceptor {
    private static final Logger logger = LoggerFactory.getLogger(TraceWebInterceptor.class);
    
    // 请求头中的TraceID字段
    private static final String TRACE_ID_HEADER = "X-TRACE-ID";
    private static final String USER_ID_HEADER = "X-USER-ID";
    private static final String ACTIVITY_ID_PARAM = "activityId";
    
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        try {
            // 1. 从请求头获取ID(优先)或参数
            String requestId = request.getHeader(TRACE_ID_HEADER);
            String userId = request.getHeader(USER_ID_HEADER);
            String activityId = request.getParameter(ACTIVITY_ID_PARAM);
            
            // 2. 生成默认requestId(如果未提供)
            if (requestId == null || requestId.trim().isEmpty()) {
                requestId = UUID.randomUUID().toString().replace("-", "");
            }
            
            // 3. 设置响应头,便于前端获取TraceID
            response.setHeader(TRACE_ID_HEADER, requestId);
            
            // 4. 设置追踪上下文
            TraceContextUtil.setTraceContext(requestId, userId, activityId);
            logger.info("Web请求追踪上下文已设置,URL: {}", request.getRequestURI());
            
            return true;
        } catch (Exception e) {
            logger.error("设置Web请求追踪上下文失败", e);
            // 即使失败也继续处理请求
            return true;
        }
    }
    
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
        // 清理上下文
        TraceContextUtil.clearTraceContext();
        logger.info("Web请求追踪上下文已清理,URL: {}", request.getRequestURI());
    }
}

注册拦截器

package com.iotdata.admin.config;

import com.iotdata.admin.interceptor.TraceWebInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class WebConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        // 注册追踪拦截器,对所有请求生效
        registry.addInterceptor(new TraceWebInterceptor())
                .addPathPatterns("/**")
                .excludePathPatterns("/static/**", "/error");
    }
}
四、特殊场景处理方案
1. 异步方法处理

1. 异步方法注解

package com.iotdata.common.annotation;

import java.lang.annotation.*;

/**
 * 异步业务追踪注解
 * 用于标记需要追踪的异步方法
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncBizTrace {
    /**
     * 异步任务名称
     */
    String taskName() default "asyncTask";
}

2. 异步方法切面

package com.iotdata.common.aspect;

import com.iotdata.common.annotation.AsyncBizTrace;
import com.iotdata.common.util.TraceContextUtil;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 异步方法追踪切面
 * 确保异步方法中也能正确传递追踪上下文
 */
@Aspect
@Component
public class AsyncBizTraceAspect {
    private static final Logger logger = LoggerFactory.getLogger(AsyncBizTraceAspect.class);
    
    @Around("@annotation(asyncBizTrace)")
    public Object aroundAsyncMethod(ProceedingJoinPoint joinPoint, AsyncBizTrace asyncBizTrace) throws Throwable {
        // 1. 捕获当前线程的上下文
        TraceContextUtil.ContextSnapshot snapshot = TraceContextUtil.captureContext();
        
        // 2. 创建新的任务,在新线程中恢复上下文
        return ((java.util.concurrent.Callable<?>) () -> {
            try {
                // 恢复上下文
                TraceContextUtil.restoreContext(snapshot);
                logger.info("异步任务[{}]追踪上下文已恢复,bizTraceId: {}", 
                           asyncBizTrace.taskName(), TraceContextUtil.getBizTraceId());
                
                // 执行异步任务
                return joinPoint.proceed();
            } finally {
                // 清理上下文
                TraceContextUtil.clearTraceContext();
                logger.info("异步任务[{}]追踪上下文已清理", asyncBizTrace.taskName());
            }
        }).call();
    }
}

3. 使用示例

@Service
public class OrderServiceImpl implements OrderService {
    @Autowired
    private AsyncTaskService asyncTaskService;
    
    @BizTrace
    public void createOrder(String requestId, String userId, String activityId) {
        // 同步业务逻辑
        // ...
        
        // 调用异步方法
        asyncTaskService.notifyUser(requestId, userId);
    }
}

@Service
public class AsyncTaskService {
    
    @Async
    @AsyncBizTrace(taskName = "notifyUser")
    public CompletableFuture<Void> notifyUser(String requestId, String userId) {
        // 异步业务逻辑
        logger.info("发送订单通知给用户: {}", userId);
        // ...
        return CompletableFuture.runAsync(() -> {
            // 具体异步操作
        });
    }
}
2. MQ消息追踪

1. RabbitMQ生产者拦截器

package com.iotdata.infrastructure.mq;

import com.iotdata.common.util.TraceContextUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Component;

/**
 * RabbitMQ消息生产者拦截器
 * 用于在发送消息时注入追踪上下文
 */
@Component
public class RabbitMqProducerInterceptor implements MessagePostProcessor {
    @Override
    public Message postProcessMessage(Message message) {
        // 获取当前追踪上下文
        String bizTraceId = TraceContextUtil.getBizTraceId();
        String swTraceId = org.apache.skywalking.apm.toolkit.trace.TraceContext.traceId();
        
        // 将上下文信息存入消息头
        if (bizTraceId != null) {
            message.getMessageProperties().setHeader("X-BIZ-TRACE-ID", bizTraceId);
        }
        if (swTraceId != null && !"N/A".equals(swTraceId)) {
            message.getMessageProperties().setHeader("X-SW-TRACE-ID", swTraceId);
        }
        
        return message;
    }
}

2. RabbitMQ消费者拦截器

package com.iotdata.infrastructure.mq;

import com.iotdata.common.util.TraceContextUtil;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Component;

/**
 * RabbitMQ消息消费者拦截器
 * 用于在消费消息时恢复追踪上下文
 */
@Component
public class RabbitMqConsumerInterceptor implements MethodInterceptor {
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // 获取消息参数
        Message message = null;
        for (Object arg : invocation.getArguments()) {
            if (arg instanceof Message) {
                message = (Message) arg;
                break;
            }
        }
        
        // 从消息头恢复上下文
        if (message != null) {
            MessageProperties properties = message.getMessageProperties();
            String bizTraceId = properties.getHeader("X-BIZ-TRACE-ID");
            String swTraceId = properties.getHeader("X-SW-TRACE-ID");
            
            if (bizTraceId != null) {
                // 解析业务TraceID(requestId_userId_activityId)
                String[] parts = bizTraceId.split("_");
                if (parts.length >= 3) {
                    TraceContextUtil.setTraceContext(parts[0], parts[1], parts[2]);
                } else {
                    TraceContextUtil.setTraceContext(bizTraceId, "unknown", "unknown");
                }
            }
        }
        
        try {
            // 执行消费方法
            return invocation.proceed();
        } finally {
            // 清理上下文
            TraceContextUtil.clearTraceContext();
        }
    }
}

3. MQ配置类

package com.iotdata.infrastructure.config;

import com.iotdata.infrastructure.mq.RabbitMqConsumerInterceptor;
import com.iotdata.infrastructure.mq.RabbitMqProducerInterceptor;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.aop.target.SingletonTargetSource;

@Configuration
public class RabbitMqConfig {
    @Autowired
    private RabbitMqProducerInterceptor producerInterceptor;
    
    @Autowired
    private RabbitMqConsumerInterceptor consumerInterceptor;
    
    /**
     * 配置RabbitTemplate,添加生产者拦截器
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        // 添加消息后置处理器,注入追踪上下文
        rabbitTemplate.setBeforePublishPostProcessors(producerInterceptor);
        return rabbitTemplate;
    }
    
    /**
     * 配置消费者容器工厂,添加消费者拦截器
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
    
    /**
     * 为消费者方法创建代理,应用拦截器
     */
    @Bean
    public ProxyFactoryBean rabbitMqConsumerProxy(Object consumerBean) {
        ProxyFactoryBean proxyFactory = new ProxyFactoryBean();
        proxyFactory.setTargetSource(new SingletonTargetSource(consumerBean));
        proxyFactory.addAdvice(consumerInterceptor);
        return proxyFactory;
    }
}
五、应用启动配置
1. 启动脚本配置

Windows启动脚本

@echo off
setlocal enabledelayedexpansion

REM SkyWalking Agent配置
set SW_AGENT_HOME=d:/skywalking-agent
set SW_AGENT_SERVICE_NAME=iotdata-back
set SW_AGENT_COLLECTOR_BACKEND_SERVICES=localhost:11800
set SW_AGENT_SAMPLE_RATE=1
set SW_AGENT_LOG_OUTPUT=FILE
set SW_AGENT_LOG_DIR=d:/shuiwu/iotdata-back/logs/agent

REM JVM参数配置
set JAVA_OPTS=-Xms512m -Xmx1024m -XX:+UseG1GC -XX:MaxGCPauseMillis=200
set JAVA_OPTS=!JAVA_OPTS! -javaagent:!SW_AGENT_HOME!/skywalking-agent.jar
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.agent.service_name=!SW_AGENT_SERVICE_NAME!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.collector.backend_service=!SW_AGENT_COLLECTOR_BACKEND_SERVICES!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.agent.sample_rate=!SW_AGENT_SAMPLE_RATE!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.logging.output=!SW_AGENT_LOG_OUTPUT!
set JAVA_OPTS=!JAVA_OPTS! -Dskywalking.logging.dir=!SW_AGENT_LOG_DIR!

REM 启动应用
java !JAVA_OPTS! -jar iotdata-admin/target/iotdata-admin.jar

endlocal
2. 验证与测试

1. 业务方法使用示例

package com.iotdata.admin.service.impl;

import com.iotdata.admin.service.GroupOrderService;
import com.iotdata.admin.service.InventoryService;
import com.iotdata.admin.service.MQProducerService;
import com.iotdata.common.annotation.BizTrace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class GroupOrderServiceImpl implements GroupOrderService {
    private static final Logger logger = LoggerFactory.getLogger(GroupOrderServiceImpl.class);
    
    @Autowired
    private InventoryService inventoryService;
    
    @Autowired
    private MQProducerService mqProducerService;
    
    /**
     * 创建拼团订单(核心业务方法)
     * 使用@BizTrace注解自动注入追踪上下文
     */
    @Override
    @BizTrace(requestIdParam = "orderId", userIdParam = "userId", activityIdParam = "activityId")
    public void createGroupOrder(String orderId, String userId, String activityId, int productId, int quantity) {
        logger.info("开始处理拼团订单: orderId={}, productId={}, quantity={}", orderId, productId, quantity);
        
        try {
            // 1. 扣减库存
            boolean deductSuccess = inventoryService.deductStock(productId, quantity);
            if (!deductSuccess) {
                logger.error("库存不足,创建订单失败: productId={}, quantity={}", productId, quantity);
                throw new RuntimeException("库存不足");
            }
            logger.info("库存扣减成功: productId={}, quantity={}", productId, quantity);
            
            // 2. 更新拼团记录
            updateGroupRecord(orderId, userId, activityId);
            logger.info("拼团记录更新成功: orderId={}", orderId);
            
            // 3. 发送MQ通知
            mqProducerService.sendGroupSuccessMessage(orderId, userId, activityId);
            logger.info("拼团成功通知已发送: orderId={}", orderId);
            
        } catch (Exception e) {
            logger.error("创建拼团订单失败: orderId={}", orderId, e);
            // 异常处理...
        }
    }
    
    private void updateGroupRecord(String orderId, String userId, String activityId) {
        // 更新拼团记录逻辑
        logger.info("执行拼团记录更新: orderId={}, activityId={}", orderId, activityId);
        // ...
    }
}

2. 测试日志输出

2023-11-15 14:30:45.123(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-开始处理拼团订单: orderId=req12345, productId=1001, quantity=2
2023-11-15 14:30:45.234(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.InventoryServiceImpl-库存扣减成功: productId=1001, quantity=2
2023-11-15 14:30:45.345(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-库存扣减成功: productId=1001, quantity=2
2023-11-15 14:30:45.456(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-执行拼团记录更新: orderId=req12345, activityId=act5678
2023-11-15 14:30:45.567(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-拼团记录更新成功: orderId=req12345
2023-11-15 14:30:45.678(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.MQProducerServiceImpl-发送拼团成功消息: orderId=req12345
2023-11-15 14:30:45.789(main)INFO TID:8f7d6c5b4a3s2d1f [req12345_user67890_act5678] com.iotdata.admin.service.impl.GroupOrderServiceImpl-拼团成功通知已发送: orderId=req12345

3. SkyWalking UI验证

  1. 访问SkyWalking UI: http://localhost:8080
  2. 在"追踪"菜单中搜索业务TraceID或SkyWalking TraceID
  3. 查看完整调用链路和各节点耗时

4. Kibana日志查询

  1. 访问Kibana: http://localhost:5601
  2. 创建索引模式: iotdata-log-*
  3. 在Discover页面搜索特定bizTraceId: req12345_user67890_act5678
  4. 查看完整业务链路日志
六、常见问题与解决方案
问题场景 解决方案
异步线程上下文丢失 使用TraceContextUtil.captureContext()和restoreContext()传递上下文
MQ消息追踪断裂 通过消息头传递TraceID,消费端恢复上下文
日志中没有TraceID 检查logback配置是否使用TraceIdPatternLogbackLayout
SkyWalking无数据 检查agent路径是否正确,collector地址是否可达
高并发下性能问题 调整SkyWalking采样率,使用异步日志输出
七、总结

本方案通过SkyWalking实现分布式链路追踪,结合Logstash+Elasticsearch实现日志集中管理,通过AOP和拦截器实现追踪上下文的自动注入与传递,最终实现了从拼团下单→扣库存→更新拼团记录→MQ通知→支付回调的全链路追踪。方案特点:

  1. 自动埋点:通过AOP注解实现业务方法自动埋点
  2. 全链路覆盖:支持同步、异步、MQ等多种场景
  3. 业务关联:自定义复合TraceID实现业务属性与技术链路的关联
  4. 性能优化:异步日志、可配置采样率等性能保障措施
  5. 易于扩展:模块化设计,支持新增中间件的追踪适配

通过该方案,可以快速定位分布式系统中的问题,分析业务链路性能瓶颈,提升系统可观测性和可维护性。


网站公告

今日签到

点亮在社区的每一天
去签到