Improve stateDB reliability

- Close StateDB when stopping the node
- Lock the StateDB when doing checkpoints to avoid multiple instances of
  oppening the pebble DB at the same time.
This commit is contained in:
Eduard S
2021-02-01 16:00:16 +01:00
parent f0886b3d2a
commit 8517e6afa0
4 changed files with 45 additions and 14 deletions

View File

@@ -172,6 +172,16 @@ func NewCoordinator(cfg Config,
return &c, nil return &c, nil
} }
// TxSelector returns the inner TxSelector
func (c *Coordinator) TxSelector() *txselector.TxSelector {
return c.txSelector
}
// BatchBuilder returns the inner BatchBuilder
func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
return c.batchBuilder
}
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts)

View File

@@ -8,6 +8,7 @@ import (
"path" "path"
"sort" "sort"
"strings" "strings"
"sync"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/log"
@@ -40,6 +41,7 @@ type KVDB struct {
CurrentIdx common.Idx CurrentIdx common.Idx
CurrentBatch common.BatchNum CurrentBatch common.BatchNum
keep int keep int
m sync.Mutex
} }
// NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
@@ -141,10 +143,8 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
return nil return nil
} }
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// copy 'BatchNumX' to 'current' // copy 'BatchNumX' to 'current'
err = pebbleMakeCheckpoint(checkpointPath, currentPath) if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -212,22 +212,13 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// use checkpoint from synchronizerKVDB
synchronizerCheckpointPath := path.Join(synchronizerKVDB.path,
fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) {
// if synchronizerKVDB does not have checkpoint at batchNum, return err
return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" not exist in Synchronizer",
synchronizerCheckpointPath))
}
// copy synchronizer'BatchNumX' to 'BatchNumX' // copy synchronizer'BatchNumX' to 'BatchNumX'
err = pebbleMakeCheckpoint(synchronizerCheckpointPath, checkpointPath) if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
// copy 'BatchNumX' to 'current' // copy 'BatchNumX' to 'current'
err = pebbleMakeCheckpoint(checkpointPath, currentPath) err = kvdb.MakeCheckpointFromTo(batchNum, currentPath)
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -412,6 +403,25 @@ func (kvdb *KVDB) deleteOldCheckpoints() error {
return nil return nil
} }
// MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
// to the dest folder. This method is locking, so it can be called from
// multiple places at the same time.
func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
if _, err := os.Stat(source); os.IsNotExist(err) {
// if kvdb does not have checkpoint at batchNum, return err
return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
}
// By locking we allow calling MakeCheckpointFromTo from multiple
// places at the same time for the same stateDB. This allows the
// synchronizer to do a reset to a batchNum at the same time as the
// pipeline is doing a txSelector.Reset and batchBuilder.Reset from
// synchronizer to the same batchNum
kvdb.m.Lock()
defer kvdb.m.Unlock()
return pebbleMakeCheckpoint(source, dest)
}
func pebbleMakeCheckpoint(source, dest string) error { func pebbleMakeCheckpoint(source, dest string) error {
// Remove dest folder (if it exists) before doing the checkpoint // Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) { if _, err := os.Stat(dest); !os.IsNotExist(err) {

View File

@@ -655,4 +655,10 @@ func (n *Node) Stop() {
log.Info("Stopping Coordinator...") log.Info("Stopping Coordinator...")
n.coord.Stop() n.coord.Stop()
} }
// Close kv DBs
n.sync.StateDB().Close()
if n.mode == ModeCoordinator {
n.coord.TxSelector().LocalAccountsDB().Close()
n.coord.BatchBuilder().LocalStateDB().Close()
}
} }

View File

@@ -262,6 +262,11 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History
return s, s.init() return s, s.init()
} }
// StateDB returns the inner StateDB
func (s *Synchronizer) StateDB() *statedb.StateDB {
return s.stateDB
}
// Stats returns a copy of the Synchronizer Stats. It is safe to call Stats() // Stats returns a copy of the Synchronizer Stats. It is safe to call Stats()
// during a Sync call // during a Sync call
func (s *Synchronizer) Stats() *Stats { func (s *Synchronizer) Stats() *Stats {