Add Last db view in kvdb and statedb

Last db view is an opened pebble db which always contains a checkpoint from the
last batch.  Methods to access this last batch are thread safe so that views of
the last checkpoint can be made anywhere and with a consistent view of the
state.
This commit is contained in:
Eduard S
2021-02-02 15:42:52 +01:00
parent ca7fa8ae2e
commit 6590c47a9a
12 changed files with 422 additions and 160 deletions

View File

@@ -24,6 +24,9 @@ const (
// PathCurrent defines the subpath of the current Batch in the subpath
// of the KVDB
PathCurrent = "current"
// PathLast defines the subpath of the last Batch in the subpath
// of the StateDB
PathLast = "last"
)
var (
@@ -42,6 +45,58 @@ type KVDB struct {
CurrentBatch common.BatchNum
keep int
m sync.Mutex
last *Last
}
// Last is a consistent view to the last batch of the stateDB that can
// be queried concurrently.
type Last struct {
db *pebble.Storage
path string
rw sync.RWMutex
}
func (k *Last) setNew() error {
k.rw.Lock()
defer k.rw.Unlock()
if k.db != nil {
k.db.Close()
}
lastPath := path.Join(k.path, PathLast)
err := os.RemoveAll(lastPath)
if err != nil {
return tracerr.Wrap(err)
}
db, err := pebble.NewPebbleStorage(path.Join(k.path, lastPath), false)
if err != nil {
return tracerr.Wrap(err)
}
k.db = db
return nil
}
func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error {
k.rw.Lock()
defer k.rw.Unlock()
if k.db != nil {
k.db.Close()
}
lastPath := path.Join(k.path, PathLast)
if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil {
return tracerr.Wrap(err)
}
db, err := pebble.NewPebbleStorage(lastPath, false)
if err != nil {
return tracerr.Wrap(err)
}
k.db = db
return nil
}
func (k *Last) close() {
k.rw.Lock()
defer k.rw.Unlock()
k.db.Close()
}
// NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
@@ -58,6 +113,9 @@ func NewKVDB(pathDB string, keep int) (*KVDB, error) {
path: pathDB,
db: sto,
keep: keep,
last: &Last{
path: pathDB,
},
}
// load currentBatch
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
@@ -74,6 +132,13 @@ func NewKVDB(pathDB string, keep int) (*KVDB, error) {
return kvdb, nil
}
// LastRead is a thread-safe method to query the last KVDB
func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error {
kvdb.last.rw.RLock()
defer kvdb.last.rw.RUnlock()
return fn(kvdb.last.db)
}
// DB returns the *pebble.Storage from the KVDB
func (kvdb *KVDB) DB() *pebble.Storage {
return kvdb.db
@@ -139,14 +204,21 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
kvdb.db = sto
kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
kvdb.CurrentBatch = 0
if err := kvdb.last.setNew(); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// copy 'BatchNumX' to 'current'
// copy 'batchNum' to 'current'
if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
return tracerr.Wrap(err)
}
// copy 'batchNum' to 'last'
if err := kvdb.last.set(kvdb, batchNum); err != nil {
return tracerr.Wrap(err)
}
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
@@ -334,6 +406,10 @@ func (kvdb *KVDB) MakeCheckpoint() error {
if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil {
return tracerr.Wrap(err)
}
// copy 'CurrentBatch' to 'last'
if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil {
return tracerr.Wrap(err)
}
// delete old checkpoints
if err := kvdb.deleteOldCheckpoints(); err != nil {
return tracerr.Wrap(err)
@@ -456,4 +532,5 @@ func pebbleMakeCheckpoint(source, dest string) error {
// Close the DB
func (kvdb *KVDB) Close() {
kvdb.db.Close()
kvdb.last.close()
}