diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 81baa46..5843c94 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -172,6 +172,16 @@ func NewCoordinator(cfg Config, return &c, nil } +// TxSelector returns the inner TxSelector +func (c *Coordinator) TxSelector() *txselector.TxSelector { + return c.txSelector +} + +// BatchBuilder returns the inner BatchBuilder +func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { + return c.batchBuilder +} + func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index 88d9c60..ce0a71c 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -8,6 +8,7 @@ import ( "path" "sort" "strings" + "sync" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/log" @@ -40,6 +41,7 @@ type KVDB struct { CurrentIdx common.Idx CurrentBatch common.BatchNum keep int + m sync.Mutex } // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage. @@ -141,10 +143,8 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error { return nil } - checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) // copy 'BatchNumX' to 'current' - err = pebbleMakeCheckpoint(checkpointPath, currentPath) - if err != nil { + if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil { return tracerr.Wrap(err) } @@ -212,22 +212,13 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) - // use checkpoint from synchronizerKVDB - synchronizerCheckpointPath := path.Join(synchronizerKVDB.path, - fmt.Sprintf("%s%d", PathBatchNum, batchNum)) - if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) { - // if synchronizerKVDB does not have checkpoint at batchNum, return err - return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" not exist in Synchronizer", - synchronizerCheckpointPath)) - } // copy synchronizer'BatchNumX' to 'BatchNumX' - err = pebbleMakeCheckpoint(synchronizerCheckpointPath, checkpointPath) - if err != nil { + if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil { return tracerr.Wrap(err) } // copy 'BatchNumX' to 'current' - err = pebbleMakeCheckpoint(checkpointPath, currentPath) + err = kvdb.MakeCheckpointFromTo(batchNum, currentPath) if err != nil { return tracerr.Wrap(err) } @@ -412,6 +403,25 @@ func (kvdb *KVDB) deleteOldCheckpoints() error { return nil } +// MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum +// to the dest folder. This method is locking, so it can be called from +// multiple places at the same time. +func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error { + source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum)) + if _, err := os.Stat(source); os.IsNotExist(err) { + // if kvdb does not have checkpoint at batchNum, return err + return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) + } + // By locking we allow calling MakeCheckpointFromTo from multiple + // places at the same time for the same stateDB. This allows the + // synchronizer to do a reset to a batchNum at the same time as the + // pipeline is doing a txSelector.Reset and batchBuilder.Reset from + // synchronizer to the same batchNum + kvdb.m.Lock() + defer kvdb.m.Unlock() + return pebbleMakeCheckpoint(source, dest) +} + func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint if _, err := os.Stat(dest); !os.IsNotExist(err) { diff --git a/node/node.go b/node/node.go index ca54180..edd684c 100644 --- a/node/node.go +++ b/node/node.go @@ -655,4 +655,10 @@ func (n *Node) Stop() { log.Info("Stopping Coordinator...") n.coord.Stop() } + // Close kv DBs + n.sync.StateDB().Close() + if n.mode == ModeCoordinator { + n.coord.TxSelector().LocalAccountsDB().Close() + n.coord.BatchBuilder().LocalStateDB().Close() + } } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 811d7df..ad03122 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -262,6 +262,11 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History return s, s.init() } +// StateDB returns the inner StateDB +func (s *Synchronizer) StateDB() *statedb.StateDB { + return s.stateDB +} + // Stats returns a copy of the Synchronizer Stats. It is safe to call Stats() // during a Sync call func (s *Synchronizer) Stats() *Stats {