最近基于lua脚本做了一套数据采集服务,日采集数据量 :1.1亿条/天,CPU和内存都可以做到极低的水平。当然上线以来也经历了一波全链路的崩溃到修复过程。其中涉及到nginx优化、lua脚本写入kafka性能优化、kafka性能优化;
1、数据压缩策略
进行对应的数据压缩是大数据量传输的前提条件,业界用得比较多的压缩算法有gzip、lz4、snappy、zstd。已经实现了snappy、gzip,后续计划扩展zstd和lz4;假如考虑速度更快可以考虑snappy、lz4;考虑压缩比更高可以考虑zstd;(ps:微信桌面端通信采用snappy进行数据压缩)
2、nginx网关层
worker_processes auto; # 根据CPU自动选择worker进程数据
client_body_buffer_size 64k; # 增大可以防止生成大量小文件,全在内存进行处理,提升效率
3、lua采集脚本
lua-resty-kafka配置优化(生产者优化)
kafka_producer = producer:new(broker_list, {
producer_type = "async", -- 异步生产者类型
required_acks = 1,
request_timeout = 20000,
max_retry = 15,
retry_backoff = 1000,
max_buffering = 100000, -- 生产者缓冲消息队列最大值,默认5w条(queue.buffering.max.messages)
--batch_size = 400,
batch_size = 2097152, -- 生产者缓冲区大小,每次发送的数据大于这个值,有可能发生堆积(send.buffer.bytes)应小于kafka的socket.request.max.bytes / 2 - 10k
batch_num = 600, -- 生产者每次批量发送的数量(batch.num.messages)
linger_ms = 800, --
compression_codec = "gzip" -- 当前插件是不支持压缩,可惜了
})
(1)生产者内存队列缓存区配置
(2)每个批次发送大小
(2)网络缓冲区,批次大小不能超过网络缓冲区,网络缓冲区最大依赖于操作系统的tcp和kafka broker配置。
(3)发送前的数据压缩
(4)多分区topic提升写入并行度
(5)异步写入
(6)ack=1,单副本成功即可
异步批量写盘优化
local function async_flush_logs()
if #log_buffer == 0 then return end
ngx.thread.spawn(function()
local retries = 3 -- 最大重试次数
local delay = 0.1 -- 每次重试的延迟时间(秒)
while retries > 0 do
-- 确保日志文件已初始化
init_log_file()
-- 检查日志文件是否已成功打开
if current_log_file then
local ok, err = pcall(function()
for _, log_entry in ipairs(log_buffer) do
current_log_file:write(log_entry .. "\n")
end
current_log_file:flush() -- 确保数据立即写入磁盘
end)
if not ok then
ngx.log(ngx.ERR, "Failed to write log: ", err)
else
-- 写入成功,清空缓冲区
log_buffer = {}
break -- 退出重试循环
end
else
ngx.log(ngx.WARN, "Log file is not initialized, retrying...")
retries = retries - 1
ngx.sleep(delay) -- 等待一段时间后重试
end
end
if retries == 0 then
ngx.log(ngx.ERR, "Failed to write log after multiple retries")
end
end)
end
4、kafka端
Kafka Broker 的 socket.request.max.bytes=204857600(200M),配合写入端进行性能优化
堆内存配置大小
中型正式环境 4~8G
大型生产环境 8~16G
5、安全性
采用非对称加密+对称加密的混合方式进行数据加密传输;完全的非对称加密在数据量稍大的情况下会消耗大量的cpu和压缩解压时间;
对于上报的数据压缩提需要校验上传文件的最大值,以及解压后数据大小,防止非法请求,循环解压浪费大量cpu和存储资源