前言
logstash作为elastic公司elkb体系中的一环,在日志处理中被广泛应用。它可以实现数据传输,数据过滤,格式化输出,还有强大的官方以及自定义插件功能,因而在很久之前就已经闻其大名了。也进行过简单的功能测试,对其有了基本的认知。
但是真正的生产环境的产品化应用却不是很多,这次终于找到了一个合适的场景来真正使用下这个产品。本来一个简单的需求应该不会掀起什么波澜,但是结果却令我很意外,logstash给我留下了深刻的印象,并为我解锁了很多的玩法。下面我们就来看看全过程,顺便梳理下logstash在使用过程中的一些常规用法以及需要注意的地方。
方案
需求
其实这次的需求很简单,就是把流数据中的部分种类的数据按照一定的规则重新组织,并推送到远端的硬盘按天存储。
这个需求看起来其实很简单,最直接的方案就是在数据处理的过程中进行分支业务处理,当符合条件时,将数据进行解析并重新组织,通过文件流的方式输出到远端的硬盘即可。
虽然逻辑很简单,但是考虑到效率,容错以及对主程序性能的影响,直接使用上面的处理方式其实并不是最佳的选择。于是,使用logstash来搞定这件事情的方案就被优先选择了。
方案
在数据处理主程序中,进行分支处理,将数据拼装好以后直接吐到kafka的topic中,主程序的任务就结束了,剩下的事情就交给logstash来处理
logstash读kafka,然后简单的处理数据后,直接输出到file中,问题解决
使用logstash不仅将主程序数据处理与数据落盘解耦合,不会影响到主程序的效率,更重要的是logstash能将性能,容错等这些不太好处理的脏活累活都大包大揽了
实践
根据上述的方案,logstash的配置就很显然了。主要的选型如下:
input:kafka,即从kafka的topic中读取数据
filter:mutate,由于kafka input输出的数据格式有很多多余的字段,所以使用mutate filter来删掉这些字段,用来减少网络IO以及磁盘存储的消耗
output:file,即输出到硬盘存储成文件
准备
首先就是安装logstash了,安装步骤很简单,把tar包下载到服务器本地,tar命令解压即可。
这里需要注意下,安装完毕后,需要使用命令查看当前版本的logstash自带插件的情况:
.\bin\logstash-plugin list
果不其然,并没有发现我需要的kafka相关的plugin,所以第一步是安装kafka相关的插件。
进入logstash的安装目录,执行下面的命令:
命令如下:
//在线
.\bin\logstash-plugin install logstash-output-logservice
//离线
.\bin\logstash-plugin install /root/logstash-offline-plugins.zip
此处需要注意两点:
必须在logstash的安装目录下执行命令,而不能在bin目录下执行,因为该插件托管于RubyGems,需要相关的依赖,而这些依赖都在logstash的安装目录下。
离线安装必须下载logstash-offline-plugins.zip,安装插件时需要依赖此压缩包
安装完毕后,再次使用上面插件查看的命令,发现kafka的插件安装完毕。
至此kafka的plugin安装完毕,准备任务完成。
first config
环境准备完毕后,接下来就开始编写conf文件了,毕竟所有的业务和处理都依赖于这个配置文件。根据上面的业务需求,第一个配置文件内容如下:
input{
kafka{
bootstrap_servers => ["172.16.45.157:9092"]
client_id => "test"
group_id => "test"
auto_offset_reset => "latest"
consumer_threads => 5
topics => ["bb_risk_control"]
}
}
output {
file {
ceate_if_deleted=>true
file_mode=>777
filename_failure=>"log_error"
flush_interval=>0
path=>"/data1/data/%{+YYYYMMdd}.log"
gzip=>true
}
}
第一个配置文件本着先调通流程,看看数据长啥样的目的,所以仅仅加了些常规配置,也没有加filter,然后得到的结果如下:
{
"event":{
"original":"{\"country\":\"中国\",\"is_xxxxxx_xxxx\":\"1\",\"city\":\"北京\",\"user_id\":\"admin\",\"ip\":\"172.16.43.157\",\"is_xxxx\":\"0\",\"deviceID\":\"1234567890\",\"xxxxxx_num\":\"1660111513693\",\"dev_xx\":\"7423208c-25fd-3482-98b9-951dfe5af148\",\"is_xxxxxxxx\":\"0\"}"
},
"@timestamp":"2022-08-10T06:00:23.695733Z",
"message":"{\"country\":\"中国\",\"is_xxxxxx_xxxx\":\"1\",\"city\":\"北京\",\"user_id\":\"admin\",\"ip\":\"172.16.43.157\",\"is_xxxx\":\"0\",\"deviceID\":\"1234567890\",\"xxxxxx_num\":\"1660111513693\",\"dev_xx\":\"7423208c-25fd-3482-98b9-951dfe5af148\",\"is_xxxxxxxx\":\"0\"}",
"@version":"1"
}
虽然内容看起来是没问题的,但是默认的kafka input输出的默认结果字段冗余,结构混乱,这显然不是我们期望的结果,下一步就是对这些字段进行格式化以及过滤了。
input改成json格式
首先,通过学习发现想输出json格式的数据,最简单的就是增加input的一个属性即codec=”json“,这样数据就会按照json输出,配置文件如下:
input{
kafka{
bootstrap_servers => ["172.16.45.157:9092"]
client_id => "test"
group_id => "test"
auto_offset_reset => "latest"
consumer_threads => 5
topics => ["bb_risk_control"]
codec => "json"
}
}
output {
file {
ceate_if_deleted=>true
file_mode=>777
filename_failure=>"log_error"
flush_interval=>0
path=>"/data1/data/%{+YYYYMMdd}.log"
gzip=>true
}
}
看看输出结果,果然好多了。
{"xxxxxx_num":"1660114709040",
"deviceID":"1234567890",
"@timestamp":"2022-08-10T06:53:38.960117Z",
"is_xxxxxx_xxxx":"1",
"user_id":"admin",
"is_xxxxxxxx":"0",
"event":{"original":"{\"country\":\"中国\",\"is_xxxxxx_xxxx\":\"1\",\"city\":\"北京\",\"user_id\":\"admin\",\"ip\":\"172.16.43.157\",\"is_xxxx\":\"0\",\"deviceID\":\"1234567890\",\"xxxxxx_num\":\"1660114709040\",\"dev_xx\":\"7423208c-25fd-3482-98b9-951dfe5af148\",\"is_xxxxxxxx\":\"0\"}"},
"is_xxxx":"0",
"city":"北京",
"dev_xx":"7423208c-25fd-3482-98b9-951dfe5af148",
"ip":"172.16.43.157",
"country":"中国",
"@version":"1"}
结果是在一级json中,但是还多了event,@timestamp以及@version字段,下一步的目标就是去掉这三个字段。
进入误区
过滤字段,就轮到filter模块上场了,由于需要使用的字段remove_field是filter的公共字段,所以理论上所有的filter都可以实现此功能。
经过筛选,分别尝试了json filter以及mutate filter,但是奇怪的是不管是哪个filter都无文件输出,且日志无报错。 脑袋上出现了无数个问号,到底哪里出问题了?
后来经过了无数的尝试,终于发现了罪魁祸首,原来是output中的生成输出文件名中的配置导致的:
path=>"/data1/data/%{+YYYYMMdd}.log"
这个YYYYMMdd是通过@timestamp字段算出来的,如果把@timestamp字段干掉,这里就无法生成数据,输出的文件名就会变成.log文件,非法的文件名,自然就不会输出数据了。
所以当前配置下,直接干掉这几个字段是不可能了,既然如此,那就先用着吧,但是另外一个大问题又慢慢的浮出水面了。
时区问题
在走出误区后,又发现了logstash的另一个大问题,即时区问题。logstash默认时区为UTC,与我们东八区相差八小时,所以@timestamp字段的时间其实比我们的正常时间少八个小时,这对output中elasticsearch按天创建索引或者file按天创建文件带来了很大的影响。
那么当前版本下如何解决时区问题呢?常规的做法就是手动处理@timestamp字段,在这个值的基础上加8小时,手动改成北京时间(这个做法仅适用于file以日期或者时间格式生成文件名的场景,elasticsearch按天创建索引可能不是很适用,因为kibana是自动帮我们解决时区的问题,结果就是又多了八小时)。
具体的解决方式参见下面的配置:
filter {
date {
match => ["message","UNIX_MS"]
target => "@timestamp"
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
}
此处仅粘贴了filter相关的代码,逻辑也很简单:
使用date filter将message中的时间字段转化为绝对秒数,并赋值给@timestamp
创建一个新字段timestamp,并将@timestamp的时间加8小时后赋值给timestamp
再将timestamp的值赋值给@timestamp
删除timestamp字段
是不是很眼熟?没错,和代码中两个值互换的操作异曲同工:
c=a;
a=b;
b=c;
操作完后,a和b的值就互换了。
最终版本
时区的问题解决了,理论上就可以解决按日期生成文件名后数据不准确的问题了。但是这个方案并没有从根本上解决我的问题,主要还有以下两个问题:
@timestamp是操作时间,而非业务时间,如果kafka数据积压或者来了部分历史数据,那么@timestamp生成的文件名称就会有问题
字段冗余的问题也未能解决
考虑到上面两个问题,我经过尝试放弃了原本的根据YYYYMMdd生成文件名的方式,而是在input的kafka消息中增加了一个字段date_str来标识业务日期,格式是YYYYMMdd格式,文件名的生成根据该字段进行拼接。然后在filter中remove掉event,@timestamp以及@version三个字段。
最终配置如下:
input{
kafka{
bootstrap_servers => ["172.16.45.157:9092"]
client_id => "test"
group_id => "test"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => false
topics => ["bb_risk_control"]
codec => "json"
}
}
filter {
mutate {
remove_field => [ "event", "@version", "@timestamp" ]
}
}
output {
file {
create_if_deleted=>true
file_mode=>777
filename_failure=>"log_error"
flush_interval=>0
path=>"/data1/data/%{date_str}.gz"
gzip=>true
}
}
至此问题解决,输出的字段完全正常,达到预期。
{"dev_xx":"7423208c-25fd-3482-98b9-951dfe5af148",
"city":"北京",
"user_id":"admin",
"deviceID":"1234567890",
"is_xxxxxx_xxxx":"1",
"is_xxxx":"0",
"is_xxxxxxxx":"0",
"ip":"172.16.43.157",
"xxxxxx_num":"1660136246082",
"country":"中国",
"date_str":"20220810"}
节外生枝
上面的问题解决后,本来该志得意满的去解决下一个问题,但是中间出了个小的插曲,也让我对上述的配置做了一个优化。
由于之前的file output的数据启用了gzip压缩,且文件名是yyyyMMdd.log。一切都很正常除了在服务器上解压缩文件,浪费了点脑筋,使用gunzip,unzip以及tar命令都无法解压缩。而使用file命令来查看文件的属性,发现确实是压缩的gzip类型的数据,一时间进入了死胡同。
后来经过尝试后发现将20220811.log重命名成20220811.log.gz,然后再使用gunzip 20220811.log.gz对文件进行解压缩,虽然得到的文件仍然叫20220811.log,但是已经可以使用vim查看了,解压缩成功。
由于需要使用脚本对file output的文件进行解压缩,所以重命名这个命令其实是可以省略的,于是修改logstash的file output的path属性,直接改成:
path=>"/data1/data/%{date_str}.gz"
直接输出20220811.gz文件,直接进行解压缩,但是此处解压缩必须使用-f参数即gunzip -f 20220811.gz,否则会报错:
总结
上文就是使用logstash实现整个需求的全过程,可以看到,需求虽小,但是涉及到的问题和内容还是很丰富的,logstash常规的问题基本上都遇到了。
但这也是件好事,毕竟这样后续再次使用logstash的时候试错成本就会低很多。只要能正常的用起来,logstash的效率以及稳定性还是比我们手写的软件强很多,更重要的是节省了我们很多的开发以及测试工作量。
虽然如此,但是logstash也有自己的缺点。其中最让人诟病的就是logstash耗资源较大,运行占用CPU和内存高。但是这个也是和整个硬件投入以及数据量有关的,需要具体问题具体分析。
另外,如果是轻量级的数据采集,业务简单的话,可以直接使用filebeat,filebeat更轻量,占用资源更少。也可以将filebeat作为 Logstash Forwarder,底层通过filebeat更快速稳定轻量低耗地进行收集工作,然后根据需求可以很方便地与 Logstash还有直接与Elasticsearch进行对接。而如此高的灵活性和选择扩展性代表着不同细分场景下不同的选择,完全由业务决定,无关乎优劣,这也正是开源软件的魅力所在!
文章到这里就结束了,最后路漫漫其修远兮,大数据之路还很漫长。如果想一起大数据的小伙伴,欢迎点赞转发加关注,下次学习不迷路,我们在大数据的路上共同前进!