diff --git a/batchbuilder/batchbuilder.go b/batchbuilder/batchbuilder.go index 8f662fe..ed57501 100644 --- a/batchbuilder/batchbuilder.go +++ b/batchbuilder/batchbuilder.go @@ -54,7 +54,7 @@ func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchN // copy of the rollup state from the Synchronizer at that `batchNum`, otherwise // it can just roll back the internal copy. func (bb *BatchBuilder) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { - return bb.localStateDB.Reset(batchNum, fromSynchronizer) + return tracerr.Wrap(bb.localStateDB.Reset(batchNum, fromSynchronizer)) } // BuildBatch takes the transactions and returns the common.ZKInputs of the next batch diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 8e33845..dd63f9d 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -41,12 +41,15 @@ TokenHEZ = "0x5D94e3e7aeC542aB0F9129B9a7BAdeb5B3Ca0f77" TokenHEZName = "Hermez Network Token" [Coordinator] -# ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordinator +ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordinator # ForgerAddressPrivateKey = "0x30f5fddb34cd4166adb2c6003fa6b18f380fd2341376be42cf1c7937004ac7a3" -ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator +# ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator # ForgerAddressPrivateKey = "0xa8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563" ConfirmBlocks = 10 -L1BatchTimeoutPerc = 0.999 +L1BatchTimeoutPerc = 0.6 +StartSlotBlocksDelay = 2 +ScheduleBatchBlocksAheadCheck = 3 +SendBatchBlocksMarginCheck = 1 ProofServerPollInterval = "1s" ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" @@ -85,8 +88,11 @@ ReceiptLoopInterval = "500ms" CheckLoopInterval = "500ms" Attempts = 4 AttemptsDelay = "500ms" +TxResendTimeout = "2m" +NoReuseNonce = false CallGasLimit = 300000 GasPriceDiv = 100 +MaxGasPrice = "5000000000" [Coordinator.EthClient.Keystore] Path = "/tmp/iden3-test/hermez/ethkeystore" diff --git a/common/batch.go b/common/batch.go index a6e11ce..972b53a 100644 --- a/common/batch.go +++ b/common/batch.go @@ -27,6 +27,24 @@ type Batch struct { TotalFeesUSD *float64 `meddler:"total_fees_usd"` } +// NewEmptyBatch creates a new empty batch +func NewEmptyBatch() *Batch { + return &Batch{ + BatchNum: 0, + EthBlockNum: 0, + ForgerAddr: ethCommon.Address{}, + CollectedFees: make(map[TokenID]*big.Int), + FeeIdxsCoordinator: make([]Idx, 0), + StateRoot: big.NewInt(0), + NumAccounts: 0, + LastIdx: 0, + ExitRoot: big.NewInt(0), + ForgeL1TxsNum: nil, + SlotNum: 0, + TotalFeesUSD: nil, + } +} + // BatchNum identifies a batch type BatchNum int64 diff --git a/common/ethauction.go b/common/ethauction.go index 0292407..9425ea4 100644 --- a/common/ethauction.go +++ b/common/ethauction.go @@ -33,7 +33,8 @@ func (c *AuctionConstants) SlotNum(blockNum int64) int64 { if blockNum >= c.GenesisBlockNum { return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } - return -1 + // This result will be negative + return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } // SlotBlocks returns the first and the last block numbers included in that slot diff --git a/config/config.go b/config/config.go index 92330c8..b9aa213 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "io/ioutil" + "math/big" "time" "github.com/BurntSushi/toml" @@ -51,6 +52,27 @@ type Coordinator struct { // L1BatchTimeoutPerc is the portion of the range before the L1Batch // timeout that will trigger a schedule to forge an L1Batch L1BatchTimeoutPerc float64 `validate:"required"` + // StartSlotBlocksDelay is the number of blocks of delay to wait before + // starting the pipeline when we reach a slot in which we can forge. + StartSlotBlocksDelay int64 + // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which + // the forger address is checked to be allowed to forge (apart from + // checking the next block), used to decide when to stop scheduling new + // batches (by stopping the pipeline). + // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck + // is 5, eventhough at block 11 we canForge, the pipeline will be + // stopped if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // scheduling a batch and having it mined. + ScheduleBatchBlocksAheadCheck int64 + // SendBatchBlocksMarginCheck is the number of margin blocks ahead in + // which the coordinator is also checked to be allowed to forge, apart + // from the next block; used to decide when to stop sending batches to + // the smart contract. + // For example, if we are at block 10 and SendBatchBlocksMarginCheck is + // 5, eventhough at block 11 we canForge, the batch will be discarded + // if we can't forge at block 15. + SendBatchBlocksMarginCheck int64 // ProofServerPollInterval is the waiting interval between polling the // ProofServer while waiting for a particular status ProofServerPollInterval Duration `validate:"required"` @@ -65,19 +87,33 @@ type Coordinator struct { // SafetyPeriod is the number of batches after which // non-pending L2Txs are deleted from the pool SafetyPeriod common.BatchNum `validate:"required"` - // MaxTxs is the number of L2Txs that once reached triggers - // deletion of old L2Txs + // MaxTxs is the maximum number of pending L2Txs that can be + // stored in the pool. Once this number of pending L2Txs is + // reached, inserts to the pool will be denied until some of + // the pending txs are forged. MaxTxs uint32 `validate:"required"` // TTL is the Time To Live for L2Txs in the pool. Once MaxTxs // L2Txs is reached, L2Txs older than TTL will be deleted. TTL Duration `validate:"required"` - // PurgeBatchDelay is the delay between batches to purge outdated transactions + // PurgeBatchDelay is the delay between batches to purge + // outdated transactions. Oudated L2Txs are those that have + // been forged or marked as invalid for longer than the + // SafetyPeriod and pending L2Txs that have been in the pool + // for longer than TTL once there are MaxTxs. PurgeBatchDelay int64 `validate:"required"` - // InvalidateBatchDelay is the delay between batches to mark invalid transactions + // InvalidateBatchDelay is the delay between batches to mark + // invalid transactions due to nonce lower than the account + // nonce. InvalidateBatchDelay int64 `validate:"required"` - // PurgeBlockDelay is the delay between blocks to purge outdated transactions + // PurgeBlockDelay is the delay between blocks to purge + // outdated transactions. Oudated L2Txs are those that have + // been forged or marked as invalid for longer than the + // SafetyPeriod and pending L2Txs that have been in the pool + // for longer than TTL once there are MaxTxs. PurgeBlockDelay int64 `validate:"required"` - // InvalidateBlockDelay is the delay between blocks to mark invalid transactions + // InvalidateBlockDelay is the delay between blocks to mark + // invalid transactions due to nonce lower than the account + // nonce. InvalidateBlockDelay int64 `validate:"required"` } `validate:"required"` TxSelector struct { @@ -101,6 +137,9 @@ type Coordinator struct { // calls, except for methods where a particular gas limit is // harcoded because it's known to be a big value CallGasLimit uint64 `validate:"required"` + // MaxGasPrice is the maximum gas price allowed for ethereum + // transactions + MaxGasPrice *big.Int `validate:"required"` // GasPriceDiv is the gas price division GasPriceDiv uint64 `validate:"required"` // CheckLoopInterval is the waiting interval between receipt @@ -112,6 +151,13 @@ type Coordinator struct { // AttemptsDelay is delay between attempts do do an eth client // RPC call AttemptsDelay Duration `validate:"required"` + // TxResendTimeout is the timeout after which a non-mined + // ethereum transaction will be resent (reusing the nonce) with + // a newly calculated gas price + TxResendTimeout Duration `validate:"required"` + // NoReuseNonce disables reusing nonces of pending transactions for + // new replacement transactions + NoReuseNonce bool // Keystore is the ethereum keystore where private keys are kept Keystore struct { // Path to the keystore diff --git a/coordinator/batch.go b/coordinator/batch.go index b2acf67..56a4088 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -47,6 +47,8 @@ type Debug struct { MineBlockNum int64 // SendBlockNum is the blockNum when the batch was sent to ethereum SendBlockNum int64 + // ResendNum is the number of times the tx has been resent + ResendNum int // LastScheduledL1BatchBlockNum is the blockNum when the last L1Batch // was scheduled LastScheduledL1BatchBlockNum int64 @@ -64,10 +66,17 @@ type Debug struct { // StartToSendDelay is the delay between starting a batch and sending // it to ethereum, in seconds StartToSendDelay float64 + // StartToMineDelay is the delay between starting a batch and having + // it mined in seconds + StartToMineDelay float64 + // SendToMineDelay is the delay between sending a batch tx and having + // it mined in seconds + SendToMineDelay float64 } // BatchInfo contans the Batch information type BatchInfo struct { + PipelineNum int BatchNum common.BatchNum ServerProof prover.Client ZKInputs *common.ZKInputs @@ -82,9 +91,16 @@ type BatchInfo struct { CoordIdxs []common.Idx ForgeBatchArgs *eth.RollupForgeBatchArgs // FeesInfo - EthTx *types.Transaction - Receipt *types.Receipt - Debug Debug + EthTx *types.Transaction + EthTxErr error + // SendTimestamp the time of batch sent to ethereum + SendTimestamp time.Time + Receipt *types.Receipt + // Fail is true if: + // - The receipt status is failed + // - A previous parent batch is failed + Fail bool + Debug Debug } // DebugStore is a debug function to store the BatchInfo as a json text file in diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index ee6caf9..0c2705d 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -3,8 +3,8 @@ package coordinator import ( "context" "fmt" + "math/big" "os" - "strings" "sync" "time" @@ -42,6 +42,29 @@ type Config struct { // L1BatchTimeoutPerc is the portion of the range before the L1Batch // timeout that will trigger a schedule to forge an L1Batch L1BatchTimeoutPerc float64 + // StartSlotBlocksDelay is the number of blocks of delay to wait before + // starting the pipeline when we reach a slot in which we can forge. + StartSlotBlocksDelay int64 + // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which + // the forger address is checked to be allowed to forge (apart from + // checking the next block), used to decide when to stop scheduling new + // batches (by stopping the pipeline). + // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck + // is 5, eventhough at block 11 we canForge, the pipeline will be + // stopped if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // scheduling a batch and having it mined. + ScheduleBatchBlocksAheadCheck int64 + // SendBatchBlocksMarginCheck is the number of margin blocks ahead in + // which the coordinator is also checked to be allowed to forge, apart + // from the next block; used to decide when to stop sending batches to + // the smart contract. + // For example, if we are at block 10 and SendBatchBlocksMarginCheck is + // 5, eventhough at block 11 we canForge, the batch will be discarded + // if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // sending a batch and having it mined. + SendBatchBlocksMarginCheck int64 // EthClientAttempts is the number of attempts to do an eth client RPC // call before giving up EthClientAttempts int @@ -54,13 +77,25 @@ type Config struct { // EthClientAttemptsDelay is delay between attempts do do an eth client // RPC call EthClientAttemptsDelay time.Duration + // EthTxResendTimeout is the timeout after which a non-mined ethereum + // transaction will be resent (reusing the nonce) with a newly + // calculated gas price + EthTxResendTimeout time.Duration + // EthNoReuseNonce disables reusing nonces of pending transactions for + // new replacement transactions + EthNoReuseNonce bool + // MaxGasPrice is the maximum gas price allowed for ethereum + // transactions + MaxGasPrice *big.Int // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration // DebugBatchPath if set, specifies the path where batchInfo is stored // in JSON in every step/update of the pipeline - DebugBatchPath string - Purger PurgerCfg + DebugBatchPath string + Purger PurgerCfg + // VerifierIdx is the index of the verifier contract registered in the + // smart contract VerifierIdx uint8 TxProcessorConfig txprocessor.Config } @@ -74,15 +109,22 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) { } } +type fromBatch struct { + BatchNum common.BatchNum + ForgerAddr ethCommon.Address + StateRoot *big.Int +} + // Coordinator implements the Coordinator type type Coordinator struct { // State - pipelineBatchNum common.BatchNum // batchNum from which we started the pipeline - provers []prover.Client - consts synchronizer.SCConsts - vars synchronizer.SCVariables - stats synchronizer.Stats - started bool + pipelineNum int // Pipeline sequential number. The first pipeline is 1 + pipelineFromBatch fromBatch // batch from which we started the pipeline + provers []prover.Client + consts synchronizer.SCConsts + vars synchronizer.SCVariables + stats synchronizer.Stats + started bool cfg Config @@ -96,7 +138,8 @@ type Coordinator struct { wg sync.WaitGroup cancel context.CancelFunc - pipeline *Pipeline + pipeline *Pipeline + lastNonFailedBatchNum common.BatchNum purger *Purger txManager *TxManager @@ -139,10 +182,15 @@ func NewCoordinator(cfg Config, ctx, cancel := context.WithCancel(context.Background()) c := Coordinator{ - pipelineBatchNum: -1, - provers: serverProofs, - consts: *scConsts, - vars: *initSCVars, + pipelineNum: 0, + pipelineFromBatch: fromBatch{ + BatchNum: 0, + ForgerAddr: ethCommon.Address{}, + StateRoot: big.NewInt(0), + }, + provers: serverProofs, + consts: *scConsts, + vars: *initSCVars, cfg: cfg, @@ -183,8 +231,9 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { } func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { - return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, - c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) + c.pipelineNum++ + return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector, + c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts) } // MsgSyncBlock indicates an update to the Synchronizer stats @@ -205,6 +254,9 @@ type MsgSyncReorg struct { // MsgStopPipeline indicates a signal to reset the pipeline type MsgStopPipeline struct { Reason string + // FailedBatchNum indicates the first batchNum that failed in the + // pipeline. If FailedBatchNum is 0, it should be ignored. + FailedBatchNum common.BatchNum } // SendMsg is a thread safe method to pass a message to the Coordinator @@ -215,27 +267,36 @@ func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) { } } -func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - c.vars.Rollup = *vars.Rollup +func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariablesPtr) { + if update.Rollup != nil { + vars.Rollup = *update.Rollup } - if vars.Auction != nil { - c.vars.Auction = *vars.Auction + if update.Auction != nil { + vars.Auction = *update.Auction } - if vars.WDelayer != nil { - c.vars.WDelayer = *vars.WDelayer + if update.WDelayer != nil { + vars.WDelayer = *update.WDelayer } } +func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { + updateSCVars(&c.vars, vars) +} + func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables, currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64) bool { + if blockNum < auctionConstants.GenesisBlockNum { + log.Infow("canForge: requested blockNum is < genesis", "blockNum", blockNum, + "genesis", auctionConstants.GenesisBlockNum) + return false + } var slot *common.Slot if currentSlot.StartBlock <= blockNum && blockNum <= currentSlot.EndBlock { slot = currentSlot } else if nextSlot.StartBlock <= blockNum && blockNum <= nextSlot.EndBlock { slot = nextSlot } else { - log.Warnw("Coordinator: requested blockNum for canForge is outside slot", + log.Warnw("canForge: requested blockNum is outside current and next slot", "blockNum", blockNum, "currentSlot", currentSlot, "nextSlot", nextSlot, ) @@ -244,16 +305,23 @@ func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.Auc anyoneForge := false if !slot.ForgerCommitment && auctionConstants.RelativeBlock(blockNum) >= int64(auctionVars.SlotDeadline) { - log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)", + log.Debugw("canForge: anyone can forge in the current slot (slotDeadline passed)", "block", blockNum) anyoneForge = true } if slot.Forger == addr || anyoneForge { return true } + log.Debugw("canForge: can't forge", "slot.Forger", slot.Forger) return false } +func (c *Coordinator) canForgeAt(blockNum int64) bool { + return canForge(&c.consts.Auction, &c.vars.Auction, + &c.stats.Sync.Auction.CurrentSlot, &c.stats.Sync.Auction.NextSlot, + c.cfg.ForgerAddress, blockNum) +} + func (c *Coordinator) canForge() bool { blockNum := c.stats.Eth.LastBlock.Num + 1 return canForge(&c.consts.Auction, &c.vars.Auction, @@ -262,12 +330,24 @@ func (c *Coordinator) canForge() bool { } func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error { - canForge := c.canForge() + nextBlock := c.stats.Eth.LastBlock.Num + 1 + canForge := c.canForgeAt(nextBlock) + if c.cfg.ScheduleBatchBlocksAheadCheck != 0 && canForge { + canForge = c.canForgeAt(nextBlock + c.cfg.ScheduleBatchBlocksAheadCheck) + } if c.pipeline == nil { - if canForge { + relativeBlock := c.consts.Auction.RelativeBlock(nextBlock) + if canForge && relativeBlock < c.cfg.StartSlotBlocksDelay { + log.Debugf("Coordinator: delaying pipeline start due to "+ + "relativeBlock (%v) < cfg.StartSlotBlocksDelay (%v)", + relativeBlock, c.cfg.StartSlotBlocksDelay) + } else if canForge { log.Infow("Coordinator: forging state begin", "block", - stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch) - batchNum := common.BatchNum(stats.Sync.LastBatch) + stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) + batchNum := stats.Sync.LastBatch.BatchNum + if c.lastNonFailedBatchNum > batchNum { + batchNum = c.lastNonFailedBatchNum + } var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) @@ -276,7 +356,6 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) c.pipeline = nil return tracerr.Wrap(err) } - c.pipelineBatchNum = batchNum } } else { if !canForge { @@ -286,25 +365,12 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) } } if c.pipeline == nil { - // Mark invalid in Pool due to forged L2Txs - // for _, batch := range batches { - // if err := c.l2DB.InvalidateOldNonces( - // idxsNonceFromL2Txs(batch.L2Txs), batch.Batch.BatchNum); err != nil { - // return err - // } - // } - if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) { - if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil { - return tracerr.Wrap(err) - } - } - _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), - stats.Sync.LastBlock.Num, stats.Sync.LastBatch) - if err != nil { + if _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), + stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)); err != nil { return tracerr.Wrap(err) } - _, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch) - if err != nil { + if _, err := c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, + int64(stats.Sync.LastBatch.BatchNum)); err != nil { return tracerr.Wrap(err) } } @@ -331,33 +397,42 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error if c.pipeline != nil { c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars) } - if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum { - // There's been a reorg and the batch from which the pipeline - // was started was in a block that was discarded. The batch - // may not be in the main chain, so we stop the pipeline as a - // precaution (it will be started again once the node is in - // sync). - log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum", - "sync.LastBatch", c.stats.Sync.LastBatch, - "c.pipelineBatchNum", c.pipelineBatchNum) - if err := c.handleStopPipeline(ctx, "reorg"); err != nil { + if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress && + c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 { + // There's been a reorg and the batch state root from which the + // pipeline was started has changed (probably because it was in + // a block that was discarded), and it was sent by a different + // coordinator than us. That batch may never be in the main + // chain, so we stop the pipeline (it will be started again + // once the node is in sync). + log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch.ForgerAddr != cfg.ForgerAddr "+ + "& sync.LastBatch.StateRoot != pipelineFromBatch.StateRoot", + "sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot, + "pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot) + c.txManager.DiscardPipeline(ctx, c.pipelineNum) + if err := c.handleStopPipeline(ctx, "reorg", 0); err != nil { return tracerr.Wrap(err) } } return nil } -func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error { +// handleStopPipeline handles stopping the pipeline. If failedBatchNum is 0, +// the next pipeline will start from the last state of the synchronizer, +// otherwise, it will state from failedBatchNum-1. +func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string, failedBatchNum common.BatchNum) error { + batchNum := c.stats.Sync.LastBatch.BatchNum + if failedBatchNum != 0 { + batchNum = failedBatchNum - 1 + } if c.pipeline != nil { c.pipeline.Stop(c.ctx) c.pipeline = nil } - if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil { + if err := c.l2DB.Reorg(batchNum); err != nil { return tracerr.Wrap(err) } - if strings.Contains(reason, common.AuctionErrMsgCannotForge) { //nolint:staticcheck - // TODO: Check that we are in a slot in which we can't forge - } + c.lastNonFailedBatchNum = batchNum return nil } @@ -373,7 +448,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error { } case MsgStopPipeline: log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason) - if err := c.handleStopPipeline(ctx, msg.Reason); err != nil { + if err := c.handleStopPipeline(ctx, msg.Reason, msg.FailedBatchNum); err != nil { return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err)) } default: diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index a9189e8..3bb6902 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -261,8 +261,8 @@ func TestCoordinatorFlow(t *testing.T) { var stats synchronizer.Stats stats.Eth.LastBlock = *ethClient.CtlLastBlock() stats.Sync.LastBlock = stats.Eth.LastBlock - stats.Eth.LastBatch = ethClient.CtlLastForgedBatch() - stats.Sync.LastBatch = stats.Eth.LastBatch + stats.Eth.LastBatchNum = ethClient.CtlLastForgedBatch() + stats.Sync.LastBatch.BatchNum = common.BatchNum(stats.Eth.LastBatchNum) canForge, err := ethClient.AuctionCanForge(forger, blockNum+1) require.NoError(t, err) var slot common.Slot @@ -279,7 +279,7 @@ func TestCoordinatorFlow(t *testing.T) { // Copy stateDB to synchronizer if there was a new batch source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch) dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch) - if stats.Sync.LastBatch != 0 { + if stats.Sync.LastBatch.BatchNum != 0 { if _, err := os.Stat(dest); os.IsNotExist(err) { log.Infow("Making pebble checkpoint for sync", "source", source, "dest", dest) diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 81d5774..9939079 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "database/sql" "fmt" "math/big" "sync" @@ -24,19 +25,27 @@ type statsVars struct { Vars synchronizer.SCVariablesPtr } +type state struct { + batchNum common.BatchNum + lastScheduledL1BatchBlockNum int64 + lastForgeL1TxsNum int64 +} + // Pipeline manages the forging of batches with parallel server proofs type Pipeline struct { + num int cfg Config consts synchronizer.SCConsts // state - batchNum common.BatchNum - lastScheduledL1BatchBlockNum int64 - lastForgeL1TxsNum int64 - started bool + state state + started bool + rw sync.RWMutex + errAtBatchNum common.BatchNum proversPool *ProversPool provers []prover.Client + coord *Coordinator txManager *TxManager historyDB *historydb.HistoryDB l2DB *l2db.L2DB @@ -53,14 +62,28 @@ type Pipeline struct { cancel context.CancelFunc } +func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) { + p.rw.Lock() + defer p.rw.Unlock() + p.errAtBatchNum = batchNum +} + +func (p *Pipeline) getErrAtBatchNum() common.BatchNum { + p.rw.RLock() + defer p.rw.RUnlock() + return p.errAtBatchNum +} + // NewPipeline creates a new Pipeline func NewPipeline(ctx context.Context, cfg Config, + num int, // Pipeline sequential number historyDB *historydb.HistoryDB, l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, purger *Purger, + coord *Coordinator, txManager *TxManager, provers []prover.Client, scConsts *synchronizer.SCConsts, @@ -79,6 +102,7 @@ func NewPipeline(ctx context.Context, return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) } return &Pipeline{ + num: num, cfg: cfg, historyDB: historyDB, l2DB: l2DB, @@ -87,6 +111,7 @@ func NewPipeline(ctx context.Context, provers: provers, proversPool: proversPool, purger: purger, + coord: coord, txManager: txManager, consts: *scConsts, statsVarsCh: make(chan statsVars, queueLen), @@ -104,33 +129,67 @@ func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Sta // reset pipeline state func (p *Pipeline) reset(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { - p.batchNum = batchNum - p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum + p.state = state{ + batchNum: batchNum, + lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, + lastScheduledL1BatchBlockNum: 0, + } p.stats = *stats p.vars = *vars - p.lastScheduledL1BatchBlockNum = 0 - err := p.txSelector.Reset(p.batchNum) + // Reset the StateDB in TxSelector and BatchBuilder from the + // synchronizer only if the checkpoint we reset from either: + // a. Doesn't exist in the TxSelector/BatchBuilder + // b. The batch has already been synced by the synchronizer and has a + // different MTRoot than the BatchBuilder + // Otherwise, reset from the local checkpoint. + + // First attempt to reset from local checkpoint if such checkpoint exists + existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum) if err != nil { return tracerr.Wrap(err) } - err = p.batchBuilder.Reset(p.batchNum, true) + fromSynchronizerTxSelector := !existsTxSelector + if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil { + return tracerr.Wrap(err) + } + existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum) if err != nil { return tracerr.Wrap(err) } + fromSynchronizerBatchBuilder := !existsBatchBuilder + if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil { + return tracerr.Wrap(err) + } + + // After reset, check that if the batch exists in the historyDB, the + // stateRoot matches with the local one, if not, force a reset from + // synchronizer + batch, err := p.historyDB.GetBatch(p.state.batchNum) + if tracerr.Unwrap(err) == sql.ErrNoRows { + // nothing to do + } else if err != nil { + return tracerr.Wrap(err) + } else { + localStateRoot := p.batchBuilder.LocalStateDB().MT.Root().BigInt() + if batch.StateRoot.Cmp(localStateRoot) != 0 { + log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+ + "Forcing reset from Synchronizer", localStateRoot, batch.StateRoot) + // StateRoot from synchronizer doesn't match StateRoot + // from batchBuilder, force a reset from synchronizer + if err := p.txSelector.Reset(p.state.batchNum, true); err != nil { + return tracerr.Wrap(err) + } + if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil { + return tracerr.Wrap(err) + } + } + } return nil } func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - p.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - p.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - p.vars.WDelayer = *vars.WDelayer - } + updateSCVars(&p.vars, vars) } // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, @@ -143,7 +202,7 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu } else if err != nil { if tracerr.Unwrap(err) == errLastL1BatchNotSynced { log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, - "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) } else { log.Errorw("forgeBatch", "err", err) @@ -199,15 +258,32 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) case <-time.After(waitDuration): - batchNum = p.batchNum + 1 + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + waitDuration = p.cfg.ForgeRetryInterval + continue + } + batchNum = p.state.batchNum + 1 batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { continue + } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + waitDuration = p.cfg.ForgeRetryInterval + continue } else if err != nil { - waitDuration = p.cfg.SyncRetryInterval + p.setErrAtBatchNum(batchNum) + waitDuration = p.cfg.ForgeRetryInterval + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.handleForgBatch: %v", err), + FailedBatchNum: batchNum, + }) continue } - p.batchNum = batchNum + + p.state.batchNum = batchNum select { case batchChSentServerProof <- batchInfo: case <-p.ctx.Done(): @@ -225,16 +301,28 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.wg.Done() return case batchInfo := <-batchChSentServerProof: + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + continue + } err := p.waitServerProof(p.ctx, batchInfo) - // We are done with this serverProof, add it back to the pool - p.proversPool.Add(p.ctx, batchInfo.ServerProof) - batchInfo.ServerProof = nil if p.ctx.Err() != nil { continue } else if err != nil { log.Errorw("waitServerProof", "err", err) + p.setErrAtBatchNum(batchInfo.BatchNum) + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.waitServerProof: %v", err), + FailedBatchNum: batchInfo.BatchNum, + }) continue } + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(p.ctx, batchInfo.ServerProof) + // batchInfo.ServerProof = nil p.txManager.AddBatch(p.ctx, batchInfo) } } @@ -284,8 +372,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } - - batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + // Structure to accumulate data and metadata of the batch + batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} batchInfo.Debug.StartTimestamp = time.Now() batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 @@ -300,22 +388,19 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var auths [][]byte var coordIdxs []common.Idx + // TODO: If there are no txs and we are behind the timeout, skip + // forging a batch and return a particular error that can be handleded + // in the loop where handleForgeBatch is called to retry after an + // interval + // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true - defer func() { - // If there's no error, update the parameters related - // to the last L1Batch forged - if err == nil { - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.lastForgeL1TxsNum++ - } - }() - if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { return nil, tracerr.Wrap(errLastL1BatchNotSynced) } // 2a: L1+L2 txs - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) if err != nil { return nil, tracerr.Wrap(err) } @@ -324,6 +409,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } + + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = @@ -399,12 +487,12 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { // Take the lastL1BatchBlockNum as the biggest between the last // scheduled one, and the synchronized one. - lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum + lastL1BatchBlockNum := p.state.lastScheduledL1BatchBlockNum if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock } // Set Debug information - batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum + batchInfo.Debug.LastScheduledL1BatchBlockNum = p.state.lastScheduledL1BatchBlockNum batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum batchInfo.Debug.L1BatchBlockScheduleDeadline = diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go index 1bd7bcf..454c446 100644 --- a/coordinator/pipeline_test.go +++ b/coordinator/pipeline_test.go @@ -25,6 +25,14 @@ import ( "github.com/stretchr/testify/require" ) +func newBigInt(s string) *big.Int { + v, ok := new(big.Int).SetString(s, 10) + if !ok { + panic(fmt.Errorf("Can't set big.Int from %s", s)) + } + return v +} + func TestPipelineShouldL1L2Batch(t *testing.T) { ethClientSetup := test.NewClientSetupExample() ethClientSetup.ChainID = big.NewInt(int64(chainID)) @@ -77,7 +85,7 @@ func TestPipelineShouldL1L2Batch(t *testing.T) { // // Scheduled L1Batch // - pipeline.lastScheduledL1BatchBlockNum = startBlock + pipeline.state.lastScheduledL1BatchBlockNum = startBlock stats.Sync.LastL1BatchBlock = startBlock - 10 // We are are one block before the timeout range * 0.5 @@ -128,6 +136,11 @@ func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchr blocks, err := tc.GenerateBlocksFromInstructions(set) require.NoError(t, err) require.NotNil(t, blocks) + // Set StateRoots for batches manually (til doesn't set it) + blocks[0].Rollup.Batches[0].Batch.StateRoot = + newBigInt("0") + blocks[0].Rollup.Batches[1].Batch.StateRoot = + newBigInt("10941365282189107056349764238909072001483688090878331371699519307087372995595") ethAddTokens(blocks, ethClient) err = ethClient.CtlAddBlocks(blocks) @@ -172,7 +185,7 @@ func TestPipelineForgeBatchWithTxs(t *testing.T) { // users with positive balances tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) syncStats := sync.Stats() - batchNum := common.BatchNum(syncStats.Sync.LastBatch) + batchNum := syncStats.Sync.LastBatch.BatchNum syncSCVars := sync.SCVars() pipeline, err := coord.newPipeline(ctx) diff --git a/coordinator/purger.go b/coordinator/purger.go index fa0256d..8e11b1d 100644 --- a/coordinator/purger.go +++ b/coordinator/purger.go @@ -13,13 +13,23 @@ import ( // PurgerCfg is the purger configuration type PurgerCfg struct { - // PurgeBatchDelay is the delay between batches to purge outdated transactions + // PurgeBatchDelay is the delay between batches to purge outdated + // transactions. Oudated L2Txs are those that have been forged or + // marked as invalid for longer than the SafetyPeriod and pending L2Txs + // that have been in the pool for longer than TTL once there are + // MaxTxs. PurgeBatchDelay int64 - // InvalidateBatchDelay is the delay between batches to mark invalid transactions + // InvalidateBatchDelay is the delay between batches to mark invalid + // transactions due to nonce lower than the account nonce. InvalidateBatchDelay int64 - // PurgeBlockDelay is the delay between blocks to purge outdated transactions + // PurgeBlockDelay is the delay between blocks to purge outdated + // transactions. Oudated L2Txs are those that have been forged or + // marked as invalid for longer than the SafetyPeriod and pending L2Txs + // that have been in the pool for longer than TTL once there are + // MaxTxs. PurgeBlockDelay int64 - // InvalidateBlockDelay is the delay between blocks to mark invalid transactions + // InvalidateBlockDelay is the delay between blocks to mark invalid + // transactions due to nonce lower than the account nonce. InvalidateBlockDelay int64 } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 5a449c7..082d0f0 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "errors" "fmt" "math/big" "time" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/l2db" @@ -35,12 +37,20 @@ type TxManager struct { vars synchronizer.SCVariables statsVarsCh chan statsVars - queue []*BatchInfo + discardPipelineCh chan int // int refers to the pipelineNum + + minPipelineNum int + queue Queue // lastSuccessBatch stores the last BatchNum that who's forge call was confirmed lastSuccessBatch common.BatchNum - lastPendingBatch common.BatchNum - lastSuccessNonce uint64 - lastPendingNonce uint64 + // lastPendingBatch common.BatchNum + // accNonce is the account nonce in the last mined block (due to mined txs) + accNonce uint64 + // accNextNonce is the nonce that we should use to send the next tx. + // In some cases this will be a reused nonce of an already pending tx. + accNextNonce uint64 + + lastSentL1BatchBlockNum int64 } // NewTxManager creates a new TxManager @@ -54,26 +64,19 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac if err != nil { return nil, tracerr.Wrap(err) } - lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil) + accNonce, err := ethClient.EthNonceAt(ctx, *address, nil) if err != nil { return nil, err } - lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address) - if err != nil { - return nil, err - } - if lastSuccessNonce != lastPendingNonce { - return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)", - lastSuccessNonce, lastPendingNonce)) - } - log.Infow("TxManager started", "nonce", lastSuccessNonce) + log.Infow("TxManager started", "nonce", accNonce) return &TxManager{ - cfg: *cfg, - ethClient: ethClient, - l2DB: l2DB, - coord: coord, - batchCh: make(chan *BatchInfo, queueLen), - statsVarsCh: make(chan statsVars, queueLen), + cfg: *cfg, + ethClient: ethClient, + l2DB: l2DB, + coord: coord, + batchCh: make(chan *BatchInfo, queueLen), + statsVarsCh: make(chan statsVars, queueLen), + discardPipelineCh: make(chan int, queueLen), account: accounts.Account{ Address: *address, }, @@ -82,8 +85,10 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac vars: *initSCVars, - lastSuccessNonce: lastSuccessNonce, - lastPendingNonce: lastPendingNonce, + minPipelineNum: 0, + queue: NewQueue(), + accNonce: accNonce, + accNextNonce: accNonce, }, nil } @@ -104,18 +109,19 @@ func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.St } } -func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - t.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - t.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - t.vars.WDelayer = *vars.WDelayer +// DiscardPipeline is a thread safe method to notify about a discarded pipeline +// due to a reorg +func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) { + select { + case t.discardPipelineCh <- pipelineNum: + case <-ctx.Done(): } } +func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { + updateSCVars(&t.vars, vars) +} + // NewAuth generates a new auth object for an ethereum transaction func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx) @@ -123,6 +129,7 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { return nil, tracerr.Wrap(err) } inc := new(big.Int).Set(gasPrice) + // TODO: Replace this by a value of percentage const gasPriceDiv = 100 inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv)) gasPrice.Add(gasPrice, inc) @@ -141,29 +148,75 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { return auth, nil } -func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { - // TODO: Check if we can forge in the next blockNum, abort if we can't - batchInfo.Debug.Status = StatusSent - batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 - batchInfo.Debug.SendTimestamp = time.Now() - batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( - batchInfo.Debug.StartTimestamp).Seconds() +func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error { + nextBlock := t.stats.Eth.LastBlock.Num + 1 + if !t.canForgeAt(nextBlock) { + return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock)) + } + if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch { + return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock)) + } + margin := t.cfg.SendBatchBlocksMarginCheck + if margin != 0 { + if !t.canForgeAt(nextBlock + margin) { + return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v", + margin, nextBlock)) + } + if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch { + return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v", + margin, nextBlock)) + } + } + return nil +} + +func addPerc(v *big.Int, p int64) *big.Int { + r := new(big.Int).Set(v) + r.Mul(r, big.NewInt(p)) + // nolint reason: to calculate percentages we divide by 100 + r.Div(r, big.NewInt(100)) //nolit:gomnd + return r.Add(v, r) +} + +func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error { var ethTx *types.Transaction var err error auth, err := t.NewAuth(ctx) if err != nil { return tracerr.Wrap(err) } - auth.Nonce = big.NewInt(int64(t.lastPendingNonce)) - t.lastPendingNonce++ + auth.Nonce = big.NewInt(int64(t.accNextNonce)) + if resend { + auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce())) + } for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 { + return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)", + auth.GasPrice, t.cfg.MaxGasPrice)) + } + // RollupForgeBatch() calls ethclient.SendTransaction() ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth) - if err != nil { - // if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { - // log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err, - // "block", t.stats.Eth.LastBlock.Num+1) - // return tracerr.Wrap(err) - // } + if errors.Is(err, core.ErrNonceTooLow) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce", + "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum) + auth.Nonce.Add(auth.Nonce, big.NewInt(1)) + attempt-- + } else if errors.Is(err, core.ErrNonceTooHigh) { + log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce", + "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum) + auth.Nonce.Sub(auth.Nonce, big.NewInt(1)) + attempt-- + } else if errors.Is(err, core.ErrUnderpriced) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice", + "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum) + auth.GasPrice = addPerc(auth.GasPrice, 10) + attempt-- + } else if errors.Is(err, core.ErrReplaceUnderpriced) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice", + "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum) + auth.GasPrice = addPerc(auth.GasPrice, 10) + attempt-- + } else if err != nil { log.Errorw("TxManager ethClient.RollupForgeBatch", "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1, "batchNum", batchInfo.BatchNum) @@ -179,10 +232,29 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn if err != nil { return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) } + if !resend { + t.accNextNonce = auth.Nonce.Uint64() + 1 + } batchInfo.EthTx = ethTx - log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) + log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash()) + now := time.Now() + batchInfo.SendTimestamp = now + + if resend { + batchInfo.Debug.ResendNum++ + } + batchInfo.Debug.Status = StatusSent + batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 + batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp + batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() t.cfg.debugBatchStore(batchInfo) - t.lastPendingBatch = batchInfo.BatchNum + + if !resend { + if batchInfo.L1Batch { + t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 + } + } if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { return tracerr.Wrap(err) } @@ -225,13 +297,20 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) { receipt := batchInfo.Receipt if receipt != nil { + if batchInfo.EthTx.Nonce()+1 > t.accNonce { + t.accNonce = batchInfo.EthTx.Nonce() + 1 + } if receipt.Status == types.ReceiptStatusFailed { batchInfo.Debug.Status = StatusFailed - t.cfg.debugBatchStore(batchInfo) _, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber) - log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(), + log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash, "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(), "err", err) + batchInfo.EthTxErr = err + if batchInfo.BatchNum <= t.lastSuccessBatch { + t.lastSuccessBatch = batchInfo.BatchNum - 1 + } + t.cfg.debugBatchStore(batchInfo) return nil, tracerr.Wrap(fmt.Errorf( "ethereum transaction receipt status is failed: %w", err)) } else if receipt.Status == types.ReceiptStatusSuccessful { @@ -239,6 +318,17 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - batchInfo.Debug.StartBlockNum + if batchInfo.Debug.StartToMineDelay == 0 { + if block, err := t.ethClient.EthBlockByNumber(ctx, + receipt.BlockNumber.Int64()); err != nil { + log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err) + } else { + batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub( + batchInfo.Debug.SendTimestamp).Seconds() + batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() + } + } t.cfg.debugBatchStore(batchInfo) if batchInfo.BatchNum > t.lastSuccessBatch { t.lastSuccessBatch = batchInfo.BatchNum @@ -250,9 +340,72 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i return nil, nil } +// TODO: +// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions) + +// Queue of BatchInfos +type Queue struct { + list []*BatchInfo + // nonceByBatchNum map[common.BatchNum]uint64 + next int +} + +// NewQueue returns a new queue +func NewQueue() Queue { + return Queue{ + list: make([]*BatchInfo, 0), + // nonceByBatchNum: make(map[common.BatchNum]uint64), + next: 0, + } +} + +// Len is the length of the queue +func (q *Queue) Len() int { + return len(q.list) +} + +// At returns the BatchInfo at position (or nil if position is out of bounds) +func (q *Queue) At(position int) *BatchInfo { + if position >= len(q.list) { + return nil + } + return q.list[position] +} + +// Next returns the next BatchInfo (or nil if queue is empty) +func (q *Queue) Next() (int, *BatchInfo) { + if len(q.list) == 0 { + return 0, nil + } + defer func() { q.next = (q.next + 1) % len(q.list) }() + return q.next, q.list[q.next] +} + +// Remove removes the BatchInfo at position +func (q *Queue) Remove(position int) { + // batchInfo := q.list[position] + // delete(q.nonceByBatchNum, batchInfo.BatchNum) + q.list = append(q.list[:position], q.list[position+1:]...) + if len(q.list) == 0 { + q.next = 0 + } else { + q.next = position % len(q.list) + } +} + +// Push adds a new BatchInfo +func (q *Queue) Push(batchInfo *BatchInfo) { + q.list = append(q.list, batchInfo) + // q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce() +} + +// func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) { +// nonce, ok := q.nonceByBatchNum[batchNum] +// return nonce, ok +// } + // Run the TxManager func (t *TxManager) Run(ctx context.Context) { - next := 0 waitDuration := longWaitDuration var statsVars statsVars @@ -263,7 +416,7 @@ func (t *TxManager) Run(ctx context.Context) { t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) log.Infow("TxManager: received initial statsVars", - "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch) + "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) for { select { @@ -273,8 +426,27 @@ func (t *TxManager) Run(ctx context.Context) { case statsVars := <-t.statsVarsCh: t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) + case pipelineNum := <-t.discardPipelineCh: + t.minPipelineNum = pipelineNum + 1 + if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("TxManager: removeBadBatchInfos", "err", err) + continue + } case batchInfo := <-t.batchCh: - if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { + if batchInfo.PipelineNum < t.minPipelineNum { + log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum", + "num", batchInfo.PipelineNum, "minNum", t.minPipelineNum) + } + if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil { + log.Warnw("TxManager: shouldSend", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)}) + continue + } + if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil { continue } else if err != nil { // If we reach here it's because our ethNode has @@ -282,19 +454,20 @@ func (t *TxManager) Run(ctx context.Context) { // ethereum. This could be due to the ethNode // failure, or an invalid transaction (that // can't be mined) - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch send: %v", err)}) + log.Warnw("TxManager: forgeBatch send failed", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch send: %v", err)}) continue } - t.queue = append(t.queue, batchInfo) + t.queue.Push(batchInfo) waitDuration = t.cfg.TxManagerCheckInterval case <-time.After(waitDuration): - if len(t.queue) == 0 { + queuePosition, batchInfo := t.queue.Next() + if batchInfo == nil { waitDuration = longWaitDuration continue } - current := next - next = (current + 1) % len(t.queue) - batchInfo := t.queue[current] if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { continue } else if err != nil { //nolint:staticcheck @@ -304,7 +477,8 @@ func (t *TxManager) Run(ctx context.Context) { // if it was not mined, mined and succesfull or // mined and failed. This could be due to the // ethNode failure. - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) } confirm, err := t.handleReceipt(ctx, batchInfo) @@ -312,32 +486,106 @@ func (t *TxManager) Run(ctx context.Context) { continue } else if err != nil { //nolint:staticcheck // Transaction was rejected - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) + if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("TxManager: removeBadBatchInfos", "err", err) + continue } - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + continue } - if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { - log.Debugw("TxManager tx for RollupForgeBatch confirmed", - "batch", batchInfo.BatchNum) - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) + now := time.Now() + if !t.cfg.EthNoReuseNonce && confirm == nil && + now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout { + log.Infow("TxManager: forgeBatch tx not been mined timeout, resending", + "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) + if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil { + continue + } else if err != nil { + // If we reach here it's because our ethNode has + // been unable to send the transaction to + // ethereum. This could be due to the ethNode + // failure, or an invalid transaction (that + // can't be mined) + log.Warnw("TxManager: forgeBatch resend failed", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch resend: %v", err)}) + continue } } + + if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { + log.Debugw("TxManager: forgeBatch tx confirmed", + "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) + t.queue.Remove(queuePosition) + } } } } -// nolint reason: this function will be used in the future -//nolint:unused -func (t *TxManager) canForge(stats *synchronizer.Stats, blockNum int64) bool { +func (t *TxManager) removeBadBatchInfos(ctx context.Context) error { + next := 0 + for { + batchInfo := t.queue.At(next) + if batchInfo == nil { + break + } + if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { + return nil + } else if err != nil { + // Our ethNode is giving an error different + // than "not found" when getting the receipt + // for the transaction, so we can't figure out + // if it was not mined, mined and succesfull or + // mined and failed. This could be due to the + // ethNode failure. + next++ + continue + } + confirm, err := t.handleReceipt(ctx, batchInfo) + if ctx.Err() != nil { + return nil + } else if err != nil { + // Transaction was rejected + if t.minPipelineNum <= batchInfo.PipelineNum { + t.minPipelineNum = batchInfo.PipelineNum + 1 + } + t.queue.Remove(next) + continue + } + // If tx is pending but is from a cancelled pipeline, remove it + // from the queue + if confirm == nil { + if batchInfo.PipelineNum < t.minPipelineNum { + t.queue.Remove(next) + continue + } + } + next++ + } + accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil) + if err != nil { + return err + } + if !t.cfg.EthNoReuseNonce { + t.accNextNonce = accNonce + } + return nil +} + +func (t *TxManager) canForgeAt(blockNum int64) bool { return canForge(&t.consts.Auction, &t.vars.Auction, - &stats.Sync.Auction.CurrentSlot, &stats.Sync.Auction.NextSlot, + &t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot, t.cfg.ForgerAddress, blockNum) } + +func (t *TxManager) mustL1L2Batch(blockNum int64) bool { + lastL1BatchBlockNum := t.lastSentL1BatchBlockNum + if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock + } + return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1 +} diff --git a/coordinator/txmanager_test.go b/coordinator/txmanager_test.go new file mode 100644 index 0000000..4c624d6 --- /dev/null +++ b/coordinator/txmanager_test.go @@ -0,0 +1,15 @@ +package coordinator + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddPerc(t *testing.T) { + assert.Equal(t, "110", addPerc(big.NewInt(100), 10).String()) + assert.Equal(t, "101", addPerc(big.NewInt(100), 1).String()) + assert.Equal(t, "12", addPerc(big.NewInt(10), 20).String()) + assert.Equal(t, "1500", addPerc(big.NewInt(1000), 50).String()) +} diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 8abd994..38dafa4 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -164,6 +164,19 @@ func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error { return nil } +// GetBatch returns the batch with the given batchNum +func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error) { + var batch common.Batch + err := meddler.QueryRow( + hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, + batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, + batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, + batch.slot_num, batch.total_fees_usd FROM batch WHERE batch_num = $1;`, + batchNum, + ) + return &batch, err +} + // GetAllBatches retrieve all batches from the DB func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) { var batches []*common.Batch @@ -208,6 +221,18 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) { return batchNum, tracerr.Wrap(row.Scan(&batchNum)) } +// GetLastBatch returns the last forged batch +func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) { + var batch common.Batch + err := meddler.QueryRow( + hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, + batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, + batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, + batch.slot_num, batch.total_fees_usd FROM batch ORDER BY batch_num DESC LIMIT 1;`, + ) + return &batch, err +} + // GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) { row := hdb.db.QueryRow(`SELECT eth_block_num FROM batch diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index 80bcc0f..e7db118 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -203,6 +203,10 @@ func TestBatches(t *testing.T) { fetchedLastBatchNum, err := historyDB.GetLastBatchNum() assert.NoError(t, err) assert.Equal(t, batches[len(batches)-1].BatchNum, fetchedLastBatchNum) + // Test GetLastBatch + fetchedLastBatch, err := historyDB.GetLastBatch() + assert.NoError(t, err) + assert.Equal(t, &batches[len(batches)-1], fetchedLastBatch) // Test GetLastL1TxsNum fetchedLastL1TxsNum, err := historyDB.GetLastL1TxsNum() assert.NoError(t, err) @@ -211,6 +215,12 @@ func TestBatches(t *testing.T) { fetchedLastL1BatchBlockNum, err := historyDB.GetLastL1BatchBlockNum() assert.NoError(t, err) assert.Equal(t, lastL1BatchBlockNum, fetchedLastL1BatchBlockNum) + // Test GetBatch + fetchedBatch, err := historyDB.GetBatch(1) + require.NoError(t, err) + assert.Equal(t, &batches[0], fetchedBatch) + _, err = historyDB.GetBatch(common.BatchNum(len(batches) + 1)) + assert.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err)) } func TestBids(t *testing.T) { diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index c2f5633..a367bf3 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -425,12 +425,13 @@ func (k *KVDB) MakeCheckpoint() error { } // if checkpoint BatchNum already exist in disk, delete it - if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) { + if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { + } else if err != nil { + return tracerr.Wrap(err) + } else { if err := os.RemoveAll(checkpointPath); err != nil { return tracerr.Wrap(err) } - } else if err != nil && !os.IsNotExist(err) { - return tracerr.Wrap(err) } // execute Checkpoint @@ -451,12 +452,25 @@ func (k *KVDB) MakeCheckpoint() error { return nil } +// CheckpointExists returns true if the checkpoint exists +func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + if _, err := os.Stat(source); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + // DeleteCheckpoint removes if exist the checkpoint of the given batchNum func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) + } else if err != nil { + return tracerr.Wrap(err) } return os.RemoveAll(checkpointPath) @@ -520,6 +534,8 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) + } else if err != nil { + return tracerr.Wrap(err) } // By locking we allow calling MakeCheckpointFromTo from multiple // places at the same time for the same stateDB. This allows the @@ -533,12 +549,13 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint - if _, err := os.Stat(dest); !os.IsNotExist(err) { + if _, err := os.Stat(dest); os.IsNotExist(err) { + } else if err != nil { + return tracerr.Wrap(err) + } else { if err := os.RemoveAll(dest); err != nil { return tracerr.Wrap(err) } - } else if err != nil && !os.IsNotExist(err) { - return tracerr.Wrap(err) } sto, err := pebble.NewPebbleStorage(source, false) diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index bcf44a5..12c8200 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -498,11 +498,17 @@ func NewLocalStateDB(cfg Config, synchronizerDB *StateDB) (*LocalStateDB, error) }, nil } +// CheckpointExists returns true if the checkpoint exists +func (l *LocalStateDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + return l.db.CheckpointExists(batchNum) +} + // Reset performs a reset in the LocaStateDB. If fromSynchronizer is true, it // gets the state from LocalStateDB.synchronizerStateDB for the given batchNum. // If fromSynchronizer is false, get the state from LocalStateDB checkpoints. func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { if fromSynchronizer { + log.Debugw("Making StateDB ResetFromSynchronizer", "batch", batchNum, "type", l.cfg.Type) if err := l.db.ResetFromSynchronizer(batchNum, l.synchronizerStateDB.db); err != nil { return tracerr.Wrap(err) } diff --git a/go.sum b/go.sum index e06a659..06a6019 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/uf github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= +github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= +github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= @@ -84,6 +86,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk= github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= +github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU= +github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18/go.mod h1:HD5P3vAIAh+Y2GAxg0PrPN1P8WkepXGpjbUPDHJqqKM= @@ -169,6 +173,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc h1:jtW8jbpkO4YirRSyepBOH8E+2HEw6/hKkBvFPwhUN8c= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= +github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c= +github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= @@ -596,6 +602,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= +github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= @@ -614,6 +622,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= +github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 h1:ju5UTwk5Odtm4trrY+4Ca4RMj5OyXbmVeDAVad2T0Jw= +github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw= github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM= diff --git a/node/node.go b/node/node.go index 44eded3..a09736b 100644 --- a/node/node.go +++ b/node/node.go @@ -2,6 +2,7 @@ package node import ( "context" + "errors" "fmt" "net/http" "sync" @@ -301,6 +302,9 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, + EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce, + EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration, + MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice, TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration, DebugBatchPath: cfg.Coordinator.Debug.BatchPath, Purger: coordinator.PurgerCfg{ @@ -487,11 +491,15 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va if stats.Synced() { if err := n.nodeAPI.api.UpdateNetworkInfo( stats.Eth.LastBlock, stats.Sync.LastBlock, - common.BatchNum(stats.Eth.LastBatch), + common.BatchNum(stats.Eth.LastBatchNum), stats.Sync.Auction.CurrentSlot.SlotNum, ); err != nil { log.Errorw("API.UpdateNetworkInfo", "err", err) } + } else { + n.nodeAPI.api.UpdateNetworkInfoBlock( + stats.Eth.LastBlock, stats.Sync.LastBlock, + ) } } } @@ -573,7 +581,11 @@ func (n *Node) StartSynchronizer() { if n.ctx.Err() != nil { continue } - log.Errorw("Synchronizer.Sync", "err", err) + if errors.Is(err, eth.ErrBlockHashMismatchEvent) { + log.Warnw("Synchronizer.Sync", "err", err) + } else { + log.Errorw("Synchronizer.Sync", "err", err) + } } } } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 984b66f..361bdc8 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -25,12 +25,12 @@ type Stats struct { Updated time.Time FirstBlockNum int64 LastBlock common.Block - LastBatch int64 + LastBatchNum int64 } Sync struct { Updated time.Time LastBlock common.Block - LastBatch int64 + LastBatch common.Batch // LastL1BatchBlock is the last ethereum block in which an // l1Batch was forged LastL1BatchBlock int64 @@ -77,13 +77,13 @@ func (s *StatsHolder) UpdateCurrentNextSlot(current *common.Slot, next *common.S } // UpdateSync updates the synchronizer stats -func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum, +func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.Batch, lastL1BatchBlock *int64, lastForgeL1TxsNum *int64) { now := time.Now() s.rw.Lock() s.Sync.LastBlock = *lastBlock if lastBatch != nil { - s.Sync.LastBatch = int64(*lastBatch) + s.Sync.LastBatch = *lastBatch } if lastL1BatchBlock != nil { s.Sync.LastL1BatchBlock = *lastL1BatchBlock @@ -105,16 +105,16 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error { lastBlock, err := ethClient.EthBlockByNumber(context.TODO(), -1) if err != nil { - return tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err)) } - lastBatch, err := ethClient.RollupLastForgedBatch() + lastBatchNum, err := ethClient.RollupLastForgedBatch() if err != nil { - return tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("RollupLastForgedBatch: %w", err)) } s.rw.Lock() s.Eth.Updated = now s.Eth.LastBlock = *lastBlock - s.Eth.LastBatch = lastBatch + s.Eth.LastBatchNum = lastBatchNum s.rw.Unlock() return nil } @@ -139,6 +139,10 @@ func (s *StatsHolder) CopyStats() *Stats { sCopy.Sync.Auction.NextSlot.DefaultSlotBid = common.CopyBigInt(s.Sync.Auction.NextSlot.DefaultSlotBid) } + if s.Sync.LastBatch.StateRoot != nil { + sCopy.Sync.LastBatch.StateRoot = + common.CopyBigInt(s.Sync.LastBatch.StateRoot) + } s.rw.RUnlock() return &sCopy } @@ -152,9 +156,9 @@ func (s *StatsHolder) blocksPerc() float64 { float64(s.Eth.LastBlock.Num-(s.Eth.FirstBlockNum-1)) } -func (s *StatsHolder) batchesPerc(batchNum int64) float64 { +func (s *StatsHolder) batchesPerc(batchNum common.BatchNum) float64 { return float64(batchNum) * 100.0 / - float64(s.Eth.LastBatch) + float64(s.Eth.LastBatchNum) } // StartBlockNums sets the first block used to start tracking the smart @@ -329,23 +333,25 @@ func (s *Synchronizer) setSlotCoordinator(slot *common.Slot) error { return nil } -// firstBatchBlockNum is the blockNum of first batch in that block, if any -func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*common.Slot, error) { - slot := common.Slot{ - SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum, - ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment, - } +// updateCurrentSlot updates the slot with information of the current slot. +// The information abouth which coordinator is allowed to forge is only updated +// when we are Synced. +// hasBatch is true when the last synced block contained at least one batch. +func (s *Synchronizer) updateCurrentSlot(slot *common.Slot, reset bool, hasBatch bool) error { // We want the next block because the current one is already mined blockNum := s.stats.Sync.LastBlock.Num + 1 slotNum := s.consts.Auction.SlotNum(blockNum) + firstBatchBlockNum := s.stats.Sync.LastBlock.Num if reset { + // Using this query only to know if there dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum) if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { - return nil, tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err)) + return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err)) } else if tracerr.Unwrap(err) == sql.ErrNoRows { - firstBatchBlockNum = nil + hasBatch = false } else { - firstBatchBlockNum = &dbFirstBatchBlockNum + hasBatch = true + firstBatchBlockNum = dbFirstBatchBlockNum } slot.ForgerCommitment = false } else if slotNum > slot.SlotNum { @@ -356,11 +362,11 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum) // If Synced, update the current coordinator if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum { - if err := s.setSlotCoordinator(&slot); err != nil { - return nil, tracerr.Wrap(err) + if err := s.setSlotCoordinator(slot); err != nil { + return tracerr.Wrap(err) } - if firstBatchBlockNum != nil && - s.consts.Auction.RelativeBlock(*firstBatchBlockNum) < + if hasBatch && + s.consts.Auction.RelativeBlock(firstBatchBlockNum) < int64(s.vars.Auction.SlotDeadline) { slot.ForgerCommitment = true } @@ -369,57 +375,61 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c // BEGIN SANITY CHECK canForge, err := s.ethClient.AuctionCanForge(slot.Forger, blockNum) if err != nil { - return nil, tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err)) } if !canForge { - return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ + return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ "differs from smart contract: %+v", slot)) } // END SANITY CHECK } - return &slot, nil + return nil } -func (s *Synchronizer) getNextSlot() (*common.Slot, error) { +// updateNextSlot updates the slot with information of the next slot. +// The information abouth which coordinator is allowed to forge is only updated +// when we are Synced. +func (s *Synchronizer) updateNextSlot(slot *common.Slot) error { // We want the next block because the current one is already mined blockNum := s.stats.Sync.LastBlock.Num + 1 slotNum := s.consts.Auction.SlotNum(blockNum) + 1 - slot := common.Slot{ - SlotNum: slotNum, - ForgerCommitment: false, - } + slot.SlotNum = slotNum + slot.ForgerCommitment = false slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum) // If Synced, update the current coordinator if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum { - if err := s.setSlotCoordinator(&slot); err != nil { - return nil, tracerr.Wrap(err) + if err := s.setSlotCoordinator(slot); err != nil { + return tracerr.Wrap(err) } // TODO: Remove this SANITY CHECK once this code is tested enough // BEGIN SANITY CHECK canForge, err := s.ethClient.AuctionCanForge(slot.Forger, slot.StartBlock) if err != nil { - return nil, tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err)) } if !canForge { - return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ + return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ "differs from smart contract: %+v", slot)) } // END SANITY CHECK } - return &slot, nil + return nil } -func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, firstBatchBlockNum *int64) error { - current, err := s.getCurrentSlot(reset, firstBatchBlockNum) - if err != nil { +// updateCurrentNextSlotIfSync updates the current and next slot. Information +// about forger address that is allowed to forge is only updated if we are +// Synced. +func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, hasBatch bool) error { + current := s.stats.Sync.Auction.CurrentSlot + next := s.stats.Sync.Auction.NextSlot + if err := s.updateCurrentSlot(¤t, reset, hasBatch); err != nil { return tracerr.Wrap(err) } - next, err := s.getNextSlot() - if err != nil { + if err := s.updateNextSlot(&next); err != nil { return tracerr.Wrap(err) } - s.stats.UpdateCurrentNextSlot(current, next) + s.stats.UpdateCurrentNextSlot(¤t, &next) return nil } @@ -458,9 +468,9 @@ func (s *Synchronizer) init() error { "ethLastBlock", s.stats.Eth.LastBlock, ) log.Infow("Sync init batch", - "syncLastBatch", s.stats.Sync.LastBatch, - "syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch), - "ethLastBatch", s.stats.Eth.LastBatch, + "syncLastBatch", s.stats.Sync.LastBatch.BatchNum, + "syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch.BatchNum), + "ethLastBatch", s.stats.Eth.LastBatchNum, ) return nil } @@ -521,7 +531,7 @@ func (s *Synchronizer) Sync2(ctx context.Context, if tracerr.Unwrap(err) == ethereum.NotFound { return nil, nil, nil } else if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err)) } log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String()) @@ -627,14 +637,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, } } s.stats.UpdateSync(ethBlock, - &rollupData.Batches[batchesLen-1].Batch.BatchNum, + &rollupData.Batches[batchesLen-1].Batch, lastL1BatchBlock, lastForgeL1TxsNum) } - var firstBatchBlockNum *int64 + hasBatch := false if len(rollupData.Batches) > 0 { - firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum + hasBatch = true } - if err := s.updateCurrentNextSlotIfSync(false, firstBatchBlockNum); err != nil { + if err := s.updateCurrentNextSlotIfSync(false, hasBatch); err != nil { return nil, nil, tracerr.Wrap(err) } @@ -646,8 +656,8 @@ func (s *Synchronizer) Sync2(ctx context.Context, for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", "syncLastBatch", batchData.Batch.BatchNum, - "syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)), - "ethLastBatch", s.stats.Eth.LastBatch, + "syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum), + "ethLastBatch", s.stats.Eth.LastBatchNum, ) } @@ -700,15 +710,15 @@ func getInitialVariables(ethClient eth.ClientInterface, consts *SCConsts) (*SCVariables, *StartBlockNums, error) { rollupInit, rollupInitBlock, err := ethClient.RollupEventInit() if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("RollupEventInit: %w", err)) } auctionInit, auctionInitBlock, err := ethClient.AuctionEventInit() if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("AuctionEventInit: %w", err)) } wDelayerInit, wDelayerInitBlock, err := ethClient.WDelayerEventInit() if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("WDelayerEventInit: %w", err)) } rollupVars := rollupInit.RollupVariables() auctionVars := auctionInit.AuctionVariables(consts.Auction.InitialMinimalBidding) @@ -753,15 +763,15 @@ func (s *Synchronizer) resetState(block *common.Block) error { s.vars.WDelayer = *wDelayer } - batchNum, err := s.historyDB.GetLastBatchNum() + batch, err := s.historyDB.GetLastBatch() if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err)) } if tracerr.Unwrap(err) == sql.ErrNoRows { - batchNum = 0 + batch = &common.Batch{} } - err = s.stateDB.Reset(batchNum) + err = s.stateDB.Reset(batch.BatchNum) if err != nil { return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err)) } @@ -783,9 +793,9 @@ func (s *Synchronizer) resetState(block *common.Block) error { lastForgeL1TxsNum = &n } - s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum) + s.stats.UpdateSync(block, batch, &lastL1BatchBlockNum, lastForgeL1TxsNum) - if err := s.updateCurrentNextSlotIfSync(true, nil); err != nil { + if err := s.updateCurrentNextSlotIfSync(true, false); err != nil { return tracerr.Wrap(err) } return nil @@ -802,7 +812,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e // the expected one. rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, ðBlock.Hash) if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err)) } // No events in this block if rollupEvents == nil { @@ -919,9 +929,15 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e return nil, tracerr.Wrap(err) } if s.stateDB.CurrentBatch() != batchNum { - return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != evtForgeBatch.BatchNum = (%v)", + return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != "+ + "evtForgeBatch.BatchNum = (%v)", s.stateDB.CurrentBatch(), batchNum)) } + if s.stateDB.MT.Root().BigInt().Cmp(forgeBatchArgs.NewStRoot) != 0 { + return nil, tracerr.Wrap(fmt.Errorf("stateDB.MTRoot (%v) != "+ + "forgeBatchArgs.NewStRoot (%v)", + s.stateDB.MT.Root().BigInt(), forgeBatchArgs.NewStRoot)) + } // Transform processed PoolL2 txs to L2 and store in BatchData l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way @@ -1106,7 +1122,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, // Get auction events in the block auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, ðBlock.Hash) if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err)) } // No events in this block if auctionEvents == nil { @@ -1203,7 +1219,7 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat // Get wDelayer events in the block wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, ðBlock.Hash) if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err)) } // No events in this block if wDelayerEvents == nil { diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index 342caab..a0aa818 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -17,7 +17,6 @@ import ( "github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/eth" - "github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/test" "github.com/hermeznetwork/hermez-node/test/til" "github.com/jinzhu/copier" @@ -321,6 +320,14 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { return stateDB, historyDB } +func newBigInt(s string) *big.Int { + v, ok := new(big.Int).SetString(s, 10) + if !ok { + panic(fmt.Errorf("Can't set big.Int from %s", s)) + } + return v +} + func TestSyncGeneral(t *testing.T) { // // Setup @@ -339,7 +346,6 @@ func TestSyncGeneral(t *testing.T) { s, err := NewSynchronizer(client, historyDB, stateDB, Config{ StatsRefreshPeriod: 0 * time.Second, }) - log.Error(err) require.NoError(t, err) ctx := context.Background() @@ -434,12 +440,22 @@ func TestSyncGeneral(t *testing.T) { require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs)) require.Equal(t, 2, len(blocks[i].Rollup.Batches)) require.Equal(t, 2, len(blocks[i].Rollup.Batches[0].L1CoordinatorTxs)) + // Set StateRoots for batches manually (til doesn't set it) + blocks[i].Rollup.Batches[0].Batch.StateRoot = + newBigInt("18906357591508007884273218035694076596537737437965299189312069102730480717391") + blocks[i].Rollup.Batches[1].Batch.StateRoot = + newBigInt("9513185123401321669660637227182204000277156839501731093239187625486561933297") // blocks 1 (blockNum=3) i = 1 require.Equal(t, 3, int(blocks[i].Block.Num)) require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs)) require.Equal(t, 2, len(blocks[i].Rollup.Batches)) require.Equal(t, 3, len(blocks[i].Rollup.Batches[0].L2Txs)) + // Set StateRoots for batches manually (til doesn't set it) + blocks[i].Rollup.Batches[0].Batch.StateRoot = + newBigInt("13060270878200012606074130020925677466793317216609491464427188889005039616594") + blocks[i].Rollup.Batches[1].Batch.StateRoot = + newBigInt("21427104994652624302859637783375978708867165042357535792408500519060088086054") // Generate extra required data ethAddTokens(blocks, client) @@ -614,6 +630,12 @@ func TestSyncGeneral(t *testing.T) { blocks, err = tc.GenerateBlocks(set2) require.NoError(t, err) + // Set StateRoots for batches manually (til doesn't set it) + blocks[0].Rollup.Batches[0].Batch.StateRoot = + newBigInt("11218510534825843475100588932060366395781087435899915642332104464234485046683") + blocks[0].Rollup.Batches[1].Batch.StateRoot = + newBigInt("20283020730369146334077598087403837297563965802277806438205710455191646998983") + for i := 0; i < 4; i++ { client.CtlRollback() } diff --git a/txselector/txselector.go b/txselector/txselector.go index 4bf5f41..0ff79e0 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -88,12 +88,8 @@ func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB { // Reset tells the TxSelector to get it's internal AccountsDB // from the required `batchNum` -func (txsel *TxSelector) Reset(batchNum common.BatchNum) error { - err := txsel.localAccountsDB.Reset(batchNum, true) - if err != nil { - return tracerr.Wrap(err) - } - return nil +func (txsel *TxSelector) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { + return tracerr.Wrap(txsel.localAccountsDB.Reset(batchNum, fromSynchronizer)) } func (txsel *TxSelector) getCoordIdx(tokenID common.TokenID) (common.Idx, error) {