|
|
@ -27,6 +27,8 @@ const ( |
|
|
|
// PathLast defines the subpath of the last Batch in the subpath
|
|
|
|
// of the StateDB
|
|
|
|
PathLast = "last" |
|
|
|
// DefaultKeep is the default value for the Keep parameter
|
|
|
|
DefaultKeep = 128 |
|
|
|
) |
|
|
|
|
|
|
|
var ( |
|
|
@ -34,16 +36,18 @@ var ( |
|
|
|
KeyCurrentBatch = []byte("k:currentbatch") |
|
|
|
// keyCurrentIdx is used as key in the db to store the CurrentIdx
|
|
|
|
keyCurrentIdx = []byte("k:idx") |
|
|
|
// ErrNoLast is returned when the KVDB has been configured to not have
|
|
|
|
// a Last checkpoint but a Last method is used
|
|
|
|
ErrNoLast = fmt.Errorf("no last checkpoint") |
|
|
|
) |
|
|
|
|
|
|
|
// KVDB represents the Key-Value DB object
|
|
|
|
type KVDB struct { |
|
|
|
path string |
|
|
|
db *pebble.Storage |
|
|
|
cfg Config |
|
|
|
db *pebble.Storage |
|
|
|
// CurrentIdx holds the current Idx that the BatchBuilder is using
|
|
|
|
CurrentIdx common.Idx |
|
|
|
CurrentBatch common.BatchNum |
|
|
|
keep int |
|
|
|
m sync.Mutex |
|
|
|
last *Last |
|
|
|
} |
|
|
@ -103,23 +107,42 @@ func (k *Last) close() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Config of the KVDB
|
|
|
|
type Config struct { |
|
|
|
// Path where the checkpoints will be stored
|
|
|
|
Path string |
|
|
|
// Keep is the number of old checkpoints to keep. If 0, all
|
|
|
|
// checkpoints are kept.
|
|
|
|
Keep int |
|
|
|
// At every checkpoint, check that there are no gaps between the
|
|
|
|
// checkpoints
|
|
|
|
NoGapsCheck bool |
|
|
|
// NoLast skips having an opened DB with a checkpoint to the last
|
|
|
|
// batchNum for thread-safe reads.
|
|
|
|
NoLast bool |
|
|
|
} |
|
|
|
|
|
|
|
// NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
|
|
|
|
// Checkpoints older than the value defined by `keep` will be deleted.
|
|
|
|
func NewKVDB(pathDB string, keep int) (*KVDB, error) { |
|
|
|
// func NewKVDB(pathDB string, keep int) (*KVDB, error) {
|
|
|
|
func NewKVDB(cfg Config) (*KVDB, error) { |
|
|
|
var sto *pebble.Storage |
|
|
|
var err error |
|
|
|
sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false) |
|
|
|
sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false) |
|
|
|
if err != nil { |
|
|
|
return nil, tracerr.Wrap(err) |
|
|
|
} |
|
|
|
|
|
|
|
var last *Last |
|
|
|
if !cfg.NoLast { |
|
|
|
last = &Last{ |
|
|
|
path: cfg.Path, |
|
|
|
} |
|
|
|
} |
|
|
|
kvdb := &KVDB{ |
|
|
|
path: pathDB, |
|
|
|
cfg: cfg, |
|
|
|
db: sto, |
|
|
|
keep: keep, |
|
|
|
last: &Last{ |
|
|
|
path: pathDB, |
|
|
|
}, |
|
|
|
last: last, |
|
|
|
} |
|
|
|
// load currentBatch
|
|
|
|
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() |
|
|
@ -137,29 +160,32 @@ func NewKVDB(pathDB string, keep int) (*KVDB, error) { |
|
|
|
} |
|
|
|
|
|
|
|
// LastRead is a thread-safe method to query the last KVDB
|
|
|
|
func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error { |
|
|
|
kvdb.last.rw.RLock() |
|
|
|
defer kvdb.last.rw.RUnlock() |
|
|
|
return fn(kvdb.last.db) |
|
|
|
func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error { |
|
|
|
if k.last == nil { |
|
|
|
return tracerr.Wrap(ErrNoLast) |
|
|
|
} |
|
|
|
k.last.rw.RLock() |
|
|
|
defer k.last.rw.RUnlock() |
|
|
|
return fn(k.last.db) |
|
|
|
} |
|
|
|
|
|
|
|
// DB returns the *pebble.Storage from the KVDB
|
|
|
|
func (kvdb *KVDB) DB() *pebble.Storage { |
|
|
|
return kvdb.db |
|
|
|
func (k *KVDB) DB() *pebble.Storage { |
|
|
|
return k.db |
|
|
|
} |
|
|
|
|
|
|
|
// StorageWithPrefix returns the db.Storage with the given prefix from the
|
|
|
|
// current KVDB
|
|
|
|
func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage { |
|
|
|
return kvdb.db.WithPrefix(prefix) |
|
|
|
func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage { |
|
|
|
return k.db.WithPrefix(prefix) |
|
|
|
} |
|
|
|
|
|
|
|
// Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
|
|
|
|
// not delete the checkpoints between old current and the new current, those
|
|
|
|
// checkpoints will remain in the storage, and eventually will be deleted when
|
|
|
|
// MakeCheckpoint overwrites them.
|
|
|
|
func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { |
|
|
|
return kvdb.reset(batchNum, true) |
|
|
|
func (k *KVDB) Reset(batchNum common.BatchNum) error { |
|
|
|
return k.reset(batchNum, true) |
|
|
|
} |
|
|
|
|
|
|
|
// reset resets the KVDB to the checkpoint at the given batchNum. Reset does
|
|
|
@ -167,19 +193,19 @@ func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { |
|
|
|
// checkpoints will remain in the storage, and eventually will be deleted when
|
|
|
|
// MakeCheckpoint overwrites them. `closeCurrent` will close the currently
|
|
|
|
// opened db before doing the reset.
|
|
|
|
func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { |
|
|
|
currentPath := path.Join(kvdb.path, PathCurrent) |
|
|
|
func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { |
|
|
|
currentPath := path.Join(k.cfg.Path, PathCurrent) |
|
|
|
|
|
|
|
if closeCurrent && kvdb.db != nil { |
|
|
|
kvdb.db.Close() |
|
|
|
kvdb.db = nil |
|
|
|
if closeCurrent && k.db != nil { |
|
|
|
k.db.Close() |
|
|
|
k.db = nil |
|
|
|
} |
|
|
|
// remove 'current'
|
|
|
|
if err := os.RemoveAll(currentPath); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
// remove all checkpoints > batchNum
|
|
|
|
list, err := kvdb.ListCheckpoints() |
|
|
|
list, err := k.ListCheckpoints() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -192,7 +218,7 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { |
|
|
|
} |
|
|
|
} |
|
|
|
for _, bn := range list[start:] { |
|
|
|
if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { |
|
|
|
if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
@ -203,23 +229,27 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
kvdb.db = sto |
|
|
|
kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
|
|
|
|
kvdb.CurrentBatch = 0 |
|
|
|
if err := kvdb.last.setNew(); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
k.db = sto |
|
|
|
k.CurrentIdx = common.RollupConstReservedIDx // 255
|
|
|
|
k.CurrentBatch = 0 |
|
|
|
if k.last != nil { |
|
|
|
if err := k.last.setNew(); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// copy 'batchNum' to 'current'
|
|
|
|
if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil { |
|
|
|
if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
// copy 'batchNum' to 'last'
|
|
|
|
if err := kvdb.last.set(kvdb, batchNum); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
if k.last != nil { |
|
|
|
if err := k.last.set(k, batchNum); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// open the new 'current'
|
|
|
@ -227,15 +257,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
kvdb.db = sto |
|
|
|
k.db = sto |
|
|
|
|
|
|
|
// get currentBatch num
|
|
|
|
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() |
|
|
|
k.CurrentBatch, err = k.GetCurrentBatch() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
// idx is obtained from the statedb reset
|
|
|
|
kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() |
|
|
|
k.CurrentIdx, err = k.GetCurrentIdx() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -245,15 +275,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { |
|
|
|
|
|
|
|
// ResetFromSynchronizer performs a reset in the KVDB getting the state from
|
|
|
|
// synchronizerKVDB for the given batchNum.
|
|
|
|
func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { |
|
|
|
func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { |
|
|
|
if synchronizerKVDB == nil { |
|
|
|
return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil")) |
|
|
|
} |
|
|
|
|
|
|
|
currentPath := path.Join(kvdb.path, PathCurrent) |
|
|
|
if kvdb.db != nil { |
|
|
|
kvdb.db.Close() |
|
|
|
kvdb.db = nil |
|
|
|
currentPath := path.Join(k.cfg.Path, PathCurrent) |
|
|
|
if k.db != nil { |
|
|
|
k.db.Close() |
|
|
|
k.db = nil |
|
|
|
} |
|
|
|
|
|
|
|
// remove 'current'
|
|
|
@ -261,12 +291,12 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
// remove all checkpoints
|
|
|
|
list, err := kvdb.ListCheckpoints() |
|
|
|
list, err := k.ListCheckpoints() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
for _, bn := range list { |
|
|
|
if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { |
|
|
|
if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
@ -277,14 +307,14 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
kvdb.db = sto |
|
|
|
kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
|
|
|
|
kvdb.CurrentBatch = 0 |
|
|
|
k.db = sto |
|
|
|
k.CurrentIdx = common.RollupConstReservedIDx // 255
|
|
|
|
k.CurrentBatch = 0 |
|
|
|
|
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) |
|
|
|
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) |
|
|
|
|
|
|
|
// copy synchronizer'BatchNumX' to 'BatchNumX'
|
|
|
|
if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { |
|
|
@ -292,7 +322,7 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV |
|
|
|
} |
|
|
|
|
|
|
|
// copy 'BatchNumX' to 'current'
|
|
|
|
err = kvdb.MakeCheckpointFromTo(batchNum, currentPath) |
|
|
|
err = k.MakeCheckpointFromTo(batchNum, currentPath) |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -302,15 +332,15 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
kvdb.db = sto |
|
|
|
k.db = sto |
|
|
|
|
|
|
|
// get currentBatch num
|
|
|
|
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() |
|
|
|
k.CurrentBatch, err = k.GetCurrentBatch() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
// get currentIdx
|
|
|
|
kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() |
|
|
|
k.CurrentIdx, err = k.GetCurrentIdx() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -319,8 +349,8 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV |
|
|
|
} |
|
|
|
|
|
|
|
// GetCurrentBatch returns the current BatchNum stored in the KVDB
|
|
|
|
func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { |
|
|
|
cbBytes, err := kvdb.db.Get(KeyCurrentBatch) |
|
|
|
func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) { |
|
|
|
cbBytes, err := k.db.Get(KeyCurrentBatch) |
|
|
|
if tracerr.Unwrap(err) == db.ErrNotFound { |
|
|
|
return 0, nil |
|
|
|
} |
|
|
@ -331,12 +361,12 @@ func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { |
|
|
|
} |
|
|
|
|
|
|
|
// setCurrentBatch stores the current BatchNum in the KVDB
|
|
|
|
func (kvdb *KVDB) setCurrentBatch() error { |
|
|
|
tx, err := kvdb.db.NewTx() |
|
|
|
func (k *KVDB) setCurrentBatch() error { |
|
|
|
tx, err := k.db.NewTx() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes()) |
|
|
|
err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes()) |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -347,9 +377,9 @@ func (kvdb *KVDB) setCurrentBatch() error { |
|
|
|
} |
|
|
|
|
|
|
|
// GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
|
|
|
|
// used for an Account in the KVDB.
|
|
|
|
func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { |
|
|
|
idxBytes, err := kvdb.db.Get(keyCurrentIdx) |
|
|
|
// used for an Account in the k.
|
|
|
|
func (k *KVDB) GetCurrentIdx() (common.Idx, error) { |
|
|
|
idxBytes, err := k.db.Get(keyCurrentIdx) |
|
|
|
if tracerr.Unwrap(err) == db.ErrNotFound { |
|
|
|
return common.RollupConstReservedIDx, nil // 255, nil
|
|
|
|
} |
|
|
@ -360,10 +390,10 @@ func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { |
|
|
|
} |
|
|
|
|
|
|
|
// SetCurrentIdx stores Idx in the KVDB
|
|
|
|
func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { |
|
|
|
kvdb.CurrentIdx = idx |
|
|
|
func (k *KVDB) SetCurrentIdx(idx common.Idx) error { |
|
|
|
k.CurrentIdx = idx |
|
|
|
|
|
|
|
tx, err := kvdb.db.NewTx() |
|
|
|
tx, err := k.db.NewTx() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -383,14 +413,14 @@ func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { |
|
|
|
|
|
|
|
// MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
|
|
|
|
// Internally this advances & stores the current BatchNum, and then stores a
|
|
|
|
// Checkpoint of the current state of the KVDB.
|
|
|
|
func (kvdb *KVDB) MakeCheckpoint() error { |
|
|
|
// Checkpoint of the current state of the k.
|
|
|
|
func (k *KVDB) MakeCheckpoint() error { |
|
|
|
// advance currentBatch
|
|
|
|
kvdb.CurrentBatch++ |
|
|
|
k.CurrentBatch++ |
|
|
|
|
|
|
|
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch)) |
|
|
|
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch)) |
|
|
|
|
|
|
|
if err := kvdb.setCurrentBatch(); err != nil { |
|
|
|
if err := k.setCurrentBatch(); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
|
|
|
@ -404,15 +434,17 @@ func (kvdb *KVDB) MakeCheckpoint() error { |
|
|
|
} |
|
|
|
|
|
|
|
// execute Checkpoint
|
|
|
|
if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil { |
|
|
|
if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
// copy 'CurrentBatch' to 'last'
|
|
|
|
if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
if k.last != nil { |
|
|
|
if err := k.last.set(k, k.CurrentBatch); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
|
// delete old checkpoints
|
|
|
|
if err := kvdb.deleteOldCheckpoints(); err != nil { |
|
|
|
if err := k.deleteOldCheckpoints(); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
|
|
|
@ -420,8 +452,8 @@ func (kvdb *KVDB) MakeCheckpoint() error { |
|
|
|
} |
|
|
|
|
|
|
|
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum
|
|
|
|
func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { |
|
|
|
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) |
|
|
|
func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { |
|
|
|
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) |
|
|
|
|
|
|
|
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { |
|
|
|
return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) |
|
|
@ -432,8 +464,8 @@ func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { |
|
|
|
|
|
|
|
// ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
|
|
|
|
// If there's a gap between the list of checkpoints, an error is returned.
|
|
|
|
func (kvdb *KVDB) ListCheckpoints() ([]int, error) { |
|
|
|
files, err := ioutil.ReadDir(kvdb.path) |
|
|
|
func (k *KVDB) ListCheckpoints() ([]int, error) { |
|
|
|
files, err := ioutil.ReadDir(k.cfg.Path) |
|
|
|
if err != nil { |
|
|
|
return nil, tracerr.Wrap(err) |
|
|
|
} |
|
|
@ -450,12 +482,12 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) { |
|
|
|
} |
|
|
|
} |
|
|
|
sort.Ints(checkpoints) |
|
|
|
if len(checkpoints) > 0 { |
|
|
|
if !k.cfg.NoGapsCheck && len(checkpoints) > 0 { |
|
|
|
first := checkpoints[0] |
|
|
|
for _, checkpoint := range checkpoints[1:] { |
|
|
|
first++ |
|
|
|
if checkpoint != first { |
|
|
|
log.Errorw("GAP", "checkpoints", checkpoints) |
|
|
|
log.Errorw("gap between checkpoints", "checkpoints", checkpoints) |
|
|
|
return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint)) |
|
|
|
} |
|
|
|
} |
|
|
@ -465,14 +497,14 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) { |
|
|
|
|
|
|
|
// deleteOldCheckpoints deletes old checkpoints when there are more than
|
|
|
|
// `s.keep` checkpoints
|
|
|
|
func (kvdb *KVDB) deleteOldCheckpoints() error { |
|
|
|
list, err := kvdb.ListCheckpoints() |
|
|
|
func (k *KVDB) deleteOldCheckpoints() error { |
|
|
|
list, err := k.ListCheckpoints() |
|
|
|
if err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
if len(list) > kvdb.keep { |
|
|
|
for _, checkpoint := range list[:len(list)-kvdb.keep] { |
|
|
|
if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { |
|
|
|
if k.cfg.Keep > 0 && len(list) > k.cfg.Keep { |
|
|
|
for _, checkpoint := range list[:len(list)-k.cfg.Keep] { |
|
|
|
if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
@ -483,8 +515,8 @@ func (kvdb *KVDB) deleteOldCheckpoints() error { |
|
|
|
// MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
|
|
|
|
// to the dest folder. This method is locking, so it can be called from
|
|
|
|
// multiple places at the same time.
|
|
|
|
func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { |
|
|
|
source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) |
|
|
|
func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { |
|
|
|
source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) |
|
|
|
if _, err := os.Stat(source); os.IsNotExist(err) { |
|
|
|
// if kvdb does not have checkpoint at batchNum, return err
|
|
|
|
return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) |
|
|
@ -494,8 +526,8 @@ func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string |
|
|
|
// synchronizer to do a reset to a batchNum at the same time as the
|
|
|
|
// pipeline is doing a txSelector.Reset and batchBuilder.Reset from
|
|
|
|
// synchronizer to the same batchNum
|
|
|
|
kvdb.m.Lock() |
|
|
|
defer kvdb.m.Unlock() |
|
|
|
k.m.Lock() |
|
|
|
defer k.m.Unlock() |
|
|
|
return pebbleMakeCheckpoint(source, dest) |
|
|
|
} |
|
|
|
|
|
|
@ -525,10 +557,12 @@ func pebbleMakeCheckpoint(source, dest string) error { |
|
|
|
} |
|
|
|
|
|
|
|
// Close the DB
|
|
|
|
func (kvdb *KVDB) Close() { |
|
|
|
if kvdb.db != nil { |
|
|
|
kvdb.db.Close() |
|
|
|
kvdb.db = nil |
|
|
|
func (k *KVDB) Close() { |
|
|
|
if k.db != nil { |
|
|
|
k.db.Close() |
|
|
|
k.db = nil |
|
|
|
} |
|
|
|
if k.last != nil { |
|
|
|
k.last.close() |
|
|
|
} |
|
|
|
kvdb.last.close() |
|
|
|
} |