Spring Boot应用程序接入ELK-003

发布于:2025-04-10 ⋅ 阅读:(30) ⋅ 点赞:(0)

Spring Boot应用程序接入ELK

一、项目依赖集成

在将Spring Boot应用程序接入ELK日志搜索引擎时,首先要在项目中集成相关依赖:

(一)Logstash依赖

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>5.3</version>
</dependency>

logstash - logback - encoder依赖用于将日志格式化为适合Logstash处理的格式,方便后续日志的收集与解析。

(二)Kafka相关依赖

<dependency>
    <groupId>com.github.danielwegener</groupId>
    <artifactId>logback-kafka-appender</artifactId>
    <version>0.2.0-RC1</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.12.RELEASE</version>
</dependency>

logback - kafka - appender用于在Logback中配置Kafka相关的日志追加器,实现将日志发送到Kafka消息队列。spring - kafka则是Spring Boot与Kafka集成的核心依赖,提供了对Kafka操作的支持 。

二、Logback配置Kafka

logback.xml文件中进行Kafka相关配置:

<conversionRule conversionWord="nanotime" converterClass="com.test.sunchd.log.NanoTimeConverter"/>
<define name="localIp" class="com.test.sunchd.log.LogIpProperty"/>
<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
    <encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
        <providers>
            <pattern>
                <pattern>
                    {
                        "timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}",
                        "name": "%logger",
                        "level": "%level",
                        "ip": "${localIp}",
                        "service": "${applicationName}",
                        "pid": "${PID:-}",
                        "class": "%logger",
                        "method": "%method",
                        "thread": "%thread",
                        "traceId": "%X{traceId:-}",
                        "spanId": "%X{spanId:-}",
                        "parent": "%X{X - B3 - ParentSpanId:-}",
                        "message": "[%level] [${localIp}] [${applicationName}] [%thread] [%logger{36}] [%X{traceId:-}] [%X{spanId:-}] [%X{X - B3 - ParentSpanId:-}] %message"
                    }
                </pattern>
            </pattern>
        </providers>
    </encoder>
    <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostnameKeyingStrategy"/>
    <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
    <producerConfig>bootstrap.servers=10.190.107.136:9092</producerConfig>
    <topic>logs</topic>
    <producerConfig>acks=0</producerConfig>
    <producerConfig>linger.ms=1000</producerConfig>
    <producerConfig>max.block.ms=0</producerConfig>
    <producerConfig>client.id=0</producerConfig>
</appender>
<appender name="FILE-INFO-ASYNC" class="ch.qos.logback.classic.AsyncAppender">
    <discardingThreshold>0</discardingThreshold>
    <queueSize>512</queueSize>
    <appender-ref ref="FILE-INFO"/>
</appender>
<appender name="FILE-ERROR-ASYNC" class="ch.qos.logback.classic.AsyncAppender">
    <discardingThreshold>0</discardingThreshold>
    <queueSize>512</queueSize>
    <appender-ref ref="FILE-ERROR"/>
</appender>
<logger name="com.test.cop" level="DEBUG" additivity="false">
    <appender-ref ref="STDOUT"/>
    <appender-ref ref="FILE-INFO-ASYNC"/>
    <appender-ref ref="FILE-ERROR-ASYNC"/>
    <appender-ref ref="KafkaAppender"/>
</logger>

这里配置了KafkaAppender,使用LoggingEventCompositeJsonEncoder对日志进行JSON格式编码,设置了日志包含的各种字段。keyingStrategy指定按主机名进行键控,deliveryStrategy采用异步发送策略。producerConfig指定了Kafka的服务器地址,topic设置为logs

三、关键类介绍

(一)NanoClockUtils类

package com.test.sunchd.log.utils;

import java.io.Serializable;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import static java.util.Objects.requireNonNull;

public final class NanoClockUtils extends Clock implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final long EPOCH_NANOS = System.currentTimeMillis() * 1000000L;
    private static final long NANO_START = System.nanoTime();
    private static final long OFFSET_NANOS = EPOCH_NANOS - NANO_START;
    private static final NanoClockUtils UTC_INSTANCE = new NanoClockUtils(ZoneOffset.UTC);
    private static final NanoClockUtils DEFAULT_INSTANCE = new NanoClockUtils(ZoneId.systemDefault());
    public static final long NANOS_PER_SECOND = 1_000_000_000L;
    private final ZoneId _zone;

    private NanoClockUtils(final ZoneId zone) {
        _zone = requireNonNull(zone, "zone");
    }

    @Override
    public ZoneId getZone() {
        return _zone;
    }

    @Override
    public NanoClockUtils withZone(final ZoneId zone) {
        return zone.equals(_zone)? this : new NanoClockUtils(zone);
    }

    @Override
    public long millis() {
        return System.currentTimeMillis();
    }

    public long nanos() {
        return System.nanoTime() + OFFSET_NANOS;
    }

    public String nanosStr() {
        return String.valueOf(nanos());
    }

    @Override
    public Instant instant() {
        final long now = nanos();
        return Instant.ofEpochSecond(now / NANOS_PER_SECOND, now % NANOS_PER_SECOND);
    }

    @Override
    public int hashCode() {
        return _zone.hashCode() + 11;
    }

    @Override
    public boolean equals(final Object obj) {
        return obj == this || (obj instanceof NanoClockUtils && ((NanoClockUtils) obj)._zone.equals(_zone));
    }

    @Override
    public String toString() {
        return "NanoClock[" + _zone + "]";
    }

    public static NanoClockUtils system(final ZoneId zone) {
        return new NanoClockUtils(zone);
    }

    public static NanoClockUtils systemUTC() {
        return UTC_INSTANCE;
    }

    public static NanoClockUtils systemDefaultZone() {
        return DEFAULT_INSTANCE;
    }
}

