Implement StateDB Checkpoints & Resets system

This commit is contained in:
arnaucube
2020-08-17 11:27:33 +02:00
parent db927ddd24
commit 205db8e4d3
4 changed files with 343 additions and 46 deletions

View File

@@ -1,13 +1,17 @@
package statedb
import (
"encoding/binary"
"errors"
"fmt"
"os"
"os/exec"
"strconv"
"github.com/hermeznetwork/hermez-node/common"
"github.com/iden3/go-merkletree"
"github.com/iden3/go-merkletree/db"
"github.com/iden3/go-merkletree/db/leveldb"
"github.com/iden3/go-merkletree/db/memory"
"github.com/iden3/go-merkletree/db/pebble"
)
// ErrStateDBWithoutMT is used when a method that requires a MerkleTree is called in a StateDB that does not have a MerkleTree defined
@@ -16,25 +20,30 @@ var ErrStateDBWithoutMT = errors.New("Can not call method to use MerkleTree in a
// ErrAccountAlreadyExists is used when CreateAccount is called and the Account already exists
var ErrAccountAlreadyExists = errors.New("Can not CreateAccount because Account already exists")
// KEYCURRENTBATCH is used as key in the db to store the current BatchNum
var KEYCURRENTBATCH = []byte("currentbatch")
// STATEDBPATH defines the subpath of the StateDB
const STATEDBPATH = "/statedb"
// StateDB represents the StateDB object
type StateDB struct {
db db.Storage
mt *merkletree.MerkleTree
path string
currentBatch uint64
db *pebble.PebbleStorage
mt *merkletree.MerkleTree
}
// NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk
// storage
func NewStateDB(path string, inDisk bool, withMT bool, nLevels int) (*StateDB, error) {
var sto db.Storage
func NewStateDB(path string, withMT bool, nLevels int) (*StateDB, error) {
var sto *pebble.PebbleStorage
var err error
if inDisk {
sto, err = leveldb.NewLevelDbStorage(path, false)
if err != nil {
return nil, err
}
} else {
sto = memory.NewMemoryStorage()
sto, err = pebble.NewPebbleStorage(path+STATEDBPATH+"/current", false)
if err != nil {
return nil, err
}
var mt *merkletree.MerkleTree = nil
if withMT {
mt, err = merkletree.NewMerkleTree(sto, nLevels)
@@ -43,34 +52,126 @@ func NewStateDB(path string, inDisk bool, withMT bool, nLevels int) (*StateDB, e
}
}
return &StateDB{
db: sto,
mt: mt,
}, nil
sdb := &StateDB{
path: path + STATEDBPATH,
db: sto,
mt: mt,
}
// load currentBatch
sdb.currentBatch, err = sdb.GetCurrentBatch()
if err != nil {
return nil, err
}
return sdb, nil
}
// DB returns the *pebble.PebbleStorage from the StateDB
func (s *StateDB) DB() *pebble.PebbleStorage {
return s.db
}
// GetCurrentBatch returns the current BatchNum stored in the StateDB
func (s *StateDB) GetCurrentBatch() (uint64, error) {
cbBytes, err := s.db.Get(KEYCURRENTBATCH)
if err == db.ErrNotFound {
return 0, nil
}
if err != nil {
return 0, err
}
cb := binary.LittleEndian.Uint64(cbBytes[:8])
return cb, nil
}
// setCurrentBatch stores the current BatchNum in the StateDB
func (s *StateDB) setCurrentBatch() error {
tx, err := s.db.NewTx()
if err != nil {
return err
}
var cbBytes [8]byte
binary.LittleEndian.PutUint64(cbBytes[:], s.currentBatch)
tx.Put(KEYCURRENTBATCH, cbBytes[:])
if err := tx.Commit(); err != nil {
return err
}
return nil
}
// CheckPointAt does a checkpoint at the given batchNum in the defined path
func (s *StateDB) CheckPointAt(batchNum uint64, path string) error {
// TODO
func (s *StateDB) MakeCheckpoint() error {
// advance currentBatch
s.currentBatch++
checkpointPath := s.path + "/BatchNum" + strconv.Itoa(int(s.currentBatch))
s.setCurrentBatch()
// if checkpoint BatchNum already exist in disk, delete it
if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
err := os.RemoveAll(checkpointPath)
if err != nil {
return err
}
}
// execute Checkpoint
err := s.db.Pebble().Checkpoint(checkpointPath)
if err != nil {
return err
}
return nil
}
// Reset resets the StateDB to the checkpoint at the given batchNum
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum
func (s *StateDB) DeleteCheckpoint(batchNum uint64) error {
checkpointPath := s.path + "/BatchNum" + strconv.Itoa(int(batchNum))
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
return fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)
}
return os.RemoveAll(checkpointPath)
}
// Reset resets the StateDB to the checkpoint at the given batchNum. Reset
// does not delete the checkpoints between old current and the new current,
// those checkpoints will remain in the storage, and eventually will be
// deleted when MakeCheckpoint overwrites them.
func (s *StateDB) Reset(batchNum uint64) error {
// TODO
checkpointPath := s.path + "/BatchNum" + strconv.Itoa(int(batchNum))
currentPath := s.path + "/current"
// remove 'current'
err := os.RemoveAll(currentPath)
if err != nil {
return err
}
// copy 'BatchNumX' to 'current'
cmd := exec.Command("cp", "-r", checkpointPath, currentPath)
err = cmd.Run()
if err != nil {
return err
}
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
if err != nil {
return err
}
s.db = sto
// get currentBatch num
s.currentBatch, err = s.GetCurrentBatch()
if err != nil {
return err
}
return nil
}
// Checkpoints returns a list of the checkpoints (batchNums)
func (s *StateDB) Checkpoints() ([]uint64, error) {
// TODO
//batchnums, err
return nil, nil
}
// GetAccount returns the account for the given Idx
func (s *StateDB) GetAccount(idx common.Idx) (*common.Account, error) {
vBytes, err := s.db.Get(idx.Bytes())
@@ -195,8 +296,8 @@ type LocalStateDB struct {
// NewLocalStateDB returns a new LocalStateDB connected to the given
// synchronizerDB
func NewLocalStateDB(synchronizerDB *StateDB, withMT bool, nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB("", false, withMT, nLevels)
func NewLocalStateDB(path string, synchronizerDB *StateDB, withMT bool, nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB(path, withMT, nLevels)
if err != nil {
return nil, err
}
@@ -206,16 +307,53 @@ func NewLocalStateDB(synchronizerDB *StateDB, withMT bool, nLevels int) (*LocalS
}, nil
}
// Reset performs a reset, getting the state from
// LocalStateDB.synchronizerStateDB for the given batchNum
// Reset performs a reset in the LocaStateDB. If fromSynchronizer is true, it
// gets the state from LocalStateDB.synchronizerStateDB for the given batchNum. If fromSynchronizer is false, get the state from LocalStateDB checkpoints.
func (l *LocalStateDB) Reset(batchNum uint64, fromSynchronizer bool) error {
// TODO
// if fromSynchronizer==true:
// make copy from l.synchronizerStateDB at the batchNum to the localStateDB
// if synchronizerStateDB does not have batchNum, return err
// the localStateDB checkpoint is set to batchNum
// else fromSynchronizer==false:
// the localStateDB checkpoint is set to batchNum
// if localStateDB does not have batchNum, return err
return nil
synchronizerCheckpointPath := l.synchronizerStateDB.path + "/BatchNum" + strconv.Itoa(int(batchNum))
checkpointPath := l.path + "/BatchNum" + strconv.Itoa(int(batchNum))
currentPath := l.path + "/current"
if fromSynchronizer {
// use checkpoint from SynchronizerStateDB
if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) {
// if synchronizerStateDB does not have checkpoint at batchNum, return err
return fmt.Errorf("Checkpoint not exist in Synchronizer")
}
// remove 'current'
err := os.RemoveAll(currentPath)
if err != nil {
return err
}
// copy synchronizer'BatchNumX' to 'current'
cmd := exec.Command("cp", "-r", synchronizerCheckpointPath, currentPath)
err = cmd.Run()
if err != nil {
return err
}
// copy synchronizer-'BatchNumX' to 'BatchNumX'
cmd = exec.Command("cp", "-r", synchronizerCheckpointPath, checkpointPath)
err = cmd.Run()
if err != nil {
return err
}
// open the new 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
if err != nil {
return err
}
l.db = sto
// get currentBatch num
l.currentBatch, err = l.GetCurrentBatch()
if err != nil {
return err
}
return nil
}
// use checkpoint from LocalStateDB
return l.StateDB.Reset(batchNum)
}

View File

@@ -2,6 +2,7 @@ package statedb
import (
"encoding/hex"
"fmt"
"io/ioutil"
"math/big"
"testing"
@@ -38,7 +39,7 @@ func TestStateDBWithoutMT(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.Nil(t, err)
sdb, err := NewStateDB(dir, false, false, 0)
sdb, err := NewStateDB(dir, false, 0)
assert.Nil(t, err)
// create test accounts
@@ -96,12 +97,12 @@ func TestStateDBWithMT(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.Nil(t, err)
sdb, err := NewStateDB(dir, false, true, 32)
sdb, err := NewStateDB(dir, true, 32)
assert.Nil(t, err)
// create test accounts
var accounts []*common.Account
for i := 0; i < 100; i++ {
for i := 0; i < 20; i++ {
accounts = append(accounts, newAccount(t, i))
}
@@ -142,3 +143,125 @@ func TestStateDBWithMT(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, accounts[1].Nonce, a.Nonce)
}
func TestCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "sdb")
require.Nil(t, err)
sdb, err := NewStateDB(dir, true, 32)
assert.Nil(t, err)
// create test accounts
var accounts []*common.Account
for i := 0; i < 10; i++ {
accounts = append(accounts, newAccount(t, i))
}
// add test accounts
for i := 0; i < len(accounts); i++ {
_, err = sdb.MTCreateAccount(common.Idx(i), accounts[i])
assert.Nil(t, err)
}
// do checkpoints and check that currentBatch is correct
err = sdb.MakeCheckpoint()
assert.Nil(t, err)
cb, err := sdb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(1), cb)
for i := 1; i < 10; i++ {
err = sdb.MakeCheckpoint()
assert.Nil(t, err)
cb, err = sdb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(i+1), cb)
}
// printCheckpoints(t, sdb.path)
// reset checkpoint
err = sdb.Reset(3)
assert.Nil(t, err)
// check that reset can be repeated (as there exist the 'current' and
// 'BatchNum3', from where the 'current' is a copy)
err = sdb.Reset(3)
require.Nil(t, err)
// check that currentBatch is as expected after Reset
cb, err = sdb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(3), cb)
// advance one checkpoint and check that currentBatch is fine
err = sdb.MakeCheckpoint()
assert.Nil(t, err)
cb, err = sdb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(4), cb)
err = sdb.DeleteCheckpoint(uint64(9))
assert.Nil(t, err)
err = sdb.DeleteCheckpoint(uint64(10))
assert.Nil(t, err)
err = sdb.DeleteCheckpoint(uint64(9)) // does not exist, should return err
assert.NotNil(t, err)
err = sdb.DeleteCheckpoint(uint64(11)) // does not exist, should return err
assert.NotNil(t, err)
// Create a LocalStateDB from the initial StateDB
dirLocal, err := ioutil.TempDir("", "ldb")
require.Nil(t, err)
ldb, err := NewLocalStateDB(dirLocal, sdb, true, 32)
assert.Nil(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
err = ldb.Reset(4, true)
assert.Nil(t, err)
// check that currentBatch is 4 after the Reset
cb, err = ldb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(4), cb)
// advance one checkpoint in ldb
err = ldb.MakeCheckpoint()
assert.Nil(t, err)
cb, err = ldb.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(5), cb)
// Create a 2nd LocalStateDB from the initial StateDB
dirLocal2, err := ioutil.TempDir("", "ldb2")
require.Nil(t, err)
ldb2, err := NewLocalStateDB(dirLocal2, sdb, true, 32)
assert.Nil(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
err = ldb2.Reset(4, true)
assert.Nil(t, err)
// check that currentBatch is 4 after the Reset
cb, err = ldb2.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(4), cb)
// advance one checkpoint in ldb2
err = ldb2.MakeCheckpoint()
assert.Nil(t, err)
cb, err = ldb2.GetCurrentBatch()
assert.Nil(t, err)
assert.Equal(t, uint64(5), cb)
// printCheckpoints(t, sdb.path)
// printCheckpoints(t, ldb.path)
// printCheckpoints(t, ldb2.path)
}
func printCheckpoints(t *testing.T, path string) {
files, err := ioutil.ReadDir(path)
assert.Nil(t, err)
fmt.Println(path)
for _, f := range files {
fmt.Println(" " + f.Name())
}
}