Badger Value Log值日志详解
概述
Value Log是Badger实现键值分离(Key-Value Separation)的核心组件。它将大值存储在独立的日志文件中,LSM树只存储键和值指针,这种设计显著减少了写放大和提高了性能。
核心设计思想
1. 键值分离机制
Small Values (< ValueThreshold):
Key + Value → LSM Tree
Large Values (>= ValueThreshold):
Key + ValuePointer → LSM Tree
Value → Value Log
ValuePointer格式:
[Fid(4B)] [Len(4B)] [Offset(4B)]
2. Value Log结构
type valueLog struct {
dirPath string // 目录路径
filesLock sync.RWMutex // 文件锁
filesMap map[uint32]*logFile // 文件映射
maxFid uint32 // 最大文件ID
filesToBeDeleted []uint32 // 待删除文件
numActiveIterators atomic.Int32 // 活跃迭代器数量
writableLogOffset atomic.Uint32 // 可写偏移
numEntriesWritten uint32 // 已写入条目数
discardStats *discardStats // 垃圾回收统计
garbageCh chan struct{} // 垃圾回收通道
}
Value Log文件格式
1. 文件头部
+----------------+------------------+
| keyID(8 bytes) | baseIV(12 bytes)|
+----------------+------------------+
2. 记录格式
+--------+--------+--------+--------+--------+--------+
| Header | KeyLen | Key | ValLen | Value | CRC32 |
+--------+--------+--------+--------+--------+--------+
| ?B | 4B | Var | 4B | Var | 4B |
3. Header详细格式
type header struct {
klen uint32 // 键长度
vlen uint32 // 值长度
expiresAt uint64 // 过期时间
meta byte // 元数据标志
userMeta byte // 用户元数据
}
写入机制
1. 值阈值判断
func (db *DB) valueThreshold() int64 {
if db.threshold != nil {
return db.threshold.valueThreshold.Load()
}
return db.opt.ValueThreshold
}
2. 写入流程
func (vlog *valueLog) write(reqs []*request) error {
vlog.filesLock.RLock()
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
vlog.filesLock.RUnlock()
toDisk := func() error {
if err := curlf.doneWriting(vlog.writableLogOffset.Load()); err != nil {
return err
}
y.NumBytesWrittenToL0Add(vlog.opt.MetricsEnabled,
int64(vlog.writableLogOffset.Load()))
return nil
}
for i := range reqs {
b := reqs[i]
b.Ptrs = b.Ptrs[:0]
for j := range b.Entries {
e := b.Entries[j]
var p valuePointer
if shouldWriteValueToLSM(e, vlog.db.valueThreshold()) {
// 小值直接存储在LSM中
p.Len = uint32(len(e.Value))
b.Ptrs = append(b.Ptrs, p)
continue
}
// 大值存储在Value Log中
p.Fid = curlf.fid
p.Offset = vlog.woffset()
p.Len = uint32(len(e.Value))
b.Ptrs = append(b.Ptrs, p)
// 写入Value Log
buf := vlog.encodeEntry(e, p.Offset)
copy(curlf.Data[p.Offset:], buf)
vlog.writableLogOffset.Add(uint32(len(buf)))
}
}
return toDisk()
}
3. 条目编码
func (vlog *valueLog) encodeEntry(e *Entry, offset uint32) []byte {
h := header{
klen: uint32(len(e.Key)),
vlen: uint32(len(e.Value)),
expiresAt: e.ExpiresAt,
meta: e.meta,
userMeta: e.UserMeta,
}
// 计算总大小
size := h.Size() + len(e.Key) + len(e.Value) + crc32.Size
buf := make([]byte, size)
written := h.EncodeTo(buf)
copy(buf[written:], e.Key)
written += len(e.Key)
copy(buf[written:], e.Value)
written += len(e.Value)
// 计算并写入CRC32
hash := crc32.New(y.CastagnoliCrcTable)
hash.Write(buf[:written])
checksum := hash.Sum32()
y.U32ToBytes(buf[written:], checksum)
return buf
}
读取机制
1. 值指针解码
type valuePointer struct {
Fid uint32 // 文件ID
Len uint32 // 值长度
Offset uint32 // 文件内偏移
}
func (p *valuePointer) Decode(b []byte) {
p.Fid = binary.BigEndian.Uint32(b[:4])
p.Len = binary.BigEndian.Uint32(b[4:8])
p.Offset = binary.BigEndian.Uint32(b[8:12])
}
2. 读取流程
func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
// 获取目标文件
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
}
// 读取数据
buf, err := lf.read(vp)
if err != nil {
return nil, nil, err
}
// 验证和解密
reader := &safeRead{
recordOffset: vp.Offset,
lf: lf,
}
entry, err := reader.Entry(bytes.NewReader(buf))
if err != nil {
return nil, nil, err
}
return entry.Value, func() { /* unlock callback */ }, nil
}
3. 缓存友好读取
func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
}
// 使用mmap直接读取
offset := vp.Offset
if offset >= uint32(len(lf.Data)) {
return nil, lf, fmt.Errorf("Invalid value pointer offset: %d", offset)
}
return lf.Data[offset:], lf, nil
}
垃圾回收机制
1. 丢弃统计
type discardStats struct {
sync.Mutex
valuesLock sync.RWMutex
values map[uint32]int64 // 每个文件的丢弃字节数
bytesRead atomic.Int64 // 读取字节数
fileSize int64 // 文件大小
}
func (ds *discardStats) Update(fid uint32, discard int64) {
ds.valuesLock.Lock()
ds.values[fid] += discard
ds.valuesLock.Unlock()
}
2. GC触发条件
func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
candidate := struct {
fid uint32
discardRatio float64
staleDataSize int64
}{}
for fid, lf := range vlog.filesMap {
if fid >= vlog.maxFid {
continue // 跳过当前写入文件
}
staleDataSize := vlog.discardStats.StaleDataSize(fid)
totalSize := lf.size.Load()
if totalSize == 0 {
continue
}
ratio := float64(staleDataSize) / float64(totalSize)
if ratio > discardRatio && ratio > candidate.discardRatio {
candidate.fid = fid
candidate.discardRatio = ratio
candidate.staleDataSize = staleDataSize
}
}
if candidate.fid != 0 {
return vlog.filesMap[candidate.fid]
}
return nil
}
3. GC执行流程
func (vlog *valueLog) doRunGC(lf *logFile) error {
// 统计有效数据
var validEntries []*Entry
var totalValidSize int64
err := lf.iterate(true, 0, func(e Entry, vp valuePointer) error {
// 检查entry是否仍然有效
vs, err := vlog.db.get(e.Key)
if err != nil {
return err
}
if !discardEntry(e, vs, vlog.db) {
validEntries = append(validEntries, &e)
totalValidSize += int64(vp.Len)
}
return nil
})
if err != nil {
return err
}
vlog.opt.Infof("GC rewriting fid: %d, valid entries: %d, valid size: %d",
lf.fid, len(validEntries), totalValidSize)
// 重写有效数据到新的Value Log文件
if len(validEntries) > 0 {
return vlog.rewrite(validEntries)
}
// 标记文件为删除
vlog.filesLock.Lock()
vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, lf.fid)
vlog.filesLock.Unlock()
return nil
}
4. 数据重写
func (vlog *valueLog) rewrite(entries []*Entry) error {
// 创建重写请求
req := &request{
Entries: entries,
Ptrs: make([]valuePointer, len(entries)),
}
req.Wg.Add(1)
// 写入新的Value Log
err := vlog.write([]*request{req})
if err != nil {
return err
}
// 更新LSM中的值指针
txn := vlog.db.NewTransaction(true)
defer txn.Discard()
for i, e := range entries {
vp := req.Ptrs[i]
if err := txn.SetEntry(&Entry{
Key: e.Key,
Value: vp.Encode(),
Meta: bitValuePointer,
}); err != nil {
return err
}
}
return txn.Commit()
}
文件管理
1. 文件创建
func (vlog *valueLog) createVlogFile() (*logFile, error) {
fid := atomic.AddUint32(&vlog.maxFid, 1)
fname := vlog.fpath(fid)
mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR, vlog.opt.ValueLogFileSize)
if err != nil {
return nil, err
}
lf := &logFile{
MmapFile: mf,
fid: fid,
path: fname,
opt: vlog.opt,
}
// 写入文件头
if err := lf.bootstrap(); err != nil {
return nil, err
}
vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
vlog.filesLock.Unlock()
return lf, nil
}
2. 文件轮转
func (vlog *valueLog) sync() error {
vlog.filesLock.RLock()
curlf := vlog.filesMap[vlog.maxFid]
vlog.filesLock.RUnlock()
// 检查当前文件是否需要轮转
if vlog.woffset() > vlog.opt.ValueLogFileSize-uint32(maxHeaderSize) {
if err := curlf.doneWriting(vlog.woffset()); err != nil {
return err
}
// 创建新文件
newlf, err := vlog.createVlogFile()
if err != nil {
return err
}
vlog.writableLogOffset.Store(vlogHeaderSize)
vlog.numEntriesWritten = 0
}
return curlf.Sync()
}
3. 文件删除
func (vlog *valueLog) deleteLogFile(lf *logFile) error {
// 检查是否还有活跃迭代器
if vlog.numActiveIterators.Load() > 0 {
return nil // 延迟删除
}
vlog.filesLock.Lock()
delete(vlog.filesMap, lf.fid)
vlog.filesLock.Unlock()
// 删除文件
if err := lf.Delete(); err != nil {
return err
}
vlog.opt.Infof("Deleted value log file: %s", lf.path)
return nil
}
动态阈值调整
1. 阈值监控
type vlogThreshold struct {
percentile float64 // 百分位数
valueThreshold atomic.Int64 // 当前阈值
valueCh chan []int64 // 值大小通道
vlMetrics *z.HistogramData // 直方图数据
}
func (v *vlogThreshold) update(sizes []int64) {
v.vlMetrics.Update(sizes)
// 根据百分位数计算新阈值
newThreshold := v.vlMetrics.Percentile(v.percentile)
v.valueThreshold.Store(int64(newThreshold))
}
2. 自适应调整
func (v *vlogThreshold) listenForValueThresholdUpdate() {
defer v.closer.Done()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case sizes := <-v.valueCh:
v.update(sizes)
case <-v.clearCh:
v.vlMetrics.Clear()
case <-ticker.C:
// 定期调整阈值
if v.vlMetrics.Count() > 1000 {
v.update(nil)
}
case <-v.closer.HasBeenClosed():
return
}
}
}
性能优化
1. 批量写入
func (vlog *valueLog) write(reqs []*request) error {
// 合并多个请求减少系统调用
totalSize := uint32(0)
for _, req := range reqs {
totalSize += estimateRequestSize(req)
}
// 预分配缓冲区
buf := make([]byte, totalSize)
written := 0
for _, req := range reqs {
for _, e := range req.Entries {
entryBuf := vlog.encodeEntry(e, offset)
copy(buf[written:], entryBuf)
written += len(entryBuf)
}
}
// 一次性写入
return vlog.writeBuffer(buf)
}
2. 内存映射优化
func (lf *logFile) read(vp valuePointer) ([]byte, error) {
// 直接从mmap内存读取,无需系统调用
if vp.Offset >= uint32(len(lf.Data)) {
return nil, fmt.Errorf("Invalid offset")
}
return lf.Data[vp.Offset:vp.Offset+vp.Len], nil
}
3. 并发控制优化
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
RES (
lf, ok := vlog.filesMap[vp.Fid]
if !ok {
return nil, fmt.Errorf("File not found: %d", vp.Fid)
}
return lf, nil
}
配置建议
1. 值阈值设置
// 小文件多读场景
ValueThreshold: 32 // 32字节
// 大文件少读场景
ValueThreshold: 2048 // 2KB
// 自适应模式
VLogPercentile: 0.95 // 95%分位数
2. 文件大小配置
// SSD环境
ValueLogFileSize: 1 << 30 // 1GB
// HDD环境
ValueLogFileSize: 2 << 30 // 2GB
// 内存有限环境
ValueLogFileSize: 128 << 20 // 128MB
3. GC配置
// 激进GC
GCDiscardRatio: 0.3 // 30%垃圾时触发
// 保守GC
GCDiscardRatio: 0.7 // 70%垃圾时触发
Value Log的设计是Badger性能优异的关键因素,通过键值分离有效减少了写放大,同时通过智能的垃圾回收机制保证了空间利用率。理解其工作原理对于优化Badger在不同场景下的性能表现非常重要。
Badger事务与并发控制详解
概述
Badger采用MVCC(Multi-Version Concurrency Control)实现事务并发控制,支持快照隔离(Snapshot Isolation)级别的事务。通过Oracle组件管理时间戳分配和冲突检测,实现了高并发的读写操作。
核心组件
1. Oracle时间戳管理器
type oracle struct {
isManaged bool // 是否托管模式
detectConflicts bool // 是否开启冲突检测
nextTxnTs uint64 // 下一个事务时间戳
writeChLock sync.Mutex // 写入通道锁
txnMark *y.WaterMark // 事务水位标记
readMark *y.WaterMark // 读取水位标记
discardTs uint64 // 丢弃时间戳
committedTxns []committedTxn // 已提交事务列表
lastCleanupTs uint64 // 最后清理时间戳
closer *z.Closer // 关闭器
}
2. Transaction事务结构
type Txn struct {
readTs uint64 // 读取时间戳
commitTs uint64 // 提交时间戳
size int64 // 事务大小
count int64 // 条目数量
db *DB // 数据库引用
reads []uint64 // 读取键指纹
conflictKeys map[uint64]struct{} // 冲突键指纹
readsLock sync.Mutex // 读取锁
pendingWrites map[string]*Entry // 待写入条目
duplicateWrites []*Entry // 重复写入条目
numIterators atomic.Int32 // 迭代器数量
discarded bool // 是否已丢弃
doneRead bool // 是否完成读取
update bool // 是否更新事务
}
MVCC机制详解
1. 时间戳分配
func (o *oracle) readTs() uint64 {
if o.isManaged {
panic("ReadTs should not be retrieved for managed DB")
}
var readTs uint64
o.Lock()
readTs = o.nextTxnTs - 1
o.readMark.Begin(readTs) // 开始读取标记
o.Unlock()
// 等待所有无冲突事务完成写入
y.Check(o.txnMark.WaitForMark(context.Background(), readTs))
return readTs
}
func (o *oracle) newCommitTs(txn *Txn) (uint64, bool) {
o.Lock()
defer o.Unlock()
// 检查冲突
if o.hasConflict(txn) {
return 0, true // 返回冲突标志
}
var ts uint64
if !o.isManaged {
o.doneRead(txn)
o.cleanupCommittedTransactions()
// 分配新的提交时间戳
ts = o.nextTxnTs
o.nextTxnTs++
o.txnMark.Begin(ts)
} else {
ts = txn.commitTs // 托管模式使用预设时间戳
}
// 记录已提交事务用于冲突检测
if o.detectConflicts {
o.committedTxns = append(o.committedTxns, committedTxn{
ts: ts,
conflictKeys: txn.conflictKeys,
})
}
return ts, false
}
2. 版本可见性规则
// 版本可见性判断
func isVisible(itemVersion, readTs uint64) bool {
return itemVersion <= readTs
}
// 在Get操作中的应用
func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
if len(key) == 0 {
return nil, ErrEmptyKey
}
if txn.discarded {
return nil, ErrDiscardedTxn
}
// 1. 首先检查事务内的写入缓存
if txn.update {
if e, has := txn.pendingWrites[string(key)]; has {
if isDeletedOrExpired(e.meta, e.ExpiresAt) {
return nil, ErrKeyNotFound
}
return &Item{e: e, txn: txn}, nil
}
}
// 2. 添加到读取集合(用于冲突检测)
txn.addReadKey(key)
// 3. 从数据库读取,只读取版本 <= readTs 的数据
seek := y.KeyWithTs(key, txn.readTs)
vs, err := txn.db.get(seek)
if err != nil {
return nil, y.Wrapf(err, "DB::Get key: %q", key)
}
if vs.Meta&bitDelete > 0 {
return nil, ErrKeyNotFound
}
item = &Item{
key: key,
vptr: vs.Value,
meta: vs.Meta,
userMeta: vs.UserMeta,
expiresAt: vs.ExpiresAt,
version: y.ParseTs(vs.Value),
txn: txn,
db: txn.db,
}
return item, nil
}
冲突检测机制
1. 冲突键跟踪
func (txn *Txn) addReadKey(key []byte) {
if !txn.update || !txn.db.opt.DetectConflicts {
return
}
txn.readsLock.Lock()
defer txn.readsLock.Unlock()
fp := z.MemHash(key)
// 添加到读取集合
txn.reads = append(txn.reads, fp)
}
func (txn *Txn) modify(e *Entry) error {
if txn.discarded {
return ErrDiscardedTxn
}
if !txn.update {
return ErrReadOnlyTxn
}
// 记录写入键用于冲突检测
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key)
if txn.conflictKeys == nil {
txn.conflictKeys = make(map[uint64]struct{})
}
txn.conflictKeys[fp] = struct{}{}
}
txn.pendingWrites[string(e.Key)] = e
return nil
}
2. 冲突检测算法
func (o *oracle) hasConflict(txn *Txn) bool {
if len(txn.reads) == 0 {
return false
}
for _, committedTxn := range o.committedTxns {
// 如果已提交事务的时间戳 <= 当前事务的读时间戳
// 说明已提交事务在当前事务开始前完成,无需检查冲突
if committedTxn.ts <= txn.readTs {
continue
}
// 检查读写冲突:当前事务读取的键是否被后续事务写入
for _, readKey := range txn.reads {
if _, has := committedTxn.conflictKeys[readKey]; has {
return true // 发现冲突
}
}
}
return false
}
3. 冲突处理策略
func (txn *Txn) Commit() error {
// 预检查
if err := txn.commitPrecheck(); err != nil {
return err
}
// 尝试获取提交时间戳
commitTs, conflict := txn.db.orc.newCommitTs(txn)
if conflict {
// 发生冲突,回滚事务
return ErrConflict
}
txn.commitTs = commitTs
// 执行提交
callback, err := txn.commitAndSend()
if err != nil {
return err
}
// 等待写入完成
return callback()
}
事务生命周期
1. 事务创建
func (db *DB) NewTransaction(update bool) *Txn {
return db.newTransaction(update, false)
}
func (db *DB) newTransaction(update, isManaged bool) *Txn {
if db.IsClosed() {
panic(ErrDBClosed)
}
txn := &Txn{
update: update,
db: db,
count: 1, // 自身占用一个计数
size: int64(len(txnKey)), // 事务标记键的大小
discarded: false,
pendingWrites: make(map[string]*Entry),
}
if !isManaged {
txn.readTs = db.orc.readTs() // 分配读时间戳
}
return txn
}
2. 写入操作
func (txn *Txn) Set(key, val []byte) error {
e := NewEntry(key, val)
return txn.SetEntry(e)
}
func (txn *Txn) SetEntry(e *Entry) error {
return txn.modify(e)
}
func (txn *Txn) Delete(key []byte) error {
e := NewEntry(key, nil).WithMeta(bitDelete)
return txn.modify(e)
}
3. 提交过程
func (txn *Txn) commitAndSend() (func() error, error) {
// 1. 构建写入条目
var entries []*Entry
for _, e := range txn.pendingWrites {
// 设置版本信息
e.Key = y.KeyWithTs(e.Key, txn.commitTs)
e.meta |= bitTxn // 标记为事务条目
entries = append(entries, e)
}
// 2. 添加事务结束标记
e := &Entry{
Key: y.KeyWithTs(txnKey, txn.commitTs),
meta: bitFinTxn,
}
entries = append(entries, e)
// 3. 发送到写入通道
req, err := txn.db.sendToWriteCh(entries)
if err != nil {
return nil, err
}
// 4. 返回等待函数
ret := func() error {
err := req.Wait()
// 标记事务完成
txn.db.orc.doneCommit(txn.commitTs)
return err
}
return ret, nil
}
4. 事务清理
func (txn *Txn) Discard() {
if txn.discarded {
return
}
// 等待所有迭代器关闭
if atomic.LoadInt32(&txn.numIterators) > 0 {
panic("Unclosed iterator at time of Txn.Discard.")
}
txn.discarded = true
if !txn.db.orc.isManaged {
txn.db.orc.doneRead(txn) // 标记读取完成
}
}
WaterMark水位标记
1. WaterMark机制
// WaterMark用于跟踪进行中的操作
type WaterMark struct {
Name string
markIdx uint64
doneUntil uint64
waiters map[uint64][]chan struct{}
elog trace.EventLog
lock sync.Mutex
}
func (w *WaterMark) Begin(index uint64) {
w.lock.Lock()
defer w.lock.Unlock()
w.markIdx = index
w.waiters[index] = make([]chan struct{}, 0)
}
func (w *WaterMark) Done(index uint64) {
w.lock.Lock()
defer w.lock.Unlock()
// 通知等待者
if chs, ok := w.waiters[index]; ok {
for _, ch := range chs {
close(ch)
}
delete(w.waiters, index)
}
// 更新doneUntil
if index == w.doneUntil+1 {
w.doneUntil = index
// 连续更新doneUntil
for {
if _, ok := w.waiters[w.doneUntil+1]; !ok {
w.doneUntil++
} else {
break
}
}
}
}
2. 垃圾回收支持
func (o *oracle) discardAtOrBelow() uint64 {
if o.isManaged {
o.Lock()
defer o.Unlock()
return o.discardTs
}
// 返回所有读取操作都已完成的最大时间戳
return o.readMark.DoneUntil()
}
func (o *oracle) cleanupCommittedTransactions() {
if !o.detectConflicts {
return
}
// 清理过期的已提交事务记录
discardBelow := o.discardAtOrBelow()
if discardBelow > o.lastCleanupTs {
// 移除时间戳 <= discardBelow 的事务
var newCommittedTxns []committedTxn
for _, txn := range o.committedTxns {
if txn.ts > discardBelow {
newCommittedTxns = append(newCommittedTxns, txn)
}
}
o.committedTxns = newCommittedTxns
o.lastCleanupTs = discardBelow
}
}
托管事务模式
1. 托管vs非托管
// 非托管模式:Badger自动分配时间戳
func (db *DB) Update(fn func(txn *Txn) error) error {
txn := db.NewTransaction(true)
defer txn.Discard()
if err := fn(txn); err != nil {
return err
}
return txn.Commit() // 自动分配commitTs
}
// 托管模式:用户控制时间戳
func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
wb := db.NewWriteBatch()
wb.commitTs = commitTs
return wb
}
2. 托管模式优势
- 确定性重放:用户控制时间戳,便于复制和恢复
- 批量操作:可以批量提交多个事务
- 外部一致性:与外部系统保持时间戳一致性
读写隔离级别
1. 快照隔离保证
// 事务看到的是readTs时刻的快照
func (txn *Txn) Get(key []byte) (*Item, error) {
// 只能看到版本 <= readTs 的数据
seek := y.KeyWithTs(key, txn.readTs)
vs, err := txn.db.get(seek)
// ...
}
// 写入时使用commitTs作为版本
func (txn *Txn) commitAndSend() {
for _, e := range txn.pendingWrites {
e.Key = y.KeyWithTs(e.Key, txn.commitTs)
// ...
}
}
2. 读一致性
- 单调读:同一事务内多次读取同一键得到相同结果
- 读已提交:只能读取到已提交的数据
- 快照隔离:事务开始时获得数据库快照
性能优化
1. 批量操作
func (db *DB) Update(fn func(txn *Txn) error) error {
// 自动管理事务生命周期
txn := db.NewTransaction(true)
defer txn.Discard()
return fn(txn)
}
// 批量写入
type WriteBatch struct {
txn *Txn
commitTs uint64
entries []*Entry
}
func (wb *WriteBatch) Flush() error {
// 批量提交所有操作
return wb.txn.Commit()
}
2. 冲突检测优化
// 可选择性启用冲突检测
opt := DefaultOptions(path)
opt.DetectConflicts = false // 禁用以提高性能
// 使用指纹而非完整键进行冲突检测
fp := z.MemHash(key) // 64位指纹,降低内存使用
3. 内存管理
// 限制事务大小
func (txn *Txn) checkSize(e *Entry) error {
count := int64(len(e.Key) + len(e.Value) + 1)
if txn.count+count >= txn.db.opt.maxBatchCount {
return ErrTxnTooBig
}
if txn.size+count >= txn.db.opt.maxBatchSize {
return ErrTxnTooBig
}
txn.count += count
txn.size += int64(e.EstimateSize(txn.db.opt.ValueThreshold))
return nil
}
配置参数
1. 冲突检测配置
opt.DetectConflicts = true // 启用冲突检测
opt.NumCompactors = 2 // 压缩线程数影响清理速度
2. 事务大小限制
opt.MaxBatchCount = 100000 // 最大批次条目数
opt.MaxBatchSize = 15MB // 最大批次大小
3. 托管模式配置
opt.ManagedTxns = true // 启用托管事务模式
Badger的MVCC实现提供了高性能的并发控制,通过时间戳排序和冲突检测保证了事务的ACID特性,同时支持高并发的读写操作。理解其工作原理有助于正确使用事务功能并优化应用性能。
Badger SSTable文件格式详解
概述
SSTable(Sorted String Table)是Badger中的不可变数据文件格式,存储经过排序的键值对。每个SSTable文件都包含数据块、索引、布隆过滤器等组件,通过精心设计的格式实现高效的查找和压缩。
文件整体结构
+================+
| Data Blocks | ← 数据块区域(压缩)
| ... |
+================+
| Index Block | ← 索引块(FlatBuffer格式)
+================+
| Bloom Filter | ← 布隆过滤器(可选)
+================+
| Checksum | ← 文件校验和(8字节)
+================+
| Index Offset | ← 索引偏移量(8字节)
+================+
| Index Length | ← 索引长度(4字节)
+================+
| Footer Magic | ← 文件魔数(4字节)
+================+
数据块(Data Blocks)
1. 块内结构
Block内部格式:
+--------+--------+-----+--------+--------+-----+
| Entry1 | Entry2 | ... | EntryN | Restart| CRC |
+--------+--------+-----+--------+--------+-----+
Entry格式:
+----------+----------+----------+----------+----------+
| SharedLen| UnsharedLen| ValueLen | Key | Value |
+----------+----------+----------+----------+----------+
| 4B | 4B | 4B | Var | Var |
2. 键压缩机制
// 前缀压缩减少存储空间
type blockBuilder struct {
data []byte
restarts []uint32 // 重启点偏移
counter int // 当前条目计数
prevKey []byte // 前一个键
restartInterval int // 重启间隔
}
func (b *blockBuilder) Add(key, value []byte, valuePointer bool) {
// 计算与前一个键的共同前缀长度
shared := 0
if b.counter < b.restartInterval {
shared = b.sharedPrefixLen(b.prevKey, key)
}
unshared := len(key) - shared
// 编码条目
b.append4Byte(uint32(shared)) // 共享前缀长度
b.append4Byte(uint32(unshared)) // 非共享部分长度
b.append4Byte(uint32(len(value))) // 值长度
b.data = append(b.data, key[shared:]...) // 非共享键部分
b.data = append(b.data, value...) // 值
if b.counter == 0 || b.counter >= b.restartInterval {
// 设置重启点
b.restarts = append(b.restarts, uint32(len(b.data)))
b.counter = 0
}
b.prevKey = append(b.prevKey[:0], key...)
b.counter++
}
3. 块尾部信息
// 块结束时添加重启点信息
func (b *blockBuilder) Finish() []byte {
// 1. 添加重启点数组
for _, restart := range b.restarts {
b.append4Byte(restart)
}
// 2. 添加重启点数量
b.append4Byte(uint32(len(b.restarts)))
// 3. 计算并添加CRC32校验和
checksum := crc32.Checksum(b.data, y.CastagnoliCrcTable)
b.append4Byte(checksum)
return b.data
}
索引结构(FlatBuffer)
1. TableIndex定义
// table.fbs
namespace fb;
table BlockOffset {
key: [ubyte]; // 块的第一个键
offset: uint64; // 块在文件中的偏移
len: uint32; // 块的长度
}
table TableIndex {
offsets: [BlockOffset]; // 块偏移数组
bloom_filter: [ubyte]; // 布隆过滤器数据
max_version: uint64; // 最大版本号
key_count: uint32; // 键数量
uncompressed_size: uint32; // 未压缩大小
on_disk_size: uint32; // 磁盘大小
stale_data_size: uint32; // 过期数据大小
}
2. 索引构建
func (b *Builder) buildIndex() *fb.TableIndexT {
index := &fb.TableIndexT{
Offsets: make([]*fb.BlockOffsetT, 0, len(b.blockList)),
BloomFilter: b.bloom.JSONMarshal(),
MaxVersion: b.maxVersion,
KeyCount: uint32(b.keyCount),
UncompressedSize: uint32(b.uncompressedSize),
OnDiskSize: uint32(len(b.buf)),
}
// 构建块偏移数组
for _, block := range b.blockList {
offset := &fb.BlockOffsetT{
Key: block.firstKey,
Offset: uint64(block.offset),
Len: uint32(block.len),
}
index.Offsets = append(index.Offsets, offset)
}
return index
}
3. 索引序列化
func (b *Builder) finishIndex() []byte {
// 使用FlatBuffer序列化索引
builder := flatbuffers.NewBuilder(1024)
index := b.buildIndex()
indexOffset := index.Pack(builder)
builder.Finish(indexOffset)
return builder.FinishedBytes()
}
布隆过滤器
1. 布隆过滤器构建
type Bloom struct {
bitmap []byte // 位图
k uint8 // 哈希函数数量
numBits uint32 // 位数
numKeys uint32 // 键数量
}
func NewBloomFilter(numEntries int, fp float64) *Bloom {
// 计算最优参数
bitsPerKey := -1.44 * math.Log2(fp) // 每个键需要的位数
numBits := uint32(float64(numEntries) * bitsPerKey)
numHashFuncs := uint8(bitsPerKey * 0.693) // ln(2)
return &Bloom{
bitmap: make([]byte, (numBits+7)/8),
k: numHashFuncs,
numBits: numBits,
}
}
func (bloom *Bloom) Add(key []byte) {
h := z.MemHash(key)
delta := h>>17 | h<<15 // 第二个哈希函数
for i := uint8(0); i < bloom.k; i++ {
bitPos := h % bloom.numBits
bloom.bitmap[bitPos/8] |= 1 << (bitPos % 8)
h += delta
}
}
func (bloom *Bloom) MayContain(key []byte) bool {
h := z.MemHash(key)
delta := h>>17 | h<<15
for i := uint8(0); i < bloom.k; i++ {
bitPos := h % bloom.numBits
if bloom.bitmap[bitPos/8]&(1<<(bitPos%8)) == 0 {
return false // 肯定不存在
}
h += delta
}
return true // 可能存在
}
2. 布隆过滤器优化
// 分块布隆过滤器,减少缓存缺失
type BlockedBloom struct {
data []byte
numProbes int
numBlocks uint32
blockMask uint32
}
func (bf *BlockedBloom) MayContain(key []byte) bool {
h := z.MemHash(key)
// 选择块
blockIdx := (h >> 11 | h << 21) & bf.blockMask
block := bf.data[blockIdx*32 : (blockIdx+1)*32] // 32字节块
// 在块内进行多次探测
for i := 0; i < bf.numProbes; i++ {
bitPos := h & 255 // 块内位置(0-255)
if block[bitPos/8]&(1<<(bitPos%8)) == 0 {
return false
}
h += h>>17 | h<<15
}
return true
}
压缩机制
1. 块级压缩
func (b *Builder) finishBlock() error {
if b.curBlock.IsEmpty() {
return nil
}
// 获取原始数据
data := b.curBlock.Finish()
var compressed []byte
var compressionType uint32
switch b.opts.Compression {
case options.None:
compressed = data
compressionType = 0
case options.Snappy:
compressed = snappy.Encode(nil, data)
compressionType = 1
case options.ZSTD:
compressed = zstd.Compress(nil, data)
compressionType = 2
}
// 选择压缩效果更好的版本
if len(compressed) < len(data) {
data = compressed
} else {
compressionType = 0 // 使用未压缩版本
}
// 写入压缩类型标记
data = append(data, byte(compressionType))
b.writeBlock(data)
return nil
}
2. 压缩策略
// 自适应压缩:根据数据特征选择压缩算法
func (b *Builder) chooseCompression(data []byte) options.CompressionType {
if len(data) < 1024 {
return options.None // 小块不压缩
}
// 计算数据熵
entropy := calculateEntropy(data)
if entropy > 7.5 {
return options.None // 高熵数据压缩效果差
}
if entropy > 6.0 {
return options.Snappy // 中等熵使用Snappy
}
return options.ZSTD // 低熵使用ZSTD
}
读取机制
1. 表打开流程
func OpenTable(mf *z.MmapFile, opts table.Options) (*Table, error) {
t := &Table{
mf: mf,
opts: opts,
}
// 1. 读取并验证Footer
if err := t.readFooter(); err != nil {
return nil, err
}
// 2. 读取索引
if err := t.readIndex(); err != nil {
return nil, err
}
// 3. 验证校验和
if err := t.verifyChecksum(); err != nil {
return nil, err
}
return t, nil
}
2. 索引查找
func (t *Table) search(key []byte, maxVs *y.ValueStruct) (y.ValueStruct, error) {
// 1. 布隆过滤器预检查
if t.index.BloomFilter != nil {
if !t.bloomFilter.MayContain(key) {
return y.ValueStruct{}, ErrKeyNotFound
}
}
// 2. 二分查找确定块
blockIdx := t.findBlock(key)
if blockIdx < 0 {
return y.ValueStruct{}, ErrKeyNotFound
}
// 3. 从缓存或磁盘读取块
block, err := t.getBlock(blockIdx)
if err != nil {
return y.ValueStruct{}, err
}
// 4. 在块内查找
return block.search(key, maxVs)
}
3. 块级缓存
type Table struct {
mf *z.MmapFile
index *fb.TableIndex
bloomFilter *Bloom
blockCache *ristretto.Cache // 块缓存
}
func (t *Table) getBlock(idx int) (*Block, error) {
// 1. 尝试从缓存获取
if cached, found := t.blockCache.Get(t.blockCacheKey(idx)); found {
return cached.(*Block), nil
}
// 2. 从磁盘读取
offset := t.index.Offsets[idx]
data := t.mf.Data[offset.Offset:offset.Offset+uint64(offset.Len)]
// 3. 解压缩
block, err := t.decompressBlock(data)
if err != nil {
return nil, err
}
// 4. 缓存块
t.blockCache.Set(t.blockCacheKey(idx), block, int64(len(data)))
return block, nil
}
迭代器实现
1. 表迭代器
type Iterator struct {
t *Table
blockIdx int
blockIter *blockIterator
err error
reversed bool
}
func (it *Iterator) seekToFirst() {
it.blockIdx = 0
it.loadBlock()
if it.blockIter != nil {
it.blockIter.seekToFirst()
}
}
func (it *Iterator) Next() {
if it.blockIter != nil {
it.blockIter.Next()
if !it.blockIter.Valid() {
// 当前块已结束,移动到下一块
it.blockIdx++
it.loadBlock()
if it.blockIter != nil {
it.blockIter.seekToFirst()
}
}
}
}
2. 块内迭代器
type blockIterator struct {
data []byte // 块数据
restarts []uint32 // 重启点
offset uint32 // 当前偏移
key []byte // 当前键
value []byte // 当前值
entryOffset uint32 // 条目偏移
}
func (it *blockIterator) parseNext() {
if it.offset >= uint32(len(it.data)) {
return
}
// 读取条目头部
shared := binary.LittleEndian.Uint32(it.data[it.offset:])
it.offset += 4
unshared := binary.LittleEndian.Uint32(it.data[it.offset:])
it.offset += 4
valueLen := binary.LittleEndian.Uint32(it.data[it.offset:])
it.offset += 4
// 重构完整键
it.key = it.key[:shared]
it.key = append(it.key, it.data[it.offset:it.offset+unshared]...)
it.offset += unshared
// 读取值
it.value = it.data[it.offset:it.offset+valueLen]
it.offset += valueLen
}
性能优化
1. 预取优化
func (t *Table) prefetchBlocks(startIdx, count int) {
for i := 0; i < count && startIdx+i < len(t.index.Offsets); i++ {
idx := startIdx + i
go func(blockIdx int) {
t.getBlock(blockIdx) // 异步预取
}(idx)
}
}
2. 内存对齐
// 确保关键数据结构内存对齐
type alignedBlock struct {
_ [0]uint64 // 确保8字节对齐
data []byte
checksum uint32
_ [4]byte // 填充到8字节边界
}
3. SIMD优化(布隆过滤器)
// 使用SIMD指令优化布隆过滤器
func (bf *Bloom) mayContainSIMD(key []byte) bool {
// 在支持的平台上使用SIMD指令
return bf.mayContainAVX2(key)
}
文件管理
1. 原子创建
func CreateTable(fname string, builder *Builder) (*Table, error) {
// 1. 写入临时文件
tmpName := fname + ".tmp"
bd := builder.Done()
mf, err := z.OpenMmapFile(tmpName, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)
if err != nil {
return nil, err
}
// 2. 写入数据
written := bd.Copy(mf.Data)
if written != len(mf.Data) {
return nil, fmt.Errorf("written %d != expected %d", written, len(mf.Data))
}
// 3. 同步到磁盘
if err := mf.Sync(); err != nil {
return nil, err
}
// 4. 原子重命名
if err := os.Rename(tmpName, fname); err != nil {
return nil, err
}
return OpenTable(mf, builder.opts)
}
2. 错误恢复
func (t *Table) verifyIntegrity() error {
// 1. 检查魔数
if t.footerMagic != tableFooterMagic {
return ErrInvalidMagic
}
// 2. 验证索引校验和
if err := t.verifyIndexChecksum(); err != nil {
return err
}
// 3. 抽样验证数据块
return t.verifyDataBlocks()
}
配置参数
1. 块大小配置
BlockSize: 4096 // 4KB,适合SSD
BlockSize: 16384 // 16KB,适合HDD
2. 压缩配置
Compression: options.ZSTD // 高压缩比
Compression: options.Snappy // 平衡性能和压缩比
Compression: options.None // 无压缩,最快
3. 布隆过滤器配置
BloomFalsePositive: 0.01 // 1%假阳性率
BloomFalsePositive: 0.001 // 0.1%假阳性率(更多内存)
SSTable的设计在存储效率、查询性能和内存使用之间取得了很好的平衡,通过块级组织、前缀压缩、布隆过滤器等技术实现了高效的数据存储和访问。