Graylog 日志审计使用教程-kafka收取消息

发布于:2024-08-20 ⋅ 阅读:(57) ⋅ 点赞:(0)

Graylog 是一个开源的日志管理和分析平台。它主要用于收集、存储、处理和分析大量的日志数据。Graylog 的核心功能包括:

  1. 日志收集:Graylog 可以从各种来源(如服务器、网络设备、应用程序等)收集日志数据。它支持多种输入方式,包括 Syslog、GELF、Beats、Kafka 等。
  2. 日志存储:Graylog 使用 Elasticsearch 作为其后端存储系统,因此它可以高效地存储和检索大量的日志数据。
  3. 日志处理和分析:Graylog 提供了强大的搜索和分析功能,用户可以通过查询语言对日志进行过滤、排序、聚合等操作,帮助发现问题或监控系统状态。
  4. 警报和通知:Graylog 可以根据预设条件触发警报,并通过电子邮件、Slack 等渠道发送通知。
  5. 可视化和仪表板:用户可以在 Graylog 中创建可视化的仪表板,用于实时监控和分析日志数据。

Graylog 常用于 IT 运维、安全监控、故障排查等场景,通过对日志数据的集中管理和分析,帮助企业提高系统的可观测性和问题解决能力。

一、日志收集

1.1、配置日志服务器

选择input

这里给了很多日志接收方法

试验使用kafka获取日志,

1.2、配置映射表

他作为动态数据获取,比如IP 地理位置等,根据日志某个字段的信息从XX库找到对应的数据

选择数据源

创建链接源数据实例

有很多不同格式的数据源

选择csv文件

如果我们的内容为

username ,department,email

jdoe,IT,jdoe@example.com

asmith,HR,asmith@example.com

bwhite,Finance,bwhite@example.com

johndoe,jiushi,a.com

将内容编写为

不区分大小写,更新

创建索引表实例

选择刚刚创建的数据源实例并创建

1.3、创建字段提取器

创建,内容填写自己构建的kafka服务器地址和topic

保存成功,他会自己运行的,如果没有问题就会变成RUNNING的状态

先生产一个消息,因为要配置字段提取器

生产内容

{  "version": "1.1",  "host": "webserver.local",  "short_message": "User login failed",  "full_message": "User johndoe failed to login to the web application",  "level": 3,  "_username": "johndoe",  "_src_ip": "192.168.1.2"}

查看消息

新产生的这个就是

构造字段提取

新增字段提取器

加载最近的消息

这里内容是kafka生产的消息,只作为日志,可以对每个字段配置提取。

将username 用于 Look UP配置

忘记设置映射表了。先在1,2小节创建映射表实例

选择映射表

1.4、创建告警

1.4.1、创建上报

使用简单的API上报

填充URL,并尝试发送测试数据

提示成功

查看api服务器

1.4.2、定义事件

设置上报方式,刚才创建的API方式

创建告警事件完成

1.5、测试

1.5.1、产生关于登录失败的日志

{  "version": "1.1",  "host": "webserver.local",  "short_message": "User login failed",  "full_message": "User johndoe failed to login to the web application",  "level": 3,  "_username": "johndoe",  "_src_ip": "192.168.1.2"}

其中username的值为johndoe,记得之前映射表给定文件内容

我们创建映射表时,通过username查找department内容,并添加到告警自定义字段中。

产生的告警内容,我们自定义字段“renyuan”他的value通过映射表username关联到的。日志内容jdoe对应表格IT内容

附录

Input kafka不同格式示例

1. CEF Kafka

示例消息:

CEF:0|SecurityVendor|SecurityProduct|1.0|100|User Login|5|dvc=192.168.1.1 duser=johndoe src=10.0.0.1 spt=443

