diff --git a/api/api_test.go b/api/api_test.go index 97dee27..001d3c8 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -213,7 +213,7 @@ func TestMain(m *testing.M) { panic(err) } }() - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeTxSelector, NLevels: 0}) if err != nil { panic(err) } diff --git a/batchbuilder/batchbuilder.go b/batchbuilder/batchbuilder.go index 4515efc..8f662fe 100644 --- a/batchbuilder/batchbuilder.go +++ b/batchbuilder/batchbuilder.go @@ -2,6 +2,7 @@ package batchbuilder import ( "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/kvdb" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/txprocessor" "github.com/hermeznetwork/tracerr" @@ -28,8 +29,14 @@ type ConfigBatch struct { // NewBatchBuilder constructs a new BatchBuilder, and executes the bb.Reset // method func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchNum common.BatchNum, nLevels uint64) (*BatchBuilder, error) { - localStateDB, err := statedb.NewLocalStateDB(dbpath, 128, synchronizerStateDB, - statedb.TypeBatchBuilder, int(nLevels)) + localStateDB, err := statedb.NewLocalStateDB( + statedb.Config{ + Path: dbpath, + Keep: kvdb.DefaultKeep, + Type: statedb.TypeBatchBuilder, + NLevels: int(nLevels), + }, + synchronizerStateDB) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/batchbuilder/batchbuilder_test.go b/batchbuilder/batchbuilder_test.go index 550f5e6..8188aca 100644 --- a/batchbuilder/batchbuilder_test.go +++ b/batchbuilder/batchbuilder_test.go @@ -15,7 +15,7 @@ func TestBatchBuilder(t *testing.T) { require.Nil(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - synchDB, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 0) + synchDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeBatchBuilder, NLevels: 0}) assert.Nil(t, err) bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB") diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index ca12966..ebd853d 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -97,7 +97,8 @@ func newTestModules(t *testing.T) modules { syncDBPath, err = ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) deleteme = append(deleteme, syncDBPath) - syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 48}) assert.NoError(t, err) pass := os.Getenv("POSTGRES_PASS") diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 400d654..81d5774 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -200,15 +200,17 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.syncSCVars(statsVars.Vars) case <-time.After(waitDuration): batchNum = p.batchNum + 1 - if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) + if p.ctx.Err() != nil { + continue + } else if err != nil { waitDuration = p.cfg.SyncRetryInterval continue - } else { - p.batchNum = batchNum - select { - case batchChSentServerProof <- batchInfo: - case <-p.ctx.Done(): - } + } + p.batchNum = batchNum + select { + case batchChSentServerProof <- batchInfo: + case <-p.ctx.Done(): } } } diff --git a/coordinator/purger_test.go b/coordinator/purger_test.go index 568fde6..d47ac68 100644 --- a/coordinator/purger_test.go +++ b/coordinator/purger_test.go @@ -28,12 +28,14 @@ func newStateDB(t *testing.T) *statedb.LocalStateDB { syncDBPath, err := ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) deleteme = append(deleteme, syncDBPath) - syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 48}) assert.NoError(t, err) stateDBPath, err := ioutil.TempDir("", "tmpStateDB") require.NoError(t, err) deleteme = append(deleteme, stateDBPath) - stateDB, err := statedb.NewLocalStateDB(stateDBPath, 128, syncStateDB, statedb.TypeTxSelector, 0) + stateDB, err := statedb.NewLocalStateDB(statedb.Config{Path: stateDBPath, Keep: 128, + Type: statedb.TypeTxSelector, NLevels: 0}, syncStateDB) require.NoError(t, err) return stateDB } diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index 11a8939..c2f5633 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -27,6 +27,8 @@ const ( // PathLast defines the subpath of the last Batch in the subpath // of the StateDB PathLast = "last" + // DefaultKeep is the default value for the Keep parameter + DefaultKeep = 128 ) var ( @@ -34,16 +36,18 @@ var ( KeyCurrentBatch = []byte("k:currentbatch") // keyCurrentIdx is used as key in the db to store the CurrentIdx keyCurrentIdx = []byte("k:idx") + // ErrNoLast is returned when the KVDB has been configured to not have + // a Last checkpoint but a Last method is used + ErrNoLast = fmt.Errorf("no last checkpoint") ) // KVDB represents the Key-Value DB object type KVDB struct { - path string - db *pebble.Storage + cfg Config + db *pebble.Storage // CurrentIdx holds the current Idx that the BatchBuilder is using CurrentIdx common.Idx CurrentBatch common.BatchNum - keep int m sync.Mutex last *Last } @@ -103,23 +107,42 @@ func (k *Last) close() { } } +// Config of the KVDB +type Config struct { + // Path where the checkpoints will be stored + Path string + // Keep is the number of old checkpoints to keep. If 0, all + // checkpoints are kept. + Keep int + // At every checkpoint, check that there are no gaps between the + // checkpoints + NoGapsCheck bool + // NoLast skips having an opened DB with a checkpoint to the last + // batchNum for thread-safe reads. + NoLast bool +} + // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage. // Checkpoints older than the value defined by `keep` will be deleted. -func NewKVDB(pathDB string, keep int) (*KVDB, error) { +// func NewKVDB(pathDB string, keep int) (*KVDB, error) { +func NewKVDB(cfg Config) (*KVDB, error) { var sto *pebble.Storage var err error - sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false) + sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false) if err != nil { return nil, tracerr.Wrap(err) } + var last *Last + if !cfg.NoLast { + last = &Last{ + path: cfg.Path, + } + } kvdb := &KVDB{ - path: pathDB, + cfg: cfg, db: sto, - keep: keep, - last: &Last{ - path: pathDB, - }, + last: last, } // load currentBatch kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() @@ -137,29 +160,32 @@ func NewKVDB(pathDB string, keep int) (*KVDB, error) { } // LastRead is a thread-safe method to query the last KVDB -func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error { - kvdb.last.rw.RLock() - defer kvdb.last.rw.RUnlock() - return fn(kvdb.last.db) +func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error { + if k.last == nil { + return tracerr.Wrap(ErrNoLast) + } + k.last.rw.RLock() + defer k.last.rw.RUnlock() + return fn(k.last.db) } // DB returns the *pebble.Storage from the KVDB -func (kvdb *KVDB) DB() *pebble.Storage { - return kvdb.db +func (k *KVDB) DB() *pebble.Storage { + return k.db } // StorageWithPrefix returns the db.Storage with the given prefix from the // current KVDB -func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage { - return kvdb.db.WithPrefix(prefix) +func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage { + return k.db.WithPrefix(prefix) } // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does // not delete the checkpoints between old current and the new current, those // checkpoints will remain in the storage, and eventually will be deleted when // MakeCheckpoint overwrites them. -func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { - return kvdb.reset(batchNum, true) +func (k *KVDB) Reset(batchNum common.BatchNum) error { + return k.reset(batchNum, true) } // reset resets the KVDB to the checkpoint at the given batchNum. Reset does @@ -167,19 +193,19 @@ func (kvdb *KVDB) Reset(batchNum common.BatchNum) error { // checkpoints will remain in the storage, and eventually will be deleted when // MakeCheckpoint overwrites them. `closeCurrent` will close the currently // opened db before doing the reset. -func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { - currentPath := path.Join(kvdb.path, PathCurrent) +func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { + currentPath := path.Join(k.cfg.Path, PathCurrent) - if closeCurrent && kvdb.db != nil { - kvdb.db.Close() - kvdb.db = nil + if closeCurrent && k.db != nil { + k.db.Close() + k.db = nil } // remove 'current' if err := os.RemoveAll(currentPath); err != nil { return tracerr.Wrap(err) } // remove all checkpoints > batchNum - list, err := kvdb.ListCheckpoints() + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } @@ -192,7 +218,7 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { } } for _, bn := range list[start:] { - if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { + if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } } @@ -203,23 +229,27 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto - kvdb.CurrentIdx = common.RollupConstReservedIDx // 255 - kvdb.CurrentBatch = 0 - if err := kvdb.last.setNew(); err != nil { - return tracerr.Wrap(err) + k.db = sto + k.CurrentIdx = common.RollupConstReservedIDx // 255 + k.CurrentBatch = 0 + if k.last != nil { + if err := k.last.setNew(); err != nil { + return tracerr.Wrap(err) + } } return nil } // copy 'batchNum' to 'current' - if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil { + if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil { return tracerr.Wrap(err) } // copy 'batchNum' to 'last' - if err := kvdb.last.set(kvdb, batchNum); err != nil { - return tracerr.Wrap(err) + if k.last != nil { + if err := k.last.set(k, batchNum); err != nil { + return tracerr.Wrap(err) + } } // open the new 'current' @@ -227,15 +257,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto + k.db = sto // get currentBatch num - kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() + k.CurrentBatch, err = k.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // idx is obtained from the statedb reset - kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() + k.CurrentIdx, err = k.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) } @@ -245,15 +275,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { // ResetFromSynchronizer performs a reset in the KVDB getting the state from // synchronizerKVDB for the given batchNum. -func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { +func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error { if synchronizerKVDB == nil { return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil")) } - currentPath := path.Join(kvdb.path, PathCurrent) - if kvdb.db != nil { - kvdb.db.Close() - kvdb.db = nil + currentPath := path.Join(k.cfg.Path, PathCurrent) + if k.db != nil { + k.db.Close() + k.db = nil } // remove 'current' @@ -261,12 +291,12 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV return tracerr.Wrap(err) } // remove all checkpoints - list, err := kvdb.ListCheckpoints() + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } for _, bn := range list { - if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil { + if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil { return tracerr.Wrap(err) } } @@ -277,14 +307,14 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto - kvdb.CurrentIdx = common.RollupConstReservedIDx // 255 - kvdb.CurrentBatch = 0 + k.db = sto + k.CurrentIdx = common.RollupConstReservedIDx // 255 + k.CurrentBatch = 0 return nil } - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) // copy synchronizer'BatchNumX' to 'BatchNumX' if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { @@ -292,7 +322,7 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV } // copy 'BatchNumX' to 'current' - err = kvdb.MakeCheckpointFromTo(batchNum, currentPath) + err = k.MakeCheckpointFromTo(batchNum, currentPath) if err != nil { return tracerr.Wrap(err) } @@ -302,15 +332,15 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV if err != nil { return tracerr.Wrap(err) } - kvdb.db = sto + k.db = sto // get currentBatch num - kvdb.CurrentBatch, err = kvdb.GetCurrentBatch() + k.CurrentBatch, err = k.GetCurrentBatch() if err != nil { return tracerr.Wrap(err) } // get currentIdx - kvdb.CurrentIdx, err = kvdb.GetCurrentIdx() + k.CurrentIdx, err = k.GetCurrentIdx() if err != nil { return tracerr.Wrap(err) } @@ -319,8 +349,8 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV } // GetCurrentBatch returns the current BatchNum stored in the KVDB -func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { - cbBytes, err := kvdb.db.Get(KeyCurrentBatch) +func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) { + cbBytes, err := k.db.Get(KeyCurrentBatch) if tracerr.Unwrap(err) == db.ErrNotFound { return 0, nil } @@ -331,12 +361,12 @@ func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) { } // setCurrentBatch stores the current BatchNum in the KVDB -func (kvdb *KVDB) setCurrentBatch() error { - tx, err := kvdb.db.NewTx() +func (k *KVDB) setCurrentBatch() error { + tx, err := k.db.NewTx() if err != nil { return tracerr.Wrap(err) } - err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes()) + err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes()) if err != nil { return tracerr.Wrap(err) } @@ -347,9 +377,9 @@ func (kvdb *KVDB) setCurrentBatch() error { } // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx -// used for an Account in the KVDB. -func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { - idxBytes, err := kvdb.db.Get(keyCurrentIdx) +// used for an Account in the k. +func (k *KVDB) GetCurrentIdx() (common.Idx, error) { + idxBytes, err := k.db.Get(keyCurrentIdx) if tracerr.Unwrap(err) == db.ErrNotFound { return common.RollupConstReservedIDx, nil // 255, nil } @@ -360,10 +390,10 @@ func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) { } // SetCurrentIdx stores Idx in the KVDB -func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { - kvdb.CurrentIdx = idx +func (k *KVDB) SetCurrentIdx(idx common.Idx) error { + k.CurrentIdx = idx - tx, err := kvdb.db.NewTx() + tx, err := k.db.NewTx() if err != nil { return tracerr.Wrap(err) } @@ -383,14 +413,14 @@ func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error { // MakeCheckpoint does a checkpoint at the given batchNum in the defined path. // Internally this advances & stores the current BatchNum, and then stores a -// Checkpoint of the current state of the KVDB. -func (kvdb *KVDB) MakeCheckpoint() error { +// Checkpoint of the current state of the k. +func (k *KVDB) MakeCheckpoint() error { // advance currentBatch - kvdb.CurrentBatch++ + k.CurrentBatch++ - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch)) + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch)) - if err := kvdb.setCurrentBatch(); err != nil { + if err := k.setCurrentBatch(); err != nil { return tracerr.Wrap(err) } @@ -404,15 +434,17 @@ func (kvdb *KVDB) MakeCheckpoint() error { } // execute Checkpoint - if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil { + if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil { return tracerr.Wrap(err) } // copy 'CurrentBatch' to 'last' - if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil { - return tracerr.Wrap(err) + if k.last != nil { + if err := k.last.set(k, k.CurrentBatch); err != nil { + return tracerr.Wrap(err) + } } // delete old checkpoints - if err := kvdb.deleteOldCheckpoints(); err != nil { + if err := k.deleteOldCheckpoints(); err != nil { return tracerr.Wrap(err) } @@ -420,8 +452,8 @@ func (kvdb *KVDB) MakeCheckpoint() error { } // DeleteCheckpoint removes if exist the checkpoint of the given batchNum -func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) +func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { + checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) @@ -432,8 +464,8 @@ func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { // ListCheckpoints returns the list of batchNums of the checkpoints, sorted. // If there's a gap between the list of checkpoints, an error is returned. -func (kvdb *KVDB) ListCheckpoints() ([]int, error) { - files, err := ioutil.ReadDir(kvdb.path) +func (k *KVDB) ListCheckpoints() ([]int, error) { + files, err := ioutil.ReadDir(k.cfg.Path) if err != nil { return nil, tracerr.Wrap(err) } @@ -450,12 +482,12 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) { } } sort.Ints(checkpoints) - if len(checkpoints) > 0 { + if !k.cfg.NoGapsCheck && len(checkpoints) > 0 { first := checkpoints[0] for _, checkpoint := range checkpoints[1:] { first++ if checkpoint != first { - log.Errorw("GAP", "checkpoints", checkpoints) + log.Errorw("gap between checkpoints", "checkpoints", checkpoints) return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint)) } } @@ -465,14 +497,14 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) { // deleteOldCheckpoints deletes old checkpoints when there are more than // `s.keep` checkpoints -func (kvdb *KVDB) deleteOldCheckpoints() error { - list, err := kvdb.ListCheckpoints() +func (k *KVDB) deleteOldCheckpoints() error { + list, err := k.ListCheckpoints() if err != nil { return tracerr.Wrap(err) } - if len(list) > kvdb.keep { - for _, checkpoint := range list[:len(list)-kvdb.keep] { - if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { + if k.cfg.Keep > 0 && len(list) > k.cfg.Keep { + for _, checkpoint := range list[:len(list)-k.cfg.Keep] { + if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil { return tracerr.Wrap(err) } } @@ -483,8 +515,8 @@ func (kvdb *KVDB) deleteOldCheckpoints() error { // MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum // to the dest folder. This method is locking, so it can be called from // multiple places at the same time. -func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { - source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) +func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { + source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) @@ -494,8 +526,8 @@ func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string // synchronizer to do a reset to a batchNum at the same time as the // pipeline is doing a txSelector.Reset and batchBuilder.Reset from // synchronizer to the same batchNum - kvdb.m.Lock() - defer kvdb.m.Unlock() + k.m.Lock() + defer k.m.Unlock() return pebbleMakeCheckpoint(source, dest) } @@ -525,10 +557,12 @@ func pebbleMakeCheckpoint(source, dest string) error { } // Close the DB -func (kvdb *KVDB) Close() { - if kvdb.db != nil { - kvdb.db.Close() - kvdb.db = nil +func (k *KVDB) Close() { + if k.db != nil { + k.db.Close() + k.db = nil + } + if k.last != nil { + k.last.close() } - kvdb.last.close() } diff --git a/db/kvdb/kvdb_test.go b/db/kvdb/kvdb_test.go index f50b685..5bbb485 100644 --- a/db/kvdb/kvdb_test.go +++ b/db/kvdb/kvdb_test.go @@ -37,7 +37,7 @@ func TestCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - db, err := NewKVDB(dir, 128) + db, err := NewKVDB(Config{Path: dir, Keep: 128}) require.NoError(t, err) // add test key-values @@ -72,7 +72,7 @@ func TestCheckpoints(t *testing.T) { err = db.Reset(3) require.NoError(t, err) - printCheckpoints(t, db.path) + printCheckpoints(t, db.cfg.Path) // check that currentBatch is as expected after Reset cb, err = db.GetCurrentBatch() @@ -99,7 +99,7 @@ func TestCheckpoints(t *testing.T) { dirLocal, err := ioutil.TempDir("", "ldb") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal)) - ldb, err := NewKVDB(dirLocal, 128) + ldb, err := NewKVDB(Config{Path: dirLocal, Keep: 128}) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -120,7 +120,7 @@ func TestCheckpoints(t *testing.T) { dirLocal2, err := ioutil.TempDir("", "ldb2") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal2)) - ldb2, err := NewKVDB(dirLocal2, 128) + ldb2, err := NewKVDB(Config{Path: dirLocal2, Keep: 128}) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -139,9 +139,9 @@ func TestCheckpoints(t *testing.T) { debug := false if debug { - printCheckpoints(t, db.path) - printCheckpoints(t, ldb.path) - printCheckpoints(t, ldb2.path) + printCheckpoints(t, db.cfg.Path) + printCheckpoints(t, ldb.cfg.Path) + printCheckpoints(t, ldb2.cfg.Path) } } @@ -150,7 +150,7 @@ func TestListCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - db, err := NewKVDB(dir, 128) + db, err := NewKVDB(Config{Path: dir, Keep: 128}) require.NoError(t, err) numCheckpoints := 16 @@ -181,7 +181,7 @@ func TestDeleteOldCheckpoints(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - db, err := NewKVDB(dir, keep) + db, err := NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) numCheckpoints := 32 @@ -202,7 +202,7 @@ func TestGetCurrentIdx(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - db, err := NewKVDB(dir, keep) + db, err := NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) idx, err := db.GetCurrentIdx() @@ -211,7 +211,7 @@ func TestGetCurrentIdx(t *testing.T) { db.Close() - db, err = NewKVDB(dir, keep) + db, err = NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) idx, err = db.GetCurrentIdx() @@ -227,7 +227,7 @@ func TestGetCurrentIdx(t *testing.T) { db.Close() - db, err = NewKVDB(dir, keep) + db, err = NewKVDB(Config{Path: dir, Keep: keep}) require.NoError(t, err) idx, err = db.GetCurrentIdx() diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index 74abea5..bcf44a5 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -52,19 +52,40 @@ const ( // TypeBatchBuilder defines a StateDB used by the BatchBuilder, that // generates the ExitTree and the ZKInput when processing the txs TypeBatchBuilder = "batchbuilder" + // MaxNLevels is the maximum value of NLevels for the merkle tree, + // which comes from the fact that AccountIdx has 48 bits. + MaxNLevels = 48 ) // TypeStateDB determines the type of StateDB type TypeStateDB string +// Config of the StateDB +type Config struct { + // Path where the checkpoints will be stored + Path string + // Keep is the number of old checkpoints to keep. If 0, all + // checkpoints are kept. + Keep int + // NoLast skips having an opened DB with a checkpoint to the last + // batchNum for thread-safe reads. + NoLast bool + // Type of StateDB ( + Type TypeStateDB + // NLevels is the number of merkle tree levels in case the Type uses a + // merkle tree. If the Type doesn't use a merkle tree, NLevels should + // be 0. + NLevels int + // At every checkpoint, check that there are no gaps between the + // checkpoints + noGapsCheck bool +} + // StateDB represents the StateDB object type StateDB struct { - path string - Typ TypeStateDB - db *kvdb.KVDB - nLevels int - MT *merkletree.MerkleTree - keep int + cfg Config + db *kvdb.KVDB + MT *merkletree.MerkleTree } // Last offers a subset of view methods of the StateDB that can be @@ -104,36 +125,40 @@ func (s *Last) GetAccounts() ([]common.Account, error) { // NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk // storage. Checkpoints older than the value defined by `keep` will be // deleted. -func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int) (*StateDB, error) { +// func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int) (*StateDB, error) { +func NewStateDB(cfg Config) (*StateDB, error) { var kv *kvdb.KVDB var err error - kv, err = kvdb.NewKVDB(pathDB, keep) + kv, err = kvdb.NewKVDB(kvdb.Config{Path: cfg.Path, Keep: cfg.Keep, + NoGapsCheck: cfg.noGapsCheck, NoLast: cfg.NoLast}) if err != nil { return nil, tracerr.Wrap(err) } var mt *merkletree.MerkleTree = nil - if typ == TypeSynchronizer || typ == TypeBatchBuilder { - mt, err = merkletree.NewMerkleTree(kv.StorageWithPrefix(PrefixKeyMT), nLevels) + if cfg.Type == TypeSynchronizer || cfg.Type == TypeBatchBuilder { + mt, err = merkletree.NewMerkleTree(kv.StorageWithPrefix(PrefixKeyMT), cfg.NLevels) if err != nil { return nil, tracerr.Wrap(err) } } - if typ == TypeTxSelector && nLevels != 0 { + if cfg.Type == TypeTxSelector && cfg.NLevels != 0 { return nil, tracerr.Wrap(fmt.Errorf("invalid StateDB parameters: StateDB type==TypeStateDB can not have nLevels!=0")) } return &StateDB{ - path: pathDB, - db: kv, - nLevels: nLevels, - MT: mt, - Typ: typ, - keep: keep, + cfg: cfg, + db: kv, + MT: mt, }, nil } +// Type returns the StateDB configured Type +func (s *StateDB) Type() TypeStateDB { + return s.cfg.Type +} + // LastRead is a thread-safe method to query the last checkpoint of the StateDB // via the Last type methods func (s *StateDB) LastRead(fn func(sdbLast *Last) error) error { @@ -179,7 +204,7 @@ func (s *StateDB) LastGetCurrentBatch() (common.BatchNum, error) { func (s *StateDB) LastMTGetRoot() (*big.Int, error) { var root *big.Int if err := s.LastRead(func(sdb *Last) error { - mt, err := merkletree.NewMerkleTree(sdb.DB().WithPrefix(PrefixKeyMT), s.nLevels) + mt, err := merkletree.NewMerkleTree(sdb.DB().WithPrefix(PrefixKeyMT), s.cfg.NLevels) if err != nil { return tracerr.Wrap(err) } @@ -195,7 +220,7 @@ func (s *StateDB) LastMTGetRoot() (*big.Int, error) { // Internally this advances & stores the current BatchNum, and then stores a // Checkpoint of the current state of the StateDB. func (s *StateDB) MakeCheckpoint() error { - log.Debugw("Making StateDB checkpoint", "batch", s.CurrentBatch()+1, "type", s.Typ) + log.Debugw("Making StateDB checkpoint", "batch", s.CurrentBatch()+1, "type", s.cfg.Type) return s.db.MakeCheckpoint() } @@ -230,7 +255,7 @@ func (s *StateDB) SetCurrentIdx(idx common.Idx) error { // those checkpoints will remain in the storage, and eventually will be // deleted when MakeCheckpoint overwrites them. func (s *StateDB) Reset(batchNum common.BatchNum) error { - log.Debugw("Making StateDB Reset", "batch", batchNum, "type", s.Typ) + log.Debugw("Making StateDB Reset", "batch", batchNum, "type", s.cfg.Type) if err := s.db.Reset(batchNum); err != nil { return tracerr.Wrap(err) } @@ -460,9 +485,10 @@ type LocalStateDB struct { // NewLocalStateDB returns a new LocalStateDB connected to the given // synchronizerDB. Checkpoints older than the value defined by `keep` will be // deleted. -func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeStateDB, - nLevels int) (*LocalStateDB, error) { - s, err := NewStateDB(path, keep, typ, nLevels) +func NewLocalStateDB(cfg Config, synchronizerDB *StateDB) (*LocalStateDB, error) { + cfg.noGapsCheck = true + cfg.NoLast = true + s, err := NewStateDB(cfg) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/db/statedb/statedb_test.go b/db/statedb/statedb_test.go index 999cf8d..571acb1 100644 --- a/db/statedb/statedb_test.go +++ b/db/statedb/statedb_test.go @@ -45,7 +45,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) // test values @@ -78,7 +78,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { // call NewStateDB which should get the db at the last checkpoint state // executing a Reset (discarding the last 'testkey0'&'testvalue0' data) - sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err = NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) v, err = sdb.db.DB().Get(k0) assert.NotNil(t, err) @@ -158,7 +158,7 @@ func TestNewStateDBIntermediateState(t *testing.T) { // call NewStateDB which should get the db at the last checkpoint state // executing a Reset (discarding the last 'testkey1'&'testvalue1' data) - sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err = NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) bn, err = sdb.getCurrentBatch() @@ -182,7 +182,7 @@ func TestStateDBWithoutMT(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) // create test accounts @@ -236,7 +236,7 @@ func TestStateDBWithMT(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) // create test accounts @@ -290,7 +290,7 @@ func TestCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) err = sdb.Reset(0) @@ -335,7 +335,7 @@ func TestCheckpoints(t *testing.T) { assert.Equal(t, common.BatchNum(i+1), cb) } - // printCheckpoints(t, sdb.path) + // printCheckpoints(t, sdb.cfg.Path) // reset checkpoint err = sdb.Reset(3) @@ -371,7 +371,7 @@ func TestCheckpoints(t *testing.T) { dirLocal, err := ioutil.TempDir("", "ldb") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal)) - ldb, err := NewLocalStateDB(dirLocal, 128, sdb, TypeBatchBuilder, 32) + ldb, err := NewLocalStateDB(Config{Path: dirLocal, Keep: 128, Type: TypeBatchBuilder, NLevels: 32}, sdb) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -392,7 +392,7 @@ func TestCheckpoints(t *testing.T) { dirLocal2, err := ioutil.TempDir("", "ldb2") require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dirLocal2)) - ldb2, err := NewLocalStateDB(dirLocal2, 128, sdb, TypeBatchBuilder, 32) + ldb2, err := NewLocalStateDB(Config{Path: dirLocal2, Keep: 128, Type: TypeBatchBuilder, NLevels: 32}, sdb) require.NoError(t, err) // get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB) @@ -409,9 +409,9 @@ func TestCheckpoints(t *testing.T) { debug := false if debug { - printCheckpoints(t, sdb.path) - printCheckpoints(t, ldb.path) - printCheckpoints(t, ldb2.path) + printCheckpoints(t, sdb.cfg.Path) + printCheckpoints(t, ldb.cfg.Path) + printCheckpoints(t, ldb2.cfg.Path) } } @@ -419,7 +419,7 @@ func TestStateDBGetAccounts(t *testing.T) { dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) require.NoError(t, err) // create test accounts @@ -466,7 +466,7 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1)) @@ -540,7 +540,7 @@ func TestListCheckpoints(t *testing.T) { require.NoError(t, err) defer require.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) numCheckpoints := 16 @@ -573,7 +573,7 @@ func TestDeleteOldCheckpoints(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) numCheckpoints := 32 @@ -594,7 +594,7 @@ func TestCurrentIdx(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) idx := sdb.CurrentIdx() @@ -602,7 +602,7 @@ func TestCurrentIdx(t *testing.T) { sdb.Close() - sdb, err = NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err = NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) idx = sdb.CurrentIdx() @@ -616,7 +616,7 @@ func TestCurrentIdx(t *testing.T) { sdb.Close() - sdb, err = NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err = NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) idx = sdb.CurrentIdx() @@ -629,7 +629,7 @@ func TestResetFromBadCheckpoint(t *testing.T) { defer require.NoError(t, os.RemoveAll(dir)) keep := 16 - sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32) + sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32}) require.NoError(t, err) err = sdb.MakeCheckpoint() diff --git a/db/statedb/utils_test.go b/db/statedb/utils_test.go index 9c33844..12d6303 100644 --- a/db/statedb/utils_test.go +++ b/db/statedb/utils_test.go @@ -18,7 +18,7 @@ func TestGetIdx(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0) + sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0}) assert.NoError(t, err) var sk babyjub.PrivateKey diff --git a/node/node.go b/node/node.go index edd684c..1978d49 100644 --- a/node/node.go +++ b/node/node.go @@ -164,8 +164,12 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { return nil, tracerr.Wrap(fmt.Errorf("cfg.StateDB.Keep = %v < %v, which is unsafe", cfg.StateDB.Keep, safeStateDBKeep)) } - stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, cfg.StateDB.Keep, - statedb.TypeSynchronizer, 32) + stateDB, err := statedb.NewStateDB(statedb.Config{ + Path: cfg.StateDB.Path, + Keep: cfg.StateDB.Keep, + Type: statedb.TypeSynchronizer, + NLevels: statedb.MaxNLevels, + }) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index a356f8c..a95d3e0 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -307,7 +307,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { require.NoError(t, err) deleteme = append(deleteme, dir) - stateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + stateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeSynchronizer, NLevels: 32}) require.NoError(t, err) // Init History DB diff --git a/test/debugapi/debugapi_test.go b/test/debugapi/debugapi_test.go index 3746860..8cd900a 100644 --- a/test/debugapi/debugapi_test.go +++ b/test/debugapi/debugapi_test.go @@ -44,7 +44,7 @@ func TestDebugAPI(t *testing.T) { dir, err := ioutil.TempDir("", "tmpdb") require.Nil(t, err) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeSynchronizer, NLevels: 32}) require.Nil(t, err) err = sdb.MakeCheckpoint() // Make a checkpoint to increment the batchNum require.Nil(t, err) diff --git a/test/zkproof/flows_test.go b/test/zkproof/flows_test.go index 255407a..fcfb017 100644 --- a/test/zkproof/flows_test.go +++ b/test/zkproof/flows_test.go @@ -80,7 +80,8 @@ func initTxSelector(t *testing.T, chainID uint16, hermezContractAddr ethCommon.A dir, err := ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - syncStateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 0) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 0}) require.NoError(t, err) txselDir, err := ioutil.TempDir("", "tmpTxSelDB") diff --git a/test/zkproof/zkproof_test.go b/test/zkproof/zkproof_test.go index 4eea363..514a67e 100644 --- a/test/zkproof/zkproof_test.go +++ b/test/zkproof/zkproof_test.go @@ -50,7 +50,7 @@ func initStateDB(t *testing.T, typ statedb.TypeStateDB) *statedb.StateDB { require.NoError(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, typ, NLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: typ, NLevels: NLevels}) require.NoError(t, err) return sdb } diff --git a/txprocessor/txprocessor.go b/txprocessor/txprocessor.go index beaa474..9ba1ae6 100644 --- a/txprocessor/txprocessor.go +++ b/txprocessor/txprocessor.go @@ -127,7 +127,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat exits := make([]processedExit, nTx) - if tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeBatchBuilder { tp.zki = common.NewZKInputs(tp.config.ChainID, tp.config.MaxTx, tp.config.MaxL1Tx, tp.config.MaxFeeTx, tp.config.NLevels, (tp.s.CurrentBatch() + 1).BigInt()) tp.zki.OldLastIdx = tp.s.CurrentIdx().BigInt() @@ -137,7 +137,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat // TBD if ExitTree is only in memory or stored in disk, for the moment // is only needed in memory - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { tmpDir, err := ioutil.TempDir("", "hermez-statedb-exittree") if err != nil { return nil, tracerr.Wrap(err) @@ -166,7 +166,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat if err != nil { return nil, tracerr.Wrap(err) } - if tp.s.Typ == statedb.TypeSynchronizer { + if tp.s.Type() == statedb.TypeSynchronizer { if createdAccount != nil { createdAccounts = append(createdAccounts, *createdAccount) l1usertxs[i].EffectiveFromIdx = createdAccount.Idx @@ -195,7 +195,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat tp.zki.ISExitRoot[tp.i] = exitTree.Root().BigInt() } } - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { if exitIdx != nil && exitTree != nil { exits[tp.i] = processedExit{ exit: true, @@ -217,7 +217,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat if exitIdx != nil { log.Error("Unexpected Exit in L1CoordinatorTx") } - if tp.s.Typ == statedb.TypeSynchronizer { + if tp.s.Type() == statedb.TypeSynchronizer { if createdAccount != nil { createdAccounts = append(createdAccounts, *createdAccount) l1coordinatortxs[i].EffectiveFromIdx = createdAccount.Idx @@ -276,7 +276,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat // collectedFees will contain the amount of fee collected for each // TokenID var collectedFees map[common.TokenID]*big.Int - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { collectedFees = make(map[common.TokenID]*big.Int) for tokenID := range coordIdxsMap { collectedFees[tokenID] = big.NewInt(0) @@ -317,7 +317,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } } - if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder { if exitIdx != nil && exitTree != nil { exits[tp.i] = processedExit{ exit: true, @@ -401,7 +401,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } - if tp.s.Typ == statedb.TypeTxSelector { + if tp.s.Type() == statedb.TypeTxSelector { return nil, nil } @@ -436,8 +436,8 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat } } - if tp.s.Typ == statedb.TypeSynchronizer { - // return exitInfos, createdAccounts and collectedFees, so Synchronizer will + if tp.s.Type() == statedb.TypeSynchronizer { + // retuTypeexitInfos, createdAccounts and collectedFees, so Synchronizer will // be able to store it into HistoryDB for the concrete BatchNum return &ProcessTxOutput{ ZKInputs: nil, @@ -588,7 +588,7 @@ func (tp *TxProcessor) ProcessL1Tx(exitTree *merkletree.MerkleTree, tx *common.L } var createdAccount *common.Account - if tp.s.Typ == statedb.TypeSynchronizer && + if tp.s.Type() == statedb.TypeSynchronizer && (tx.Type == common.TxTypeCreateAccountDeposit || tx.Type == common.TxTypeCreateAccountDepositTransfer) { var err error @@ -612,8 +612,8 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx, var err error // if tx.ToIdx==0, get toIdx by ToEthAddr or ToBJJ if tx.ToIdx == common.Idx(0) && tx.AuxToIdx == common.Idx(0) { - if tp.s.Typ == statedb.TypeSynchronizer { - // this should never be reached + if tp.s.Type() == statedb.TypeSynchronizer { + // thisTypeould never be reached log.Error("WARNING: In StateDB with Synchronizer mode L2.ToIdx can't be 0") return nil, nil, false, tracerr.Wrap(fmt.Errorf("In StateDB with Synchronizer mode L2.ToIdx can't be 0")) } @@ -676,8 +676,8 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx, } // if StateDB type==TypeSynchronizer, will need to add Nonce - if tp.s.Typ == statedb.TypeSynchronizer { - // as type==TypeSynchronizer, always tx.ToIdx!=0 + if tp.s.Type() == statedb.TypeSynchronizer { + // as tType==TypeSynchronizer, always tx.ToIdx!=0 acc, err := tp.s.GetAccount(tx.FromIdx) if err != nil { log.Errorw("GetAccount", "fromIdx", tx.FromIdx, "err", err) @@ -889,8 +889,8 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx, accumulated := tp.AccumulatedFees[accCoord.Idx] accumulated.Add(accumulated, fee) - if tp.s.Typ == statedb.TypeSynchronizer || - tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || + tp.s.Type() == statedb.TypeBatchBuilder { collected := collectedFees[accCoord.TokenID] collected.Add(collected, fee) } @@ -1094,8 +1094,8 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx, accumulated := tp.AccumulatedFees[accCoord.Idx] accumulated.Add(accumulated, fee) - if tp.s.Typ == statedb.TypeSynchronizer || - tp.s.Typ == statedb.TypeBatchBuilder { + if tp.s.Type() == statedb.TypeSynchronizer || + tp.s.Type() == statedb.TypeBatchBuilder { collected := collectedFees[accCoord.TokenID] collected.Add(collected, fee) } diff --git a/txprocessor/txprocessor_test.go b/txprocessor/txprocessor_test.go index eb319a1..8148ea5 100644 --- a/txprocessor/txprocessor_test.go +++ b/txprocessor/txprocessor_test.go @@ -36,7 +36,8 @@ func TestComputeEffectiveAmounts(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) set := ` @@ -212,7 +213,8 @@ func TestProcessTxsBalances(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) chainID := uint16(0) @@ -358,7 +360,8 @@ func TestProcessTxsSynchronizer(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) chainID := uint16(0) @@ -489,7 +492,8 @@ func TestProcessTxsBatchBuilder(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) assert.NoError(t, err) chainID := uint16(0) @@ -580,7 +584,8 @@ func TestProcessTxsRootTestVectors(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) assert.NoError(t, err) // same values than in the js test @@ -631,7 +636,8 @@ func TestCreateAccountDepositMaxValue(t *testing.T) { defer assert.NoError(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) assert.NoError(t, err) users := txsets.GenerateJsUsers(t) @@ -700,7 +706,8 @@ func initTestMultipleCoordIdxForTokenID(t *testing.T) (*TxProcessor, *til.Contex require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) assert.NoError(t, err) chainID := uint16(1) @@ -798,7 +805,8 @@ func TestTwoExits(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) chainID := uint16(1) @@ -865,7 +873,8 @@ func TestTwoExits(t *testing.T) { require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir2)) - sdb2, err := statedb.NewStateDB(dir2, 128, statedb.TypeSynchronizer, 32) + sdb2, err := statedb.NewStateDB(statedb.Config{Path: dir2, Keep: 128, + Type: statedb.TypeSynchronizer, NLevels: 32}) assert.NoError(t, err) tc = til.NewContext(chainID, common.RollupConstMaxL1UserTx) diff --git a/txprocessor/zkinputsgen_test.go b/txprocessor/zkinputsgen_test.go index d6a66aa..9d50d59 100644 --- a/txprocessor/zkinputsgen_test.go +++ b/txprocessor/zkinputsgen_test.go @@ -41,7 +41,8 @@ func TestZKInputsHashTestVector0(t *testing.T) { require.NoError(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) require.NoError(t, err) chainID := uint16(0) @@ -90,7 +91,8 @@ func TestZKInputsHashTestVector1(t *testing.T) { require.NoError(t, err) defer assert.Nil(t, os.RemoveAll(dir)) - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: 32}) require.NoError(t, err) chainID := uint16(0) @@ -149,7 +151,8 @@ func TestZKInputsEmpty(t *testing.T) { nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -266,7 +269,8 @@ func TestZKInputs0(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -322,7 +326,8 @@ func TestZKInputs1(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -385,7 +390,8 @@ func TestZKInputs2(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -456,7 +462,8 @@ func TestZKInputs3(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -527,7 +534,8 @@ func TestZKInputs4(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -598,7 +606,8 @@ func TestZKInputs5(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) @@ -661,7 +670,8 @@ func TestZKInputs6(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) nLevels := 16 - sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels) + sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeBatchBuilder, NLevels: nLevels}) require.NoError(t, err) chainID := uint16(0) diff --git a/txselector/txselector.go b/txselector/txselector.go index fb7092c..c3780cd 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -10,6 +10,7 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/kvdb" "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/log" @@ -62,8 +63,14 @@ type TxSelector struct { // NewTxSelector returns a *TxSelector func NewTxSelector(coordAccount *CoordAccount, dbpath string, synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) { - localAccountsDB, err := statedb.NewLocalStateDB(dbpath, 128, - synchronizerStateDB, statedb.TypeTxSelector, 0) // without merkletree + localAccountsDB, err := statedb.NewLocalStateDB( + statedb.Config{ + Path: dbpath, + Keep: kvdb.DefaultKeep, + Type: statedb.TypeTxSelector, + NLevels: 0, + }, + synchronizerStateDB) // without merkletree if err != nil { return nil, tracerr.Wrap(err) } diff --git a/txselector/txselector_test.go b/txselector/txselector_test.go index 516ea35..6219325 100644 --- a/txselector/txselector_test.go +++ b/txselector/txselector_test.go @@ -34,7 +34,8 @@ func initTest(t *testing.T, chainID uint16, hermezContractAddr ethCommon.Address dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) defer assert.NoError(t, os.RemoveAll(dir)) - syncStateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0) + syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, + Type: statedb.TypeTxSelector, NLevels: 0}) require.NoError(t, err) txselDir, err := ioutil.TempDir("", "tmpTxSelDB")