Delete old checkpoints in stateDB automatically

Introduce a constructor parameter for the StateDB called `keep`, which tells
how many checkpoints to keep.  When doing a new checkpoint, if the number of
existing checkpoints exeeds `keep`, the oldest ones will be deleted.
This commit is contained in:
Eduard S
2020-12-23 18:06:36 +01:00
parent 35d598f564
commit 2205fcadbc
17 changed files with 192 additions and 64 deletions

View File

@@ -3,9 +3,12 @@ package statedb
import (
"errors"
"fmt"
"io/ioutil"
"math/big"
"os"
"strconv"
"path"
"sort"
"strings"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/log"
@@ -51,10 +54,10 @@ var (
const (
// PathBatchNum defines the subpath of the Batch Checkpoint in the
// subpath of the StateDB
PathBatchNum = "/BatchNum"
PathBatchNum = "BatchNum"
// PathCurrent defines the subpath of the current Batch in the subpath
// of the StateDB
PathCurrent = "/current"
PathCurrent = "current"
// TypeSynchronizer defines a StateDB used by the Synchronizer, that
// generates the ExitTree when processing the txs
TypeSynchronizer = "synchronizer"
@@ -85,14 +88,16 @@ type StateDB struct {
// AccumulatedFees contains the accumulated fees for each token (Coord
// Idx) in the processed batch
AccumulatedFees map[common.Idx]*big.Int
keep int
}
// NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk
// storage
func NewStateDB(path string, typ TypeStateDB, nLevels int, chainID uint16) (*StateDB, error) {
// storage. Checkpoints older than the value defined by `keep` will be
// deleted.
func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int, chainID uint16) (*StateDB, error) {
var sto *pebble.PebbleStorage
var err error
sto, err = pebble.NewPebbleStorage(path+PathCurrent, false)
sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -109,11 +114,12 @@ func NewStateDB(path string, typ TypeStateDB, nLevels int, chainID uint16) (*Sta
}
sdb := &StateDB{
path: path,
path: pathDB,
db: sto,
mt: mt,
typ: typ,
chainID: chainID,
keep: keep,
}
// load currentBatch
@@ -170,10 +176,9 @@ func (s *StateDB) MakeCheckpoint() error {
s.currentBatch++
log.Debugw("Making StateDB checkpoint", "batch", s.currentBatch, "type", s.typ)
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(s.currentBatch))
checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, s.currentBatch))
err := s.setCurrentBatch()
if err != nil {
if err := s.setCurrentBatch(); err != nil {
return tracerr.Wrap(err)
}
@@ -188,8 +193,11 @@ func (s *StateDB) MakeCheckpoint() error {
}
// execute Checkpoint
err = s.db.Pebble().Checkpoint(checkpointPath)
if err != nil {
if err := s.db.Pebble().Checkpoint(checkpointPath); err != nil {
return tracerr.Wrap(err)
}
// delete old checkpoints
if err := s.deleteOldCheckpoints(); err != nil {
return tracerr.Wrap(err)
}
@@ -198,7 +206,7 @@ func (s *StateDB) MakeCheckpoint() error {
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum
func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error {
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
@@ -207,6 +215,55 @@ func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error {
return os.RemoveAll(checkpointPath)
}
// listCheckpoints returns the list of batchNums of the checkpoints, sorted.
// If there's a gap between the list of checkpoints, an error is returned.
func (s *StateDB) listCheckpoints() ([]int, error) {
files, err := ioutil.ReadDir(s.path)
if err != nil {
return nil, err
}
checkpoints := []int{}
var checkpoint int
pattern := fmt.Sprintf("%s%%d", PathBatchNum)
for _, file := range files {
fileName := file.Name()
if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
return nil, err
}
checkpoints = append(checkpoints, checkpoint)
}
}
sort.Ints(checkpoints)
if len(checkpoints) > 0 {
first := checkpoints[0]
for _, checkpoint := range checkpoints[1:] {
first++
if checkpoint != first {
return nil, fmt.Errorf("checkpoint gap at %v", checkpoint)
}
}
}
return checkpoints, nil
}
// deleteOldCheckpoints deletes old checkpoints when there are more than
// `s.keep` checkpoints
func (s *StateDB) deleteOldCheckpoints() error {
list, err := s.listCheckpoints()
if err != nil {
return err
}
if len(list) > s.keep {
for _, checkpoint := range list[:len(list)-s.keep] {
if err := s.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
return err
}
}
}
return nil
}
func pebbleMakeCheckpoint(source, dest string) error {
// Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) {
@@ -252,7 +309,7 @@ func (s *StateDB) Reset(batchNum common.BatchNum) error {
// deleted when MakeCheckpoint overwrites them. `closeCurrent` will close the
// currently opened db before doing the reset.
func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
currentPath := s.path + PathCurrent
currentPath := path.Join(s.path, PathCurrent)
if closeCurrent {
if err := s.db.Pebble().Close(); err != nil {
@@ -264,6 +321,12 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
if err != nil {
return tracerr.Wrap(err)
}
// remove all checkpoints > batchNum
for i := batchNum + 1; i <= s.currentBatch; i++ {
if err := s.DeleteCheckpoint(i); err != nil {
return err
}
}
if batchNum == 0 {
// if batchNum == 0, open the new fresh 'current'
sto, err := pebble.NewPebbleStorage(currentPath, false)
@@ -285,7 +348,7 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
return nil
}
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// copy 'BatchNumX' to 'current'
err = pebbleMakeCheckpoint(checkpointPath, currentPath)
if err != nil {
@@ -521,9 +584,11 @@ type LocalStateDB struct {
}
// NewLocalStateDB returns a new LocalStateDB connected to the given
// synchronizerDB
func NewLocalStateDB(path string, synchronizerDB *StateDB, typ TypeStateDB, nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB(path, typ, nLevels, synchronizerDB.chainID)
// synchronizerDB. Checkpoints older than the value defined by `keep` will be
// deleted.
func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeStateDB,
nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB(path, keep, typ, nLevels, synchronizerDB.chainID)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -541,9 +606,10 @@ func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) er
return nil
}
synchronizerCheckpointPath := l.synchronizerStateDB.path + PathBatchNum + strconv.Itoa(int(batchNum))
checkpointPath := l.path + PathBatchNum + strconv.Itoa(int(batchNum))
currentPath := l.path + PathCurrent
synchronizerCheckpointPath := path.Join(l.synchronizerStateDB.path,
fmt.Sprintf("%s%d", PathBatchNum, batchNum))
checkpointPath := path.Join(l.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
currentPath := path.Join(l.path, PathCurrent)
if fromSynchronizer {
// use checkpoint from SynchronizerStateDB

View File

@@ -46,7 +46,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err)
// test values
@@ -68,7 +68,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
// call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey0'&'testvalue0' data)
sdb, err = NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err)
v, err = sdb.db.Get(k0)
assert.NotNil(t, err)
@@ -110,7 +110,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
// call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey1'&'testvalue1' data)
sdb, err = NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err)
v, err = sdb.db.Get(k0)
@@ -129,7 +129,7 @@ func TestStateDBWithoutMT(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err)
// create test accounts
@@ -184,7 +184,7 @@ func TestStateDBWithMT(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err)
// create test accounts
@@ -237,7 +237,7 @@ func TestCheckpoints(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err)
// create test accounts
@@ -291,20 +291,20 @@ func TestCheckpoints(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, common.BatchNum(4), cb)
err = sdb.DeleteCheckpoint(common.BatchNum(9))
err = sdb.DeleteCheckpoint(common.BatchNum(1))
assert.NoError(t, err)
err = sdb.DeleteCheckpoint(common.BatchNum(10))
err = sdb.DeleteCheckpoint(common.BatchNum(2))
assert.NoError(t, err)
err = sdb.DeleteCheckpoint(common.BatchNum(9)) // does not exist, should return err
err = sdb.DeleteCheckpoint(common.BatchNum(1)) // does not exist, should return err
assert.NotNil(t, err)
err = sdb.DeleteCheckpoint(common.BatchNum(11)) // does not exist, should return err
err = sdb.DeleteCheckpoint(common.BatchNum(2)) // does not exist, should return err
assert.NotNil(t, err)
// Create a LocalStateDB from the initial StateDB
dirLocal, err := ioutil.TempDir("", "ldb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dirLocal))
ldb, err := NewLocalStateDB(dirLocal, sdb, TypeBatchBuilder, 32)
ldb, err := NewLocalStateDB(dirLocal, 128, sdb, TypeBatchBuilder, 32)
assert.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@@ -325,7 +325,7 @@ func TestCheckpoints(t *testing.T) {
dirLocal2, err := ioutil.TempDir("", "ldb2")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dirLocal2))
ldb2, err := NewLocalStateDB(dirLocal2, sdb, TypeBatchBuilder, 32)
ldb2, err := NewLocalStateDB(dirLocal2, 128, sdb, TypeBatchBuilder, 32)
assert.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@@ -355,7 +355,7 @@ func TestStateDBGetAccounts(t *testing.T) {
require.NoError(t, err)
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err)
// create test accounts
@@ -403,7 +403,7 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
require.NoError(t, err)
ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1))
@@ -469,3 +469,56 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) {
// root value generated by js version:
assert.Equal(t, "17298264051379321456969039521810887093935433569451713402227686942080129181291", sdb.mt.Root().BigInt().String())
}
func TestListCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
require.NoError(t, err)
numCheckpoints := 16
// do checkpoints
for i := 0; i < numCheckpoints; i++ {
err = sdb.MakeCheckpoint()
require.NoError(t, err)
}
list, err := sdb.listCheckpoints()
require.NoError(t, err)
assert.Equal(t, numCheckpoints, len(list))
assert.Equal(t, 1, list[0])
assert.Equal(t, numCheckpoints, list[len(list)-1])
numReset := 10
err = sdb.Reset(common.BatchNum(numReset))
require.NoError(t, err)
list, err = sdb.listCheckpoints()
require.NoError(t, err)
assert.Equal(t, numReset, len(list))
assert.Equal(t, 1, list[0])
assert.Equal(t, numReset, list[len(list)-1])
}
func TestDeleteOldCheckpoints(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
keep := 16
sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32, chainID)
require.NoError(t, err)
numCheckpoints := 32
// do checkpoints and check that we never have more than `keep`
// checkpoints
for i := 0; i < numCheckpoints; i++ {
err = sdb.MakeCheckpoint()
require.NoError(t, err)
checkpoints, err := sdb.listCheckpoints()
require.NoError(t, err)
assert.LessOrEqual(t, len(checkpoints), keep)
}
}

View File

@@ -29,7 +29,7 @@ func TestComputeEffectiveAmounts(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err)
set := `
@@ -203,7 +203,7 @@ func TestProcessTxsBalances(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err)
// generate test transactions from test.SetBlockchain0 code
@@ -336,7 +336,7 @@ func TestProcessTxsSynchronizer(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID)
assert.NoError(t, err)
// generate test transactions from test.SetBlockchain0 code
@@ -465,7 +465,7 @@ func TestProcessTxsBatchBuilder(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.NoError(t, err)
// generate test transactions from test.SetBlockchain0 code
@@ -554,7 +554,7 @@ func TestProcessTxsRootTestVectors(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.NoError(t, err)
// same values than in the js test
@@ -603,7 +603,7 @@ func TestCreateAccountDepositMaxValue(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.NoError(t, err)
users := generateJsUsers(t)

View File

@@ -21,7 +21,7 @@ func TestGetIdx(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID)
assert.NoError(t, err)
var sk babyjub.PrivateKey

View File

@@ -79,7 +79,7 @@ func TestZKInputsHashTestVector0(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -154,7 +154,7 @@ func TestZKInputsHashTestVector1(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -254,7 +254,7 @@ func TestZKInputsEmpty(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
ptc := ProcessTxsConfig{
@@ -403,7 +403,7 @@ func TestZKInputs0(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -491,7 +491,7 @@ func TestZKInputs1(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -598,7 +598,7 @@ func TestZKInputs2(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -742,7 +742,7 @@ func TestZKInputs3(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -886,7 +886,7 @@ func TestZKInputs4(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -1040,7 +1040,7 @@ func TestZKInputs5(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// same values than in the js test
@@ -1164,7 +1164,7 @@ func TestZKInputs6(t *testing.T) {
nLevels := 16
chainID := uint16(0)
sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID)
sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID)
assert.Nil(t, err)
// Coordinator Idx where to send the fees