构建Doris日志系统

发布于:2024-06-04 ⋅ 阅读:(153) ⋅ 点赞:(0)

基于es的日志系统,存在数据压缩率低,写入吞吐量低,聚合性能差。
Doris2.0以上版本支持倒排索引,基于Doris的日志系统,主要是看重了doris压缩率是es的两倍多,聚合性能比es高。
以上可以自测,以上不是本文重点,本文重点是如何快速对接Doris,以下是实例

建表

CREATE TABLE log_table
(
  `ts` DATETIMEV2,
  `ip` VARCHAR(255),
  `msg` TEXT
  INDEX idx_clientip (`ip`) USING INVERTED,
  INDEX idx_request (`msg`) USING INVERTED PROPERTIES("parser" = "english")
)
ENGINE = OLAP
DUPLICATE KEY(`ts`)
PARTITION BY RANGE(`ts`) ()
DISTRIBUTED BY RANDOM BUCKETS AUTO
PROPERTIES (
"replication_num" = "1",
"storage_policy" = "log_policy_1day",
"deprecated_dynamic_schema" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "7",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "AUTO",
"dynamic_partition.replication_num" = "1"
);

1.使用 DATETIMEV2 类型的时间字段作为 Key,在查询最新 N 条日志时有明显加速
2.对经常查询的字段创建索引,对需要全文检索的字段指定分词器 Parser 参数
3.分区使用时间字段上的 RANGE 分区,开启动态 Partiiton 可按天自动管理分区
4.分桶使用 RANDOM 随机分桶,使用 AUTO 可让系统根据集群规模和数据量自动计算分桶数量

倒排索引参考官方链接

https://doris.apache.org/zh-CN/docs/dev/table-design/index/inverted-index/

分区分桶参考官方链接

https://doris.apache.org/zh-CN/docs/table-design/data-partition/?_highlight=dynamic_partition&_highlight

日志导入

Kafka 导入

将 JSON 格式的日志写入 Kafka 消息队列,创建 Kafka Routine Load,让 Doris 从 Kafka 中主动拉取数据。示例如下,其中 property.* 配置是非必须的:

-- 创建routine load,从kafka log _topic将数据导入log_table表
CREATE ROUTINE LOAD load_log_kafka ON log_db.log_table
COLUMNS(ts, clientip, request, status, size)
PROPERTIES (
    "max_batch_interval" = "10",
    "max_batch_rows" = "1000000",
    "max_batch_size" = "109715200",
    "strict_mode" = "false",
    "format" = "json"
)
FROM KAFKA (
    "kafka_broker_list" = "host:port",
    "kafka_topic" = "log _topic",
    "property.group.id" = "your_group_id",
    "property.security.protocol"="SASL_PLAINTEXT",     
    "property.sasl.mechanism"="GSSAPI",     
    "property.sasl.kerberos.service.name"="kafka",     
    "property.sasl.kerberos.keytab"="/path/to/xxx.keytab",     
    "property.sasl.kerberos.principal"="xxx@yyy.com"
);

kakfa导入参考官方文档:

https://doris.apache.org/zh-CN/docs/data-operate/import/routine-load-manual

Logstash 导入

Logstash 导入
配置 Logstash 的 HTTP Output,将数据通过 HTTP Stream Load 发送到 Doris。

配置 Batch 攒批条数和时间,用于提升数据写入性能

pipeline.batch.size: 100000
pipeline.batch.delay: 10000

output 配置

output {
    http {
       follow_redirects => true
       keepalive => false
       http_method => "put"
       url => "http://172.21.0.5:8640/api/logdb/logtable/_stream_load"
       headers => [
           "format", "json",
           "strip_outer_array", "true",
           "load_to_single_tablet", "true",
           "Authorization", "Basic cm9vdDo=",
           "Expect", "100-continue"
       ]
       format => "json_batch"
    }
}
flume导入

flume支持两种导入一种是kafka,参考上文,一种是自定义sink,写程序导入。
自定义sink导入,参考下面的方式通过 Http Stream Load 接口导入数据到 Doris,关键点如下:
1.使用 basic auth 进行 HTTP 鉴权,用命令echo -n ‘username:password’ | base64 来计算
2.设置http header “format:json”,指定数据格式为 JSON
3.设置 http header “read_json_by_line:true”,指定每行一个 JSON
4.设置 http header “load_to_single_tablet:true”,指定一次写入一个分桶
5.目前建议写入客户端一个 Batch 100MB ~ 1GB,后续版本会通过服务端 Group Commit 降低客户端 Batch 大小

curl \
--location-trusted \
-u username:password \
-H "format:json" \
-H "read_json_by_line:true" \
-H "load_to_single_tablet:true" \
-T logfile.json \
http://fe_host:fe_http_port/api/log_db/log_table/_stream_load

Logstash 导入和flume导入实质都是通过http方式导入,官网文档如下:

https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual

还有其它的导入比如mysql客户端,s3导入等,具体文档参考如下:

https://doris.apache.org/zh-CN/docs/data-operate/import/load-manual

后续会讲下Doris与es的性能比较。