5 Star 15 Fork 1

Gitee 极速下载 / Badger

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/dgraph-io/badger
克隆/下载
db.go 56.28 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bytes"
"context"
"encoding/binary"
"expvar"
"fmt"
"math"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"github.com/dgraph-io/badger/v4/options"
"github.com/dgraph-io/badger/v4/pb"
"github.com/dgraph-io/badger/v4/skl"
"github.com/dgraph-io/badger/v4/table"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
)
var (
badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
bannedNsKey = []byte("!badger!banned") // For storing the banned namespaces.
)
type closers struct {
updateSize *z.Closer
compactors *z.Closer
memtable *z.Closer
writes *z.Closer
valueGC *z.Closer
pub *z.Closer
cacheHealth *z.Closer
}
type lockedKeys struct {
sync.RWMutex
keys map[uint64]struct{}
}
func (lk *lockedKeys) add(key uint64) {
lk.Lock()
defer lk.Unlock()
lk.keys[key] = struct{}{}
}
func (lk *lockedKeys) has(key uint64) bool {
lk.RLock()
defer lk.RUnlock()
_, ok := lk.keys[key]
return ok
}
func (lk *lockedKeys) all() []uint64 {
lk.RLock()
defer lk.RUnlock()
keys := make([]uint64, 0, len(lk.keys))
for key := range lk.keys {
keys = append(keys, key)
}
return keys
}
// DB provides the various functions required to interact with Badger.
// DB is thread-safe.
type DB struct {
testOnlyDBExtensions
lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
dirLockGuard *directoryLockGuard
// nil if Dir and ValueDir are the same
valueDirGuard *directoryLockGuard
closers closers
mt *memTable // Our latest (actively written) in-memory table
imm []*memTable // Add here only AFTER pushing to flushChan.
// Initialized via openMemTables.
nextMemFid int
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
writeCh chan *request
flushChan chan *memTable // For flushing memtables.
closeOnce sync.Once // For closing DB only once.
blockWrites atomic.Int32
isClosed atomic.Uint32
orc *oracle
bannedNamespaces *lockedKeys
threshold *vlogThreshold
pub *publisher
registry *KeyRegistry
blockCache *ristretto.Cache
indexCache *ristretto.Cache
allocPool *z.AllocatorPool
}
const (
kvWriteChCapacity = 1000
)
func checkAndSetOptions(opt *Options) error {
// It's okay to have zero compactors which will disable all compactions but
// we cannot have just one compactor otherwise we will end up with all data
// on level 2.
if opt.NumCompactors == 1 {
return errors.New("Cannot have 1 compactor. Need at least 2")
}
if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
}
opt.maxBatchSize = (15 * opt.MemTableSize) / 100
opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
// This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
return errors.New("vlogPercentile must be within range of 0.0-1.0")
}
// We are limiting opt.ValueThreshold to maxValueThreshold for now.
if opt.ValueThreshold > maxValueThreshold {
return errors.Errorf("Invalid ValueThreshold, must be less or equal to %d",
maxValueThreshold)
}
// If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
// the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
if opt.ValueThreshold > opt.maxBatchSize {
return errors.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
"reduce opt.ValueThreshold or increase opt.BaseTableSize.",
opt.ValueThreshold, opt.maxBatchSize)
}
// ValueLogFileSize should be stricly LESS than 2<<30 otherwise we will
// overflow the uint32 when we mmap it in OpenMemtable.
if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
return ErrValueLogSize
}
if opt.ReadOnly {
// Do not perform compaction in read only mode.
opt.CompactL0OnClose = false
}
needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
if needCache && opt.BlockCacheSize == 0 {
panic("BlockCacheSize should be set since compression/encryption are enabled")
}
return nil
}
// Open returns a new DB object.
func Open(opt Options) (*DB, error) {
if err := checkAndSetOptions(&opt); err != nil {
return nil, err
}
var dirLockGuard, valueDirLockGuard *directoryLockGuard
// Create directories and acquire lock on it only if badger is not running in InMemory mode.
// We don't have any directories/files in InMemory mode so we don't need to acquire
// any locks on them.
if !opt.InMemory {
if err := createDirs(opt); err != nil {
return nil, err
}
var err error
if !opt.BypassLockGuard {
dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
if err != nil {
return nil, err
}
defer func() {
if dirLockGuard != nil {
_ = dirLockGuard.release()
}
}()
absDir, err := filepath.Abs(opt.Dir)
if err != nil {
return nil, err
}
absValueDir, err := filepath.Abs(opt.ValueDir)
if err != nil {
return nil, err
}
if absValueDir != absDir {
valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
if err != nil {
return nil, err
}
defer func() {
if valueDirLockGuard != nil {
_ = valueDirLockGuard.release()
}
}()
}
}
}
manifestFile, manifest, err := openOrCreateManifestFile(opt)
if err != nil {
return nil, err
}
defer func() {
if manifestFile != nil {
_ = manifestFile.close()
}
}()
db := &DB{
imm: make([]*memTable, 0, opt.NumMemtables),
flushChan: make(chan *memTable, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
dirLockGuard: dirLockGuard,
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
pub: newPublisher(),
allocPool: z.NewAllocatorPool(8),
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
threshold: initVlogThreshold(&opt),
}
db.syncChan = opt.syncChan
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
if err != nil {
opt.Errorf("Received err: %v. Cleaning up...", err)
db.cleanup()
db = nil
}
}()
if opt.BlockCacheSize > 0 {
numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost: opt.BlockCacheSize,
BufferItems: 64,
Metrics: true,
OnExit: table.BlockEvictHandler,
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create data cache")
}
}
if opt.IndexCacheSize > 0 {
// Index size is around 5% of the table size.
indexSz := int64(float64(opt.MemTableSize) * 0.05)
numInCache := opt.IndexCacheSize / indexSz
if numInCache == 0 {
// Make the value of this variable at least one since the cache requires
// the number of counters to be greater than zero.
numInCache = 1
}
config := ristretto.Config{
NumCounters: numInCache * 8,
MaxCost: opt.IndexCacheSize,
BufferItems: 64,
Metrics: true,
}
db.indexCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, y.Wrap(err, "failed to create bf cache")
}
}
db.closers.cacheHealth = z.NewCloser(1)
go db.monitorCache(db.closers.cacheHealth)
if db.opt.InMemory {
db.opt.SyncWrites = false
// If badger is running in memory mode, push everything into the LSM Tree.
db.opt.ValueThreshold = math.MaxInt32
}
krOpt := KeyRegistryOptions{
ReadOnly: opt.ReadOnly,
Dir: opt.Dir,
EncryptionKey: opt.EncryptionKey,
EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
InMemory: opt.InMemory,
}
if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
return db, err
}
db.calculateSize()
db.closers.updateSize = z.NewCloser(1)
go db.updateSize(db.closers.updateSize)
if err := db.openMemTables(db.opt); err != nil {
return nil, y.Wrapf(err, "while opening memtables")
}
if !db.opt.ReadOnly {
if db.mt, err = db.newMemTable(); err != nil {
return nil, y.Wrapf(err, "cannot create memtable")
}
}
// newLevelsController potentially loads files in directory.
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return db, err
}
// Initialize vlog struct.
db.vlog.init(db)
if !opt.ReadOnly {
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
db.closers.memtable = z.NewCloser(1)
go func() {
db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
}()
// Flush them to disk asap.
for _, mt := range db.imm {
db.flushChan <- mt
}
}
// We do increment nextTxnTs below. So, no need to do it here.
db.orc.nextTxnTs = db.MaxVersion()
db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
if err = db.vlog.open(db); err != nil {
return db, y.Wrapf(err, "During db.vlog.open")
}
// Let's advance nextTxnTs to one more than whatever we observed via
// replaying the logs.
db.orc.txnMark.Done(db.orc.nextTxnTs)
// In normal mode, we must update readMark so older versions of keys can be removed during
// compaction when run in offline mode via the flatten tool.
db.orc.readMark.Done(db.orc.nextTxnTs)
db.orc.incrementNextTs()
go db.threshold.listenForValueThresholdUpdate()
if err := db.initBannedNamespaces(); err != nil {
return db, errors.Wrapf(err, "While setting banned keys")
}
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
if !db.opt.InMemory {
db.closers.valueGC = z.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)
}
db.closers.pub = z.NewCloser(1)
go db.pub.listenForUpdates(db.closers.pub)
valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
return db, nil
}
// initBannedNamespaces retrieves the banned namepsaces from the DB and updates in-memory structure.
func (db *DB) initBannedNamespaces() error {
if db.opt.NamespaceOffset < 0 {
return nil
}
return db.View(func(txn *Txn) error {
iopts := DefaultIteratorOptions
iopts.Prefix = bannedNsKey
iopts.PrefetchValues = false
iopts.InternalAccess = true
itr := txn.NewIterator(iopts)
defer itr.Close()
for itr.Rewind(); itr.Valid(); itr.Next() {
key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
db.bannedNamespaces.add(key)
}
return nil
})
}
func (db *DB) MaxVersion() uint64 {
var maxVersion uint64
update := func(a uint64) {
if a > maxVersion {
maxVersion = a
}
}
db.lock.Lock()
// In read only mode, we do not create new mem table.
if !db.opt.ReadOnly {
update(db.mt.maxVersion)
}
for _, mt := range db.imm {
update(mt.maxVersion)
}
db.lock.Unlock()
for _, ti := range db.Tables() {
update(ti.MaxVersion)
}
return maxVersion
}
func (db *DB) monitorCache(c *z.Closer) {
defer c.Done()
count := 0
analyze := func(name string, metrics *ristretto.Metrics) {
// If the mean life expectancy is less than 10 seconds, the cache
// might be too small.
le := metrics.LifeExpectancySeconds()
if le == nil {
return
}
lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
if lifeTooShort && hitRatioTooLow {
db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
} else if le.Count > 1000 && count%5 == 0 {
db.opt.Infof("%s metrics: %s\n", name, metrics)
}
}
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-c.HasBeenClosed():
return
case <-ticker.C:
}
analyze("Block cache", db.BlockCacheMetrics())
analyze("Index cache", db.IndexCacheMetrics())
count++
}
}
// cleanup stops all the goroutines started by badger. This is used in open to
// cleanup goroutines in case of an error.
func (db *DB) cleanup() {
db.stopMemoryFlush()
db.stopCompactions()
db.blockCache.Close()
db.indexCache.Close()
if db.closers.updateSize != nil {
db.closers.updateSize.Signal()
}
if db.closers.valueGC != nil {
db.closers.valueGC.Signal()
}
if db.closers.writes != nil {
db.closers.writes.Signal()
}
if db.closers.pub != nil {
db.closers.pub.Signal()
}
db.orc.Stop()
// Do not use vlog.Close() here. vlog.Close truncates the files. We don't
// want to truncate files unless the user has specified the truncate flag.
}
// BlockCacheMetrics returns the metrics for the underlying block cache.
func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
if db.blockCache != nil {
return db.blockCache.Metrics
}
return nil
}
// IndexCacheMetrics returns the metrics for the underlying index cache.
func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
if db.indexCache != nil {
return db.indexCache.Metrics
}
return nil
}
// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
// disk. Calling DB.Close() multiple times would still only close the DB once.
func (db *DB) Close() error {
var err error
db.closeOnce.Do(func() {
err = db.close()
})
return err
}
// IsClosed denotes if the badger DB is closed or not. A DB instance should not
// be used after closing it.
func (db *DB) IsClosed() bool {
return db.isClosed.Load() == 1
}
func (db *DB) close() (err error) {
defer db.allocPool.Release()
db.opt.Debugf("Closing database")
db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
db.blockWrites.Store(1)
db.isClosed.Store(1)
if !db.opt.InMemory {
// Stop value GC first.
db.closers.valueGC.SignalAndWait()
}
// Stop writes next.
db.closers.writes.SignalAndWait()
// Don't accept any more write.
close(db.writeCh)
db.closers.pub.SignalAndWait()
db.closers.cacheHealth.Signal()
// Make sure that block writer is done pushing stuff into memtable!
// Otherwise, you will have a race condition: we are trying to flush memtables
// and remove them completely, while the block / memtable writer is still
// trying to push stuff into the memtable. This will also resolve the value
// offset problem: as we push into memtable, we update value offsets there.
if db.mt != nil {
if db.mt.sl.Empty() {
// Remove the memtable if empty.
db.mt.DecrRef()
} else {
db.opt.Debugf("Flushing memtable")
for {
pushedMemTable := func() bool {
db.lock.Lock()
defer db.lock.Unlock()
y.AssertTrue(db.mt != nil)
select {
case db.flushChan <- db.mt:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.opt.Debugf("pushed to flush chan\n")
return true
default:
// If we fail to push, we need to unlock and wait for a short while.
// The flushing operation needs to update s.imm. Otherwise, we have a
// deadlock.
// TODO: Think about how to do this more cleanly, maybe without any locks.
}
return false
}()
if pushedMemTable {
break
}
time.Sleep(10 * time.Millisecond)
}
}
}
db.stopMemoryFlush()
db.stopCompactions()
// Force Compact L0
// We don't need to care about cstatus since no parallel compaction is running.
if db.opt.CompactL0OnClose {
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
switch err {
case errFillTables:
// This error only means that there might be enough tables to do a compaction. So, we
// should not report it to the end user to avoid confusing them.
case nil:
db.opt.Debugf("Force compaction on level 0 done")
default:
db.opt.Warningf("While forcing compaction on level 0: %v", err)
}
}
// Now close the value log.
if vlogErr := db.vlog.Close(); vlogErr != nil {
err = y.Wrap(vlogErr, "DB.Close")
}
db.opt.Infof(db.LevelsToString())
if lcErr := db.lc.close(); err == nil {
err = y.Wrap(lcErr, "DB.Close")
}
db.opt.Debugf("Waiting for closer")
db.closers.updateSize.SignalAndWait()
db.orc.Stop()
db.blockCache.Close()
db.indexCache.Close()
db.threshold.close()
if db.opt.InMemory {
return
}
if db.dirLockGuard != nil {
if guardErr := db.dirLockGuard.release(); err == nil {
err = y.Wrap(guardErr, "DB.Close")
}
}
if db.valueDirGuard != nil {
if guardErr := db.valueDirGuard.release(); err == nil {
err = y.Wrap(guardErr, "DB.Close")
}
}
if manifestErr := db.manifest.close(); err == nil {
err = y.Wrap(manifestErr, "DB.Close")
}
if registryErr := db.registry.Close(); err == nil {
err = y.Wrap(registryErr, "DB.Close")
}
// Fsync directories to ensure that lock file, and any other removed files whose directory
// we haven't specifically fsynced, are guaranteed to have their directory entry removal
// persisted to disk.
if syncErr := db.syncDir(db.opt.Dir); err == nil {
err = y.Wrap(syncErr, "DB.Close")
}
if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
err = y.Wrap(syncErr, "DB.Close")
}
return err
}
// VerifyChecksum verifies checksum for all tables on all levels.
// This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
func (db *DB) VerifyChecksum() error {
return db.lc.verifyChecksum()
}
const (
lockFile = "LOCK"
)
// Sync syncs database content to disk. This function provides
// more control to user to sync data whenever required.
func (db *DB) Sync() error {
/**
Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
Cases:
- All_ok :: If both the logs sync successfully.
- Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
:: and the WAL was synced but there was an error in syncing the vLog.
:: The entry will be considered lost and this case will need to be handled during recovery.
- Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
- Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
:: result in entries being lost because recovery of the active memtable is done from its WAL.
:: Check `UpdateSkipList` in memtable.go.
- Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
:: but there was an error in syncing the vLog.
:: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
- Partially_lost :: If entries were written partially in either of the logs,
:: the logs will be truncated during recovery.
:: As a result of truncation, some entries might be lost.
:: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
:: of data and then the machine shuts down or the disk failure happens,
:: this will result in partial writes. [[This case needs verification]]
*/
db.lock.RLock()
memtableSyncError := db.mt.SyncWAL()
db.lock.RUnlock()
vLogSyncError := db.vlog.sync()
return y.CombineErrors(memtableSyncError, vLogSyncError)
}
// getMemtables returns the current memtables and get references.
func (db *DB) getMemTables() ([]*memTable, func()) {
db.lock.RLock()
defer db.lock.RUnlock()
var tables []*memTable
// Mutable memtable does not exist in read-only mode.
if !db.opt.ReadOnly {
// Get mutable memtable.
tables = append(tables, db.mt)
db.mt.IncrRef()
}
// Get immutable memtables.
last := len(db.imm) - 1
for i := range db.imm {
tables = append(tables, db.imm[last-i])
db.imm[last-i].IncrRef()
}
return tables, func() {
for _, tbl := range tables {
tbl.DecrRef()
}
}
}
// get returns the value in memtable or disk for given key.
// Note that value will include meta byte.
//
// IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
// maintain this invariant to search for the latest value of a key, or else we need to search in all
// tables and find the max version among them. To maintain this invariant, we also need to ensure
// that all versions of a key are always present in the same table from level 1, because compaction
// can push any table down.
//
// Update(23/09/2020) - We have dropped the move key implementation. Earlier we
// were inserting move keys to fix the invalid value pointers but we no longer
// do that. For every get("fooX") call where X is the version, we will search
// for "fooX" in all the levels of the LSM tree. This is expensive but it
// removes the overhead of handling move keys completely.
func (db *DB) get(key []byte) (y.ValueStruct, error) {
if db.IsClosed() {
return y.ValueStruct{}, ErrDBClosed
}
tables, decr := db.getMemTables() // Lock should be released.
defer decr()
var maxVs y.ValueStruct
version := y.ParseTs(key)
y.NumGetsAdd(db.opt.MetricsEnabled, 1)
for i := 0; i < len(tables); i++ {
vs := tables[i].sl.Get(key)
y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
if vs.Meta == 0 && vs.Value == nil {
continue
}
// Found the required version of the key, return immediately.
if vs.Version == version {
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
return db.lc.get(key, maxVs, 0)
}
var requestPool = sync.Pool{
New: func() interface{} {
return new(request)
},
}
func (db *DB) writeToLSM(b *request) error {
// We should check the length of b.Prts and b.Entries only when badger is not
// running in InMemory mode. In InMemory mode, we don't write anything to the
// value log and that's why the length of b.Ptrs will always be zero.
if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
return errors.Errorf("Ptrs and Entries don't match: %+v", b)
}
for i, entry := range b.Entries {
var err error
if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
// Will include deletion / tombstone case.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Value: entry.Value,
// Ensure value pointer flag is removed. Otherwise, the value will fail
// to be retrieved during iterator prefetch. `bitValuePointer` is only
// known to be set in write to LSM when the entry is loaded from a backup
// with lower ValueThreshold and its value was stored in the value log.
Meta: entry.meta &^ bitValuePointer,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
})
} else {
// Write pointer to Memtable.
err = db.mt.Put(entry.Key,
y.ValueStruct{
Value: b.Ptrs[i].Encode(),
Meta: entry.meta | bitValuePointer,
UserMeta: entry.UserMeta,
ExpiresAt: entry.ExpiresAt,
})
}
if err != nil {
return y.Wrapf(err, "while writing to memTable")
}
}
if db.opt.SyncWrites {
return db.mt.SyncWAL()
}
return nil
}
// writeRequests is called serially by only one goroutine.
func (db *DB) writeRequests(reqs []*request) error {
if len(reqs) == 0 {
return nil
}
done := func(err error) {
for _, r := range reqs {
r.Err = err
r.Wg.Done()
}
}
db.opt.Debugf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}
db.opt.Debugf("Writing to memtable")
var count int
for _, b := range reqs {
if len(b.Entries) == 0 {
continue
}
count += len(b.Entries)
var i uint64
var err error
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
db.opt.Debugf("Making room for writes")
}
// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
// you will get a deadlock.
time.Sleep(10 * time.Millisecond)
}
if err != nil {
done(err)
return y.Wrap(err, "writeRequests")
}
if err := db.writeToLSM(b); err != nil {
done(err)
return y.Wrap(err, "writeRequests")
}
}
db.opt.Debugf("Sending updates to subscribers")
db.pub.sendUpdates(reqs)
done(nil)
db.opt.Debugf("%d entries written", count)
return nil
}
func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
if db.blockWrites.Load() == 1 {
return nil, ErrBlockedWrites
}
var count, size int64
for _, e := range entries {
size += e.estimateSizeAndSetThreshold(db.valueThreshold())
count++
}
y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
return nil, ErrTxnTooBig
}
// We can only service one request because we need each txn to be stored in a contigous section.
// Txns should not interleave among other txns or rewrites.
req := requestPool.Get().(*request)
req.reset()
req.Entries = entries
req.Wg.Add(1)
req.IncrRef() // for db write
db.writeCh <- req // Handled in doWrites.
y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
return req, nil
}
func (db *DB) doWrites(lc *z.Closer) {
defer lc.Done()
pendingCh := make(chan struct{}, 1)
writeRequests := func(reqs []*request) {
if err := db.writeRequests(reqs); err != nil {
db.opt.Errorf("writeRequests: %v", err)
}
<-pendingCh
}
// This variable tracks the number of pending writes.
reqLen := new(expvar.Int)
y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
reqs := make([]*request, 0, 10)
for {
var r *request
select {
case r = <-db.writeCh:
case <-lc.HasBeenClosed():
goto closedCase
}
for {
reqs = append(reqs, r)
reqLen.Set(int64(len(reqs)))
if len(reqs) >= 3*kvWriteChCapacity {
pendingCh <- struct{}{} // blocking.
goto writeCase
}
select {
// Either push to pending, or continue to pick from writeCh.
case r = <-db.writeCh:
case pendingCh <- struct{}{}:
goto writeCase
case <-lc.HasBeenClosed():
goto closedCase
}
}
closedCase:
// All the pending request are drained.
// Don't close the writeCh, because it has be used in several places.
for {
select {
case r = <-db.writeCh:
reqs = append(reqs, r)
default:
pendingCh <- struct{}{} // Push to pending before doing a write.
writeRequests(reqs)
return
}
}
writeCase:
go writeRequests(reqs)
reqs = make([]*request, 0, 10)
reqLen.Set(0)
}
}
// batchSet applies a list of badger.Entry. If a request level error occurs it
// will be returned.
//
// Check(kv.BatchSet(entries))
func (db *DB) batchSet(entries []*Entry) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
}
return req.Wait()
}
// batchSetAsync is the asynchronous version of batchSet. It accepts a callback
// function which is called when all the sets are complete. If a request level
// error occurs, it will be passed back via the callback.
//
// err := kv.BatchSetAsync(entries, func(err error)) {
// Check(err)
// }
func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
req, err := db.sendToWriteCh(entries)
if err != nil {
return err
}
go func() {
err := req.Wait()
// Write is complete. Let's call the callback function now.
f(err)
}()
return nil
}
var errNoRoom = errors.New("No room for write")
// ensureRoomForWrite is always called serially.
func (db *DB) ensureRoomForWrite() error {
var err error
db.lock.Lock()
defer db.lock.Unlock()
y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
if !db.mt.isFull() {
return nil
}
select {
case db.flushChan <- db.mt:
db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
db.mt.sl.MemSize(), len(db.flushChan))
// We manage to push this task. Let's modify imm.
db.imm = append(db.imm, db.mt)
db.mt, err = db.newMemTable()
if err != nil {
return y.Wrapf(err, "cannot create new mem table")
}
// New memtable is empty. We certainly have room.
return nil
default:
// We need to do this to unlock and allow the flusher to modify imm.
return errNoRoom
}
}
func arenaSize(opt Options) int64 {
return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
}
// buildL0Table builds a new table from the memtable.
func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
defer iter.Close()
b := table.NewTableBuilder(bopts)
for iter.Rewind(); iter.Valid(); iter.Next() {
if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
continue
}
vs := iter.Value()
var vp valuePointer
if vs.Meta&bitValuePointer > 0 {
vp.Decode(vs.Value)
}
b.Add(iter.Key(), iter.Value(), vp.Len)
}
return b
}
// handleMemTableFlush must be run serially.
func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
bopts := buildTableOptions(db)
itr := mt.sl.NewUniIterator(false)
builder := buildL0Table(itr, nil, bopts)
defer builder.Close()
// buildL0Table can return nil if the none of the items in the skiplist are
// added to the builder. This can happen when drop prefix is set and all
// the items are skipped.
if builder.Empty() {
builder.Finish()
return nil
}
fileID := db.lc.reserveFileID()
var tbl *table.Table
var err error
if db.opt.InMemory {
data := builder.Finish()
tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
} else {
tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
}
if err != nil {
return y.Wrap(err, "error while creating table")
}
// We own a ref on tbl.
err = db.lc.addLevel0Table(tbl) // This will incrRef
_ = tbl.DecrRef() // Releases our ref.
return err
}
// flushMemtable must keep running until we send it an empty memtable. If there
// are errors during handling the memtable flush, we'll retry indefinitely.
func (db *DB) flushMemtable(lc *z.Closer) {
defer lc.Done()
for mt := range db.flushChan {
if mt == nil {
continue
}
for {
if err := db.handleMemTableFlush(mt, nil); err != nil {
// Encountered error. Retry indefinitely.
db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
time.Sleep(time.Second)
continue
}
// Update s.imm. Need a lock.
db.lock.Lock()
// This is a single-threaded operation. mt corresponds to the head of
// db.imm list. Once we flush it, we advance db.imm. The next mt
// which would arrive here would match db.imm[0], because we acquire a
// lock over DB when pushing to flushChan.
// TODO: This logic is dirty AF. Any change and this could easily break.
y.AssertTrue(mt == db.imm[0])
db.imm = db.imm[1:]
mt.DecrRef() // Return memory.
// unlock
db.lock.Unlock()
break
}
}
}
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// This function does a filewalk, calculates the size of vlog and sst files and stores it in
// y.LSMSize and y.VlogSize.
func (db *DB) calculateSize() {
if db.opt.InMemory {
return
}
newInt := func(val int64) *expvar.Int {
v := new(expvar.Int)
v.Add(val)
return v
}
totalSize := func(dir string) (int64, int64) {
var lsmSize, vlogSize int64
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
ext := filepath.Ext(path)
switch ext {
case ".sst":
lsmSize += info.Size()
case ".vlog":
vlogSize += info.Size()
}
return nil
})
if err != nil {
db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
}
return lsmSize, vlogSize
}
lsmSize, vlogSize := totalSize(db.opt.Dir)
y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
// If valueDir is different from dir, we'd have to do another walk.
if db.opt.ValueDir != db.opt.Dir {
_, vlogSize = totalSize(db.opt.ValueDir)
}
y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
}
func (db *DB) updateSize(lc *z.Closer) {
defer lc.Done()
if db.opt.InMemory {
return
}
metricsTicker := time.NewTicker(time.Minute)
defer metricsTicker.Stop()
for {
select {
case <-metricsTicker.C:
db.calculateSize()
case <-lc.HasBeenClosed():
return
}
}
}
// RunValueLogGC triggers a value log garbage collection.
//
// It picks value log files to perform GC based on statistics that are collected
// during compactions. If no such statistics are available, then log files are
// picked in random order. The process stops as soon as the first log file is
// encountered which does not result in garbage collection.
//
// When a log file is picked, it is first sampled. If the sample shows that we
// can discard at least discardRatio space of that file, it would be rewritten.
//
// If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
// thrown indicating that the call resulted in no file rewrites.
//
// We recommend setting discardRatio to 0.5, thus indicating that a file be
// rewritten if half the space can be discarded. This results in a lifetime
// value log write amplification of 2 (1 from original write + 0.5 rewrite +
// 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
// space reclaims, while setting it to a lower value would result in more space
// reclaims at the cost of increased activity on the LSM tree. discardRatio
// must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
// ErrInvalidRequest is returned.
//
// Only one GC is allowed at a time. If another value log GC is running, or DB
// has been closed, this would return an ErrRejected.
//
// Note: Every time GC is run, it would produce a spike of activity on the LSM
// tree.
func (db *DB) RunValueLogGC(discardRatio float64) error {
if db.opt.InMemory {
return ErrGCInMemoryMode
}
if discardRatio >= 1.0 || discardRatio <= 0.0 {
return ErrInvalidRequest
}
// Pick a log file and run GC
return db.vlog.runGC(discardRatio)
}
// Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
// call RunValueLogGC.
func (db *DB) Size() (lsm, vlog int64) {
if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
lsm, vlog = 0, 0
return
}
lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
return
}
// Sequence represents a Badger sequence.
type Sequence struct {
lock sync.Mutex
db *DB
key []byte
next uint64
leased uint64
bandwidth uint64
}
// Next would return the next integer in the sequence, updating the lease by running a transaction
// if needed.
func (seq *Sequence) Next() (uint64, error) {
seq.lock.Lock()
defer seq.lock.Unlock()
if seq.next >= seq.leased {
if err := seq.updateLease(); err != nil {
return 0, err
}
}
val := seq.next
seq.next++
return val, nil
}
// Release the leased sequence to avoid wasted integers. This should be done right
// before closing the associated DB. However it is valid to use the sequence after
// it was released, causing a new lease with full bandwidth.
func (seq *Sequence) Release() error {
seq.lock.Lock()
defer seq.lock.Unlock()
err := seq.db.Update(func(txn *Txn) error {
item, err := txn.Get(seq.key)
if err != nil {
return err
}
var num uint64
if err := item.Value(func(v []byte) error {
num = binary.BigEndian.Uint64(v)
return nil
}); err != nil {
return err
}
if num == seq.leased {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], seq.next)
return txn.SetEntry(NewEntry(seq.key, buf[:]))
}
return nil
})
if err != nil {
return err
}
seq.leased = seq.next
return nil
}
func (seq *Sequence) updateLease() error {
return seq.db.Update(func(txn *Txn) error {
item, err := txn.Get(seq.key)
switch {
case err == ErrKeyNotFound:
seq.next = 0
case err != nil:
return err
default:
var num uint64
if err := item.Value(func(v []byte) error {
num = binary.BigEndian.Uint64(v)
return nil
}); err != nil {
return err
}
seq.next = num
}
lease := seq.next + seq.bandwidth
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], lease)
if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
return err
}
seq.leased = lease
return nil
})
}
// GetSequence would initiate a new sequence object, generating it from the stored lease, if
// available, in the database. Sequence can be used to get a list of monotonically increasing
// integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
// size of the lease, determining how many Next() requests can be served from memory.
//
// GetSequence is not supported on ManagedDB. Calling this would result in a panic.
func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
if db.opt.managedTxns {
panic("Cannot use GetSequence with managedDB=true.")
}
switch {
case len(key) == 0:
return nil, ErrEmptyKey
case bandwidth == 0:
return nil, ErrZeroBandwidth
}
seq := &Sequence{
db: db,
key: key,
next: 0,
leased: 0,
bandwidth: bandwidth,
}
err := seq.updateLease()
return seq, err
}
// Tables gets the TableInfo objects from the level controller. If withKeysCount
// is true, TableInfo objects also contain counts of keys for the tables.
func (db *DB) Tables() []TableInfo {
return db.lc.getTableInfo()
}
// Levels gets the LevelInfo.
func (db *DB) Levels() []LevelInfo {
return db.lc.getLevelInfo()
}
// EstimateSize can be used to get rough estimate of data size for a given prefix.
func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
var onDiskSize, uncompressedSize uint64
tables := db.Tables()
for _, ti := range tables {
if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
onDiskSize += uint64(ti.OnDiskSize)
uncompressedSize += uint64(ti.UncompressedSize)
}
}
return onDiskSize, uncompressedSize
}
// Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
// would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
// first range would have nil as left key, and the last range would have nil as the right key.
func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
var splits []string
tables := db.Tables()
// We just want table ranges here and not keys count.
for _, ti := range tables {
// We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
// at upper levels. Only choose tables from the last level.
if ti.Level != db.opt.MaxLevels-1 {
continue
}
if bytes.HasPrefix(ti.Right, prefix) {
splits = append(splits, string(ti.Right))
}
}
// If the number of splits is low, look at the offsets inside the
// tables to generate more splits.
if len(splits) < 32 {
numTables := len(tables)
if numTables == 0 {
numTables = 1
}
numPerTable := 32 / numTables
if numPerTable == 0 {
numPerTable = 1
}
splits = db.lc.keySplits(numPerTable, prefix)
}
// If the number of splits is still < 32, then look at the memtables.
if len(splits) < 32 {
maxPerSplit := 10000
mtSplits := func(mt *memTable) {
if mt == nil {
return
}
count := 0
iter := mt.sl.NewIterator()
for iter.SeekToFirst(); iter.Valid(); iter.Next() {
if count%maxPerSplit == 0 {
// Add a split every maxPerSplit keys.
if bytes.HasPrefix(iter.Key(), prefix) {
splits = append(splits, string(iter.Key()))
}
}
count += 1
}
_ = iter.Close()
}
db.lock.Lock()
defer db.lock.Unlock()
var memTables []*memTable
memTables = append(memTables, db.imm...)
for _, mt := range memTables {
mtSplits(mt)
}
mtSplits(db.mt)
}
// We have our splits now. Let's convert them to ranges.
sort.Strings(splits)
var ranges []*keyRange
var start []byte
for _, key := range splits {
ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
start = y.SafeCopy(nil, []byte(key))
}
ranges = append(ranges, &keyRange{left: start})
// Figure out the approximate table size this range has to deal with.
for _, t := range tables {
tr := keyRange{left: t.Left, right: t.Right}
for _, r := range ranges {
if len(r.left) == 0 || len(r.right) == 0 {
continue
}
if r.overlapsWith(tr) {
r.size += int64(t.UncompressedSize)
}
}
}
var total int64
for _, r := range ranges {
total += r.size
}
if total == 0 {
return ranges
}
// Figure out the average size, so we know how to bin the ranges together.
avg := total / int64(numRanges)
var out []*keyRange
var i int
for i < len(ranges) {
r := ranges[i]
cur := &keyRange{left: r.left, size: r.size, right: r.right}
i++
for ; i < len(ranges); i++ {
next := ranges[i]
if cur.size+next.size > avg {
break
}
cur.right = next.right
cur.size += next.size
}
out = append(out, cur)
}
return out
}
// MaxBatchCount returns max possible entries in batch
func (db *DB) MaxBatchCount() int64 {
return db.opt.maxBatchCount
}
// MaxBatchSize returns max possible batch size
func (db *DB) MaxBatchSize() int64 {
return db.opt.maxBatchSize
}
func (db *DB) stopMemoryFlush() {
// Stop memtable flushes.
if db.closers.memtable != nil {
close(db.flushChan)
db.closers.memtable.SignalAndWait()
}
}
func (db *DB) stopCompactions() {
// Stop compactions.
if db.closers.compactors != nil {
db.closers.compactors.SignalAndWait()
}
}
func (db *DB) startCompactions() {
// Resume compactions.
if db.closers.compactors != nil {
db.closers.compactors = z.NewCloser(1)
db.lc.startCompact(db.closers.compactors)
}
}
func (db *DB) startMemoryFlush() {
// Start memory fluhser.
if db.closers.memtable != nil {
db.flushChan = make(chan *memTable, db.opt.NumMemtables)
db.closers.memtable = z.NewCloser(1)
go func() {
db.flushMemtable(db.closers.memtable)
}()
}
}
// Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
// level. This ensures that all the versions of keys are colocated and not split across multiple
// levels, which is necessary after a restore from backup. During Flatten, live compactions are
// stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
// between flattening the tree and new tables being created at level zero.
func (db *DB) Flatten(workers int) error {
db.stopCompactions()
defer db.startCompactions()
compactAway := func(cp compactionPriority) error {
db.opt.Infof("Attempting to compact with %+v\n", cp)
errCh := make(chan error, 1)
for i := 0; i < workers; i++ {
go func() {
errCh <- db.lc.doCompact(175, cp)
}()
}
var success int
var rerr error
for i := 0; i < workers; i++ {
err := <-errCh
if err != nil {
rerr = err
db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
} else {
success++
}
}
if success == 0 {
return rerr
}
// We could do at least one successful compaction. So, we'll consider this a success.
db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
success, cp.level)
return nil
}
hbytes := func(sz int64) string {
return humanize.IBytes(uint64(sz))
}
t := db.lc.levelTargets()
for {
db.opt.Infof("\n")
var levels []int
for i, l := range db.lc.levels {
sz := l.getTotalSize()
db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
if sz > 0 {
levels = append(levels, i)
}
}
if len(levels) <= 1 {
prios := db.lc.pickCompactLevels(nil)
if len(prios) == 0 || prios[0].score <= 1.0 {
db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
return nil
}
if err := compactAway(prios[0]); err != nil {
return err
}
continue
}
// Create an artificial compaction priority, to ensure that we compact the level.
cp := compactionPriority{level: levels[0], score: 1.71}
if err := compactAway(cp); err != nil {
return err
}
}
}
func (db *DB) blockWrite() error {
// Stop accepting new writes.
if !db.blockWrites.CompareAndSwap(0, 1) {
return ErrBlockedWrites
}
// Make all pending writes finish. The following will also close writeCh.
db.closers.writes.SignalAndWait()
db.opt.Infof("Writes flushed. Stopping compactions now...")
return nil
}
func (db *DB) unblockWrite() {
db.closers.writes = z.NewCloser(1)
go db.doWrites(db.closers.writes)
// Resume writes.
db.blockWrites.Store(0)
}
func (db *DB) prepareToDrop() (func(), error) {
if db.opt.ReadOnly {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending memtable. So that, we
// don't miss any entries.
if err := db.blockWrite(); err != nil {
return func() {}, err
}
reqs := make([]*request, 0, 10)
for {
select {
case r := <-db.writeCh:
reqs = append(reqs, r)
default:
if err := db.writeRequests(reqs); err != nil {
db.opt.Errorf("writeRequests: %v", err)
}
db.stopMemoryFlush()
return func() {
db.opt.Infof("Resuming writes")
db.startMemoryFlush()
db.unblockWrite()
}, nil
}
}
}
// DropAll would drop all the data stored in Badger. It does this in the following way.
// - Stop accepting new writes.
// - Pause memtable flushes and compactions.
// - Pick all tables from all levels, create a changeset to delete all these
// tables and apply it to manifest.
// - Pick all log files from value log, and delete all of them. Restart value log files from zero.
// - Resume memtable flushes and compactions.
//
// NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
// any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
// writes are paused before running DropAll, and resumed after it is finished.
func (db *DB) DropAll() error {
f, err := db.dropAll()
if f != nil {
f()
}
return err
}
func (db *DB) dropAll() (func(), error) {
db.opt.Infof("DropAll called. Blocking writes...")
f, err := db.prepareToDrop()
if err != nil {
return f, err
}
// prepareToDrop will stop all the incomming write and flushes any pending memtables.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
db.stopCompactions()
resume := func() {
db.startCompactions()
f()
}
// Block all foreign interactions with memory tables.
db.lock.Lock()
defer db.lock.Unlock()
// Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
db.mt.DecrRef()
for _, mt := range db.imm {
mt.DecrRef()
}
db.imm = db.imm[:0]
db.mt, err = db.newMemTable() // Set it up for future writes.
if err != nil {
return resume, y.Wrapf(err, "cannot open new memtable")
}
num, err := db.lc.dropTree()
if err != nil {
return resume, err
}
db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
num, err = db.vlog.dropAll()
if err != nil {
return resume, err
}
db.lc.nextFileID.Store(1)
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
db.blockCache.Clear()
db.indexCache.Clear()
db.threshold.Clear(db.opt)
return resume, nil
}
// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
// and memtable flush stalls for lock, which leads to deadlock
// - Flush out all memtables, skipping over keys with the given prefix, Kp.
// - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
// back after a restart.
// - Stop compaction.
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
if len(prefixes) == 0 {
return nil
}
db.opt.Infof("DropPrefix called for %s", prefixes)
f, err := db.prepareToDrop()
if err != nil {
return err
}
defer f()
var filtered [][]byte
if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
return err
}
// If there is no prefix for which the data already exist, do not do anything.
if len(filtered) == 0 {
db.opt.Infof("No prefixes to drop")
return nil
}
// Block all foreign interactions with memory tables.
db.lock.Lock()
defer db.lock.Unlock()
db.imm = append(db.imm, db.mt)
for _, memtable := range db.imm {
if memtable.sl.Empty() {
memtable.DecrRef()
continue
}
db.opt.Debugf("Flushing memtable")
if err := db.handleMemTableFlush(memtable, filtered); err != nil {
db.opt.Errorf("While trying to flush memtable: %v", err)
return err
}
memtable.DecrRef()
}
db.stopCompactions()
defer db.startCompactions()
db.imm = db.imm[:0]
db.mt, err = db.newMemTable()
if err != nil {
return y.Wrapf(err, "cannot create new mem table")
}
// Drop prefixes from the levels.
if err := db.lc.dropPrefixes(filtered); err != nil {
return err
}
db.opt.Infof("DropPrefix done")
return nil
}
func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
var filtered [][]byte
for _, prefix := range prefixes {
err := db.View(func(txn *Txn) error {
iopts := DefaultIteratorOptions
iopts.Prefix = prefix
iopts.PrefetchValues = false
itr := txn.NewIterator(iopts)
defer itr.Close()
itr.Rewind()
if itr.ValidForPrefix(prefix) {
filtered = append(filtered, prefix)
}
return nil
})
if err != nil {
return filtered, err
}
}
return filtered, nil
}
// Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
// namepspaces. Else it returns nil.
func (db *DB) isBanned(key []byte) error {
if db.opt.NamespaceOffset < 0 {
return nil
}
if len(key) <= db.opt.NamespaceOffset+8 {
return nil
}
if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
return ErrBannedKey
}
return nil
}
// BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
func (db *DB) BanNamespace(ns uint64) error {
if db.opt.NamespaceOffset < 0 {
return ErrNamespaceMode
}
db.opt.Infof("Banning namespace: %d", ns)
// First set the banned namespaces in DB and then update the in-memory structure.
key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
entry := []*Entry{{
Key: key,
Value: nil,
}}
req, err := db.sendToWriteCh(entry)
if err != nil {
return err
}
if err := req.Wait(); err != nil {
return err
}
db.bannedNamespaces.add(ns)
return nil
}
// BannedNamespaces returns the list of prefixes banned for DB.
func (db *DB) BannedNamespaces() []uint64 {
return db.bannedNamespaces.all()
}
// KVList contains a list of key-value pairs.
type KVList = pb.KVList
// Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
// At least one prefix should be passed, or an error will be returned.
// You can use an empty prefix to monitor all changes to the DB.
// Ignore string is the byte ranges for which prefix matching will be ignored.
// For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
// This function blocks until the given context is done or an error occurs.
// The given function will be called with a new KVList containing the modified keys and the
// corresponding values.
func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
if cb == nil {
return ErrNilCallback
}
c := z.NewCloser(1)
s, err := db.pub.newSubscriber(c, matches)
if err != nil {
return y.Wrapf(err, "while creating a new subscriber")
}
slurp := func(batch *pb.KVList) error {
for {
select {
case kvs := <-s.sendCh:
batch.Kv = append(batch.Kv, kvs.Kv...)
default:
if len(batch.GetKv()) > 0 {
return cb(batch)
}
return nil
}
}
}
drain := func() {
for {
select {
case _, ok := <-s.sendCh:
if !ok {
// Channel is closed.
return
}
default:
return
}
}
}
for {
select {
case <-c.HasBeenClosed():
// No need to delete here. Closer will be called only while
// closing DB. Subscriber will be deleted by cleanSubscribers.
err := slurp(new(pb.KVList))
// Drain if any pending updates.
c.Done()
return err
case <-ctx.Done():
c.Done()
s.active.Store(0)
drain()
db.pub.deleteSubscriber(s.id)
// Delete the subscriber to avoid further updates.
return ctx.Err()
case batch := <-s.sendCh:
err := slurp(batch)
if err != nil {
c.Done()
s.active.Store(0)
drain()
// Delete the subscriber if there is an error by the callback.
db.pub.deleteSubscriber(s.id)
return err
}
}
}
}
func (db *DB) syncDir(dir string) error {
if db.opt.InMemory {
return nil
}
return syncDir(dir)
}
func createDirs(opt Options) error {
for _, path := range []string{opt.Dir, opt.ValueDir} {
dirExists, err := exists(path)
if err != nil {
return y.Wrapf(err, "Invalid Dir: %q", path)
}
if !dirExists {
if opt.ReadOnly {
return errors.Errorf("Cannot find directory %q for read-only open", path)
}
// Try to create the directory
err = os.MkdirAll(path, 0700)
if err != nil {
return y.Wrapf(err, "Error Creating Dir: %q", path)
}
}
}
return nil
}
// Stream the contents of this DB to a new DB with options outOptions that will be
// created in outDir.
func (db *DB) StreamDB(outOptions Options) error {
outDir := outOptions.Dir
// Open output DB.
outDB, err := OpenManaged(outOptions)
if err != nil {
return y.Wrapf(err, "cannot open out DB at %s", outDir)
}
defer outDB.Close()
writer := outDB.NewStreamWriter()
if err := writer.Prepare(); err != nil {
return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
}
// Stream contents of DB to the output DB.
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
stream.Send = func(buf *z.Buffer) error {
return writer.Write(buf)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
}
if err := writer.Flush(); err != nil {
return y.Wrapf(err, "cannot flush writer")
}
return nil
}
// Opts returns a copy of the DB options.
func (db *DB) Opts() Options {
return db.opt
}
type CacheType int
const (
BlockCache CacheType = iota
IndexCache
)
// CacheMaxCost updates the max cost of the given cache (either block or index cache).
// The call will have an effect only if the DB was created with the cache. Otherwise it is
// a no-op. If you pass a negative value, the function will return the current value
// without updating it.
func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
if db == nil {
return 0, nil
}
if maxCost < 0 {
switch cache {
case BlockCache:
return db.blockCache.MaxCost(), nil
case IndexCache:
return db.indexCache.MaxCost(), nil
default:
return 0, errors.Errorf("invalid cache type")
}
}
switch cache {
case BlockCache:
db.blockCache.UpdateMaxCost(maxCost)
return maxCost, nil
case IndexCache:
db.indexCache.UpdateMaxCost(maxCost)
return maxCost, nil
default:
return 0, errors.Errorf("invalid cache type")
}
}
func (db *DB) LevelsToString() string {
levels := db.Levels()
h := func(sz int64) string {
return humanize.IBytes(uint64(sz))
}
base := func(b bool) string {
if b {
return "B"
}
return " "
}
var b strings.Builder
b.WriteRune('\n')
for _, li := range levels {
b.WriteString(fmt.Sprintf(
"Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
" StaleData: %s Target FileSize: %s\n",
li.Level, base(li.IsBaseLevel), li.NumTables,
h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
h(li.TargetFileSize)))
}
b.WriteString("Level Done\n")
return b.String()
}
Go
1
https://gitee.com/mirrors/Badger.git
git@gitee.com:mirrors/Badger.git
mirrors
Badger
Badger
main

搜索帮助