StateDB intermediate state reset when opening DB

StateDB intermediate state reset when opening DB to force getting always last
Checkpoint at last BatchNum, avoiding inconsistent intermediate state.
This commit is contained in:
arnaucube
2020-09-04 10:12:08 +02:00
parent 4d02308057
commit b1454d441c
5 changed files with 133 additions and 17 deletions

View File

@@ -75,6 +75,12 @@ func NewStateDB(path string, withMT bool, nLevels int) (*StateDB, error) {
return nil, err
}
// make reset (get checkpoint) at currentBatch
err = sdb.Reset(sdb.currentBatch)
if err != nil {
return nil, err
}
return sdb, nil
}
@@ -101,7 +107,10 @@ func (s *StateDB) setCurrentBatch() error {
if err != nil {
return err
}
tx.Put(KeyCurrentBatch, s.currentBatch.Bytes())
err = tx.Put(KeyCurrentBatch, s.currentBatch.Bytes())
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
@@ -153,11 +162,6 @@ func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error {
// those checkpoints will remain in the storage, and eventually will be
// deleted when MakeCheckpoint overwrites them.
func (s *StateDB) Reset(batchNum common.BatchNum) error {
if batchNum == 0 {
s.idx = 0
return nil
}
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
currentPath := s.path + PathCurrent
@@ -166,6 +170,18 @@ func (s *StateDB) Reset(batchNum common.BatchNum) error {
if err != nil {
return err
}
if batchNum == 0 {
// if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
if err != nil {
return err
}
s.db = sto
s.idx = 0
s.currentBatch = batchNum
return nil
}
// copy 'BatchNumX' to 'current'
cmd := exec.Command("cp", "-r", checkpointPath, currentPath) //nolint:gosec
err = cmd.Run()
@@ -191,12 +207,14 @@ func (s *StateDB) Reset(batchNum common.BatchNum) error {
return err
}
// open the MT for the current s.db
mt, err := merkletree.NewMerkleTree(s.db, s.mt.MaxLevels())
if err != nil {
return err
if s.mt != nil {
// open the MT for the current s.db
mt, err := merkletree.NewMerkleTree(s.db, s.mt.MaxLevels())
if err != nil {
return err
}
s.mt = mt
}
s.mt = mt
return nil
}
@@ -255,8 +273,14 @@ func createAccountInTreeDB(sto db.Storage, mt *merkletree.MerkleTree, idx common
return nil, ErrAccountAlreadyExists
}
tx.Put(v.Bytes(), accountBytes[:])
tx.Put(idx.Bytes(), v.Bytes())
err = tx.Put(v.Bytes(), accountBytes[:])
if err != nil {
return nil, err
}
err = tx.Put(idx.Bytes(), v.Bytes())
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
@@ -295,8 +319,14 @@ func updateAccountInTreeDB(sto db.Storage, mt *merkletree.MerkleTree, idx common
if err != nil {
return nil, err
}
tx.Put(v.Bytes(), accountBytes[:])
tx.Put(idx.Bytes(), v.Bytes())
err = tx.Put(v.Bytes(), accountBytes[:])
if err != nil {
return nil, err
}
err = tx.Put(idx.Bytes(), v.Bytes())
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err

View File

@@ -34,6 +34,87 @@ func newAccount(t *testing.T, i int) *common.Account {
}
}
func TestNewStateDBIntermediateState(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.Nil(t, err)
sdb, err := NewStateDB(dir, false, 0)
assert.Nil(t, err)
// test values
k0 := []byte("testkey0")
k1 := []byte("testkey1")
v0 := []byte("testvalue0")
v1 := []byte("testvalue1")
// store some data
tx, err := sdb.db.NewTx()
assert.Nil(t, err)
err = tx.Put(k0, v0)
assert.Nil(t, err)
err = tx.Commit()
assert.Nil(t, err)
v, err := sdb.db.Get(k0)
assert.Nil(t, err)
assert.Equal(t, v0, v)
// call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey0'&'testvalue0' data)
sdb, err = NewStateDB(dir, false, 0)
assert.Nil(t, err)
v, err = sdb.db.Get(k0)
assert.NotNil(t, err)
assert.Equal(t, db.ErrNotFound, err)
assert.Nil(t, v)
// store the same data from the beginning that has ben lost since last NewStateDB
tx, err = sdb.db.NewTx()
assert.Nil(t, err)
err = tx.Put(k0, v0)
assert.Nil(t, err)
err = tx.Commit()
assert.Nil(t, err)
v, err = sdb.db.Get(k0)
assert.Nil(t, err)
assert.Equal(t, v0, v)
// make checkpoints with the current state
bn, err := sdb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, common.BatchNum(0), bn)
err = sdb.MakeCheckpoint()
assert.Nil(t, err)
bn, err = sdb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, common.BatchNum(1), bn)
// write more data
tx, err = sdb.db.NewTx()
assert.Nil(t, err)
err = tx.Put(k1, v1)
assert.Nil(t, err)
err = tx.Commit()
assert.Nil(t, err)
v, err = sdb.db.Get(k1)
assert.Nil(t, err)
assert.Equal(t, v1, v)
// call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey1'&'testvalue1' data)
sdb, err = NewStateDB(dir, false, 0)
assert.Nil(t, err)
v, err = sdb.db.Get(k0)
assert.Nil(t, err)
assert.Equal(t, v0, v)
v, err = sdb.db.Get(k1)
assert.NotNil(t, err)
assert.Equal(t, db.ErrNotFound, err)
assert.Nil(t, v)
}
func TestStateDBWithoutMT(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.Nil(t, err)

View File

@@ -322,7 +322,10 @@ func (s *StateDB) setIdx(idx common.Idx) error {
if err != nil {
return err
}
tx.Put(keyidx, idx.Bytes())
err = tx.Put(keyidx, idx.Bytes())
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}