背景
本文基于 Paimon 0.9
出于对与Paimon内部的DeleteVctor的实现以及部分更新的实现进行的源码阅读。
关于 DeleteVector的介绍可以看这里
说明
对于Paimon来说无论是Spark中使用还是Flink使用,后面的逻辑都是一样的,所以我们以Spark为例来说。所以我们会参考类 org.apache.paimon.spark.SparkSource
,
对于Flink可以参考org.apache.paimon.flink.FlinkTableFactory
如没特别说明,这里都是以主键表来进行说明。
paimon的部分字段更新
这里主要的场景更多的是多流或者多批写同一个表字段的场景,且每个流或批只更新某几个字段(同样的主键),具体的配置或说明参考Partial Update
这里涉及到的方法为 SparkTableWrite.write
,最终会到MergeTreeWriter.write
:
@Override
public void write(KeyValue kv) throws Exception {
long sequenceNumber = newSequenceNumber();
boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
flushWriteBuffer(false, false);
success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
if (!success) {
throw new RuntimeException("Mem table is too small to hold a single element.");
}
}
}
writeBuffer.put
主要是往buffer中写数据
这里的writeBuffer
是SortBufferWriteBuffer
类实例。
这里会 主键+sequenceNumber+valueKind + value 的形式写入数据flushWriteBuffer
这里就会涉及到数据落盘以及部分更新的逻辑:writeBuffer.forEach( keyComparator, mergeFunction, changelogWriter == null ? null : changelogWriter::write, dataWriter::write);
- mergeFunction 这里的函数就是会在
MergeTreeWriter
初始化,也就是会初始化为PartialUpdateMergeFunction
。 - 对于
forEach
的实现会构建一个MergeIterator
,在这里面会调用PartialUpdateMergeFunction.add
方法
这里就会涉及到部分更新的逻辑,主要就是:把按照主键+sequenceNumber
排序好的数据传给PartialUpdateMergeFunction
,
这样PartialUpdateMergeFunction
只需要判断前后两个的数据的主键是否一致来进行更新。
具体的更新逻辑见: Partial Update
这里的new MergeIterator( awConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
buffer.sortedIterator
主要看SortBufferWriteBuffer
构造方法(也就是为什么会按照主键+sequenceNumber
排序):
其中public SortBufferWriteBuffer( RowType keyType, RowType valueType, @Nullable FieldsComparator userDefinedSeqComparator, MemorySegmentPool memoryPool, boolean spillable, MemorySize maxDiskSize, int sortMaxFan, CompressOptions compression, IOManager ioManager) { ... // key fields IntStream sortFields = IntStream.range(0, keyType.getFieldCount()); // user define sequence fields if (userDefinedSeqComparator != null) { IntStream udsFields = IntStream.of(userDefinedSeqComparator.compareFields()) .map(operand -> operand + keyType.getFieldCount() + 2); sortFields = IntStream.concat(sortFields, udsFields); } // sequence field sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())); int[] sortFieldArray = sortFields.toArray(); // row type List<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes()); fieldTypes.add(new BigIntType(false)); fieldTypes.add(new TinyIntType(false)); fieldTypes.addAll(valueType.getFieldTypes()); NormalizedKeyComputer normalizedKeyComputer = CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray); RecordComparator keyComparator = CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray); ... InternalRowSerializer serializer = InternalSerializers.create(KeyValue.schema(keyType, valueType)); BinaryInMemorySortBuffer inMemorySortBuffer = BinaryInMemorySortBuffer.createBuffer( normalizedKeyComputer, serializer, keyComparator, memoryPool);
IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()))
就会会把sequenceNumber
这个字段带入到排序中去,
也就是在buffer.sortedIterato
方法中调用。
如果有定义sequence.field
,那这里面的字段也会参与排序,见:udsFields
字段
- mergeFunction 这里的函数就是会在
DeleteVector的实现
关于deleteVector的实现,可以参考Introduce deletion vectors for primary key table
大概的思想是: 基于Compaction + lookup
的机制产生 DeleteVector:
- 当一个记录不属于 level0层的话,就不会产生DelectVector
- 当一个记录只属于需要进行compaction的level的话,就不会产生DeleteVector
- 当一个记录只属于 level0层的话,就要去查询不包含 Compaction的层的文件数据,从而产生DeleteVector
注意: deleteVector只支持主键表, 是属于bucket级别的,一个bucket一个DeleteVector。
DeleteVector的写
按照以上的说法,只有在Compaction的时候,才会产生DeleteVector,所以 我们直接到达 MergeTreeWriter.flushWriteBuffer
,这里涉及到DeleteVector的数据流如下:
compactManager.triggerCompaction(forcedFullCompaction)
||
\/
submitCompaction
||
\/
MergeTreeCompactTask.doCompact
||
\/
rewrite
||
\/
rewriteImpl
||
\/
LookupMergeTreeCompactRewriter.rewrite
||
\/
rewriteOrProduceChangelog
||
\/
createMergeWrapper
||
\/
iterator.next()
||
\/
RecordReaderIterator.next()
||
\/
advanceIfNeeded
||
\/
currentIterator.next()
||
\/
SortMergeIterator.next()
||
\/
LookupChangelogMergeFunctionWrapper.add(winner)
||
\/
LookupChangelogMergeFunctionWrapper.getResult()
这里
MergeTreeCompactTask.doCompact
写完之后,会有result.setDeletionFile(compactDfSupplier.get())
compactDfSupplier
这里的源自submitCompaction
方法中的compactDfSupplier
构造:if (dvMaintainer != null) { compactDfSupplier = lazyGenDeletionFile ? () -> CompactDeletionFile.lazyGeneration(dvMaintainer) : () -> CompactDeletionFile.generateFiles(dvMaintainer); }
而这里的deleteVector的产生来自
LookupChangelogMergeFunctionWrapper.getResult()
,见以下说明这里的
LookupMergeTreeCompactRewriter.rewrite
的LookupMergeTreeCompactRewriter
实例是在创建MergeTreeWriter
的CompactManager compactManager = createCompactManager( partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer)
这里会调用
createRewriter
方法创建LookupMergeTreeCompactRewriter
实例,
其中会根据lookupStrategy
来创建该实例:public LookupStrategy lookupStrategy() { return LookupStrategy.from( mergeEngine().equals(MergeEngine.FIRST_ROW), changelogProducer().equals(ChangelogProducer.LOOKUP), deletionVectorsEnabled(), options.get(FORCE_LOOKUP));
这里
currentIterator.next()
是 通过调用currentIterator = SortMergeReaderWithLoserTree.readBatch
获取的,而SortMergeReaderWithLoserTree
是通过readerForMergeTree
方法获取的这里
LookupChangelogMergeFunctionWrapper.getResult()
才是重点@Override public ChangelogResult getResult() { // 1. Compute the latest high level record and containLevel0 of candidates LinkedList<KeyValue> candidates = mergeFunction.candidates(); Iterator<KeyValue> descending = candidates.descendingIterator(); KeyValue highLevel = null; boolean containLevel0 = false; while (descending.hasNext()) { KeyValue kv = descending.next(); if (kv.level() > 0) { descending.remove(); if (highLevel == null) { highLevel = kv; } } else { containLevel0 = true; } } // 2. Lookup if latest high level record is absent if (highLevel == null) { InternalRow lookupKey = candidates.get(0).key(); T lookupResult = lookup.apply(lookupKey); if (lookupResult != null) { if (lookupStrategy.deletionVector) { PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult; highLevel = positionedKeyValue.keyValue(); deletionVectorsMaintainer.notifyNewDeletion( positionedKeyValue.fileName(), positionedKeyValue.rowPosition()); } else { highLevel = (KeyValue) lookupResult; } } } // 3. Calculate result KeyValue result = calculateResult(candidates, highLevel); // 4. Set changelog when there's level-0 records reusedResult.reset(); if (containLevel0 && lookupStrategy.produceChangelog) { setChangelog(highLevel, result); } return reusedResult.setResult(result); }
这里主要说明
lookup.apply
的方法,其中 lookup的 构造是在create
中LookupChangelogMergeFunctionWrapper
构造中:@Override public MergeFunctionWrapper<ChangelogResult> create( MergeFunctionFactory<KeyValue> mfFactory, int outputLevel, LookupLevels<T> lookupLevels, @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) { return new LookupChangelogMergeFunctionWrapper<>( mfFactory, key -> { try { return lookupLevels.lookup(key, outputLevel + 1); } catch (IOException e) { throw new UncheckedIOException(e); } }, valueEqualiser, changelogRowDeduplicate, lookupStrategy, deletionVectorsMaintainer, userDefinedSeqComparator); } }
这里的
lookupLevels.lookup
会最终调用createLookupFile
方法构造LookupFile
实例,
其中会调用valueProcessor.persistToDisk(kv, batch.returnedPosition()
方法,持久化 行号到对应的文件,
这样就能获取到对应的行号。获取到对应的结果 lookupResult 后
调用deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition()
方法去构造
DeletionVector.
上面提到的result.setDeletionFile(compactDfSupplier.get())
会调用CompactDeletionFile.generateFiles(dvMaintainer)
方法
从而调用maintainer.writeDeletionVectorsIndex
方法,从而写如到DeleteVector文件中。
DeleteVector的读
DeleteVector的读取主要在以下方法中构造:PrimaryKeyFileStoreTable.newRead
:
最终会调用RawFileSplitRead.createReader
从而调用 ApplyDeletionVectorReader(fileRecordReader, deletionVector)
方法构造ApplyDeletionVectorReader实例:
public RecordIterator<InternalRow> readBatch() throws IOException {
RecordIterator<InternalRow> batch = reader.readBatch();
if (batch == null) {
return null;
}
checkArgument(
batch instanceof FileRecordIterator,
"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");
return new ApplyDeletionFileRecordIterator(
(FileRecordIterator<InternalRow>) batch, deletionVector);
}
该处的readBatch
方法会构造一个ApplyDeletionFileRecordIterator
迭代器,可见在next()
方法会对每一个记录调用deletionVector.isDeleted
是否删除的判断.
@Override
public InternalRow next() throws IOException {
while (true) {
InternalRow next = iterator.next();
if (next == null) {
return null;
}
if (!deletionVector.isDeleted(returnedPosition())) {
return next;
}
}
}
FAQ
写入文件的时候,怎么记录行号和主键的关系?
这里不会写入的时候记录行号,会在调用createLookupFile
在构建 LookupFile这个文件的时候(初始化),从parquet文件读取过来的时候,就会获取行号。