From 48a538faa398981d70b6d497cb7a7e96e41392f9 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Fri, 5 Feb 2021 18:03:59 +0100 Subject: [PATCH] Pass StateDB constructor parameters as Config type - KVDB/StateDB - Pass config parameters in a Config type instead of using many arguments in constructor. - Add new parameter `NoLast` which disables having an opened DB with a checkpoint to the last batchNum for thread-safe reads. Last will be disabled in the StateDB used by the TxSelector and BatchBuilder. - Add new parameter `NoGapsCheck` which skips checking gaps in the list of checkpoints and returning errors if there are gaps. Gaps check will be disabled in the StateDB used by the TxSelector and BatchBuilder, because we expect to have gaps when there are multiple coordinators forging (slots not forged by our coordinator will leave gaps). --- api/api_test.go | 2 +- batchbuilder/batchbuilder.go | 11 +- batchbuilder/batchbuilder_test.go | 2 +- coordinator/coordinator_test.go | 3 +- coordinator/pipeline.go | 16 ++- coordinator/purger_test.go | 6 +- db/kvdb/kvdb.go | 220 +++++++++++++++++------------- db/kvdb/kvdb_test.go | 24 ++-- db/statedb/statedb.go | 72 ++++++---- db/statedb/statedb_test.go | 40 +++--- db/statedb/utils_test.go | 2 +- node/node.go | 8 +- synchronizer/synchronizer_test.go | 2 +- test/debugapi/debugapi_test.go | 2 +- test/zkproof/flows_test.go | 3 +- test/zkproof/zkproof_test.go | 2 +- txprocessor/txprocessor.go | 38 +++--- txprocessor/txprocessor_test.go | 27 ++-- txprocessor/zkinputsgen_test.go | 30 ++-- txselector/txselector.go | 11 +- txselector/txselector_test.go | 3 +- 21 files changed, 314 insertions(+), 210 deletions(-) diff --git a/api/api_test.go b/api/api_test.go index 97dee27..001d3c8 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -213,7 +213,7 @@ func TestMain(m *testing.M) { panic(err) } }() - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeTxSelector, NLevels: 0}) if err != nil { panic(err) } diff --git a/batchbuilder/batchbuilder.go b/batchbuilder/batchbuilder.go index 4515efc..8f662fe 100644 --- a/batchbuilder/batchbuilder.go +++ b/batchbuilder/batchbuilder.go @@ -2,6 +2,7 @@ package batchbuilder import ( "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/kvdb" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/txprocessor" "github.com/hermeznetwork/tracerr" @@ -28,8 +29,14 @@ type ConfigBatch struct { // NewBatchBuilder constructs a new BatchBuilder, and executes the bb.Reset // method func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchNum common.BatchNum, nLevels uint64) (*BatchBuilder, error) { - localStateDB, err := statedb.NewLocalStateDB(dbpath, 128, synchronizerStateDB, - statedb.TypeBatchBuilder, int(nLevels)) + localStateDB, err := statedb.NewLocalStateDB( + statedb.Config{ + Path: dbpath, + Keep: kvdb.DefaultKeep, + Type: statedb.TypeBatchBuilder, + NLevels: int(nLevels), + }, + synchronizerStateDB) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/batchbuilder/batchbuilder_test.go b/batchbuilder/batchbuilder_test.go index 550f5e6..8188aca 100644 --- a/batchbuilder/batchbuilder_test.go +++ b/batchbuilder/batchbuilder_test.go @@ -15,7 +15,7 @@ func TestBatchBuilder(t *testing.T) { require.Nil(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - synchDB, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 0) + synchDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeBatchBuilder, NLevels: 0}) assert.Nil(t, err) bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB") diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index ca12966..ebd853d 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -97,7 +97,8 @@ func newTestModules(t *testing.T) modules { syncDBPath, err = ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) deleteme = append(deleteme, syncDBPath) - syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 48}) assert.NoError(t, err) pass := os.Getenv("POSTGRES_PASS") diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 400d654..81d5774 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -200,15 +200,17 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.syncSCVars(statsVars.Vars) case <-time.After(waitDuration): batchNum = p.batchNum + 1 - if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) + if p.ctx.Err() != nil { + continue + } else if err != nil { waitDuration = p.cfg.SyncRetryInterval continue - } else { - p.batchNum = batchNum - select { - case batchChSentServerProof <- batchInfo: - case <-p.ctx.Done(): - } + } + p.batchNum = batchNum + select { + case batchChSentServerProof <- batchInfo: + case <-p.ctx.Done(): } } } diff --git a/coordinator/purger_test.go b/coordinator/purger_test.go index 568fde6..d47ac68 100644 --- a/coordinator/purger_test.go +++ b/coordinator/purger_test.go @@ -28,12 +28,14 @@ func newStateDB(t *testing.T) *statedb.LocalStateDB { syncDBPath, err := ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) deleteme = append(deleteme, syncDBPath) - syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 48}) assert.NoError(t, err) stateDBPath, err := ioutil.TempDir("", "tmpStateDB") require.NoError(t, err) deleteme = append(deleteme, stateDBPath) - stateDB, err := statedb.NewLocalStateDB(stateDBPath, 128, syncStateDB, statedb.TypeTxSelector, 0) + stateDB, err := statedb.NewLocalStateDB(statedb.Config{Path: stateDBPath, Keep: 128, + Type: statedb.TypeTxSelector, NLevels: 0}, syncStateDB) require.NoError(t, err) return stateDB } diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index 11a8939..c2f5633 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -27,6 +27,8 @@ const ( // PathLast defines the subpath of the last Batch in the subpath // of the StateDB PathLast = "last" + // DefaultKeep is the default value for the Keep parameter + DefaultKeep = 128 ) var ( @@ -34,16 +36,18 @@ var ( KeyCurrentBatch = []byte("k:currentbatch") // keyCurrentIdx is used as key in the db to store the CurrentIdx keyCurrentIdx = []byte("k:idx") + // ErrNoLast is returned when the KVDB has been configured to not have + // a Last checkpoint but a Last method is used + ErrNoLast = fmt.Errorf("no last checkpoint") ) // KVDB represents the Key-Value DB object type KVDB struct { - path string - db *pebble.Storage + cfg Config + db *pebble.Storage // CurrentIdx holds the current Idx that the BatchBuilder is using CurrentIdx common.Idx CurrentBatch common.BatchNum - keep int m sync.Mutex last *Last } @@ -103,23 +107,42 @@ func (k *Last) close() { } } +// Config of the KVDB +type Config struct { + // Path where the checkpoints will be stored + Path string + // Keep is the number of old checkpoints to keep. If 0, all + // checkpoints are kept. + Keep int + // At every checkpoint, check that there are no gaps between the + // checkpoints + NoGapsCheck bool + // NoLast skips having an opened DB with a checkpoint to the last + // batchNum for thread-safe reads. + NoLast bool +} + // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage. // Checkpoints older than the value defined by `keep` will be deleted. -func NewKVDB(pathDB string, keep int) (*KVDB, error) { +// func NewKVDB(pathDB string, keep int) (*KVDB, error) { +func NewKVDB(cfg Config) (*KVDB, error) { var sto *pebble.Storage var err error - sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false) + sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false) if err != nil { return nil, tracerr.Wrap(err) } + var last *Last + if !cfg.NoLast { + last = &Last{ + path: cfg.Path, + } + } kvdb := &KVDB{ - path: pathDB, + cfg: cfg, db: sto, - keep: keep, - last: &Last{ - path: pathDB, - }, + last: last, } // load currentBatch kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() @@ -137,29 +160,32 @@ func NewKVDB(pathDB string, keep int) (*KVDB, error) { } // LastRead is a thread-safe method to query the last KVDB -func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error { - kvdb.last.rw.RLock() - defer kvdb.last.rw.RUnlock() - return fn(kvdb.last.db) +func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error { + if k.last == nil { + return tracerr.Wrap(ErrNoLast) + } + k.last.rw.RLock() + defer k.last.rw.RUnlock() + return fn(k.last.db) } // DB returns the *pebble.Storage from the KVDB -func (kvdb *KVDB) DB() *pebble.Storage { - return kvdb.db +func (k *KVDB) DB() *pebble.Storage { + return k.db } // StorageWithPrefix returns the db.Storage with the given prefix from the // current KVDB -func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage { - return kvdb.db.WithPrefix(prefix) +func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage { + return k.db.WithPrefix(prefix) } // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does // not delete the checkpoints between old current and the new current, those // checkpoints will remain in the storage, and eventually will be deleted when // MakeCheckpoint overwrites them. -func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { - return kvdb.reset(batchNum, true) +func (k *KVDB) Reset(batchNum common.BatchNum) error { + return k.reset(batchNum, true) } // reset resets the KVDB to the checkpoint at the given batchNum. Reset does @@ -167,19 +193,19 @@ func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { // checkpoints will remain in the storage, and eventually will be deleted when // MakeCheckpoint overwrites them. `closeCurrent` will close the currently // opened db before doing the reset. -func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { - currentPath := path.Join(kvdb.path, PathCurrent) +func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { + currentPath := path.Join(k.cfg.Path, PathCurrent) - if closeCurrent && kvdb.db != nil { - kvdb.db.Close() - kvdb.db = nil + if closeCurrent && k.db != nil { + k.db.Close() + k.db = nil } // remove 'current' if err := os.RemoveAll(currentPath); err != nil { return tracerr.Wrap(err) } // remove all checkpoints > batchNum - list, err := kvdb.ListCheckpoints() + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } @@ -192,7 +218,7 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { } } for _, bn := range list[start:] { - if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { + if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } } @@ -203,23 +229,27 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto - kvdb.CurrentIdx = common.RollupConstReservedIDx // 255 - kvdb.CurrentBatch = 0 - if err := kvdb.last.setNew(); err != nil { - return tracerr.Wrap(err) + k.db = sto + k.CurrentIdx = common.RollupConstReservedIDx // 255 + k.CurrentBatch = 0 + if k.last != nil { + if err := k.last.setNew(); err != nil { + return tracerr.Wrap(err) + } } return nil } // copy 'batchNum' to 'current' - if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil { + if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil { return tracerr.Wrap(err) } // copy 'batchNum' to 'last' - if err := kvdb.last.set(kvdb, batchNum); err != nil { - return tracerr.Wrap(err) + if k.last != nil { + if err := k.last.set(k, batchNum); err != nil { + return tracerr.Wrap(err) + } } // open the new 'current' @@ -227,15 +257,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto + k.db = sto // get currentBatch num - kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() + k.CurrentBatch, err = k.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // idx is obtained from the statedb reset - kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() + k.CurrentIdx, err = k.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) } @@ -245,15 +275,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { // ResetFromSynchronizer performs a reset in the KVDB getting the state from // synchronizerKVDB for the given batchNum. -func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { +func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { if synchronizerKVDB == nil { return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil")) } - currentPath := path.Join(kvdb.path, PathCurrent) - if kvdb.db != nil { - kvdb.db.Close() - kvdb.db = nil + currentPath := path.Join(k.cfg.Path, PathCurrent) + if k.db != nil { + k.db.Close() + k.db = nil } // remove 'current' @@ -261,12 +291,12 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV return tracerr.Wrap(err) } // remove all checkpoints - list, err := kvdb.ListCheckpoints() + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } for _, bn := range list { - if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { + if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } } @@ -277,14 +307,14 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto - kvdb.CurrentIdx = common.RollupConstReservedIDx // 255 - kvdb.CurrentBatch = 0 + k.db = sto + k.CurrentIdx = common.RollupConstReservedIDx // 255 + k.CurrentBatch = 0 return nil } - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) // copy synchronizer'BatchNumX' to 'BatchNumX' if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { @@ -292,7 +322,7 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV } // copy 'BatchNumX' to 'current' - err = kvdb.MakeCheckpointFromTo(batchNum, currentPath) + err = k.MakeCheckpointFromTo(batchNum, currentPath) if err != nil { return tracerr.Wrap(err) } @@ -302,15 +332,15 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto + k.db = sto // get currentBatch num - kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() + k.CurrentBatch, err = k.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // get currentIdx - kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() + k.CurrentIdx, err = k.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) } @@ -319,8 +349,8 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV } // GetCurrentBatch returns the current BatchNum stored in the KVDB -func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { - cbBytes, err := kvdb.db.Get(KeyCurrentBatch) +func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) { + cbBytes, err := k.db.Get(KeyCurrentBatch) if tracerr.Unwrap(err) == db.ErrNotFound { return 0, nil } @@ -331,12 +361,12 @@ func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { } // setCurrentBatch stores the current BatchNum in the KVDB -func (kvdb *KVDB) setCurrentBatch() error { - tx, err := kvdb.db.NewTx() +func (k *KVDB) setCurrentBatch() error { + tx, err := k.db.NewTx() if err != nil { return tracerr.Wrap(err) } - err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes()) + err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes()) if err != nil { return tracerr.Wrap(err) } @@ -347,9 +377,9 @@ func (kvdb *KVDB) setCurrentBatch() error { } // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx -// used for an Account in the KVDB. -func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { - idxBytes, err := kvdb.db.Get(keyCurrentIdx) +// used for an Account in the k. +func (k *KVDB) GetCurrentIdx() (common.Idx, error) { + idxBytes, err := k.db.Get(keyCurrentIdx) if tracerr.Unwrap(err) == db.ErrNotFound { return common.RollupConstReservedIDx, nil // 255, nil } @@ -360,10 +390,10 @@ func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { } // SetCurrentIdx stores Idx in the KVDB -func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { - kvdb.CurrentIdx = idx +func (k *KVDB) SetCurrentIdx(idx common.Idx) error { + k.CurrentIdx = idx - tx, err := kvdb.db.NewTx() + tx, err := k.db.NewTx() if err != nil { return tracerr.Wrap(err) } @@ -383,14 +413,14 @@ func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { // MakeCheckpoint does a checkpoint at the given batchNum in the defined path. // Internally this advances & stores the current BatchNum, and then stores a -// Checkpoint of the current state of the KVDB. -func (kvdb *KVDB) MakeCheckpoint() error { +// Checkpoint of the current state of the k. +func (k *KVDB) MakeCheckpoint() error { // advance currentBatch - kvdb.CurrentBatch++ + k.CurrentBatch++ - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch)) + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch)) - if err := kvdb.setCurrentBatch(); err != nil { + if err := k.setCurrentBatch(); err != nil { return tracerr.Wrap(err) } @@ -404,15 +434,17 @@ func (kvdb *KVDB) MakeCheckpoint() error { } // execute Checkpoint - if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil { + if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil { return tracerr.Wrap(err) } // copy 'CurrentBatch' to 'last' - if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil { - return tracerr.Wrap(err) + if k.last != nil { + if err := k.last.set(k, k.CurrentBatch); err != nil { + return tracerr.Wrap(err) + } } // delete old checkpoints - if err := kvdb.deleteOldCheckpoints(); err != nil { + if err := k.deleteOldCheckpoints(); err != nil { return tracerr.Wrap(err) } @@ -420,8 +452,8 @@ func (kvdb *KVDB) MakeCheckpoint() error { } // DeleteCheckpoint removes if exist the checkpoint of the given batchNum -func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) +func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) @@ -432,8 +464,8 @@ func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { // ListCheckpoints returns the list of batchNums of the checkpoints, sorted. // If there's a gap between the list of checkpoints, an error is returned. -func (kvdb *KVDB) ListCheckpoints() ([]int, error) { - files, err := ioutil.ReadDir(kvdb.path) +func (k *KVDB) ListCheckpoints() ([]int, error) { + files, err := ioutil.ReadDir(k.cfg.Path) if err != nil { return nil, tracerr.Wrap(err) } @@ -450,12 +482,12 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) { } } sort.Ints(checkpoints) - if len(checkpoints) > 0 { + if !k.cfg.NoGapsCheck && len(checkpoints) > 0 { first := checkpoints[0] for _, checkpoint := range checkpoints[1:] { first++ if checkpoint != first { - log.Errorw("GAP", "checkpoints", checkpoints) + log.Errorw("gap between checkpoints", "checkpoints", checkpoints) return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint)) } } @@ -465,14 +497,14 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) { // deleteOldCheckpoints deletes old checkpoints when there are more than // `s.keep` checkpoints -func (kvdb *KVDB) deleteOldCheckpoints() error { - list, err := kvdb.ListCheckpoints() +func (k *KVDB) deleteOldCheckpoints() error { + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } - if len(list) > kvdb.keep { - for _, checkpoint := range list[:len(list)-kvdb.keep] { - if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { + if k.cfg.Keep > 0 && len(list) > k.cfg.Keep { + for _, checkpoint := range list[:len(list)-k.cfg.Keep] { + if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { return tracerr.Wrap(err) } } @@ -483,8 +515,8 @@ func (kvdb *KVDB) deleteOldCheckpoints() error { // MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum // to the dest folder. This method is locking, so it can be called from // multiple places at the same time. -func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { - source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) +func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { + source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) @@ -494,8 +526,8 @@ func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string // synchronizer to do a reset to a batchNum at the same time as the // pipeline is doing a txSelector.Reset and batchBuilder.Reset from // synchronizer to the same batchNum - kvdb.m.Lock() - defer kvdb.m.Unlock() + k.m.Lock() + defer k.m.Unlock() return pebbleMakeCheckpoint(source, dest) } @@ -525,10 +557,12 @@ func pebbleMakeCheckpoint(source, dest string) error { } // Close the DB -func (kvdb *KVDB) Close() { - if kvdb.db != nil { - kvdb.db.Close() - kvdb.db = nil +func (k *KVDB) Close() { + if k.db != nil { + k.db.Close() + k.db = nil + } + if k.last != nil { + k.last.close() } - kvdb.last.close() } diff --git a/db/kvdb/kvdb_test.go b/db/kvdb/kvdb_test.go index f50b685..5bbb485 100644 --- a/db/kvdb/kvdb_test.go +++ b/db/kvdb/kvdb_test.go @@ -37,7 +37,7 @@ func TestCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - db, err := NewKVDB(dir, 128) + db, err := NewKVDB(Config{Path: dir, Keep: 128}) require.NoError(t, err) // add test key-values @@ -72,7 +72,7 @@ func TestCheckpoints(t *testing.T) { err = db.Reset(3) require.NoError(t, err) - printCheckpoints(t, db.path) + printCheckpoints(t, db.cfg.Path) // check that currentBatch is as expected after Reset cb, err = db.GetCurrentBatch() @@ -99,7 +99,7 @@ func TestCheckpoints(t *testing.T) { dirLocal, err := ioutil.TempDir("", "ldb") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal)) - ldb, err := NewKVDB(dirLocal, 128) + ldb, err := NewKVDB(Config{Path: dirLocal, Keep: 128}) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -120,7 +120,7 @@ func TestCheckpoints(t *testing.T) { dirLocal2, err := ioutil.TempDir("", "ldb2") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal2)) - ldb2, err := NewKVDB(dirLocal2, 128) + ldb2, err := NewKVDB(Config{Path: dirLocal2, Keep: 128}) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -139,9 +139,9 @@ func TestCheckpoints(t *testing.T) { debug := false if debug { - printCheckpoints(t, db.path) - printCheckpoints(t, ldb.path) - printCheckpoints(t, ldb2.path) + printCheckpoints(t, db.cfg.Path) + printCheckpoints(t, ldb.cfg.Path) + printCheckpoints(t, ldb2.cfg.Path) } } @@ -150,7 +150,7 @@ func TestListCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - db, err := NewKVDB(dir, 128) + db, err := NewKVDB(Config{Path: dir, Keep: 128}) require.NoError(t, err) numCheckpoints := 16 @@ -181,7 +181,7 @@ func TestDeleteOldCheckpoints(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - db, err := NewKVDB(dir, keep) + db, err := NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) numCheckpoints := 32 @@ -202,7 +202,7 @@ func TestGetCurrentIdx(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - db, err := NewKVDB(dir, keep) + db, err := NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) idx, err := db.GetCurrentIdx() @@ -211,7 +211,7 @@ func TestGetCurrentIdx(t *testing.T) { db.Close() - db, err = NewKVDB(dir, keep) + db, err = NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) idx, err = db.GetCurrentIdx() @@ -227,7 +227,7 @@ func TestGetCurrentIdx(t *testing.T) { db.Close() - db, err = NewKVDB(dir, keep) + db, err = NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) idx, err = db.GetCurrentIdx() diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index 74abea5..bcf44a5 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -52,19 +52,40 @@ const ( // TypeBatchBuilder defines a StateDB used by the BatchBuilder, that // generates the ExitTree and the ZKInput when processing the txs TypeBatchBuilder = "batchbuilder" + // MaxNLevels is the maximum value of NLevels for the merkle tree, + // which comes from the fact that AccountIdx has 48 bits. + MaxNLevels = 48 ) // TypeStateDB determines the type of StateDB type TypeStateDB string +// Config of the StateDB +type Config struct { + // Path where the checkpoints will be stored + Path string + // Keep is the number of old checkpoints to keep. If 0, all + // checkpoints are kept. + Keep int + // NoLast skips having an opened DB with a checkpoint to the last + // batchNum for thread-safe reads. + NoLast bool + // Type of StateDB ( + Type TypeStateDB + // NLevels is the number of merkle tree levels in case the Type uses a + // merkle tree. If the Type doesn't use a merkle tree, NLevels should + // be 0. + NLevels int + // At every checkpoint, check that there are no gaps between the + // checkpoints + noGapsCheck bool +} + // StateDB represents the StateDB object type StateDB struct { - path string - Typ TypeStateDB - db *kvdb.KVDB - nLevels int - MT *merkletree.MerkleTree - keep int + cfg Config + db *kvdb.KVDB + MT *merkletree.MerkleTree } // Last offers a subset of view methods of the StateDB that can be @@ -104,36 +125,40 @@ func (s *Last) GetAccounts() ([]common.Account, error) { // NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk // storage. Checkpoints older than the value defined by `keep` will be // deleted. -func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int) (*StateDB, error) { +// func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int) (*StateDB, error) { +func NewStateDB(cfg Config) (*StateDB, error) { var kv *kvdb.KVDB var err error - kv, err = kvdb.NewKVDB(pathDB, keep) + kv, err = kvdb.NewKVDB(kvdb.Config{Path: cfg.Path, Keep: cfg.Keep, + NoGapsCheck: cfg.noGapsCheck, NoLast: cfg.NoLast}) if err != nil { return nil, tracerr.Wrap(err) } var mt *merkletree.MerkleTree = nil - if typ == TypeSynchronizer || typ == TypeBatchBuilder { - mt, err = merkletree.NewMerkleTree(kv.StorageWithPrefix(PrefixKeyMT), nLevels) + if cfg.Type == TypeSynchronizer || cfg.Type == TypeBatchBuilder { + mt, err = merkletree.NewMerkleTree(kv.StorageWithPrefix(PrefixKeyMT), cfg.NLevels) if err != nil { return nil, tracerr.Wrap(err) } } - if typ == TypeTxSelector && nLevels != 0 { + if cfg.Type == TypeTxSelector && cfg.NLevels != 0 { return nil, tracerr.Wrap(fmt.Errorf("invalid StateDB parameters: StateDB type==TypeStateDB can not have nLevels!=0")) } return &StateDB{ - path: pathDB, - db: kv, - nLevels: nLevels, - MT: mt, - Typ: typ, - keep: keep, + cfg: cfg, + db: kv, + MT: mt, }, nil } +// Type returns the StateDB configured Type +func (s *StateDB) Type() TypeStateDB { + return s.cfg.Type +} + // LastRead is a thread-safe method to query the last checkpoint of the StateDB // via the Last type methods func (s *StateDB) LastRead(fn func(sdbLast *Last) error) error { @@ -179,7 +204,7 @@ func (s *StateDB) LastGetCurrentBatch() (common.BatchNum, error) { func (s *StateDB) LastMTGetRoot() (*big.Int, error) { var root *big.Int if err := s.LastRead(func(sdb *Last) error { - mt, err := merkletree.NewMerkleTree(sdb.DB().WithPrefix(PrefixKeyMT), s.nLevels) + mt, err := merkletree.NewMerkleTree(sdb.DB().WithPrefix(PrefixKeyMT), s.cfg.NLevels) if err != nil { return tracerr.Wrap(err) } @@ -195,7 +220,7 @@ func (s *StateDB) LastMTGetRoot() (*big.Int, error) { // Internally this advances & stores the current BatchNum, and then stores a // Checkpoint of the current state of the StateDB. func (s *StateDB) MakeCheckpoint() error { - log.Debugw("Making StateDB checkpoint", "batch", s.CurrentBatch()+1, "type", s.Typ) + log.Debugw("Making StateDB checkpoint", "batch", s.CurrentBatch()+1, "type", s.cfg.Type) return s.db.MakeCheckpoint() } @@ -230,7 +255,7 @@ func (s *StateDB) SetCurrentIdx(idx common.Idx) error { // those checkpoints will remain in the storage, and eventually will be // deleted when MakeCheckpoint overwrites them. func (s *StateDB) Reset(batchNum common.BatchNum) error { - log.Debugw("Making StateDB Reset", "batch", batchNum, "type", s.Typ) + log.Debugw("Making StateDB Reset", "batch", batchNum, "type", s.cfg.Type) if err := s.db.Reset(batchNum); err != nil { return tracerr.Wrap(err) } @@ -460,9 +485,10 @@ type LocalStateDB struct { // NewLocalStateDB returns a new LocalStateDB connected to the given // synchronizerDB. Checkpoints older than the value defined by `keep` will be // deleted. -func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeStateDB, - nLevels int) (*LocalStateDB, error) { - s, err := NewStateDB(path, keep, typ, nLevels) +func NewLocalStateDB(cfg Config, synchronizerDB *StateDB) (*LocalStateDB, error) { + cfg.noGapsCheck = true + cfg.NoLast = true + s, err := NewStateDB(cfg) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/db/statedb/statedb_test.go b/db/statedb/statedb_test.go index 999cf8d..571acb1 100644 --- a/db/statedb/statedb_test.go +++ b/db/statedb/statedb_test.go @@ -45,7 +45,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) // test values @@ -78,7 +78,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { // call NewStateDB which should get the db at the last checkpoint state // executing a Reset (discarding the last 'testkey0'&'testvalue0' data) - sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err = NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) v, err = sdb.db.DB().Get(k0) assert.NotNil(t, err) @@ -158,7 +158,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { // call NewStateDB which should get the db at the last checkpoint state // executing a Reset (discarding the last 'testkey1'&'testvalue1' data) - sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err = NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) bn, err = sdb.getCurrentBatch() @@ -182,7 +182,7 @@ func TestStateDBWithoutMT(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) // create test accounts @@ -236,7 +236,7 @@ func TestStateDBWithMT(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) // create test accounts @@ -290,7 +290,7 @@ func TestCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) err = sdb.Reset(0) @@ -335,7 +335,7 @@ func TestCheckpoints(t *testing.T) { assert.Equal(t, common.BatchNum(i+1), cb) } - // printCheckpoints(t, sdb.path) + // printCheckpoints(t, sdb.cfg.Path) // reset checkpoint err = sdb.Reset(3) @@ -371,7 +371,7 @@ func TestCheckpoints(t *testing.T) { dirLocal, err := ioutil.TempDir("", "ldb") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal)) - ldb, err := NewLocalStateDB(dirLocal, 128, sdb, TypeBatchBuilder, 32) + ldb, err := NewLocalStateDB(Config{Path: dirLocal, Keep: 128, Type: TypeBatchBuilder, NLevels: 32}, sdb) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -392,7 +392,7 @@ func TestCheckpoints(t *testing.T) { dirLocal2, err := ioutil.TempDir("", "ldb2") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal2)) - ldb2, err := NewLocalStateDB(dirLocal2, 128, sdb, TypeBatchBuilder, 32) + ldb2, err := NewLocalStateDB(Config{Path: dirLocal2, Keep: 128, Type: TypeBatchBuilder, NLevels: 32}, sdb) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -409,9 +409,9 @@ func TestCheckpoints(t *testing.T) { debug := false if debug { - printCheckpoints(t, sdb.path) - printCheckpoints(t, ldb.path) - printCheckpoints(t, ldb2.path) + printCheckpoints(t, sdb.cfg.Path) + printCheckpoints(t, ldb.cfg.Path) + printCheckpoints(t, ldb2.cfg.Path) } } @@ -419,7 +419,7 @@ func TestStateDBGetAccounts(t *testing.T) { dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) // create test accounts @@ -466,7 +466,7 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1)) @@ -540,7 +540,7 @@ func TestListCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) numCheckpoints := 16 @@ -573,7 +573,7 @@ func TestDeleteOldCheckpoints(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) numCheckpoints := 32 @@ -594,7 +594,7 @@ func TestCurrentIdx(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) idx := sdb.CurrentIdx() @@ -602,7 +602,7 @@ func TestCurrentIdx(t *testing.T) { sdb.Close() - sdb, err = NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err = NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) idx = sdb.CurrentIdx() @@ -616,7 +616,7 @@ func TestCurrentIdx(t *testing.T) { sdb.Close() - sdb, err = NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err = NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) idx = sdb.CurrentIdx() @@ -629,7 +629,7 @@ func TestResetFromBadCheckpoint(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) err = sdb.MakeCheckpoint() diff --git a/db/statedb/utils_test.go b/db/statedb/utils_test.go index 9c33844..12d6303 100644 --- a/db/statedb/utils_test.go +++ b/db/statedb/utils_test.go @@ -18,7 +18,7 @@ func TestGetIdx(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) assert.NoError(t, err) var sk babyjub.PrivateKey diff --git a/node/node.go b/node/node.go index edd684c..1978d49 100644 --- a/node/node.go +++ b/node/node.go @@ -164,8 +164,12 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { return nil, tracerr.Wrap(fmt.Errorf("cfg.StateDB.Keep = %v < %v, which is unsafe", cfg.StateDB.Keep, safeStateDBKeep)) } - stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, cfg.StateDB.Keep, - statedb.TypeSynchronizer, 32) + stateDB, err := statedb.NewStateDB(statedb.Config{ + Path: cfg.StateDB.Path, + Keep: cfg.StateDB.Keep, + Type: statedb.TypeSynchronizer, + NLevels: statedb.MaxNLevels, + }) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index a356f8c..a95d3e0 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -307,7 +307,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { require.NoError(t, err) deleteme = append(deleteme, dir) - stateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + stateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeSynchronizer, NLevels: 32}) require.NoError(t, err) // Init History DB diff --git a/test/debugapi/debugapi_test.go b/test/debugapi/debugapi_test.go index 3746860..8cd900a 100644 --- a/test/debugapi/debugapi_test.go +++ b/test/debugapi/debugapi_test.go @@ -44,7 +44,7 @@ func TestDebugAPI(t *testing.T) { dir, err := ioutil.TempDir("", "tmpdb") require.Nil(t, err) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeSynchronizer, NLevels: 32}) require.Nil(t, err) err = sdb.MakeCheckpoint() // Make a checkpoint to increment the batchNum require.Nil(t, err) diff --git a/test/zkproof/flows_test.go b/test/zkproof/flows_test.go index 255407a..fcfb017 100644 --- a/test/zkproof/flows_test.go +++ b/test/zkproof/flows_test.go @@ -80,7 +80,8 @@ func initTxSelector(t *testing.T, chainID uint16, hermezContractAddr ethCommon.A dir, err := ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - syncStateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 0) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 0}) require.NoError(t, err) txselDir, err := ioutil.TempDir("", "tmpTxSelDB") diff --git a/test/zkproof/zkproof_test.go b/test/zkproof/zkproof_test.go index 4eea363..514a67e 100644 --- a/test/zkproof/zkproof_test.go +++ b/test/zkproof/zkproof_test.go @@ -50,7 +50,7 @@ func initStateDB(t *testing.T, typ statedb.TypeStateDB) *statedb.StateDB { require.NoError(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, typ, NLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: typ, NLevels: NLevels}) require.NoError(t, err) return sdb } diff --git a/txprocessor/txprocessor.go b/txprocessor/txprocessor.go index beaa474..9ba1ae6 100644 --- a/txprocessor/txprocessor.go +++ b/txprocessor/txprocessor.go @@ -127,7 +127,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat exits := make([]processedExit, nTx) - if tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeBatchBuilder { tp.zki = common.NewZKInputs(tp.config.ChainID, tp.config.MaxTx, tp.config.MaxL1Tx, tp.config.MaxFeeTx, tp.config.NLevels, (tp.s.CurrentBatch() + 1).BigInt()) tp.zki.OldLastIdx = tp.s.CurrentIdx().BigInt() @@ -137,7 +137,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat // TBD if ExitTree is only in memory or stored in disk, for the moment // is only needed in memory - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { tmpDir, err := ioutil.TempDir("", "hermez-statedb-exittree") if err != nil { return nil, tracerr.Wrap(err) @@ -166,7 +166,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat if err != nil { return nil, tracerr.Wrap(err) } - if tp.s.Typ == statedb.TypeSynchronizer { + if tp.s.Type() == statedb.TypeSynchronizer { if createdAccount != nil { createdAccounts = append(createdAccounts, *createdAccount) l1usertxs[i].EffectiveFromIdx = createdAccount.Idx @@ -195,7 +195,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat tp.zki.ISExitRoot[tp.i] = exitTree.Root().BigInt() } } - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { if exitIdx != nil && exitTree != nil { exits[tp.i] = processedExit{ exit: true, @@ -217,7 +217,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat if exitIdx != nil { log.Error("Unexpected Exit in L1CoordinatorTx") } - if tp.s.Typ == statedb.TypeSynchronizer { + if tp.s.Type() == statedb.TypeSynchronizer { if createdAccount != nil { createdAccounts = append(createdAccounts, *createdAccount) l1coordinatortxs[i].EffectiveFromIdx = createdAccount.Idx @@ -276,7 +276,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat // collectedFees will contain the amount of fee collected for each // TokenID var collectedFees map[common.TokenID]*big.Int - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { collectedFees = make(map[common.TokenID]*big.Int) for tokenID := range coordIdxsMap { collectedFees[tokenID] = big.NewInt(0) @@ -317,7 +317,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } } - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { if exitIdx != nil && exitTree != nil { exits[tp.i] = processedExit{ exit: true, @@ -401,7 +401,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } - if tp.s.Typ == statedb.TypeTxSelector { + if tp.s.Type() == statedb.TypeTxSelector { return nil, nil } @@ -436,8 +436,8 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } - if tp.s.Typ == statedb.TypeSynchronizer { - // return exitInfos, createdAccounts and collectedFees, so Synchronizer will + if tp.s.Type() == statedb.TypeSynchronizer { + // retuTypeexitInfos, createdAccounts and collectedFees, so Synchronizer will // be able to store it into HistoryDB for the concrete BatchNum return &ProcessTxOutput{ ZKInputs: nil, @@ -588,7 +588,7 @@ func (tp *TxProcessor) ProcessL1Tx(exitTree *merkletree.MerkleTree, tx *common.L } var createdAccount *common.Account - if tp.s.Typ == statedb.TypeSynchronizer && + if tp.s.Type() == statedb.TypeSynchronizer && (tx.Type == common.TxTypeCreateAccountDeposit || tx.Type == common.TxTypeCreateAccountDepositTransfer) { var err error @@ -612,8 +612,8 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx, var err error // if tx.ToIdx==0, get toIdx by ToEthAddr or ToBJJ if tx.ToIdx == common.Idx(0) && tx.AuxToIdx == common.Idx(0) { - if tp.s.Typ == statedb.TypeSynchronizer { - // this should never be reached + if tp.s.Type() == statedb.TypeSynchronizer { + // thisTypeould never be reached log.Error("WARNING: In StateDB with Synchronizer mode L2.ToIdx can't be 0") return nil, nil, false, tracerr.Wrap(fmt.Errorf("In StateDB with Synchronizer mode L2.ToIdx can't be 0")) } @@ -676,8 +676,8 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx, } // if StateDB type==TypeSynchronizer, will need to add Nonce - if tp.s.Typ == statedb.TypeSynchronizer { - // as type==TypeSynchronizer, always tx.ToIdx!=0 + if tp.s.Type() == statedb.TypeSynchronizer { + // as tType==TypeSynchronizer, always tx.ToIdx!=0 acc, err := tp.s.GetAccount(tx.FromIdx) if err != nil { log.Errorw("GetAccount", "fromIdx", tx.FromIdx, "err", err) @@ -889,8 +889,8 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx, accumulated := tp.AccumulatedFees[accCoord.Idx] accumulated.Add(accumulated, fee) - if tp.s.Typ == statedb.TypeSynchronizer || - tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || + tp.s.Type() == statedb.TypeBatchBuilder { collected := collectedFees[accCoord.TokenID] collected.Add(collected, fee) } @@ -1094,8 +1094,8 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx, accumulated := tp.AccumulatedFees[accCoord.Idx] accumulated.Add(accumulated, fee) - if tp.s.Typ == statedb.TypeSynchronizer || - tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || + tp.s.Type() == statedb.TypeBatchBuilder { collected := collectedFees[accCoord.TokenID] collected.Add(collected, fee) } diff --git a/txprocessor/txprocessor_test.go b/txprocessor/txprocessor_test.go index eb319a1..8148ea5 100644 --- a/txprocessor/txprocessor_test.go +++ b/txprocessor/txprocessor_test.go @@ -36,7 +36,8 @@ func TestComputeEffectiveAmounts(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) set := ` @@ -212,7 +213,8 @@ func TestProcessTxsBalances(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) chainID := uint16(0) @@ -358,7 +360,8 @@ func TestProcessTxsSynchronizer(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) chainID := uint16(0) @@ -489,7 +492,8 @@ func TestProcessTxsBatchBuilder(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) assert.NoError(t, err) chainID := uint16(0) @@ -580,7 +584,8 @@ func TestProcessTxsRootTestVectors(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) assert.NoError(t, err) // same values than in the js test @@ -631,7 +636,8 @@ func TestCreateAccountDepositMaxValue(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) assert.NoError(t, err) users := txsets.GenerateJsUsers(t) @@ -700,7 +706,8 @@ func initTestMultipleCoordIdxForTokenID(t *testing.T) (*TxProcessor, *til.Contex require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) assert.NoError(t, err) chainID := uint16(1) @@ -798,7 +805,8 @@ func TestTwoExits(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) chainID := uint16(1) @@ -865,7 +873,8 @@ func TestTwoExits(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir2)) - sdb2, err := statedb.NewStateDB(dir2, 128, statedb.TypeSynchronizer, 32) + sdb2, err := statedb.NewStateDB(statedb.Config{Path: dir2, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) tc = til.NewContext(chainID, common.RollupConstMaxL1UserTx) diff --git a/txprocessor/zkinputsgen_test.go b/txprocessor/zkinputsgen_test.go index d6a66aa..9d50d59 100644 --- a/txprocessor/zkinputsgen_test.go +++ b/txprocessor/zkinputsgen_test.go @@ -41,7 +41,8 @@ func TestZKInputsHashTestVector0(t *testing.T) { require.NoError(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) require.NoError(t, err) chainID := uint16(0) @@ -90,7 +91,8 @@ func TestZKInputsHashTestVector1(t *testing.T) { require.NoError(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) require.NoError(t, err) chainID := uint16(0) @@ -149,7 +151,8 @@ func TestZKInputsEmpty(t *testing.T) { nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -266,7 +269,8 @@ func TestZKInputs0(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -322,7 +326,8 @@ func TestZKInputs1(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -385,7 +390,8 @@ func TestZKInputs2(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -456,7 +462,8 @@ func TestZKInputs3(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -527,7 +534,8 @@ func TestZKInputs4(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -598,7 +606,8 @@ func TestZKInputs5(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -661,7 +670,8 @@ func TestZKInputs6(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) diff --git a/txselector/txselector.go b/txselector/txselector.go index fb7092c..c3780cd 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -10,6 +10,7 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/kvdb" "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/log" @@ -62,8 +63,14 @@ type TxSelector struct { // NewTxSelector returns a *TxSelector func NewTxSelector(coordAccount *CoordAccount, dbpath string, synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) { - localAccountsDB, err := statedb.NewLocalStateDB(dbpath, 128, - synchronizerStateDB, statedb.TypeTxSelector, 0) // without merkletree + localAccountsDB, err := statedb.NewLocalStateDB( + statedb.Config{ + Path: dbpath, + Keep: kvdb.DefaultKeep, + Type: statedb.TypeTxSelector, + NLevels: 0, + }, + synchronizerStateDB) // without merkletree if err != nil { return nil, tracerr.Wrap(err) } diff --git a/txselector/txselector_test.go b/txselector/txselector_test.go index 516ea35..6219325 100644 --- a/txselector/txselector_test.go +++ b/txselector/txselector_test.go @@ -34,7 +34,8 @@ func initTest(t *testing.T, chainID uint16, hermezContractAddr ethCommon.Address dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - syncStateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeTxSelector, NLevels: 0}) require.NoError(t, err) txselDir, err := ioutil.TempDir("", "tmpTxSelDB")