在小铃铛的毕业设计中涉及到了ELK日志管理框架,在调研期间发现在中文中没有很好的对ELK框架进行介绍的文章,因此拟在本文中进行较为详细的实现的介绍。
理论知识
ELK 框架介绍
ELK 是一个流行的开源日志管理解决方案堆栈,由三个核心组件组成:
组件名称 | 组件特性 |
---|---|
Elasticsearch | - 分布式搜索和分析引擎 - 提供实时搜索和数据分析能力 - 基于Lucene构建,具有高扩展性 |
Logstash | - 服务器端数据处理管道 - 用于收集、解析和转换日志数据 - 支持多种输入源和输出目标 |
Kibana | - 数据可视化平台 - 提供丰富的图表和仪表板功能 - 允许用户交互式地探索数据 |
随着发展,ELK生态系统也出现了一些变体:
EFK:用Fluentd替代Logstash
ELK+Beats:加入轻量级数据采集器Beats系列工具
Elastic Stack:官方对ELK堆栈的新命名
主要特点
- 实时分析:能够近乎实时地处理和分析数据
- 可扩展性:可以水平扩展以处理大量数据
- 灵活性:支持多种数据源和格式
- 强大的搜索能力:提供全文搜索和结构化搜索
常见应用场景
- 日志集中管理和分析
- 应用程序性能监控
- 安全信息和事件管理(SIEM)
- 业务智能分析
实际应用
小铃铛在系统中使用了ELK Stack框架,数据流如下:
接下来以文章的日志数据的收集为例进行介绍。
Step1.日志模板设计
首先需要设计需要在日志中都收集什么数据,可以自定义模板,写在Logstash的模板配置里,即/templates文件夹下面,例如以下文件:
post-metrics-template.json
{
"index_patterns": ["blog-post-metrics-*"],
"version": 3,
"priority": 400,
"template": {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "blog-policy",
"index.lifecycle.rollover_alias": "blog-post-metrics"
},
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"log_type": {
"type": "keyword"
},
"service_name": {
"type": "keyword"
},
"level": {
"type": "keyword"
},
"operation": {
"type": "keyword"
},
"post_id": {
"type": "long"
},
"post_title": {
"type": "keyword"
},
"user_id": {
"type": "long"
},
"success": {
"type": "boolean"
},
"duration": {
"type": "long"
},
"error_message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"hostname": {
"type": "keyword"
},
"os_name": {
"type": "keyword"
},
"os_version": {
"type": "keyword"
},
"environment": {
"type": "keyword"
},
"service": {
"type": "keyword"
},
"view_count": {
"type": "long"
}
}
}
}
}
Step2.设计日志收集时机
根据我个人对日志的理解,后端会在两种情况下打印日志:针对某个位置的类的方法被调用时打印(静态的,使用文件位置定位)和针对类的某方法被调用时打印(动态的,使用注解自主决定)。或许以上的概括不够准确,我将进一步展示。
针对某个位置的类的方法被调用时
当前AOP设计打印日志的时机是:调用了“com.kitty.blog.application”或“com.kitty.blog.interfaces”文件夹下的函数,并且没有执行“com.kitty.blog.infrastructure.security.filter”下的函数。
(但是以下示例并不是针对收集文章日志写的,看懂原理即可)
@Aspect
@Component
@Slf4j
public class BackendLoggingAspect {
@Before("execution(* com.kitty.blog.application..*.*(..))" +
" || execution(* com.kitty.blog.interfaces..*.*(..))" +
" && !execution(* com.kitty.blog.infrastructure.security.filter..*.*(..))")
public void before(JoinPoint joinPoint) {
Map<String, Object> logData = new HashMap<>();
logData.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());
logData.put("log_type", LogConstants.LogType.API_METRICS);
logData.put("application", LogConstants.APPLICATION_NAME);
logData.put("phase", "method_start");
logData.put("class", joinPoint.getTarget().getClass().getName());
logData.put("method", joinPoint.getSignature().getName());
logData.put("args", Arrays.toString(joinPoint.getArgs()));
// 添加标签
try {
logData.put("host", java.net.InetAddress.getLocalHost().getHostName());
logData.put("service", LogConstants.APPLICATION_NAME);
logData.put("environment", System.getProperty("spring.profiles.active", "dev"));
} catch (Exception e) {
log.warn("Failed to add tags to log data", e);
}
log.info("Method Execution: {}", net.logstash.logback.argument.StructuredArguments.entries(logData));
}
@After("execution(* com.kitty.blog.application..*.*(..))" +
" || execution(* com.kitty.blog.interfaces..*.*(..))" +
" && !execution(* com.kitty.blog.infrastructure.security.filter..*.*(..))")
public void after(JoinPoint joinPoint) {
Map<String, Object> logData = new HashMap<>();
logData.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());
logData.put("log_type", LogConstants.LogType.API_METRICS);
logData.put("application", LogConstants.APPLICATION_NAME);
logData.put("phase", "method_end");
logData.put("class", joinPoint.getTarget().getClass().getName());
logData.put("method", joinPoint.getSignature().getName());
// 添加标签
try {
logData.put("host", java.net.InetAddress.getLocalHost().getHostName());
logData.put("service", LogConstants.APPLICATION_NAME);
logData.put("environment", System.getProperty("spring.profiles.active", "dev"));
} catch (Exception e) {
log.warn("Failed to add tags to log data", e);
}
log.info("Method Execution: {}", net.logstash.logback.argument.StructuredArguments.entries(logData));
}
@AfterThrowing(pointcut = "execution(* com.kitty.blog.application..*.*(..))" +
" || execution(* com.kitty.blog.interfaces..*.*(..))" +
" && !execution(* com.kitty.blog.infrastructure.security.filter..*.*(..))", throwing = "ex")
public void afterThrowing(JoinPoint joinPoint, Throwable ex) {
Map<String, Object> logData = new HashMap<>();
logData.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());
logData.put("log_type", LogConstants.LogType.ERROR);
logData.put("application", LogConstants.APPLICATION_NAME);
logData.put("phase", "method_error");
logData.put("class", joinPoint.getTarget().getClass().getName());
logData.put("method", joinPoint.getSignature().getName());
logData.put("error_message", ex.getMessage());
logData.put("stack_trace", Arrays.toString(ex.getStackTrace()));
// 添加标签
try {
logData.put("host", java.net.InetAddress.getLocalHost().getHostName());
logData.put("service", LogConstants.APPLICATION_NAME);
logData.put("environment", System.getProperty("spring.profiles.active", "dev"));
} catch (Exception e) {
log.warn("Failed to add tags to log data", e);
}
log.error("Method Execution Error: {}", net.logstash.logback.argument.StructuredArguments.entries(logData));
}
}
针对类的某方法被调用时
当前AOP设计打印日志的时机是:调用了被@LogPostMetrics这个annotation注解过的函数。其中,annotation可以自定义也可以使用预定义的。
@Aspect
@Component
@Slf4j
public class PostMetricsAspect {
@Autowired
private PostRepository postRepository;
@Around("@annotation(com.kitty.blog.common.annotation.LogPostMetrics)")
public Object logPostMetrics(ProceedingJoinPoint joinPoint) throws Throwable {
long startTime = System.currentTimeMillis();
String operation = getOperationType(joinPoint);
Long postId = getPostId(joinPoint);
try {
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - startTime;
recordPostMetrics(operation, postId, true, null, duration);
return result;
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
recordPostMetrics(operation, postId, false, e.getMessage(), duration);
throw e;
}
}
private String getOperationType(ProceedingJoinPoint joinPoint) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
LogPostMetrics annotation = signature.getMethod().getAnnotation(LogPostMetrics.class);
return annotation.value();
}
private Long getPostId(ProceedingJoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg instanceof Long) {
return (Long) arg;
} else if (arg instanceof Integer) {
return ((Integer) arg).longValue();
} else if (arg instanceof String) {
try {
return Long.parseLong((String) arg);
} catch (NumberFormatException e) {
log.warn("Failed to parse post id from string: {}", arg);
}
}
}
return null;
}
private void recordPostMetrics(String operation, Long postId, boolean success, String errorMessage, long duration) {
try {
Map<String, Object> metrics = new HashMap<>();
// 基础字段
metrics.put("@timestamp", LocalDateTime.now().toInstant(ZoneOffset.UTC).toString());
metrics.put("log_type", LogConstants.LogType.POST_METRICS);
metrics.put("service", LogConstants.APPLICATION_NAME);
metrics.put("level", "INFO");
// 操作信息
metrics.put("operation", operation);
metrics.put("post_id", postId);
if (postId != null) {
try {
Post post = postRepository.findById(postId.intValue())
.orElse(null);
if (post != null) {
metrics.put("post_title", post.getTitle());
metrics.put("user_id", post.getUserId());
metrics.put("view_count", post.getViews());
} else {
metrics.put("post_title", "未找到文章");
metrics.put("user_id", "unknown");
log.error("文章不存在: {}", postId);
}
} catch (Exception e) {
log.error("获取文章详情失败: {}", e.getMessage(), e);
metrics.put("post_title", "获取文章信息失败");
metrics.put("user_id", "unknown");
}
} else {
metrics.put("post_title", "无文章ID");
metrics.put("user_id", "unknown");
log.error("文章ID为空");
}
metrics.put("success", success);
metrics.put("duration", duration);
if (errorMessage != null) {
metrics.put("error_message", errorMessage);
log.error("操作失败: {}", errorMessage);
}
// 添加标签
metrics.put("host", java.net.InetAddress.getLocalHost().getHostName());
metrics.put("environment", System.getProperty("spring.profiles.active", "dev"));
log.info("Post Metrics: {}",
net.logstash.logback.argument.StructuredArguments.entries(metrics));
} catch (Exception e) {
log.error("记录文章指标失败", e);
}
}
}
该自定义的注解在使用时和预定义的一样,只需要在目标方法上写@LogPostMetrics即可。
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LogPostMetrics {
String value() default "";
}
Step3.用Logback类规定部分数据
在logback-spring.xml主要规定了日志以天为单位会重新创建一个新的文件,便于管理。
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<timestamp key="CURRENT_DATE" datePattern="yyyy-MM-dd"/>
<property name="LOG_PATH" value="./logs"/>
<property name="LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"/>
<!-- 控制台输出 -->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${LOG_PATTERN}</pattern>
</encoder>
</appender>
<!-- 博客文章指标日志 -->
<appender name="POST_METRICS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/blog-post-metrics-${CURRENT_DATE}.json</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_PATH}/blog-post-metrics-%d{yyyy.MM.dd}.json</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"application":"blog-system","log_type":"post-metrics"}</customFields>
<timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSSZZ</timestampPattern>
<includeMdcData>true</includeMdcData>
<includeContext>true</includeContext>
<fieldNames>
<tags>[ignore]</tags>
<hostname>host</hostname>
</fieldNames>
<provider class="net.logstash.logback.composite.loggingevent.LogstashMarkersJsonProvider"/>
<provider class="net.logstash.logback.composite.loggingevent.MdcJsonProvider"/>
</encoder>
</appender>
<!-- 日志配置 -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
<logger name="com.kitty.blog.common.aspect.PostMetricsAspect" level="INFO" additivity="false">
<appender-ref ref="POST_METRICS_FILE"/>
<appender-ref ref="CONSOLE"/>
</logger>
</configuration>
Step4.ELK环境搭建
现在已经配置好了日志文件将存储的位置,我们需要保证Filebeat可以读取到,比如说下面是我的Docker环境里的Filebeat配置。
docker-compose.yml
filebeat:
image: elastic/filebeat:8.11.1
user: root
volumes:
- ./config/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
- ../../../logs:/var/log/blog:ro
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/run/docker.sock:/var/run/docker.sock:ro
depends_on:
logstash:
condition: service_healthy
environment:
- ENVIRONMENT=dev
- ELASTIC_USERNAME=elastic
- ELASTIC_PASSWORD=123456
networks:
- elk-network
command: >
bash -c "
chmod go-w /usr/share/filebeat/filebeat.yml &&
chown root:root /usr/share/filebeat/filebeat.yml &&
chmod 0644 /usr/share/filebeat/filebeat.yml &&
filebeat -e -strict.perms=false
"
filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/blog/blog-api-metrics-*.json
json.keys_under_root: true
json.add_error_key: true
json.ignore_decoding_error: true
fields:
app: blog-system
environment: ${ENVIRONMENT:-dev}
log_type: api-metrics
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/blog/blog-error-*.json
json.keys_under_root: true
json.add_error_key: true
json.ignore_decoding_error: true
fields:
app: blog-system
environment: ${ENVIRONMENT:-dev}
log_type: error
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/blog/blog-post-metrics-*.json
json.keys_under_root: true
json.add_error_key: true
json.ignore_decoding_error: true
fields:
app: blog-system
environment: ${ENVIRONMENT:-dev}
log_type: post-metrics
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/blog/blog-system-metrics-*.json
json.keys_under_root: true
json.add_error_key: true
json.ignore_decoding_error: true
fields:
app: blog-system
environment: ${ENVIRONMENT:-dev}
log_type: system-metrics
fields_under_root: true
- type: log
enabled: true
paths:
- /var/log/blog/blog-user-activity-*.json
json.keys_under_root: true
json.add_error_key: true
json.ignore_decoding_error: true
fields:
app: blog-system
environment: ${ENVIRONMENT:-dev}
log_type: user-activity
fields_under_root: true
processors:
- add_host_metadata: ~
- add_docker_metadata: ~
output.logstash:
hosts: ["logstash:5000"]
loadbalance: true
bulk_max_size: 2048
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat.log
rotateeverybytes: 10485760 # 10MB
keepfiles: 7
permissions: 0644
setup.ilm.enabled: false
setup.template.enabled: false
那么现在日志已经被收集到Filebeat里,Logstash的配置如下。
docker-compose.yml
logstash:
image: logstash:8.11.1
depends_on:
elasticsearch:
condition: service_healthy # 添加健康检查依赖
ports:
- "5000:5000"
volumes:
- ./config/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml:ro # 标记为只读
- ./config/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro
- ./config/logstash/templates:/usr/share/logstash/templates:ro
environment:
- "LS_JAVA_OPTS=-Xms256m -Xmx256m"
- ELASTICSEARCH_USERNAME=elastic
- ELASTICSEARCH_PASSWORD=123456
networks:
- elk-network
healthcheck:
test: ["CMD-SHELL", "curl -s -f http://localhost:9600 || exit 1"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
logstash.conf
input {
beats {
port => 5000
host => "0.0.0.0"
}
}
filter {
json {
source => "message"
skip_on_invalid_json => true
target => "parsed_json"
}
# 处理 host 字段
mutate {
add_field => {
"hostname" => "%{[host][name]}"
"os_name" => "%{[host][os][name]}"
"os_version" => "%{[host][os][version]}"
}
remove_field => ["host"]
}
# 将 host、environment 和 service 作为独立字段处理
mutate {
add_field => {
"environment" => "%{[parsed_json][environment]}"
"service" => "%{[parsed_json][service]}"
}
remove_field => ["tags"]
}
# 调试输出当前事件
ruby {
code => "
require 'logger'
logger = Logger.new(STDOUT)
logger.info('Current event:')
logger.info(event.to_hash.inspect)
"
}
# 根据log_type设置目标索引
if [log_type] == "user-activity" {
mutate {
add_field => { "[@metadata][index]" => "blog-user-activity-%{+YYYY.MM.dd}" }
add_field => { "[@metadata][template_name]" => "user-activity-template" }
}
} else if [log_type] == "post-metrics" {
mutate {
add_field => { "[@metadata][index]" => "blog-post-metrics-%{+YYYY.MM.dd}" }
add_field => { "[@metadata][template_name]" => "post-metrics-template" }
}
# Extract view_count from parsed JSON
if [parsed_json][view_count] {
mutate {
add_field => { "view_count" => "%{[parsed_json][view_count]}" }
}
mutate {
convert => { "view_count" => "integer" }
}
}
}
else if [log_type] == "api-metrics" {
mutate {
add_field => { "[@metadata][index]" => "blog-api-metrics-%{+YYYY.MM.dd}" }
add_field => { "[@metadata][template_name]" => "blog-template" }
}
} else if [log_type] == "error" {
mutate {
add_field => { "[@metadata][index]" => "blog-error-logs-%{+YYYY.MM.dd}" }
add_field => { "[@metadata][template_name]" => "blog-template" }
}
} else if [log_type] == "system-metrics" {
mutate {
add_field => { "[@metadata][index]" => "blog-system-metrics-%{+YYYY.MM.dd}" }
add_field => { "[@metadata][template_name]" => "blog-template" }
}
}
# 如果没有匹配到任何类型,添加到默认索引
if ![@metadata][index] {
mutate {
add_field => { "[@metadata][index]" => "blog-unknown-1" }
add_field => { "[@metadata][template_name]" => "blog" }
add_field => { "error_message" => "Unknown log_type: %{[log_type]}" }
}
}
# 确保时间戳格式正确
date {
match => [ "@timestamp", "ISO8601" ]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
# 移除不需要的字段
mutate {
remove_field => ["message", "tags", "beat", "input", "prospector", "agent"]
}
}
output {
if [@metadata][index] {
if [@metadata][template_name] == "post-metrics-template" {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][index]}"
template => "/usr/share/logstash/templates/post-metrics-template.json"
template_name => "post-metrics-template"
template_overwrite => true
data_stream => false
}
} else if [@metadata][template_name] == "user-activity-template" {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][index]}"
template => "/usr/share/logstash/templates/user-activity-template.json"
template_name => "user-activity-template"
template_overwrite => true
data_stream => false
}
} else if [@metadata][template_name] == "blog-template" {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "${ELASTICSEARCH_USERNAME}"
password => "${ELASTICSEARCH_PASSWORD}"
index => "%{[@metadata][index]}"
template => "/usr/share/logstash/templates/blog-template.json"
template_name => "blog-template"
template_overwrite => true
data_stream => false
}
}
}
}
logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.enabled: false
path.config: /usr/share/logstash/pipeline
path.logs: /var/log/logstash
pipeline.workers: 2
pipeline.batch.size: 125
pipeline.batch.delay: 50
queue.type: memory
queue.max_bytes: 1024mb
log.level: info
接下来存储到Elasticsearch里,如果需要额外功能需自己下载插件,例如中文分词器。
docker-compose.yml
请确定暴露出来一个账号和密码使得Logstash和Kibana可以访问。当然,账号可以不是同一个,但是应至少给Logstash的用户配有写入权限,Kibana的用户配有读出权限。
elasticsearch:
image: elasticsearch:8.11.1
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.security.enabled=true
- ELASTIC_PASSWORD=123456
- bootstrap.memory_lock=true
- cluster.name=docker-cluster
- network.host=0.0.0.0
- xpack.security.transport.ssl.enabled=false
- xpack.security.http.ssl.enabled=false
- "ELASTIC_USERNAME=elastic"
- KIBANA_SYSTEM_PASSWORD=kibana123
ports:
- "9201:9200"
- "9300:9300"
volumes:
- elasticsearch_data:/usr/share/elasticsearch/data
- ./config/elasticsearch/plugins/analysis-ik:/usr/share/elasticsearch/plugins/analysis-ik
- ./config/elasticsearch/setup:/usr/share/elasticsearch/setup
networks:
- elk-network
healthcheck:
test: ["CMD-SHELL", "curl -s -u elastic:123456 http://localhost:9200/_cluster/health || exit 1"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
ulimits: # 添加系统限制
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
接下来配置Kibana:
docker-compose.yml
kibana:
image: kibana:8.11.1
depends_on:
elasticsearch:
condition: service_healthy
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=kibana123
- SERVER_NAME=kibana
- SERVER_HOST=0.0.0.0
- XPACK_REPORTING_ENABLED=false
healthcheck:
test: ["CMD-SHELL", "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'"]
interval: 10s
timeout: 10s
retries: 120
networks:
- elk-network
Kibana的默认访问地址是http://localhost:5601,管理员用户和密码是:elastic和123456。此时,你就可以使用可视化的方式来进行分析数据并管理集群状态等操作了。