LookupLevels
LookupLevels
在 Paimon 中扮演着**“带缓存的、基于 Key 的数据查找引擎”**的角色。它的核心使命是:当需要根据主键(Key)查找某条数据时,能够高效地在 LSM-Tree 的多层(Levels)数据文件中定位到这条数据,并尽可能地利用本地缓存来避免昂贵的远程 I/O。
LookupLevels
封装了对一个 Levels
对象(代表了 LSM-Tree 的一个分区的所有数据文件分层信息)的查找操作。它本身不存储数据文件的元数据,而是依赖传入的 Levels
对象。它的主要职责可以概括为:
- 提供统一的查找入口:通过
lookup(InternalRow key, int startLevel)
方法,屏蔽底层多层文件和缓存的复杂性。 - 实现惰性本地缓存:当第一次需要在一个远程数据文件(SST 文件)中查找时,它会下载该文件,并在本地构建一个优化的、基于 Key 的查找索引文件(Lookup File)。
- 管理缓存生命周期:通过
Caffeine
缓存管理这些本地查找文件,并能在远程文件被删除时(如 Compaction 后),自动清理对应的本地缓存。 - 提供灵活的查找策略:通过
ValueProcessor
接口,支持不同场景的查找需求,例如:只需判断 Key 是否存在、需要获取完整的 Value、或需要获取 Value 在文件中的物理位置等。
关键成员变量(构造函数解析)
LookupLevels
的所有核心依赖都通过构造函数注入,这体现了良好的设计模式。
// ... existing code ...
private final Levels levels;
private final Comparator<InternalRow> keyComparator;
private final RowCompactedSerializer keySerializer;
private final ValueProcessor<T> valueProcessor;
private final IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory;
private final Function<String, File> localFileFactory;
private final LookupStoreFactory lookupStoreFactory;
private final Function<Long, BloomFilter.Builder> bfGenerator;
private final Cache<String, LookupFile> lookupFileCache;
private final Set<String> ownCachedFiles;
public LookupLevels(
Levels levels,
Comparator<InternalRow> keyComparator,
RowType keyType,
ValueProcessor<T> valueProcessor,
IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
Function<String, File> localFileFactory,
LookupStoreFactory lookupStoreFactory,
Function<Long, BloomFilter.Builder> bfGenerator,
Cache<String, LookupFile> lookupFileCache) {
// ... existing code ...
levels
: 持有 LSM-Tree 的文件分层元数据,是查找的基础。keyComparator
&keySerializer
: 用于主键的比较和序列化。valueProcessor
: 策略模式的核心。它决定了从远程文件读取数据后,在本地查找文件中存储什么,以及最终返回给调用者什么。fileReaderFactory
: 一个工厂,用于根据DataFileMeta
创建能读取远程数据文件(如 Parquet)的RecordReader
。localFileFactory
: 一个工厂,用于在本地磁盘上创建临时文件,作为查找文件的载体。lookupStoreFactory
: 一个工厂,用于创建真正的本地 KV 存储。Paimon 默认使用基于哈希的实现 (HashLookupStoreFactory
)。bfGenerator
: 布隆过滤器(Bloom Filter)生成器,用于在创建本地查找文件时一并生成布隆过滤器,可以快速过滤掉不存在的 Key,避免磁盘 I/O。lookupFileCache
: 核心缓存,一个Caffeine
缓存实例,Key 是远程数据文件的文件名,Value 是封装了本地查找文件的LookupFile
对象。这个缓存可以在多个LookupLevels
实例间共享。ownCachedFiles
: 一个HashSet
,用于追踪由当前LookupLevels
实例创建并放入缓存的文件。这主要用于在当前实例关闭时,能准确地清理自己创建的缓存。
lookup(InternalRow key, int startLevel)
// ... existing code ...
@Nullable
public T lookup(InternalRow key, int startLevel) throws IOException {
return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0);
}
// ... existing code ...
这个方法将实际的查找逻辑委托给了 LookupUtils
工具类。LookupUtils.lookup
的逻辑遵循 LSM-Tree 的基本原则:
- 从
startLevel
开始,逐层向上查找(Level 0, Level 1, Level 2...)。 - 因为低层(Level 号码小)的数据比高层的数据更新,所以一旦在某一层找到了数据,就立刻返回,不再继续查找更高层。
- 对于 Level 0,由于文件之间 key range 可能重叠,需要遍历所有文件。
- 对于 Level 1 及以上,文件间的 key range 不重叠,因此可以通过二分查找快速定位到可能包含目标 key 的那个文件(这里的二分是根据DataFileMeta记录的最大和最小key)。
最终,无论是哪一层,定位到具体的 DataFileMeta
后,都会调用 LookupLevels
自己的 lookup(InternalRow key, DataFileMeta file)
方法。
lookup(InternalRow key, DataFileMeta file)
// ... existing code ...
@Nullable
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
// 1. 尝试从缓存获取 LookupFile
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
boolean newCreatedLookupFile = false;
if (lookupFile == null) {
// 2. 缓存未命中,调用 createLookupFile 创建一个新的
lookupFile = createLookupFile(file);
newCreatedLookupFile = true;
}
byte[] valueBytes;
try {
byte[] keyBytes = keySerializer.serializeToBytes(key);
// 3. 使用 LookupFile 在本地进行查找
valueBytes = lookupFile.get(keyBytes);
} finally {
if (newCreatedLookupFile) {
// 4. 如果是新创建的,放入缓存供后续使用
lookupFileCache.put(file.fileName(), lookupFile);
}
}
if (valueBytes == null) {
return null;
}
// 5. 使用 ValueProcessor 将从本地文件读出的字节数组,转换成最终结果
return valueProcessor.readFromDisk(
key, lookupFile.remoteFile().level(), valueBytes, file.fileName());
}
// ... existing code ...
这个过程清晰地展示了 “按需加载、惰性创建” 的缓存模式。而 createLookupFile
方法则是将远程数据转化为本地索引的核心:
- 创建本地空文件:
localFileFactory.apply(...)
。 - 创建本地写入器:
lookupStoreFactory.createWriter(...)
。 - 创建远程读取器:
fileReaderFactory.apply(file)
。 - 数据转换与写入:循环读取远程文件中的每条
KeyValue
,通过valueProcessor.persistToDisk(...)
处理后,写入本地文件。 - 封装返回:将创建好的本地文件及其读取器封装成
LookupFile
对象返回。
createLookupFile(DataFileMeta file)
这个函数是 LookupLevels
实现惰性本地缓存机制的关键。它的主要作用是:当需要在一个远程数据文件(DataFileMeta
)中进行查找,但本地缓存又不存在时,由该函数负责将远程数据文件转换成本地优化的、可快速查找的索引文件(LookupFile
)。
下面我们分步骤、层层递进地解析它的实现逻辑。
// ... existing code ...
private LookupFile createLookupFile(DataFileMeta file) throws IOException {
// 1. 创建本地临时文件
File localFile = localFileFactory.apply(file.fileName());
if (!localFile.createNewFile()) {
throw new IOException("Can not create new file: " + localFile);
}
// 2. 创建本地 KV 存储的写入器
LookupStoreWriter kvWriter =
lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount()));
LookupStoreFactory.Context context;
try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
// 3. 数据从远程读取并写入本地
KeyValue kv;
if (valueProcessor.withPosition()) {
// ... (处理带位置信息的场景)
} else {
// ... (处理不带位置信息的场景)
}
} catch (IOException e) {
// 4. 异常处理:删除不完整的本地文件
FileIOUtils.deleteFileOrDirectory(localFile);
throw e;
} finally {
// 5. 关闭写入器并获取上下文
context = kvWriter.close();
}
// 6. 注册到 ownCachedFiles
ownCachedFiles.add(file.fileName());
// 7. 创建并返回 LookupFile 实例
return new LookupFile(
localFile,
file,
lookupStoreFactory.createReader(localFile, context),
() -> ownCachedFiles.remove(file.fileName()));
}
// ... existing code ...
创建本地临时文件 (localFileFactory.apply
)
File localFile = localFileFactory.apply(file.fileName());
- 作用: 为即将创建的本地查找文件在磁盘上预留一个位置。
- 递归分析:
localFileFactory
是一个Function<String, File>
类型的函数,在LookupLevels
实例化时由外部传入。- 在典型的
KeyValueFileStoreWrite
中,它的实现是调用ioManager.createChannel(prefix).getPathFile()
。 ioManager
是 Paimon 的 I/O 管理器,它负责在配置的临时目录(java.io.tmpdir
或用户指定的目录)下创建文件,并保证文件名唯一,通常会添加随机后缀。- 文件名的前缀由
LookupFile.localFilePrefix(...)
生成,格式为分区信息_bucket号_远程文件名
,这保证了不同来源的文件生成的本地文件不会冲突。
创建本地 KV 存储的写入器 (lookupStoreFactory.createWriter
)
LookupStoreWriter kvWriter =
lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount()));
- 作用: 获取一个能向第一步创建的
localFile
中写入键值对的LookupStoreWriter
对象。 - 递归分析:
lookupStoreFactory
: 这是本地查找文件格式的工厂。Paimon 支持多种格式,由CoreOptions.LOOKUP_LOCAL_FILE_TYPE
配置决定。HASH
: 返回HashLookupStoreFactory
。它创建的HashLookupStoreWriter
会构建一个基于哈希表的本地文件,提供 O(1) 的平均查找复杂度。SORT
: 返回SortLookupStoreFactory
。它创建的SortLookupStoreWriter
会构建一个基于排序键的本地文件,通过二分查找进行定位。
bfGenerator.apply(file.rowCount())
: 这是一个布隆过滤器(Bloom Filter)生成器。createWriter
会接收这个BloomFilter.Builder
,并在写入数据的同时,将所有的 Key 添加到布隆过滤器中。这个布隆过滤器最终也会被序列化到本地文件中,用于快速过滤掉不存在的 Key。
数据从远程读取并写入本地
try (RecordReader<KeyValue> reader = fileReaderFactory.apply(file)) {
// ... 循环读取和写入 ...
}
这是数据转换的核心步骤。
- 作用: 从远程的 SST 文件(如 Parquet)中逐条读取
KeyValue
,然后通过kvWriter
写入本地查找文件。 - 递归分析:
fileReaderFactory.apply(file)
: 创建一个RecordReader
来读取远程的DataFileMeta
。这个读取器知道如何解析 Parquet/ORC/Avro 文件格式。reader.readBatch()
: 为了效率,数据是按批次读取的。valueProcessor.withPosition()
: 这是一个判断,询问当前的ValueProcessor
策略是否需要原始数据在文件中的物理位置(Position)。- 如果为
true
(如PositionedKeyValueProcessor
),则会调用valueProcessor.persistToDisk(kv, batch.returnedPosition())
。 - 如果为
false
(如KeyValueProcessor
或ContainsValueProcessor
),则调用valueProcessor.persistToDisk(kv)
。
- 如果为
valueProcessor.persistToDisk(...)
: 这是策略模式的应用。ValueProcessor
决定了最终写入本地查找文件的value
是什么。KeyValueProcessor
: 写入完整的value
、sequenceNumber
和RowKind
。ContainsValueProcessor
: 写入一个空字节数组,极大地节省空间。PositionedKeyValueProcessor
: 写入rowPosition
以及可选的value
等信息。
kvWriter.put(keyBytes, valueBytes)
: 将序列化后的key
和value
写入本地查找文件。kvWriter
内部会处理哈希冲突(HASH 模式)或排序(SORT 模式),并同步更新布隆过滤器。
异常处理
catch (IOException e) {
FileIOUtils.deleteFileOrDirectory(localFile);
throw e;
}
- 作用: 这是一个健壮性保证。如果在数据转换过程中发生任何 I/O 异常(例如网络中断、磁盘写满),这个
catch
块会确保被创建出来但不完整的本地临时文件被删除,避免留下垃圾文件。
关闭写入器并获取上下文
- 作用:
kvWriter.close()
是一个至关重要的步骤。它会完成所有收尾工作,例如:- 将内存中的缓冲区(buffer)刷写到磁盘。
- 写入文件元数据(metadata),比如布隆过滤器的序列化数据、哈希表的元信息、索引块等。
- 返回一个
LookupStoreFactory.Context
对象,这个对象包含了读取该文件所必需的元信息(比如文件总大小、布隆过滤器在文件中的偏移量等)。这个context
会在后续创建LookupStoreReader
时被传入。
注册到 ownCachedFiles
ownCachedFiles.add(file.fileName());
- 作用: 将这个新创建的本地文件的远程文件名记录在
ownCachedFiles
这个Set
中。这用于追踪当前LookupLevels
实例创建了哪些缓存。当这个实例被close()
时,它只会清理自己创建的缓存,而不会影响其他实例创建的缓存。
创建并返回 LookupFile
实例
return new LookupFile(
localFile,
file,
lookupStoreFactory.createReader(localFile, context),
() -> ownCachedFiles.remove(file.fileName()));
- 作用: 将所有资源封装成一个
LookupFile
对象返回。 - 递归分析:
new LookupFile(...)
:LookupFile
是对一个本地查找文件的完整封装。lookupStoreFactory.createReader(localFile, context)
: 使用与写入时相同的工厂,并传入之前获取的context
,来创建一个LookupStoreReader
。这个reader
知道如何解析这个本地文件并执行快速查找。() -> ownCachedFiles.remove(file.fileName())
: 传入一个Runnable
回调。这个回调会在LookupFile
从Caffeine
缓存中被移除时调用,从而将文件名从ownCachedFiles
中也移除,保持状态同步。
ValueProcessor
策略接口
LookupLevels
提供了三种 ValueProcessor
实现,以应对不同场景:
KeyValueProcessor
: 用于需要获取完整KeyValue
的场景。它会将value
、sequenceNumber
和valueKind
都序列化后存入本地文件。ContainsValueProcessor
: 用于仅需判断 Key 是否存在的场景(例如 Lookup Join)。它的persistToDisk
直接返回一个空字节数组,极大地减小了本地索引文件的大小。readFromDisk
则直接返回true
。PositionedKeyValueProcessor
: 用于需要知道数据在原文件中物理位置的场景,这对于实现 Deletion Vector(删除向量)至关重要。它可以选择是否将value
也一并存入本地文件。
生命周期与缓存清理
LookupLevels
实现了两个接口来管理生命周期:
Closeable
: 当LookupLevels
对象关闭时(例如一个 Flink Task 结束),它的close()
方法会被调用,从而将它自己创建的那些缓存项(记录在ownCachedFiles
中)从共享的lookupFileCache
中移除。Levels.DropFileCallback
: 它把自己注册为Levels
的一个回调。当 Compaction 等操作导致某个远程数据文件被删除时,Levels
会通过notifyDropFile
方法通知LookupLevels
,后者会立即将该文件对应的本地缓存项从lookupFileCache
中移除,保证了缓存与实际数据的一致性。
总结
LookupLevels
是 Paimon 高性能查找能力的关键。它通过惰性本地缓存、可插拔的 KV 存储和灵活的 ValueProcessor
策略,巧妙地解决了在存算分离架构下,如何高效地对远程大规模数据进行点查的问题。其设计兼顾了性能、灵活性和健壮的缓存管理,是理解 Paimon 读取链路的一个绝佳范例。
lookup
方法的调用链
LookupLevels.lookup
方法本身非常简洁,它将所有复杂的逻辑都委托给了 LookupUtils.lookup
。
// ... existing code ...
@Nullable
public T lookup(InternalRow key, int startLevel) throws IOException {
return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0);
}
// ... existing code ...
这里的关键是理解两个方法引用:this::lookup
和 this::lookupLevel0
。它们分别指向 LookupLevels
类中其他同名但参数不同的 lookup
和 lookupLevel0
私有方法。
this::lookup
对应的是private T lookup(InternalRow key, SortedRun level)
。this::lookupLevel0
对应的是private T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0)
。
现在,我们来看 LookupUtils.lookup
的源码,看看它是如何使用这两个方法引用的。
第一层:LookupUtils.lookup(levels, ...)
- 顶层循环
LookupUtils.java
// ... existing code ...
public static <T> T lookup(
Levels levels,
InternalRow key,
int startLevel,
BiFunctionWithIOE<InternalRow, SortedRun, T> lookup, // 对应 this::lookup
BiFunctionWithIOE<InternalRow, TreeSet<DataFileMeta>, T> level0Lookup) // 对应 this::lookupLevel0
throws IOException {
T result = null;
// 1. 从 startLevel 开始,逐层向上查找
for (int i = startLevel; i < levels.numberOfLevels(); i++) {
if (i == 0) {
// 2. 如果是 Level 0,调用传入的 level0Lookup 函数
result = level0Lookup.apply(key, levels.level0());
} else {
// 3. 如果是 Level 1+,调用传入的 lookup 函数
SortedRun level = levels.runOfLevel(i);
result = lookup.apply(key, level);
}
// 4. 一旦找到结果,立即跳出循环
if (result != null) {
break;
}
}
return result;
}
// ... existing code ...
调用过程分析:
LookupUtils.lookup
接收到LookupLevels
传来的两个方法引用,分别命名为lookup
和level0Lookup
。- 它开始一个
for
循环,从startLevel
开始遍历 LSM-Tree 的每一层。 - 当
i == 0
时:它调用level0Lookup.apply(...)
。这实际上就是调用了LookupLevels.lookupLevel0(key, levels.level0())
。 - 当
i > 0
时:它调用lookup.apply(...)
。这实际上就是调用了LookupLevels.lookup(key, levels.runOfLevel(i))
。 - 只要任何一次调用返回了非
null
的结果,循环就会终止,并返回该结果。这符合 LSM-Tree 从新数据(低 Level)向旧数据(高 Level)查找的原则。
第二层:深入 LookupLevels
的私有方法
现在我们来看被调用的那两个私有方法内部又做了什么。
LookupLevels.lookupLevel0
// ... existing code ...
@Nullable
private T lookupLevel0(InternalRow key, TreeSet<DataFileMeta> level0) throws IOException {
return LookupUtils.lookupLevel0(keyComparator, key, level0, this::lookup);
}
// ... existing code ...
这里又出现了一次委托!它调用了 LookupUtils.lookupLevel0
,并且又传递了一个方法引用 this::lookup
。这次的 this::lookup
对应的是 private T lookup(InternalRow key, DataFileMeta file)
。
LookupLevels.lookup(key, level)
// ... existing code ...
@Nullable
private T lookup(InternalRow key, SortedRun level) throws IOException {
return LookupUtils.lookup(keyComparator, key, level, this::lookup);
}
// ... existing code ...
同样,这里也委托给了 LookupUtils.lookup
的另一个重载版本,并再次传递了方法引用 this::lookup
,同样对应 private T lookup(InternalRow key, DataFileMeta file)
。
第三层:LookupUtils
的具体查找逻辑
现在我们进入 LookupUtils
的具体实现,看看它们如何使用第三层传递进来的 this::lookup
方法引用。
LookupUtils.lookupLevel0
// ... existing code ...
public static <T> T lookupLevel0(
Comparator<InternalRow> keyComparator,
InternalRow target,
TreeSet<DataFileMeta> level0,
BiFunctionWithIOE<InternalRow, DataFileMeta, T> lookup) // 对应 this::lookup(key, file)
throws IOException {
T result = null;
// 遍历 Level 0 的所有文件
for (DataFileMeta file : level0) {
// 检查 key 是否在文件的 [minKey, maxKey] 范围内
if (keyComparator.compare(file.maxKey(), target) >= 0
&& keyComparator.compare(file.minKey(), target) <= 0) {
// 如果在范围内,就调用传入的 lookup 函数
result = lookup.apply(target, file);
if (result != null) {
// 找到就返回(Level 0 内部文件按新旧排序,所以第一个找到的就是最新的)
return result;
}
}
}
return null;
}
// ... existing code ...
LookupUtils.lookup(keyComparator, ...)
LookupUtils.java
// ... existing code ...
public static <T> T lookup(
Comparator<InternalRow> keyComparator,
InternalRow target,
SortedRun level,
BiFunctionWithIOE<InternalRow, DataFileMeta, T> lookup) // 对应 this::lookup(key, file)
throws IOException {
// ...
// 对 Level 1+ 的文件列表进行二分查找,找到可能包含 key 的那个文件
// ... (binary search logic) ...
List<DataFileMeta> files = level.files();
int left = 0;
int right = files.size() - 1;
// binary search restart positions to find the restart position immediately before the
// targetKey
while (left < right) {
int mid = (left + right) / 2;
if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
// Key at "mid.max" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.max" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
int index = right;
// ...
// 如果找到了对应的文件
if (index < files.size()
&& keyComparator.compare(files.get(index).minKey(), target) <= 0) {
// 调用传入的 lookup 函数
return lookup.apply(target, files.get(index));
}
return null;
}
// ... existing code ...
第四层:最终的执行体
经过层层传递,最终所有逻辑都汇聚到了 LookupLevels.lookup(InternalRow key, DataFileMeta file)
这个方法。
// ... existing code ...
@Nullable
private T lookup(InternalRow key, DataFileMeta file) throws IOException {
// 1. 检查本地缓存
LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName());
// 2. 如果缓存没有,创建本地查找文件
if (lookupFile == null) {
lookupFile = createLookupFile(file);
// ...
}
// 3. 在本地查找文件中执行 get 操作
valueBytes = lookupFile.get(keyBytes);
// ...
// 4. 处理结果并返回
return valueProcessor.readFromDisk(...);
}
// ... existing code ...
这个方法是真正干活的地方:检查和使用缓存、创建本地查找文件、从本地文件中读取数据。
总结调用链
整个过程是一个精巧的委托链条:
LookupLevels.lookup(key, startLevel)
(入口)- 委托给
LookupUtils.lookup(levels, ...)
,并告诉它:“当你需要处理 Level 0 时,请调用我的lookupLevel0
方法;当你需要处理 Level 1+ 时,请调用我的lookup(key, level)
方法。”
- 委托给
LookupUtils.lookup(levels, ...)
(顶层循环)- 根据当前层级,调用
LookupLevels
对应的私有方法。
- 根据当前层级,调用
LookupLevels.lookupLevel0
或LookupLevels.lookup(key, level)
(中间层)- 这两个方法再次委托给
LookupUtils
中更具体的实现 (lookupLevel0
或lookup(keyComparator, ...)
),并告诉它们:“当你定位到具体要查哪个DataFileMeta
文件时,请调用我的lookup(key, file)
方法。”
- 这两个方法再次委托给
LookupUtils.lookupLevel0
或LookupUtils.lookup(keyComparator, ...)
(定位层)- 它们负责遍历(Level 0)或二分查找(Level 1+)来定位到具体的文件。
- 一旦定位到
DataFileMeta
,就调用最终的方法引用,即LookupLevels.lookup(key, file)
。
LookupLevels.lookup(key, file)
(执行层)- 这是所有逻辑的终点,负责与缓存和本地文件系统交互,完成最终的查找。
这种设计将 “遍历/查找策略”(在 LookupUtils
中)和“具体执行逻辑” (在 LookupLevels
中)完美地解耦开来,使得代码结构清晰,复用性强。
LookupFile
LookupFile
是 Paimon 本地查找缓存机制的物理载体和逻辑封装。当 LookupLevels
决定将一个远程数据文件(SST 文件)缓存到本地时,最终产物就是一个 LookupFile
对象。这个对象不仅代表了本地磁盘上的一个物理文件,还封装了对该文件的所有操作和生命周期管理。
LookupFile
的职责非常清晰和集中:
- 封装本地查找文件:它持有一个本地
File
对象和一个LookupStoreReader
,提供了对这个本地优化文件的统一访问入口。 - 关联远程文件:它内部保存了原始远程文件
DataFileMeta
的引用,明确了这个本地缓存的来源。 - 提供查找功能:通过
get(byte[] key)
方法,利用内部的LookupStoreReader
在本地文件上执行快速的 Key-Value 查找。 - 管理生命周期:通过
close()
方法,负责关闭文件读取器、删除本地物理文件,并执行必要的回调。 - 作为缓存的 Value:
LookupFile
对象本身就是Caffeine
缓存中的Value
,与远程文件名(Key
)一一对应。
关键成员变量(构造函数解析)
LookupFile
的所有核心组件都在构造时传入,清晰地定义了它的构成。
// ... existing code ...
private final File localFile;
private final DataFileMeta remoteFile;
private final LookupStoreReader reader;
private final Runnable callback;
private long requestCount;
private long hitCount;
private boolean isClosed = false;
public LookupFile(
File localFile, DataFileMeta remoteFile, LookupStoreReader reader, Runnable callback) {
this.localFile = localFile;
this.remoteFile = remoteFile;
this.reader = reader;
this.callback = callback;
}
// ... existing code ...
localFile
: 指向本地磁盘上物理文件的java.io.File
对象。这是查找操作的物理基础。remoteFile
: 对应的远程数据文件的元数据(DataFileMeta
)。用于追溯来源和调试。reader
: 核心查找器。这是一个LookupStoreReader
接口的实例,它知道如何解析localFile
的二进制格式并高效地查找 Key。具体的实现可能是HashLookupStoreReader
或SortLookupStoreReader
,取决于创建时使用的LookupStoreFactory
。callback
: 一个Runnable
对象。这是一个非常重要的回调函数。当LookupFile
被关闭或从缓存中移除时,这个回调会被执行。在LookupLevels
中,这个回调的作用是ownCachedFiles.remove(file.fileName())
,用于维护LookupLevels
内部的状态一致性。requestCount
,hitCount
: 用于统计这个本地文件的访问情况,便于监控缓存效率。isClosed
: 状态标记,防止对一个已经关闭的LookupFile
进行操作。
get(byte[] key)
// ... existing code ...
@Nullable
public byte[] get(byte[] key) throws IOException {
checkArgument(!isClosed);
requestCount++;
byte[] res = reader.lookup(key);
if (res != null) {
hitCount++;
}
return res;
}
// ... existing code ...
这是 LookupFile
最主要的功能方法。它将查找请求直接委托给内部的 reader.lookup(key)
。LookupStoreReader
会利用布隆过滤器、哈希索引或二分查找等技术,在 localFile
中快速定位并返回与 key
对应的 value
字节数组。
close(RemovalCause cause)
// ... existing code ...
public void close(RemovalCause cause) throws IOException {
reader.close();
isClosed = true;
callback.run();
LOG.info(
"Delete Lookup file {} due to {}. Access stats: requestCount={}, hitCount={}, size={}KB",
// ... existing code ...
FileIOUtils.deleteFileOrDirectory(localFile);
}
// ... existing code ...
这是 LookupFile
的生命周期终点。它执行一系列清理操作:
reader.close()
: 关闭底层的LookupStoreReader
,释放文件句柄等资源。isClosed = true
: 更新状态。callback.run()
: 执行回调,通知其创建者(LookupLevels
)自己已被销毁。LOG.info(...)
: 打印一条详细的日志,说明文件被删除的原因(RemovalCause
)、访问统计和文件大小,这对于问题排查和性能调优非常有价值。FileIOUtils.deleteFileOrDirectory(localFile)
: 从本地磁盘上删除物理文件,释放磁盘空间。
静态工厂与缓存集成 (createCache
)
LookupFile
类还包含一组非常重要的静态方法,用于创建和配置 Caffeine
缓存。这使得缓存的创建逻辑与 LookupFile
本身紧密耦合,体现了其作为“可缓存对象”的设计意图。
// ... existing code ...
public static Cache<String, LookupFile> createCache(
Duration fileRetention, MemorySize maxDiskSize) {
return Caffeine.newBuilder()
.expireAfterAccess(fileRetention)
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher(LookupFile::fileWeigh)
.removalListener(LookupFile::removalCallback)
.executor(Runnable::run)
.build();
}
// ... existing code ...
这里配置了 Caffeine
缓存的几个关键策略:
.expireAfterAccess(fileRetention)
: 基于访问时间的淘汰策略。如果一个LookupFile
在fileRetention
时间内没有被访问过,它就会被标记为过期,并可能被缓存淘汰。这是避免冷数据永久占用磁盘空间的关键。.maximumWeight(maxDiskSize.getKibiBytes())
: 基于容量的淘汰策略。限制了所有本地查找文件占用的总磁盘空间。.weigher(LookupFile::fileWeigh)
: 指定了如何计算每个缓存项的“权重”。fileWeigh
方法返回本地文件的大小(以 KB 为单位)。当总权重超过maximumWeight
时,Caffeine
会开始淘汰缓存项。.removalListener(LookupFile::removalCallback)
: 核心清理机制。当一个LookupFile
因为任何原因(过期、容量超限、手动删除)被缓存移除时,removalCallback
方法会被调用。这个回调方法会调用lookupFile.close(cause)
,从而触发前面分析的清理流程,确保本地物理文件被删除。
Caffeine 内部有自己的维护线程,用于处理过期检查、淘汰计算等任务。默认情况下,为了不阻塞这些核心维护线程,removalListener
会被提交到一个默认的 Executor
(通常是 ForkJoinPool.commonPool()
) 中异步执行。
然而,Paimon 在这里通过 .executor(Runnable::run)
改变了默认行为。
Runnable::run
是一个方法引用,它等价于一个 Executor
的实现:
// 伪代码
class SameThreadExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run(); // 直接在当前线程执行,而不是提交到线程池
}
}
所以,.executor(Runnable::run)
的确切含义是:
“请不要将 removalListener
的任务异步地提交到线程池,而是在触发这个移除事件的那个线程中,立即、同步地执行它。”
为什么 Paimon 要这么做?
选择同步执行而不是异步执行,通常是出于以下考虑:
及时性和确定性:
LookupFile
的关闭操作涉及到文件 I/O(删除本地文件)。通过同步执行,可以确保一旦 Caffeine 决定移除一个文件,这个文件会立即被删除。这避免了在异步执行模型下可能出现的延迟——即缓存已经认为条目被移除了,但对应的磁盘文件由于线程池繁忙等原因,可能过了一小段时间才被真正删除。对于需要精确控制磁盘空间的场景,这种确定性很重要。简化资源管理: 同步执行可以简化并发控制。由于回调是在 Caffeine 的内部维护线程中同步执行的,可以减少多线程并发访问和修改
LookupFile
状态的复杂性。任务轻量:
lookupFile.close()
操作虽然是 I/O,但删除单个文件通常是一个非常快速的操作。Paimon 的设计者可能认为这个操作足够轻量,不会对 Caffeine 的内部线程造成明显的阻塞,因此同步执行的开销是可以接受的。
本地文件名生成 (localFilePrefix
)
// ... existing code ...
public static String localFilePrefix(
RowType partitionType, BinaryRow partition, int bucket, String remoteFileName) {
if (partition.getFieldCount() == 0) {
return String.format("%s-%s", bucket, remoteFileName);
} else {
String partitionString = partToSimpleString(partitionType, partition, "-", 20);
return String.format("%s-%s-%s", partitionString, bucket, remoteFileName);
}
}
// ... existing code ...
这个静态方法定义了本地查找文件的命名规则,格式为 分区信息-bucket号-远程文件名
。这样做的好处是:
- 唯一性:保证了不同分区、不同 Bucket、不同远程文件生成的本地缓存文件不会重名。
- 可读性:通过文件名就可以大致了解这个缓存文件对应的数据来源,方便人工排查问题。
DataFileMeta
vs. LookupFile
这两者代表了不同层面的东西,它们的存储位置也不同:
DataFileMeta
: 对象本身在内存中,它描述的数据在远程磁盘上。DataFileMeta
是一个轻量级的 Java 对象,它本身存在于 JVM 内存中。- 它并不包含实际的数据,而是数据的“元信息”,比如数据文件的路径(通常在 HDFS、S3 等远程存储上)、文件大小、行数、主键的最小/最大值等。
- 你可以把它理解为一个指向远程数据文件的“指针”或“描述符”。
LookupFile
: 对象本身在内存中,它管理的数据在本地磁盘上。LookupFile
也是一个 Java 对象,存在于 JVM 内存中(具体来说,是作为lookupFileCache
的value
)。- 它的核心职责是管理一个为了加速查询而创建在本地磁盘上的临时文件。这个本地文件是远程数据文件(由
DataFileMeta
指向)的一个经过优化的副本。 - 当需要进行 lookup 时,Paimon 会将远程文件拉取到本地,并可能转换成更适合快速随机查找的格式(如哈希索引文件),这个本地文件由
LookupFile
对象来管理。
Caffeine 自带写磁盘功能?
这是一个非常关键的问题。答案是:不,Caffeine 本身是一个纯粹的、高性能的 内存 缓存库。 它不提供原生的将缓存项写入磁盘的功能。
那么 Paimon 是如何实现用它来管理磁盘文件的呢?
Paimon 在这里使用了一种非常巧妙的设计模式,利用 Caffeine 的能力来管理本地磁盘文件的 生命周期,而不是用它来存储文件内容本身。
具体实现机制如下:
缓存的不是文件内容,而是文件句柄:Paimon 存入 Caffeine 缓存的
value
不是文件的字节内容,而是LookupFile
这个 Java 对象。这个对象内部持有了本地磁盘文件的路径。权重计算基于磁盘大小:Caffeine 的容量限制是通过
weigher
来计算每个缓存项的“权重”的。Paimon 在配置lookupFileCache
时,提供的weigher
(LookupFile::fileWeigh
) 计算的不是LookupFile
对象的内存大小,而是它所指向的本地磁盘文件的实际大小。// ... existing code ... public static Cache<String, LookupFile> createCache( Duration fileRetention, MemorySize maxDiskSize) { return Caffeine.newBuilder() .expireAfterAccess(fileRetention) // 这里的 maximumWeight 是基于磁盘大小的 .maximumWeight(maxDiskSize.getKibiBytes()) // weigher 计算的是本地文件在磁盘上的大小 .weigher(LookupFile::fileWeigh) .removalListener(LookupFile::removalCallback) .executor(Runnable::run) .build(); } private static int fileWeigh(String file, LookupFile lookupFile) { // 返回本地文件的大小(以 KB 为单位) return fileKibiBytes(lookupFile.localFile); } // ... existing code ...
这样一来,Caffeine 的容量管理就从“内存大小”变成了“磁盘空间大小”。
利用淘汰监听器删除文件:当缓存占用的总“权重”(即总磁盘空间)超过阈值时,Caffeine 会根据其淘汰策略(如 LRU)移除一个
LookupFile
对象。此时,会触发注册的removalListener
(LookupFile::removalCallback
)。在这个监听器中,Paimon 的代码会主动地去删除该LookupFile
对象所对应的本地磁盘文件,从而释放磁盘空间。// ... existing code ... private static void removalCallback(String file, LookupFile lookupFile, RemovalCause cause) { if (lookupFile != null) { try { // 当缓存项被淘汰时,关闭并删除对应的本地磁盘文件 lookupFile.close(cause); } catch (IOException e) { throw new UncheckedIOException(e); } } } public void close(RemovalCause cause) throws IOException { reader.close(); isClosed = true; callback.run(); LOG.info( "Delete Lookup file {} due to {}. Access stats: requestCount={}, hitCount={}, size={}KB", localFile.getName(), cause, requestCount, hitCount, localFile.length() >> 10); FileIOUtils.deleteFileOrDirectory(localFile); } // ... existing code ...
综上所述,Paimon 并不是在使用 Caffeine 的某个隐藏的磁盘功能,而是将 Caffeine 作为了一个非常高效的本地磁盘文件生命周期管理器。Caffeine 负责决定“何时”淘汰哪个文件,而 Paimon 的代码则负责执行实际的“文件删除”操作。这是一个非常优雅的组合设计。