该类继承自Clock并实现Serializable接口,用于处理纳秒级时间相关操作。定义了一些静态常量用于时间计算的偏移等,提供了获取纳秒时间、转换为Instant等方法,还可以根据不同时区创建实例 。

(二)NanoTimeConverter类

package com.test.sunchd.log;

import ch.qos.logback.classic.pattern.ClassicConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.test.sunchd.log.utils.NanoClockUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;

public class NanoTimeConverter extends ClassicConverter {
    @Override
    public String convert(ILoggingEvent event) {
        return NanoClockUtils.systemDefaultZone().nanosStr();
    }

    public static void main(String[] args) {
        String s = NanoClockUtils.systemDefaultZone().nanosStr().substring(0, 13);
        final long l = Long.valueOf(s);
        System.out.println(DateFormatUtils.format(new Date(l), "yyyyMMdd HHmmss"));
    }
}

该类继承自ClassicConverter,在日志格式化时用于获取纳秒时间字符串。convert方法获取系统默认时区下的纳秒时间字符串,main方法用于测试时间字符串的截取和转换 。

(三)LogIpProperty类

package com.test.sunchd.log;

import ch.qos.logback.core.PropertyDefinerBase;
import com.test.sunchd.log.utils.NetWorkUtil;

public class LogIpProperty extends PropertyDefinerBase {
    private static final String ip = NetWorkUtil.getHostAddress();

    @Override
    public String getPropertyValue() {
        return ip;
    }
}

该类继承自PropertyDefinerBase,用于获取本机IP地址并在日志配置中作为属性值使用。通过调用NetWorkUtil.getHostAddress()获取IP地址,并在getPropertyValue方法中返回 。

(四)NetWorkUtil类

package com.test.sunchd.log.utils;

import org.apache.commons.lang3.StringUtils;
import java.net.InetAddress;
import java.net.Inet4Address;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class NetWorkUtil {
    public static final String NETWORK_DOCKER = "docker";
    public static final String NETWORK_BRIDGE = "br";
    public static final String NETWORK_LINUX = "ens";

    public static String getHostAddress() {
        return getLocalHostExactAddress().getHostAddress();
    }

    public static InetAddress getLocalHostExactAddress() {
        try {
            InetAddress candidateAddress = null;
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                NetworkInterface networkInterface = networkInterfaces.nextElement();
                if (networkInterface.isVirtual() || networkInterface.isLoopback() || networkInterface.isPointToPoint()) {
                    continue;
                }
                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress inetAddress = inetAddresses.nextElement();
                    if (inetAddress instanceof Inet4Address) {
                        if (candidateAddress == null) {
                            candidateAddress = inetAddress;
                        } else {
                            // 对每个网卡下的多个IP,再要一个个遍历,找到自己需要的
                            for (InetAddress ipAddress : networkInterface.getInetAddresses()) {
                                if (ipAddress.isLoopbackAddress()) {
                                    continue;
                                }
                                if (ipAddress.isAnyLocalAddress()) {
                                    continue;
                                }
                                // 如果是IPv4地址,就是它了,就是我们要找的
                                if (ipAddress instanceof Inet4Address) {
                                    candidateAddress = ipAddress;
                                    break;
                                }
                            }
                        }
                    }
                }
            }
            // 如果与去loopback环境之外无其它地址了,就挑回环地址吧
            if (candidateAddress == null) {
                candidateAddress = InetAddress.getLocalHost();
            }
            return candidateAddress;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static Set<InetAddress> getLocalIpAddress() {
        Set<InetAddress> result = new HashSet<>();
        try {
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                NetworkInterface networkInterface = networkInterfaces.nextElement();
                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress inetAddress = inetAddresses.nextElement();
                    if (inetAddress instanceof Inet4Address) {
                        result.add(inetAddress);
                    }
                }
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result;
    }

    public static List<InetAddress> getLocalIpAddressFromNetworkInterfaces() throws SocketException {
        List<InetAddress> addresses = new ArrayList<>();
        Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
        if (networkInterfaces == null) {
            return addresses;
        }
        while (networkInterfaces.hasMoreElements()) {
            NetworkInterface networkInterface = networkInterfaces.nextElement();
            Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
            while (inetAddresses.hasMoreElements()) {
                InetAddress inetAddress = inetAddresses.nextElement();
                if (inetAddress instanceof Inet4Address) {
                    addresses.add(inetAddress);
                }
            }
        }
        return addresses;
    }

    public static boolean isBridgeNetwork(NetworkInterface ni) {
        if (ni == null) {
            return false;
        }
        return StringUtils.startsWithIgnoreCase(ni.getName(), NETWORK_BRIDGE);
    }

    public static boolean isDockerNetwork(NetworkInterface ni) {
        if (ni == null) {
            return false;
        }
        return StringUtils.startsWithIgnoreCase(ni.getName(), NETWORK_DOCKER);
    }
}

该类提供了一系列网络相关工具方法。定义了一些网络类型的标识常量,如dockerbrensgetHostAddressgetLocalHostExactAddress方法用于获取本机精确的IP地址,会遍历网络接口排除虚拟、环回等地址。getLocalIpAddressgetLocalIpAddressFromNetworkInterfaces用于获取本机所有IPv4地址,isBridgeNetworkisDockerNetwork用于判断网络接口是否属于特定网络类型 。

四、后续流程

在完成上述配置后,Kafka的主题设置为logslogstash作为消费者从kafkalogs主题获取日志数据,并将其推送至elasticsearch(es)。最终,我们可以在kibana中根据配置的索引模式查询和分析这些日志,实现对Spring Boot应用程序日志的高效管理与分析。
在这里插入图片描述