From 35d598f564c6f258aaa26a9ec0c9123aa6dc4dee Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 23 Dec 2020 17:33:41 +0100 Subject: [PATCH 1/3] Don't log errors when context done --- node/node.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/node/node.go b/node/node.go index 9e38083..d75757b 100644 --- a/node/node.go +++ b/node/node.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/http" - "strings" "sync" "time" @@ -409,22 +408,17 @@ func (n *Node) handleReorg(stats *synchronizer.Stats) { // TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we // don't have to pass it around. -func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration) { - blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock) +func (n *Node) syncLoopFn(ctx context.Context, lastBlock *common.Block) (*common.Block, time.Duration, error) { + blockData, discarded, err := n.sync.Sync2(ctx, lastBlock) stats := n.sync.Stats() if err != nil { // case: error - if strings.Contains(err.Error(), "context canceled") { - log.Warnw("Synchronizer.Sync", "err", err) - } else { - log.Errorw("Synchronizer.Sync", "err", err) - } - return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration + return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration, err } else if discarded != nil { // case: reorg log.Infow("Synchronizer.Sync reorg", "discarded", *discarded) n.handleReorg(stats) - return nil, time.Duration(0) + return nil, time.Duration(0), nil } else if blockData != nil { // case: new block n.handleNewBlock(stats, synchronizer.SCVariablesPtr{ @@ -432,10 +426,10 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration Auction: blockData.Auction.Vars, WDelayer: blockData.WDelayer.Vars, }, blockData.Rollup.Batches) - return &blockData.Block, time.Duration(0) + return &blockData.Block, time.Duration(0), nil } else { // case: no block - return lastBlock, n.cfg.Synchronizer.SyncLoopInterval.Duration + return lastBlock, n.cfg.Synchronizer.SyncLoopInterval.Duration, nil } } @@ -454,6 +448,7 @@ func (n *Node) StartSynchronizer() { n.wg.Add(1) go func() { + var err error var lastBlock *common.Block waitDuration := time.Duration(0) for { @@ -463,7 +458,13 @@ func (n *Node) StartSynchronizer() { n.wg.Done() return case <-time.After(waitDuration): - lastBlock, waitDuration = n.syncLoopFn(lastBlock) + if lastBlock, waitDuration, err = n.syncLoopFn(n.ctx, + lastBlock); err != nil { + if n.ctx.Err() != nil { + continue + } + log.Errorw("Synchronizer.Sync", "err", err) + } } } }() @@ -496,6 +497,9 @@ func (n *Node) StartDebugAPI() { n.wg.Done() }() if err := n.debugAPI.Run(n.ctx); err != nil { + if n.ctx.Err() != nil { + return + } log.Fatalw("DebugAPI.Run", "err", err) } }() @@ -511,6 +515,9 @@ func (n *Node) StartNodeAPI() { n.wg.Done() }() if err := n.nodeAPI.Run(n.ctx); err != nil { + if n.ctx.Err() != nil { + return + } log.Fatalw("NodeAPI.Run", "err", err) } }() From 2205fcadbc946b4bdd1d5555cdbe87bcc6d079b7 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 23 Dec 2020 18:06:36 +0100 Subject: [PATCH 2/3] Delete old checkpoints in stateDB automatically Introduce a constructor parameter for the StateDB called `keep`, which tells how many checkpoints to keep. When doing a new checkpoint, if the number of existing checkpoints exeeds `keep`, the oldest ones will be deleted. --- api/api_test.go | 2 +- batchbuilder/batchbuilder.go | 3 +- batchbuilder/batchbuilder_test.go | 2 +- cli/node/cfg.buidler.toml | 1 + config/config.go | 3 +- coordinator/coordinator_test.go | 2 +- coordinator/purger_test.go | 4 +- db/statedb/statedb.go | 108 ++++++++++++++++++++++++------ db/statedb/statedb_test.go | 81 ++++++++++++++++++---- db/statedb/txprocessors_test.go | 12 ++-- db/statedb/utils_test.go | 2 +- db/statedb/zkinputsgen_test.go | 20 +++--- node/node.go | 8 ++- synchronizer/synchronizer_test.go | 2 +- test/debugapi/debugapi_test.go | 2 +- txselector/txselector.go | 2 +- txselector/txselector_test.go | 2 +- 17 files changed, 192 insertions(+), 64 deletions(-) diff --git a/api/api_test.go b/api/api_test.go index 2651518..d1c6b26 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -212,7 +212,7 @@ func TestMain(m *testing.M) { } }() chainID := uint16(0) - sdb, err := statedb.NewStateDB(dir, statedb.TypeTxSelector, 0, chainID) + sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0, chainID) if err != nil { panic(err) } diff --git a/batchbuilder/batchbuilder.go b/batchbuilder/batchbuilder.go index 3d917a6..d8c9670 100644 --- a/batchbuilder/batchbuilder.go +++ b/batchbuilder/batchbuilder.go @@ -29,7 +29,8 @@ type ConfigBatch struct { // NewBatchBuilder constructs a new BatchBuilder, and executes the bb.Reset // method func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) (*BatchBuilder, error) { - localStateDB, err := statedb.NewLocalStateDB(dbpath, synchronizerStateDB, statedb.TypeBatchBuilder, int(nLevels)) + localStateDB, err := statedb.NewLocalStateDB(dbpath, 128, synchronizerStateDB, + statedb.TypeBatchBuilder, int(nLevels)) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/batchbuilder/batchbuilder_test.go b/batchbuilder/batchbuilder_test.go index 987b12c..899eac2 100644 --- a/batchbuilder/batchbuilder_test.go +++ b/batchbuilder/batchbuilder_test.go @@ -16,7 +16,7 @@ func TestBatchBuilder(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) chainID := uint16(0) - synchDB, err := statedb.NewStateDB(dir, statedb.TypeBatchBuilder, 0, chainID) + synchDB, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 0, chainID) assert.Nil(t, err) bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB") diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 564e5fd..e45a14d 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -14,6 +14,7 @@ APIAddress = "localhost:12345" [StateDB] Path = "/tmp/iden3-test/hermez/statedb" +Keep = 256 [PostgreSQL] Port = 5432 diff --git a/config/config.go b/config/config.go index b3dc03c..42e79da 100644 --- a/config/config.go +++ b/config/config.go @@ -102,7 +102,8 @@ type Node struct { Type string `valudate:"required"` } `validate:"required"` StateDB struct { - Path string + Path string `validate:"required"` + Keep int `validate:"required"` } `validate:"required"` PostgreSQL struct { Port int `validate:"required"` diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index c961686..261f199 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -98,7 +98,7 @@ func newTestModules(t *testing.T) modules { syncDBPath, err = ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) deleteme = append(deleteme, syncDBPath) - syncStateDB, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, 48, chainID) + syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48, chainID) assert.NoError(t, err) pass := os.Getenv("POSTGRES_PASS") diff --git a/coordinator/purger_test.go b/coordinator/purger_test.go index 578e545..66e23c2 100644 --- a/coordinator/purger_test.go +++ b/coordinator/purger_test.go @@ -28,12 +28,12 @@ func newStateDB(t *testing.T) *statedb.LocalStateDB { syncDBPath, err := ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) deleteme = append(deleteme, syncDBPath) - syncStateDB, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, 48, chainID) + syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48, chainID) assert.NoError(t, err) stateDBPath, err := ioutil.TempDir("", "tmpStateDB") require.NoError(t, err) deleteme = append(deleteme, stateDBPath) - stateDB, err := statedb.NewLocalStateDB(stateDBPath, syncStateDB, statedb.TypeTxSelector, 0) + stateDB, err := statedb.NewLocalStateDB(stateDBPath, 128, syncStateDB, statedb.TypeTxSelector, 0) require.NoError(t, err) return stateDB } diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index eea045a..050f756 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -3,9 +3,12 @@ package statedb import ( "errors" "fmt" + "io/ioutil" "math/big" "os" - "strconv" + "path" + "sort" + "strings" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/log" @@ -51,10 +54,10 @@ var ( const ( // PathBatchNum defines the subpath of the Batch Checkpoint in the // subpath of the StateDB - PathBatchNum = "/BatchNum" + PathBatchNum = "BatchNum" // PathCurrent defines the subpath of the current Batch in the subpath // of the StateDB - PathCurrent = "/current" + PathCurrent = "current" // TypeSynchronizer defines a StateDB used by the Synchronizer, that // generates the ExitTree when processing the txs TypeSynchronizer = "synchronizer" @@ -85,14 +88,16 @@ type StateDB struct { // AccumulatedFees contains the accumulated fees for each token (Coord // Idx) in the processed batch AccumulatedFees map[common.Idx]*big.Int + keep int } // NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk -// storage -func NewStateDB(path string, typ TypeStateDB, nLevels int, chainID uint16) (*StateDB, error) { +// storage. Checkpoints older than the value defined by `keep` will be +// deleted. +func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int, chainID uint16) (*StateDB, error) { var sto *pebble.PebbleStorage var err error - sto, err = pebble.NewPebbleStorage(path+PathCurrent, false) + sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false) if err != nil { return nil, tracerr.Wrap(err) } @@ -109,11 +114,12 @@ func NewStateDB(path string, typ TypeStateDB, nLevels int, chainID uint16) (*Sta } sdb := &StateDB{ - path: path, + path: pathDB, db: sto, mt: mt, typ: typ, chainID: chainID, + keep: keep, } // load currentBatch @@ -170,10 +176,9 @@ func (s *StateDB) MakeCheckpoint() error { s.currentBatch++ log.Debugw("Making StateDB checkpoint", "batch", s.currentBatch, "type", s.typ) - checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(s.currentBatch)) + checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, s.currentBatch)) - err := s.setCurrentBatch() - if err != nil { + if err := s.setCurrentBatch(); err != nil { return tracerr.Wrap(err) } @@ -188,8 +193,11 @@ func (s *StateDB) MakeCheckpoint() error { } // execute Checkpoint - err = s.db.Pebble().Checkpoint(checkpointPath) - if err != nil { + if err := s.db.Pebble().Checkpoint(checkpointPath); err != nil { + return tracerr.Wrap(err) + } + // delete old checkpoints + if err := s.deleteOldCheckpoints(); err != nil { return tracerr.Wrap(err) } @@ -198,7 +206,7 @@ func (s *StateDB) MakeCheckpoint() error { // DeleteCheckpoint removes if exist the checkpoint of the given batchNum func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error { - checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum)) + checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) @@ -207,6 +215,55 @@ func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error { return os.RemoveAll(checkpointPath) } +// listCheckpoints returns the list of batchNums of the checkpoints, sorted. +// If there's a gap between the list of checkpoints, an error is returned. +func (s *StateDB) listCheckpoints() ([]int, error) { + files, err := ioutil.ReadDir(s.path) + if err != nil { + return nil, err + } + checkpoints := []int{} + var checkpoint int + pattern := fmt.Sprintf("%s%%d", PathBatchNum) + for _, file := range files { + fileName := file.Name() + if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) { + if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil { + return nil, err + } + checkpoints = append(checkpoints, checkpoint) + } + } + sort.Ints(checkpoints) + if len(checkpoints) > 0 { + first := checkpoints[0] + for _, checkpoint := range checkpoints[1:] { + first++ + if checkpoint != first { + return nil, fmt.Errorf("checkpoint gap at %v", checkpoint) + } + } + } + return checkpoints, nil +} + +// deleteOldCheckpoints deletes old checkpoints when there are more than +// `s.keep` checkpoints +func (s *StateDB) deleteOldCheckpoints() error { + list, err := s.listCheckpoints() + if err != nil { + return err + } + if len(list) > s.keep { + for _, checkpoint := range list[:len(list)-s.keep] { + if err := s.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { + return err + } + } + } + return nil +} + func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint if _, err := os.Stat(dest); !os.IsNotExist(err) { @@ -252,7 +309,7 @@ func (s *StateDB) Reset(batchNum common.BatchNum) error { // deleted when MakeCheckpoint overwrites them. `closeCurrent` will close the // currently opened db before doing the reset. func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error { - currentPath := s.path + PathCurrent + currentPath := path.Join(s.path, PathCurrent) if closeCurrent { if err := s.db.Pebble().Close(); err != nil { @@ -264,6 +321,12 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error { if err != nil { return tracerr.Wrap(err) } + // remove all checkpoints > batchNum + for i := batchNum + 1; i <= s.currentBatch; i++ { + if err := s.DeleteCheckpoint(i); err != nil { + return err + } + } if batchNum == 0 { // if batchNum == 0, open the new fresh 'current' sto, err := pebble.NewPebbleStorage(currentPath, false) @@ -285,7 +348,7 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error { return nil } - checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum)) + checkpointPath := path.Join(s.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) // copy 'BatchNumX' to 'current' err = pebbleMakeCheckpoint(checkpointPath, currentPath) if err != nil { @@ -521,9 +584,11 @@ type LocalStateDB struct { } // NewLocalStateDB returns a new LocalStateDB connected to the given -// synchronizerDB -func NewLocalStateDB(path string, synchronizerDB *StateDB, typ TypeStateDB, nLevels int) (*LocalStateDB, error) { - s, err := NewStateDB(path, typ, nLevels, synchronizerDB.chainID) +// synchronizerDB. Checkpoints older than the value defined by `keep` will be +// deleted. +func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeStateDB, + nLevels int) (*LocalStateDB, error) { + s, err := NewStateDB(path, keep, typ, nLevels, synchronizerDB.chainID) if err != nil { return nil, tracerr.Wrap(err) } @@ -541,9 +606,10 @@ func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) er return nil } - synchronizerCheckpointPath := l.synchronizerStateDB.path + PathBatchNum + strconv.Itoa(int(batchNum)) - checkpointPath := l.path + PathBatchNum + strconv.Itoa(int(batchNum)) - currentPath := l.path + PathCurrent + synchronizerCheckpointPath := path.Join(l.synchronizerStateDB.path, + fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + checkpointPath := path.Join(l.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + currentPath := path.Join(l.path, PathCurrent) if fromSynchronizer { // use checkpoint from SynchronizerStateDB diff --git a/db/statedb/statedb_test.go b/db/statedb/statedb_test.go index 007201b..6c1cea3 100644 --- a/db/statedb/statedb_test.go +++ b/db/statedb/statedb_test.go @@ -46,7 +46,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID) + sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID) assert.NoError(t, err) // test values @@ -68,7 +68,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { // call NewStateDB which should get the db at the last checkpoint state // executing a Reset (discarding the last 'testkey0'&'testvalue0' data) - sdb, err = NewStateDB(dir, TypeTxSelector, 0, chainID) + sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0, chainID) assert.NoError(t, err) v, err = sdb.db.Get(k0) assert.NotNil(t, err) @@ -110,7 +110,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { // call NewStateDB which should get the db at the last checkpoint state // executing a Reset (discarding the last 'testkey1'&'testvalue1' data) - sdb, err = NewStateDB(dir, TypeTxSelector, 0, chainID) + sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0, chainID) assert.NoError(t, err) v, err = sdb.db.Get(k0) @@ -129,7 +129,7 @@ func TestStateDBWithoutMT(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID) + sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID) assert.NoError(t, err) // create test accounts @@ -184,7 +184,7 @@ func TestStateDBWithMT(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) assert.NoError(t, err) // create test accounts @@ -237,7 +237,7 @@ func TestCheckpoints(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) assert.NoError(t, err) // create test accounts @@ -291,20 +291,20 @@ func TestCheckpoints(t *testing.T) { assert.NoError(t, err) assert.Equal(t, common.BatchNum(4), cb) - err = sdb.DeleteCheckpoint(common.BatchNum(9)) + err = sdb.DeleteCheckpoint(common.BatchNum(1)) assert.NoError(t, err) - err = sdb.DeleteCheckpoint(common.BatchNum(10)) + err = sdb.DeleteCheckpoint(common.BatchNum(2)) assert.NoError(t, err) - err = sdb.DeleteCheckpoint(common.BatchNum(9)) // does not exist, should return err + err = sdb.DeleteCheckpoint(common.BatchNum(1)) // does not exist, should return err assert.NotNil(t, err) - err = sdb.DeleteCheckpoint(common.BatchNum(11)) // does not exist, should return err + err = sdb.DeleteCheckpoint(common.BatchNum(2)) // does not exist, should return err assert.NotNil(t, err) // Create a LocalStateDB from the initial StateDB dirLocal, err := ioutil.TempDir("", "ldb") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dirLocal)) - ldb, err := NewLocalStateDB(dirLocal, sdb, TypeBatchBuilder, 32) + ldb, err := NewLocalStateDB(dirLocal, 128, sdb, TypeBatchBuilder, 32) assert.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -325,7 +325,7 @@ func TestCheckpoints(t *testing.T) { dirLocal2, err := ioutil.TempDir("", "ldb2") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dirLocal2)) - ldb2, err := NewLocalStateDB(dirLocal2, sdb, TypeBatchBuilder, 32) + ldb2, err := NewLocalStateDB(dirLocal2, 128, sdb, TypeBatchBuilder, 32) assert.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -355,7 +355,7 @@ func TestStateDBGetAccounts(t *testing.T) { require.NoError(t, err) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID) + sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID) assert.NoError(t, err) // create test accounts @@ -403,7 +403,7 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) require.NoError(t, err) ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1)) @@ -469,3 +469,56 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) { // root value generated by js version: assert.Equal(t, "17298264051379321456969039521810887093935433569451713402227686942080129181291", sdb.mt.Root().BigInt().String()) } + +func TestListCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "tmpdb") + require.NoError(t, err) + defer assert.NoError(t, os.RemoveAll(dir)) + + chainID := uint16(0) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) + require.NoError(t, err) + + numCheckpoints := 16 + // do checkpoints + for i := 0; i < numCheckpoints; i++ { + err = sdb.MakeCheckpoint() + require.NoError(t, err) + } + list, err := sdb.listCheckpoints() + require.NoError(t, err) + assert.Equal(t, numCheckpoints, len(list)) + assert.Equal(t, 1, list[0]) + assert.Equal(t, numCheckpoints, list[len(list)-1]) + + numReset := 10 + err = sdb.Reset(common.BatchNum(numReset)) + require.NoError(t, err) + list, err = sdb.listCheckpoints() + require.NoError(t, err) + assert.Equal(t, numReset, len(list)) + assert.Equal(t, 1, list[0]) + assert.Equal(t, numReset, list[len(list)-1]) +} + +func TestDeleteOldCheckpoints(t *testing.T) { + dir, err := ioutil.TempDir("", "tmpdb") + require.NoError(t, err) + defer assert.NoError(t, os.RemoveAll(dir)) + + chainID := uint16(0) + keep := 16 + sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32, chainID) + require.NoError(t, err) + + numCheckpoints := 32 + // do checkpoints and check that we never have more than `keep` + // checkpoints + for i := 0; i < numCheckpoints; i++ { + err = sdb.MakeCheckpoint() + require.NoError(t, err) + checkpoints, err := sdb.listCheckpoints() + require.NoError(t, err) + assert.LessOrEqual(t, len(checkpoints), keep) + } +} diff --git a/db/statedb/txprocessors_test.go b/db/statedb/txprocessors_test.go index 96722e7..bb3b57a 100644 --- a/db/statedb/txprocessors_test.go +++ b/db/statedb/txprocessors_test.go @@ -29,7 +29,7 @@ func TestComputeEffectiveAmounts(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) assert.NoError(t, err) set := ` @@ -203,7 +203,7 @@ func TestProcessTxsBalances(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) assert.NoError(t, err) // generate test transactions from test.SetBlockchain0 code @@ -336,7 +336,7 @@ func TestProcessTxsSynchronizer(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeSynchronizer, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32, chainID) assert.NoError(t, err) // generate test transactions from test.SetBlockchain0 code @@ -465,7 +465,7 @@ func TestProcessTxsBatchBuilder(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID) assert.NoError(t, err) // generate test transactions from test.SetBlockchain0 code @@ -554,7 +554,7 @@ func TestProcessTxsRootTestVectors(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID) assert.NoError(t, err) // same values than in the js test @@ -603,7 +603,7 @@ func TestCreateAccountDepositMaxValue(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.NoError(t, err) users := generateJsUsers(t) diff --git a/db/statedb/utils_test.go b/db/statedb/utils_test.go index 2691229..54fb13c 100644 --- a/db/statedb/utils_test.go +++ b/db/statedb/utils_test.go @@ -21,7 +21,7 @@ func TestGetIdx(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeTxSelector, 0, chainID) + sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0, chainID) assert.NoError(t, err) var sk babyjub.PrivateKey diff --git a/db/statedb/zkinputsgen_test.go b/db/statedb/zkinputsgen_test.go index 16c2eb1..e9d98f6 100644 --- a/db/statedb/zkinputsgen_test.go +++ b/db/statedb/zkinputsgen_test.go @@ -79,7 +79,7 @@ func TestZKInputsHashTestVector0(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID) assert.Nil(t, err) // same values than in the js test @@ -154,7 +154,7 @@ func TestZKInputsHashTestVector1(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, 32, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, 32, chainID) assert.Nil(t, err) // same values than in the js test @@ -254,7 +254,7 @@ func TestZKInputsEmpty(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) ptc := ProcessTxsConfig{ @@ -403,7 +403,7 @@ func TestZKInputs0(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // same values than in the js test @@ -491,7 +491,7 @@ func TestZKInputs1(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // same values than in the js test @@ -598,7 +598,7 @@ func TestZKInputs2(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // same values than in the js test @@ -742,7 +742,7 @@ func TestZKInputs3(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // same values than in the js test @@ -886,7 +886,7 @@ func TestZKInputs4(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // same values than in the js test @@ -1040,7 +1040,7 @@ func TestZKInputs5(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // same values than in the js test @@ -1164,7 +1164,7 @@ func TestZKInputs6(t *testing.T) { nLevels := 16 chainID := uint16(0) - sdb, err := NewStateDB(dir, TypeBatchBuilder, nLevels, chainID) + sdb, err := NewStateDB(dir, 128, TypeBatchBuilder, nLevels, chainID) assert.Nil(t, err) // Coordinator Idx where to send the fees diff --git a/node/node.go b/node/node.go index d75757b..5363828 100644 --- a/node/node.go +++ b/node/node.go @@ -130,7 +130,13 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { } chainIDU16 := uint16(chainIDU64) - stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, statedb.TypeSynchronizer, 32, chainIDU16) + const safeStateDBKeep = 128 + if cfg.StateDB.Keep < safeStateDBKeep { + return nil, tracerr.Wrap(fmt.Errorf("cfg.StateDB.Keep = %v < %v, which is unsafe", + cfg.StateDB.Keep, safeStateDBKeep)) + } + stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, cfg.StateDB.Keep, + statedb.TypeSynchronizer, 32, chainIDU16) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index a0210e8..58fa952 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -290,7 +290,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { require.NoError(t, err) deleteme = append(deleteme, dir) - stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID) + stateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32, chainID) require.NoError(t, err) // Init History DB diff --git a/test/debugapi/debugapi_test.go b/test/debugapi/debugapi_test.go index dada8d3..eff755e 100644 --- a/test/debugapi/debugapi_test.go +++ b/test/debugapi/debugapi_test.go @@ -45,7 +45,7 @@ func TestDebugAPI(t *testing.T) { require.Nil(t, err) chainID := uint16(0) - sdb, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID) + sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32, chainID) require.Nil(t, err) err = sdb.MakeCheckpoint() // Make a checkpoint to increment the batchNum require.Nil(t, err) diff --git a/txselector/txselector.go b/txselector/txselector.go index df3ac3c..833eaf8 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -72,7 +72,7 @@ type TxSelector struct { // NewTxSelector returns a *TxSelector func NewTxSelector(coordAccount *CoordAccount, dbpath string, synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) { - localAccountsDB, err := statedb.NewLocalStateDB(dbpath, + localAccountsDB, err := statedb.NewLocalStateDB(dbpath, 128, synchronizerStateDB, statedb.TypeTxSelector, 0) // without merkletree if err != nil { return nil, tracerr.Wrap(err) diff --git a/txselector/txselector_test.go b/txselector/txselector_test.go index ce77922..d7d15b6 100644 --- a/txselector/txselector_test.go +++ b/txselector/txselector_test.go @@ -29,7 +29,7 @@ func initTest(t *testing.T, chainID uint16, testSet string) *TxSelector { dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, statedb.TypeTxSelector, 0, chainID) + sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0, chainID) require.NoError(t, err) txselDir, err := ioutil.TempDir("", "tmpTxSelDB") From a0c8ace38d4707ec10567a153f5c16f3902ae294 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Thu, 24 Dec 2020 13:39:27 +0100 Subject: [PATCH 3/3] Add missing tracerr.Wrap --- db/statedb/statedb.go | 12 ++++++------ node/node.go | 2 +- priceupdater/priceupdater.go | 2 +- prover/prover.go | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index 050f756..5b3a4d7 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -220,7 +220,7 @@ func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error { func (s *StateDB) listCheckpoints() ([]int, error) { files, err := ioutil.ReadDir(s.path) if err != nil { - return nil, err + return nil, tracerr.Wrap(err) } checkpoints := []int{} var checkpoint int @@ -229,7 +229,7 @@ func (s *StateDB) listCheckpoints() ([]int, error) { fileName := file.Name() if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) { if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil { - return nil, err + return nil, tracerr.Wrap(err) } checkpoints = append(checkpoints, checkpoint) } @@ -240,7 +240,7 @@ func (s *StateDB) listCheckpoints() ([]int, error) { for _, checkpoint := range checkpoints[1:] { first++ if checkpoint != first { - return nil, fmt.Errorf("checkpoint gap at %v", checkpoint) + return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint)) } } } @@ -252,12 +252,12 @@ func (s *StateDB) listCheckpoints() ([]int, error) { func (s *StateDB) deleteOldCheckpoints() error { list, err := s.listCheckpoints() if err != nil { - return err + return tracerr.Wrap(err) } if len(list) > s.keep { for _, checkpoint := range list[:len(list)-s.keep] { if err := s.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { - return err + return tracerr.Wrap(err) } } } @@ -324,7 +324,7 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error { // remove all checkpoints > batchNum for i := batchNum + 1; i <= s.currentBatch; i++ { if err := s.DeleteCheckpoint(i); err != nil { - return err + return tracerr.Wrap(err) } } if batchNum == 0 { diff --git a/node/node.go b/node/node.go index 5363828..13c065a 100644 --- a/node/node.go +++ b/node/node.go @@ -419,7 +419,7 @@ func (n *Node) syncLoopFn(ctx context.Context, lastBlock *common.Block) (*common stats := n.sync.Stats() if err != nil { // case: error - return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration, err + return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration, tracerr.Wrap(err) } else if discarded != nil { // case: reorg log.Infow("Synchronizer.Sync reorg", "discarded", *discarded) diff --git a/priceupdater/priceupdater.go b/priceupdater/priceupdater.go index f8a87c8..75ae273 100644 --- a/priceupdater/priceupdater.go +++ b/priceupdater/priceupdater.go @@ -68,7 +68,7 @@ func getTokenPriceBitfinex(ctx context.Context, client *sling.Sling, return 0, tracerr.Wrap(err) } if res.StatusCode != http.StatusOK { - return 0, fmt.Errorf("http response is not is %v", res.StatusCode) + return 0, tracerr.Wrap(fmt.Errorf("http response is not is %v", res.StatusCode)) } return state[6], nil } diff --git a/prover/prover.go b/prover/prover.go index 3d8c6ba..53ddca4 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -48,7 +48,7 @@ func (p *Proof) UnmarshalJSON(data []byte) error { p.PiA[1] = (*big.Int)(proof.PiA[1]) p.PiA[2] = (*big.Int)(proof.PiA[2]) if p.PiA[2].Int64() != 1 { - return fmt.Errorf("Expected PiA[2] == 1, but got %v", p.PiA[2]) + return tracerr.Wrap(fmt.Errorf("Expected PiA[2] == 1, but got %v", p.PiA[2])) } p.PiB[0][0] = (*big.Int)(proof.PiB[0][0]) p.PiB[0][1] = (*big.Int)(proof.PiB[0][1]) @@ -57,13 +57,13 @@ func (p *Proof) UnmarshalJSON(data []byte) error { p.PiB[2][0] = (*big.Int)(proof.PiB[2][0]) p.PiB[2][1] = (*big.Int)(proof.PiB[2][1]) if p.PiB[2][0].Int64() != 1 || p.PiB[2][1].Int64() != 0 { - return fmt.Errorf("Expected PiB[2] == [1, 0], but got %v", p.PiB[2]) + return tracerr.Wrap(fmt.Errorf("Expected PiB[2] == [1, 0], but got %v", p.PiB[2])) } p.PiC[0] = (*big.Int)(proof.PiC[0]) p.PiC[1] = (*big.Int)(proof.PiC[1]) p.PiC[2] = (*big.Int)(proof.PiC[2]) if p.PiC[2].Int64() != 1 { - return fmt.Errorf("Expected PiC[2] == 1, but got %v", p.PiC[2]) + return tracerr.Wrap(fmt.Errorf("Expected PiC[2] == 1, but got %v", p.PiC[2])) } // TODO: Assert ones and zeroes p.Protocol = proof.Protocol