From aca106a2ee564520caeb004a67ff18e13fbd8d3f Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 10 Feb 2021 16:15:05 +0100 Subject: [PATCH] WIP5 --- batchbuilder/batchbuilder.go | 2 +- common/batch.go | 1 + common/ethauction.go | 5 +- config/config.go | 4 +- coordinator/coordinator.go | 55 +++++++++------- coordinator/pipeline.go | 115 +++++++++++++++++++++++++++++---- coordinator/txmanager.go | 12 +++- db/historydb/historydb.go | 15 ++++- db/historydb/historydb_test.go | 6 ++ db/kvdb/kvdb.go | 29 +++++++-- db/statedb/statedb.go | 6 ++ txselector/txselector.go | 8 +-- 12 files changed, 200 insertions(+), 58 deletions(-) diff --git a/batchbuilder/batchbuilder.go b/batchbuilder/batchbuilder.go index 8f662fe..ed57501 100644 --- a/batchbuilder/batchbuilder.go +++ b/batchbuilder/batchbuilder.go @@ -54,7 +54,7 @@ func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchN // copy of the rollup state from the Synchronizer at that `batchNum`, otherwise // it can just roll back the internal copy. func (bb *BatchBuilder) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { - return bb.localStateDB.Reset(batchNum, fromSynchronizer) + return tracerr.Wrap(bb.localStateDB.Reset(batchNum, fromSynchronizer)) } // BuildBatch takes the transactions and returns the common.ZKInputs of the next batch diff --git a/common/batch.go b/common/batch.go index 8c687ed..114010d 100644 --- a/common/batch.go +++ b/common/batch.go @@ -27,6 +27,7 @@ type Batch struct { TotalFeesUSD *float64 `meddler:"total_fees_usd"` } +// NewEmptyBatch creates a new empty batch func NewEmptyBatch() *Batch { return &Batch{ BatchNum: 0, diff --git a/common/ethauction.go b/common/ethauction.go index edcf9a2..9425ea4 100644 --- a/common/ethauction.go +++ b/common/ethauction.go @@ -32,10 +32,9 @@ type AuctionConstants struct { func (c *AuctionConstants) SlotNum(blockNum int64) int64 { if blockNum >= c.GenesisBlockNum { return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) - } else { - // This result will be negative - return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } + // This result will be negative + return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } // SlotBlocks returns the first and the last block numbers included in that slot diff --git a/config/config.go b/config/config.go index 5aa92ac..6d13736 100644 --- a/config/config.go +++ b/config/config.go @@ -56,7 +56,7 @@ type Coordinator struct { // starting the pipeline when we reach a slot in which we can forge. StartSlotBlocksDelay int64 // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which - // the forger address is checked to be allowed to forge (appart from + // the forger address is checked to be allowed to forge (apart from // checking the next block), used to decide when to stop scheduling new // batches (by stopping the pipeline). // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck @@ -66,7 +66,7 @@ type Coordinator struct { // scheduling a batch and having it mined. ScheduleBatchBlocksAheadCheck int64 // SendBatchBlocksMarginCheck is the number of margin blocks ahead in - // which the coordinator is also checked to be allowed to forge, appart + // which the coordinator is also checked to be allowed to forge, apart // from the next block; used to decide when to stop sending batches to // the smart contract. // For example, if we are at block 10 and SendBatchBlocksMarginCheck is diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 85530d6..d706cb9 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -5,7 +5,6 @@ import ( "fmt" "math/big" "os" - "strings" "sync" "time" @@ -47,7 +46,7 @@ type Config struct { // starting the pipeline when we reach a slot in which we can forge. StartSlotBlocksDelay int64 // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which - // the forger address is checked to be allowed to forge (appart from + // the forger address is checked to be allowed to forge (apart from // checking the next block), used to decide when to stop scheduling new // batches (by stopping the pipeline). // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck @@ -57,7 +56,7 @@ type Config struct { // scheduling a batch and having it mined. ScheduleBatchBlocksAheadCheck int64 // SendBatchBlocksMarginCheck is the number of margin blocks ahead in - // which the coordinator is also checked to be allowed to forge, appart + // which the coordinator is also checked to be allowed to forge, apart // from the next block; used to decide when to stop sending batches to // the smart contract. // For example, if we are at block 10 and SendBatchBlocksMarginCheck is @@ -139,7 +138,8 @@ type Coordinator struct { wg sync.WaitGroup cancel context.CancelFunc - pipeline *Pipeline + pipeline *Pipeline + lastNonFailedBatchNum common.BatchNum purger *Purger txManager *TxManager @@ -233,7 +233,7 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { c.pipelineNum++ return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector, - c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) + c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts) } // MsgSyncBlock indicates an update to the Synchronizer stats @@ -254,6 +254,9 @@ type MsgSyncReorg struct { // MsgStopPipeline indicates a signal to reset the pipeline type MsgStopPipeline struct { Reason string + // FailedBatchNum indicates the first batchNum that faile in the + // pipeline. If FailedBatchNum is 0, it should be ignored. + FailedBatchNum common.BatchNum } // SendMsg is a thread safe method to pass a message to the Coordinator @@ -342,6 +345,9 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) log.Infow("Coordinator: forging state begin", "block", stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) batchNum := stats.Sync.LastBatch.BatchNum + if c.lastNonFailedBatchNum > batchNum { + batchNum = c.lastNonFailedBatchNum + } var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) @@ -367,19 +373,17 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) // return err // } // } - if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) { - if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil { - return tracerr.Wrap(err) - } - } - _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), - stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) - if err != nil { + // if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) { + // if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil { + // return tracerr.Wrap(err) + // } + // } + if _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), + stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)); err != nil { return tracerr.Wrap(err) } - _, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, - int64(stats.Sync.LastBatch.BatchNum)) - if err != nil { + if _, err := c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, + int64(stats.Sync.LastBatch.BatchNum)); err != nil { return tracerr.Wrap(err) } } @@ -419,24 +423,29 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error "sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot, "pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot) c.txManager.DiscardPipeline(ctx, c.pipelineNum) - if err := c.handleStopPipeline(ctx, "reorg"); err != nil { + if err := c.handleStopPipeline(ctx, "reorg", 0); err != nil { return tracerr.Wrap(err) } } return nil } -func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error { - if err := c.l2DB.Reorg(c.stats.Sync.LastBatch.BatchNum); err != nil { +// handleStopPipeline handles stopping the pipeline. If failedBatchNum is 0, +// the next pipeline will start from the last state of the synchronizer, +// otherwise, it will state from failedBatchNum-1. +func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string, failedBatchNum common.BatchNum) error { + batchNum := c.stats.Sync.LastBatch.BatchNum + if failedBatchNum != 0 { + batchNum = failedBatchNum - 1 + } + if err := c.l2DB.Reorg(batchNum); err != nil { return tracerr.Wrap(err) } if c.pipeline != nil { c.pipeline.Stop(c.ctx) c.pipeline = nil } - if strings.Contains(reason, common.AuctionErrMsgCannotForge) { //nolint:staticcheck - // TODO: Check that we are in a slot in which we can't forge - } + c.lastNonFailedBatchNum = batchNum return nil } @@ -452,7 +461,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error { } case MsgStopPipeline: log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason) - if err := c.handleStopPipeline(ctx, msg.Reason); err != nil { + if err := c.handleStopPipeline(ctx, msg.Reason, msg.FailedBatchNum); err != nil { return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err)) } default: diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 8ff4ff3..91c1aa1 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "database/sql" "fmt" "math/big" "sync" @@ -41,10 +42,13 @@ type Pipeline struct { // batchNum common.BatchNum // lastScheduledL1BatchBlockNum int64 // lastForgeL1TxsNum int64 - started bool + started bool + rw sync.RWMutex + errAtBatchNum common.BatchNum proversPool *ProversPool provers []prover.Client + coord *Coordinator txManager *TxManager historyDB *historydb.HistoryDB l2DB *l2db.L2DB @@ -61,6 +65,18 @@ type Pipeline struct { cancel context.CancelFunc } +func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) { + p.rw.Lock() + defer p.rw.Unlock() + p.errAtBatchNum = batchNum +} + +func (p *Pipeline) getErrAtBatchNum() common.BatchNum { + p.rw.RLock() + defer p.rw.RUnlock() + return p.errAtBatchNum +} + // NewPipeline creates a new Pipeline func NewPipeline(ctx context.Context, cfg Config, @@ -70,6 +86,7 @@ func NewPipeline(ctx context.Context, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, purger *Purger, + coord *Coordinator, txManager *TxManager, provers []prover.Client, scConsts *synchronizer.SCConsts, @@ -97,6 +114,7 @@ func NewPipeline(ctx context.Context, provers: provers, proversPool: proversPool, purger: purger, + coord: coord, txManager: txManager, consts: *scConsts, statsVarsCh: make(chan statsVars, queueLen), @@ -122,14 +140,54 @@ func (p *Pipeline) reset(batchNum common.BatchNum, p.stats = *stats p.vars = *vars - err := p.txSelector.Reset(p.state.batchNum) + // Reset the StateDB in TxSelector and BatchBuilder from the + // synchronizer only if the checkpoint we reset from either: + // a. Doesn't exist in the TxSelector/BatchBuilder + // b. The batch has already been synced by the synchronizer and has a + // different MTRoot than the BatchBuilder + // Otherwise, reset from the local checkpoint. + + // First attempt to reset from local checkpoint if such checkpoint exists + existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum) if err != nil { return tracerr.Wrap(err) } - err = p.batchBuilder.Reset(p.state.batchNum, true) + fromSynchronizerTxSelector := !existsTxSelector + if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil { + return tracerr.Wrap(err) + } + existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum) if err != nil { return tracerr.Wrap(err) } + fromSynchronizerBatchBuilder := !existsBatchBuilder + if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil { + return tracerr.Wrap(err) + } + + // After reset, check that if the batch exists in the historyDB, the + // stateRoot matches with the local one, if not, force a reset from + // synchronizer + batch, err := p.historyDB.GetBatch(p.state.batchNum) + if tracerr.Unwrap(err) == sql.ErrNoRows { + // nothing to do + } else if err != nil { + return tracerr.Wrap(err) + } else { + localStateRoot := p.batchBuilder.LocalStateDB().MT.Root().BigInt() + if batch.StateRoot.Cmp(localStateRoot) != 0 { + log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+ + "Forcing reset from Synchronizer", localStateRoot, batch.StateRoot) + // StateRoot from synchronizer doesn't match StateRoot + // from batchBuilder, force a reset from synchronizer + if err := p.txSelector.Reset(p.state.batchNum, true); err != nil { + return tracerr.Wrap(err) + } + if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil { + return tracerr.Wrap(err) + } + } + } return nil } @@ -203,14 +261,31 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) case <-time.After(waitDuration): + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + waitDuration = p.cfg.ForgeRetryInterval + continue + } batchNum = p.state.batchNum + 1 batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { continue + } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + waitDuration = p.cfg.ForgeRetryInterval + continue } else if err != nil { - waitDuration = p.cfg.SyncRetryInterval + p.setErrAtBatchNum(batchNum) + waitDuration = p.cfg.ForgeRetryInterval + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.handleForgBatch: %v", err), + FailedBatchNum: batchNum, + }) continue } + p.state.batchNum = batchNum select { case batchChSentServerProof <- batchInfo: @@ -229,16 +304,28 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.wg.Done() return case batchInfo := <-batchChSentServerProof: + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + continue + } err := p.waitServerProof(p.ctx, batchInfo) - // We are done with this serverProof, add it back to the pool - p.proversPool.Add(p.ctx, batchInfo.ServerProof) batchInfo.ServerProof = nil if p.ctx.Err() != nil { continue } else if err != nil { log.Errorw("waitServerProof", "err", err) + p.setErrAtBatchNum(batchInfo.BatchNum) + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.waitServerProof: %v", err), + FailedBatchNum: batchInfo.BatchNum, + }) continue } + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(p.ctx, batchInfo.ServerProof) p.txManager.AddBatch(p.ctx, batchInfo) } } @@ -304,17 +391,14 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var auths [][]byte var coordIdxs []common.Idx + // TODO: If there are no txs and we are behind the timeout, skip + // forging a batch and return a particular error that can be handleded + // in the loop where handleForgeBatch is called to retry after an + // interval + // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true - defer func() { - // If there's no error, update the parameters related - // to the last L1Batch forged - if err == nil { - p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.state.lastForgeL1TxsNum++ - } - }() if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { return nil, tracerr.Wrap(errLastL1BatchNotSynced) } @@ -328,6 +412,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } + + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index ea63aa0..d957779 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -183,7 +183,8 @@ func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error { func addPerc(v *big.Int, p int64) *big.Int { r := new(big.Int).Set(v) r.Mul(r, big.NewInt(p)) - r.Div(r, big.NewInt(100)) + // nolint reason: to calculate percetnages we divide by 100 + r.Div(r, big.NewInt(100)) //nolit:gomnd return r.Add(v, r) } @@ -352,12 +353,14 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i // TODO: // - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions) +// Queue of BatchInfos type Queue struct { list []*BatchInfo // nonceByBatchNum map[common.BatchNum]uint64 next int } +// NewQueue returns a new queue func NewQueue() Queue { return Queue{ list: make([]*BatchInfo, 0), @@ -366,10 +369,12 @@ func NewQueue() Queue { } } +// Len is the length of the queue func (q *Queue) Len() int { return len(q.list) } +// At returns the BatchInfo at position (or nil if position is out of bounds) func (q *Queue) At(position int) *BatchInfo { if position >= len(q.list) { return nil @@ -377,6 +382,7 @@ func (q *Queue) At(position int) *BatchInfo { return q.list[position] } +// Next returns the next BatchInfo (or nil if queue is empty) func (q *Queue) Next() (int, *BatchInfo) { if len(q.list) == 0 { return 0, nil @@ -385,6 +391,7 @@ func (q *Queue) Next() (int, *BatchInfo) { return q.next, q.list[q.next] } +// Remove removes the BatchInfo at position func (q *Queue) Remove(position int) { // batchInfo := q.list[position] // delete(q.nonceByBatchNum, batchInfo.BatchNum) @@ -396,6 +403,7 @@ func (q *Queue) Remove(position int) { } } +// Push adds a new BatchInfo func (q *Queue) Push(batchInfo *BatchInfo) { q.list = append(q.list, batchInfo) // q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce() @@ -517,8 +525,8 @@ func (t *TxManager) Run(ctx context.Context) { Reason: fmt.Sprintf("forgeBatch resend: %v", err)}) continue } - } + if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { log.Debugw("TxManager: forgeBatch tx confirmed", "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 1484ad1..0949299 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -166,6 +166,19 @@ func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error { return nil } +// GetBatch returns the batch with the given batchNum +func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error) { + var batch common.Batch + err := meddler.QueryRow( + hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, + batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, + batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, + batch.slot_num, batch.total_fees_usd FROM batch WHERE batch_num = $1;`, + batchNum, + ) + return &batch, err +} + // GetBatchAPI return the batch with the given batchNum func (hdb *HistoryDB) GetBatchAPI(batchNum common.BatchNum) (*BatchAPI, error) { batch := &BatchAPI{} @@ -320,7 +333,7 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) { return batchNum, tracerr.Wrap(row.Scan(&batchNum)) } -// GetLastBatchreturns the last forged batch +// GetLastBatch returns the last forged batch func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) { var batch common.Batch err := meddler.QueryRow( diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index b2f6941..95e552d 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -212,6 +212,12 @@ func TestBatches(t *testing.T) { fetchedLastL1BatchBlockNum, err := historyDB.GetLastL1BatchBlockNum() assert.NoError(t, err) assert.Equal(t, lastL1BatchBlockNum, fetchedLastL1BatchBlockNum) + // Test GetBatch + fetchedBatch, err := historyDB.GetBatch(1) + require.NoError(t, err) + assert.Equal(t, &batches[0], fetchedBatch) + _, err = historyDB.GetBatch(common.BatchNum(len(batches) + 1)) + assert.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err)) } func TestBids(t *testing.T) { diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index c2f5633..a367bf3 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -425,12 +425,13 @@ func (k *KVDB) MakeCheckpoint() error { } // if checkpoint BatchNum already exist in disk, delete it - if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) { + if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { + } else if err != nil { + return tracerr.Wrap(err) + } else { if err := os.RemoveAll(checkpointPath); err != nil { return tracerr.Wrap(err) } - } else if err != nil && !os.IsNotExist(err) { - return tracerr.Wrap(err) } // execute Checkpoint @@ -451,12 +452,25 @@ func (k *KVDB) MakeCheckpoint() error { return nil } +// CheckpointExists returns true if the checkpoint exists +func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + if _, err := os.Stat(source); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + // DeleteCheckpoint removes if exist the checkpoint of the given batchNum func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) + } else if err != nil { + return tracerr.Wrap(err) } return os.RemoveAll(checkpointPath) @@ -520,6 +534,8 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) + } else if err != nil { + return tracerr.Wrap(err) } // By locking we allow calling MakeCheckpointFromTo from multiple // places at the same time for the same stateDB. This allows the @@ -533,12 +549,13 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint - if _, err := os.Stat(dest); !os.IsNotExist(err) { + if _, err := os.Stat(dest); os.IsNotExist(err) { + } else if err != nil { + return tracerr.Wrap(err) + } else { if err := os.RemoveAll(dest); err != nil { return tracerr.Wrap(err) } - } else if err != nil && !os.IsNotExist(err) { - return tracerr.Wrap(err) } sto, err := pebble.NewPebbleStorage(source, false) diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index bcf44a5..12c8200 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -498,11 +498,17 @@ func NewLocalStateDB(cfg Config, synchronizerDB *StateDB) (*LocalStateDB, error) }, nil } +// CheckpointExists returns true if the checkpoint exists +func (l *LocalStateDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + return l.db.CheckpointExists(batchNum) +} + // Reset performs a reset in the LocaStateDB. If fromSynchronizer is true, it // gets the state from LocalStateDB.synchronizerStateDB for the given batchNum. // If fromSynchronizer is false, get the state from LocalStateDB checkpoints. func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { if fromSynchronizer { + log.Debugw("Making StateDB ResetFromSynchronizer", "batch", batchNum, "type", l.cfg.Type) if err := l.db.ResetFromSynchronizer(batchNum, l.synchronizerStateDB.db); err != nil { return tracerr.Wrap(err) } diff --git a/txselector/txselector.go b/txselector/txselector.go index c3780cd..248bfe2 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -89,12 +89,8 @@ func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB { // Reset tells the TxSelector to get it's internal AccountsDB // from the required `batchNum` -func (txsel *TxSelector) Reset(batchNum common.BatchNum) error { - err := txsel.localAccountsDB.Reset(batchNum, true) - if err != nil { - return tracerr.Wrap(err) - } - return nil +func (txsel *TxSelector) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { + return tracerr.Wrap(txsel.localAccountsDB.Reset(batchNum, fromSynchronizer)) } func (txsel *TxSelector) getCoordIdx(tokenID common.TokenID) (common.Idx, error) {