解释:

  • CEF:0: CEF 版本号。
  • SecurityVendor: 设备供应商名称。
  • SecurityProduct: 设备产品名称。
  • 1.0: 设备版本。
  • 100: 事件 ID。
  • User Login: 事件名称或描述。
  • 5: 严重级别(1-10)。
  • dvc=192.168.1.1: 设备 IP 地址。
  • duser=johndoe: 目标用户名。
  • src=10.0.0.1: 源 IP 地址。
  • spt=443: 源端口。

2. GELF Kafka

示例消息:

{
  "version": "1.1",
  "host": "webserver.local",
  "short_message": "User login failed",
  "full_message": "User johndoe failed to login to the web application",
  "level": 3,
  "_username": "johndoe",
  "_src_ip": "192.168.1.2"
}

解释:

  • version: GELF 版本号。
  • host: 主机名或 IP 地址。
  • short_message: 简短的事件描述。
  • full_message: 事件的详细描述。
  • level: 日志级别(例如:3 表示错误)。
  • _username 和 _src_ip: 自定义字段。

3. RAW Kafka

示例消息:

User johndoe failed to login from IP 192.168.1.2 via web application.

解释:

  • RAW 格式是原始日志消息的简单文本,没有结构化的字段或元数据。消息内容完全取决于发送方。

4. Syslog Kafka

示例消息:

<134>webserver.local User login failed: johndoe from 192.168.1.2

解释:

  • <134>: Syslog 优先级,计算方式为 (Facility * 8) + Severity。
  • webserver.local: 发送日志的主机名。
  • User login failed: johndoe from 192.168.1.2: 日志消息内容,描述发生的事件。

这些示例展示了不同格式在 Kafka 中传输的方式,供 Graylog 等日志管理系统解析和处理。

问题

1、接收到消息,但是没有输出

存在下行数据变动

查看系统日志tail -f /var/log/graylog-server/server.log &

root@a:/home/install/kafka_2.12-2.8.2# 2024-08-19T03:49:22.949Z ERROR [DecodingProcessor] Unable to decode raw message RawMessage{id=0593cd30-5dde-11ef-b23f-ac1f6b7e6aea, journalOffset=70, codec=CEF, payloadSize=9, timestamp=2024-08-19T03:49:22.947Z} on input <66c2a27b4c33336f079032fc>.

2024-08-19T03:49:22.949Z ERROR [DecodingProcessor] Error processing message RawMessage{id=0593cd30-5dde-11ef-b23f-ac1f6b7e6aea, journalOffset=70, codec=CEF, payloadSize=9, timestamp=2024-08-19T03:49:22.947Z}

java.lang.NullPointerException: null

        at org.graylog.plugins.cef.parser.MappedMessage.<init>(MappedMessage.java:37) ~[graylog.jar:?]

        at org.graylog.plugins.cef.codec.CEFCodec.decodeCEF(CEFCodec.java:128) ~[graylog.jar:?]

        at org.graylog.plugins.cef.codec.CEFCodec.decode(CEFCodec.java:117) ~[graylog.jar:?]

        at org.graylog2.shared.buffers.processors.DecodingProcessor.processMessage(DecodingProcessor.java:149) ~[graylog.jar:?]

        at org.graylog2.shared.buffers.processors.DecodingProcessor.onEvent(DecodingProcessor.java:90) [graylog.jar:?]

        at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:90) [graylog.jar:?]

        at org.graylog2.shared.buffers.processors.ProcessBufferProcessor.onEvent(ProcessBufferProcessor.java:47) [graylog.jar:?]

        at com.lmax.disruptor.WorkProcessor.run(WorkProcessor.java:143) [graylog.jar:?]

        at com.codahale.metrics.InstrumentedThreadFactory$InstrumentedRunnable.run(InstrumentedThreadFactory.java:66) [graylog.jar:?]

        at java.lang.Thread.run(Thread.java:830) [?:?]

创建了CEF格式解码器,原因为kafka生产的消息不符合格式

示例CEF格式内容

CEF:0|Security|Graylog|1.0|100|Test event|5|src=10.0.0.1 dst=10.0.0.2 spt=1232

可以通过重新创建一个syslog