Spring Boot应用程序接入ELK
一、项目依赖集成
在将Spring Boot应用程序接入ELK日志搜索引擎时,首先要在项目中集成相关依赖:
(一)Logstash依赖
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
logstash - logback - encoder
依赖用于将日志格式化为适合Logstash处理的格式,方便后续日志的收集与解析。
(二)Kafka相关依赖
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.12.RELEASE</version>
</dependency>
logback - kafka - appender
用于在Logback中配置Kafka相关的日志追加器,实现将日志发送到Kafka消息队列。spring - kafka
则是Spring Boot与Kafka集成的核心依赖,提供了对Kafka操作的支持 。
二、Logback配置Kafka
在logback.xml
文件中进行Kafka相关配置:
<conversionRule conversionWord="nanotime" converterClass="com.test.sunchd.log.NanoTimeConverter"/>
<define name="localIp" class="com.test.sunchd.log.LogIpProperty"/>
<appender name="KafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<pattern>
<pattern>
{
"timestamp": "%d{yyyy-MM-dd HH:mm:ss.SSS}",
"name": "%logger",
"level": "%level",
"ip": "${localIp}",
"service": "${applicationName}",
"pid": "${PID:-}",
"class": "%logger",
"method": "%method",
"thread": "%thread",
"traceId": "%X{traceId:-}",
"spanId": "%X{spanId:-}",
"parent": "%X{X - B3 - ParentSpanId:-}",
"message": "[%level] [${localIp}] [${applicationName}] [%thread] [%logger{36}] [%X{traceId:-}] [%X{spanId:-}] [%X{X - B3 - ParentSpanId:-}] %message"
}
</pattern>
</pattern>
</providers>
</encoder>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.HostnameKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=10.190.107.136:9092</producerConfig>
<topic>logs</topic>
<producerConfig>acks=0</producerConfig>
<producerConfig>linger.ms=1000</producerConfig>
<producerConfig>max.block.ms=0</producerConfig>
<producerConfig>client.id=0</producerConfig>
</appender>
<appender name="FILE-INFO-ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<queueSize>512</queueSize>
<appender-ref ref="FILE-INFO"/>
</appender>
<appender name="FILE-ERROR-ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<discardingThreshold>0</discardingThreshold>
<queueSize>512</queueSize>
<appender-ref ref="FILE-ERROR"/>
</appender>
<logger name="com.test.cop" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT"/>
<appender-ref ref="FILE-INFO-ASYNC"/>
<appender-ref ref="FILE-ERROR-ASYNC"/>
<appender-ref ref="KafkaAppender"/>
</logger>
这里配置了KafkaAppender
,使用LoggingEventCompositeJsonEncoder
对日志进行JSON格式编码,设置了日志包含的各种字段。keyingStrategy
指定按主机名进行键控,deliveryStrategy
采用异步发送策略。producerConfig
指定了Kafka的服务器地址,topic
设置为logs
。
三、关键类介绍
(一)NanoClockUtils类
package com.test.sunchd.log.utils;
import java.io.Serializable;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import static java.util.Objects.requireNonNull;
public final class NanoClockUtils extends Clock implements Serializable {
private static final long serialVersionUID = 1L;
private static final long EPOCH_NANOS = System.currentTimeMillis() * 1000000L;
private static final long NANO_START = System.nanoTime();
private static final long OFFSET_NANOS = EPOCH_NANOS - NANO_START;
private static final NanoClockUtils UTC_INSTANCE = new NanoClockUtils(ZoneOffset.UTC);
private static final NanoClockUtils DEFAULT_INSTANCE = new NanoClockUtils(ZoneId.systemDefault());
public static final long NANOS_PER_SECOND = 1_000_000_000L;
private final ZoneId _zone;
private NanoClockUtils(final ZoneId zone) {
_zone = requireNonNull(zone, "zone");
}
@Override
public ZoneId getZone() {
return _zone;
}
@Override
public NanoClockUtils withZone(final ZoneId zone) {
return zone.equals(_zone)? this : new NanoClockUtils(zone);
}
@Override
public long millis() {
return System.currentTimeMillis();
}
public long nanos() {
return System.nanoTime() + OFFSET_NANOS;
}
public String nanosStr() {
return String.valueOf(nanos());
}
@Override
public Instant instant() {
final long now = nanos();
return Instant.ofEpochSecond(now / NANOS_PER_SECOND, now % NANOS_PER_SECOND);
}
@Override
public int hashCode() {
return _zone.hashCode() + 11;
}
@Override
public boolean equals(final Object obj) {
return obj == this || (obj instanceof NanoClockUtils && ((NanoClockUtils) obj)._zone.equals(_zone));
}
@Override
public String toString() {
return "NanoClock[" + _zone + "]";
}
public static NanoClockUtils system(final ZoneId zone) {
return new NanoClockUtils(zone);
}
public static NanoClockUtils systemUTC() {
return UTC_INSTANCE;
}
public static NanoClockUtils systemDefaultZone() {
return DEFAULT_INSTANCE;
}
}
该类继承自Clock
并实现Serializable
接口,用于处理纳秒级时间相关操作。定义了一些静态常量用于时间计算的偏移等,提供了获取纳秒时间、转换为Instant
等方法,还可以根据不同时区创建实例 。
(二)NanoTimeConverter类
package com.test.sunchd.log;
import ch.qos.logback.classic.pattern.ClassicConverter;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.test.sunchd.log.utils.NanoClockUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.util.Date;
public class NanoTimeConverter extends ClassicConverter {
@Override
public String convert(ILoggingEvent event) {
return NanoClockUtils.systemDefaultZone().nanosStr();
}
public static void main(String[] args) {
String s = NanoClockUtils.systemDefaultZone().nanosStr().substring(0, 13);
final long l = Long.valueOf(s);
System.out.println(DateFormatUtils.format(new Date(l), "yyyyMMdd HHmmss"));
}
}
该类继承自ClassicConverter
,在日志格式化时用于获取纳秒时间字符串。convert
方法获取系统默认时区下的纳秒时间字符串,main
方法用于测试时间字符串的截取和转换 。
(三)LogIpProperty类
package com.test.sunchd.log;
import ch.qos.logback.core.PropertyDefinerBase;
import com.test.sunchd.log.utils.NetWorkUtil;
public class LogIpProperty extends PropertyDefinerBase {
private static final String ip = NetWorkUtil.getHostAddress();
@Override
public String getPropertyValue() {
return ip;
}
}
该类继承自PropertyDefinerBase
,用于获取本机IP地址并在日志配置中作为属性值使用。通过调用NetWorkUtil.getHostAddress()
获取IP地址,并在getPropertyValue
方法中返回 。
(四)NetWorkUtil类
package com.test.sunchd.log.utils;
import org.apache.commons.lang3.StringUtils;
import java.net.InetAddress;
import java.net.Inet4Address;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class NetWorkUtil {
public static final String NETWORK_DOCKER = "docker";
public static final String NETWORK_BRIDGE = "br";
public static final String NETWORK_LINUX = "ens";
public static String getHostAddress() {
return getLocalHostExactAddress().getHostAddress();
}
public static InetAddress getLocalHostExactAddress() {
try {
InetAddress candidateAddress = null;
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
if (networkInterface.isVirtual() || networkInterface.isLoopback() || networkInterface.isPointToPoint()) {
continue;
}
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (inetAddress instanceof Inet4Address) {
if (candidateAddress == null) {
candidateAddress = inetAddress;
} else {
// 对每个网卡下的多个IP,再要一个个遍历,找到自己需要的
for (InetAddress ipAddress : networkInterface.getInetAddresses()) {
if (ipAddress.isLoopbackAddress()) {
continue;
}
if (ipAddress.isAnyLocalAddress()) {
continue;
}
// 如果是IPv4地址,就是它了,就是我们要找的
if (ipAddress instanceof Inet4Address) {
candidateAddress = ipAddress;
break;
}
}
}
}
}
}
// 如果与去loopback环境之外无其它地址了,就挑回环地址吧
if (candidateAddress == null) {
candidateAddress = InetAddress.getLocalHost();
}
return candidateAddress;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static Set<InetAddress> getLocalIpAddress() {
Set<InetAddress> result = new HashSet<>();
try {
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (inetAddress instanceof Inet4Address) {
result.add(inetAddress);
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
public static List<InetAddress> getLocalIpAddressFromNetworkInterfaces() throws SocketException {
List<InetAddress> addresses = new ArrayList<>();
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
if (networkInterfaces == null) {
return addresses;
}
while (networkInterfaces.hasMoreElements()) {
NetworkInterface networkInterface = networkInterfaces.nextElement();
Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
InetAddress inetAddress = inetAddresses.nextElement();
if (inetAddress instanceof Inet4Address) {
addresses.add(inetAddress);
}
}
}
return addresses;
}
public static boolean isBridgeNetwork(NetworkInterface ni) {
if (ni == null) {
return false;
}
return StringUtils.startsWithIgnoreCase(ni.getName(), NETWORK_BRIDGE);
}
public static boolean isDockerNetwork(NetworkInterface ni) {
if (ni == null) {
return false;
}
return StringUtils.startsWithIgnoreCase(ni.getName(), NETWORK_DOCKER);
}
}
该类提供了一系列网络相关工具方法。定义了一些网络类型的标识常量,如docker
、br
、ens
。getHostAddress
和getLocalHostExactAddress
方法用于获取本机精确的IP地址,会遍历网络接口排除虚拟、环回等地址。getLocalIpAddress
和getLocalIpAddressFromNetworkInterfaces
用于获取本机所有IPv4地址,isBridgeNetwork
和isDockerNetwork
用于判断网络接口是否属于特定网络类型 。
四、后续流程
在完成上述配置后,Kafka的主题设置为logs
,logstash
作为消费者从kafka
的logs
主题获取日志数据,并将其推送至elasticsearch(es)
。最终,我们可以在kibana
中根据配置的索引模式查询和分析这些日志,实现对Spring Boot应用程序日志的高效管理与分析。