背景
本文基于Spark 4.0。
总结
对于半结构化的数据来说,我们一般会有两种方式进行存储:
第一种是存储为JSON String,这种可以保证Schema free,但是在使用的时候得解析为JSON,从而进行运算操作。
第二种是存储为Struct类型,这种虽然性能好,但是Schema是不可变的
所以引入了Variant
类型:
- Schema free以及相对于json String来说会有更好的查询性能,且使半结构化数据处理快速简单。
- Variant数据类型以灵活的方式存储半结构化数据
- 无需预先定义模式。
- Variant二进制编码还允许比解析字符串更快地处理数据。
分析
直接到 Variant 对应的getFieldByKey
方法(这个方法相对于JSON String来说就是获取某个JSON所对应key的值):
public Variant getFieldByKey(String key) {
return handleObject(value, pos, (size, idSize, offsetSize, idStart, offsetStart, dataStart) -> {
// Use linear search for a short list. Switch to binary search when the length reaches
// `BINARY_SEARCH_THRESHOLD`.
final int BINARY_SEARCH_THRESHOLD = 32;
if (size < BINARY_SEARCH_THRESHOLD) {
for (int i = 0; i < size; ++i) {
int id = readUnsigned(value, idStart + idSize * i, idSize);
if (key.equals(getMetadataKey(metadata, id))) {
int offset = readUnsigned(value, offsetStart + offsetSize * i, offsetSize);
return new Variant(value, metadata, dataStart + offset);
}
}
} else {
int low = 0;
int high = size - 1;
while (low <= high) {
// Use unsigned right shift to compute the middle of `low` and `high`. This is not only a
// performance optimization, because it can properly handle the case where `low + high`
// overflows int.
int mid = (low + high) >>> 1;
int id = readUnsigned(value, idStart + idSize * mid, idSize);
int cmp = getMetadataKey(metadata, id).compareTo(key);
if (cmp < 0) {
low = mid + 1;
} else if (cmp > 0) {
high = mid - 1;
} else {
int offset = readUnsigned(value, offsetStart + offsetSize * mid, offsetSize);
return new Variant(value, metadata, dataStart + offset);
}
}
}
return null;
});
}
其中 handleObject 方法用来获取 Variant
对象的元数据信息,
public static <T> T handleObject(byte[] value, int pos, ObjectHandler<T> handler) {
checkIndex(pos, value.length);
int basicType = value[pos] & BASIC_TYPE_MASK;
int typeInfo = (value[pos] >> BASIC_TYPE_BITS) & TYPE_INFO_MASK;
if (basicType != OBJECT) throw unexpectedType(Type.OBJECT);
// Refer to the comment of the `OBJECT` constant for the details of the object header encoding.
// Suppose `typeInfo` has a bit representation of 0_b4_b3b2_b1b0, the following line extracts
// b4 to determine whether the object uses a 1/4-byte size.
boolean largeSize = ((typeInfo >> 4) & 0x1) != 0;
int sizeBytes = (largeSize ? U32_SIZE : 1);
int size = readUnsigned(value, pos + 1, sizeBytes);
// Extracts b3b2 to determine the integer size of the field id list.
int idSize = ((typeInfo >> 2) & 0x3) + 1;
// Extracts b1b0 to determine the integer size of the offset list.
int offsetSize = (typeInfo & 0x3) + 1;
int idStart = pos + 1 + sizeBytes;
int offsetStart = idStart + size * idSize;
int dataStart = offsetStart + (size + 1) * offsetSize;
return handler.apply(size, idSize, offsetSize, idStart, offsetStart, dataStart);
}
按照以上的布局来进行获取 该 object大小,field id list大小, offset list大小,id list的起始位,offset的起始位置。
接下来就是循环调用 getMetadataKey 方法获取每个key(通过offset[i+1]- offset[i])的具体值,与当前的key进行比对,如果相等,则返回,之后再返回new Variant(value, metadata, dataStart + offset)
对象,其中会带有该key对应的起始offset
。
如果想要得到具体的类型值,直接通过对应的方法获取即可,比如说getString
等
注意:如果该object的字节长度大于32字节,则用二分查找来查找,否则用顺序查找。