topic分着存储在broker的分区中,分区进一步分为segment。
日志目录中的每一组文件都代表一个段。
段文件名中的后缀表示该段的基本偏移量。
log.segment.bytes表示分段的最大大小。
消息写入分区时,kafka会将这些消息写入段,写满了再创建一个新的段开始写消息。
log.segment.bytes
log.segment.ms 指定单个日志段的最大存活时间,如果超过了时间,kafka会滚动到一个新段
log.cleanup.policy 日志段的保留策略。可以是删除delete或者压缩compact。
kafka可以指定消费某个offset的数据。kafka的消息是有索引的,便于快速定位。
文件类型:
.log:存储消息内容(Segment文件)。.log文件追加写入。
.index:偏移量索引(快速定位消息物理位置)。
.timeindex:时间戳索引(按时间范围查询)。
log file
一级索引是偏移量,二级索引是位置。
位置:批次在文件中的位置。index索引是建在这个列上的。
index file
消费者在分区中查找偏移量为k的消息:找到偏移量为k的那个segment。从这个segment定位取出消息。
查找偏移量为123456的消息。
步骤:
根据baseOffset确定所属Segment文件(如00000000000000012345.log)。
在.index文件中通过二分查找定位到物理位置(position)。
从.log文件中读取具体消息内容。
步骤1:确定目标Segment 二分查找Segment 每个Segment的.index文件记录其起始偏移量(baseOffset)。 例如:
00000000000000000000.index:baseOffset=0
00000000000000000170.index:baseOffset=170410
00000000000000000239.index:baseOffset=239430
通过二分查找确定k=170417属于baseOffset=170410的Segment(文件名00000000000000000170.index)。
步骤2:定位消息在Segment内的位置 读取Segment的.index文件 在.index文件中,找到最后一个小于等于k的索引项。
例如: text 170410 0 170420 1024 k=170417介于170410和170420之间,取前一个索引项(170410, 0)。
计算消息在.log文件中的偏移量 起始偏移量baseOffset=170410,目标偏移量k=170417,两者差值delta=7。
从.index文件中获取基地址position=0(物理便宜量,读取的时候是按字节数),加上delta * 平均消息大小(假设平均每条消息占100字节): text 实际物理位置 = 0 + (170417 - 170410) * 100 = 700字节
步骤3:读取消息内容 从.log文件读取数据 从物理位置700字节开始读取完整消息(含Key、Value、时间戳等元数据)。
timeIndex 时间戳索引
计算时间范围T_start = now - 1h。
遍历Segment的.timeindex,找到第一个timestamp >= T_start的Segment。
通过.timeindex和.offsetindex逐条读取消息。