From f0e79f3d553074279a71e2ef8bcf6382c4bee287 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Tue, 16 Feb 2021 14:22:51 +0100 Subject: [PATCH 1/3] Update coordinator to work better under real net - cli / node - Update handler of SIGINT so that after 3 SIGINTs, the process terminates unconditionally - coordinator - Store stats without pointer - In all functions that send a variable via channel, check for context done to avoid deadlock (due to no process reading from the channel, which has no queue) when the node is stopped. - Abstract `canForge` so that it can be used outside of the `Coordinator` - In `canForge` check the blockNumber in current and next slot. - Update tests due to smart contract changes in slot handling, and minimum bid defaults - TxManager - Add consts, vars and stats to allow evaluating `canForge` - Add `canForge` method (not used yet) - Store batch and nonces status (last success and last pending) - Track nonces internally instead of relying on the ethereum node (this is required to work with ganache when there are pending txs) - Handle the (common) case of the receipt not being found after the tx is sent. - Don't start the main loop until we get an initial messae fo the stats and vars (so that in the loop the stats and vars are set to synchronizer values) - When a tx fails, check and discard all the failed transactions before sending the message to stop the pipeline. This will avoid sending consecutive messages of stop the pipeline when multiple txs are detected to be failed consecutively. Also, future txs of the same pipeline after a discarded txs are discarded, and their nonces reused. - Robust handling of nonces: - If geth returns nonce is too low, increase it - If geth returns nonce too hight, decrease it - If geth returns underpriced, increase gas price - If geth returns replace underpriced, increase gas price - Add support for resending transactions after a timeout - Store `BatchInfos` in a queue - Pipeline - When an error is found, stop forging batches and send a message to the coordinator to stop the pipeline with information of the failed batch number so that in a restart, non-failed batches are not repated. - When doing a reset of the stateDB, if possible reset from the local checkpoint instead of resetting from the synchronizer. This allows resetting from a batch that is valid but not yet sent / synced. - Every time a pipeline is started, assign it a number from a counter. This allows the TxManager to ignore batches from stopped pipelines, via a message sent by the coordinator. - Avoid forging when we haven't reached the rollup genesis block number. - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the number of blocks of delay to wait before starting the pipeline when we reach a slot in which we can forge. - When detecting a reorg, only reset the pipeline if the batch from which the pipeline started changed and wasn't sent by us. - Add config parameter `ScheduleBatchBlocksAheadCheck`: ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the forger address is checked to be allowed to forge (apart from checking the next block), used to decide when to stop scheduling new batches (by stopping the pipeline). For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge, the pipeline will be stopped if we can't forge at block 15. This value should be the expected number of blocks it takes between scheduling a batch and having it mined. - Add config parameter `SendBatchBlocksMarginCheck`: SendBatchBlocksMarginCheck is the number of margin blocks ahead in which the coordinator is also checked to be allowed to forge, apart from the next block; used to decide when to stop sending batches to the smart contract. For example, if we are at block 10 and SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the batch will be discarded if we can't forge at block 15. - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout after which a non-mined ethereum transaction will be resent (reusing the nonce) with a newly calculated gas price - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price allowed for ethereum transactions - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces of pending transactions for new replacement transactions. This is useful for testing with Ganache. - Extend BatchInfo with more useful information for debugging - eth / ethereum client - Add necessary methods to create the auth object for transactions manually so that we can set the nonce, gas price, gas limit, etc manually - Update `RollupForgeBatch` to take an auth object as input (so that the coordinator can set parameters manually) - synchronizer - In stats, add `NextSlot` - In stats, store full last batch instead of just last batch number - Instead of calculating a nextSlot from scratch every time, update the current struct (only updating the forger info if we are Synced) - Afer every processed batch, check that the calculated StateDB MTRoot matches the StateRoot found in the forgeBatch event. --- batchbuilder/batchbuilder.go | 2 +- cli/node/cfg.buidler.toml | 12 +- common/batch.go | 18 ++ common/ethauction.go | 3 +- config/config.go | 58 ++++- coordinator/batch.go | 22 +- coordinator/coordinator.go | 201 ++++++++++----- coordinator/coordinator_test.go | 6 +- coordinator/pipeline.go | 166 +++++++++--- coordinator/pipeline_test.go | 17 +- coordinator/purger.go | 18 +- coordinator/txmanager.go | 404 ++++++++++++++++++++++++------ coordinator/txmanager_test.go | 15 ++ db/historydb/historydb.go | 25 ++ db/historydb/historydb_test.go | 10 + db/kvdb/kvdb.go | 29 ++- db/statedb/statedb.go | 6 + go.sum | 10 + node/node.go | 16 +- synchronizer/synchronizer.go | 144 ++++++----- synchronizer/synchronizer_test.go | 26 +- txselector/txselector.go | 8 +- 22 files changed, 933 insertions(+), 283 deletions(-) create mode 100644 coordinator/txmanager_test.go diff --git a/batchbuilder/batchbuilder.go b/batchbuilder/batchbuilder.go index 8f662fe..ed57501 100644 --- a/batchbuilder/batchbuilder.go +++ b/batchbuilder/batchbuilder.go @@ -54,7 +54,7 @@ func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchN // copy of the rollup state from the Synchronizer at that `batchNum`, otherwise // it can just roll back the internal copy. func (bb *BatchBuilder) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { - return bb.localStateDB.Reset(batchNum, fromSynchronizer) + return tracerr.Wrap(bb.localStateDB.Reset(batchNum, fromSynchronizer)) } // BuildBatch takes the transactions and returns the common.ZKInputs of the next batch diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 8e33845..dd63f9d 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -41,12 +41,15 @@ TokenHEZ = "0x5D94e3e7aeC542aB0F9129B9a7BAdeb5B3Ca0f77" TokenHEZName = "Hermez Network Token" [Coordinator] -# ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordinator +ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordinator # ForgerAddressPrivateKey = "0x30f5fddb34cd4166adb2c6003fa6b18f380fd2341376be42cf1c7937004ac7a3" -ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator +# ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator # ForgerAddressPrivateKey = "0xa8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563" ConfirmBlocks = 10 -L1BatchTimeoutPerc = 0.999 +L1BatchTimeoutPerc = 0.6 +StartSlotBlocksDelay = 2 +ScheduleBatchBlocksAheadCheck = 3 +SendBatchBlocksMarginCheck = 1 ProofServerPollInterval = "1s" ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" @@ -85,8 +88,11 @@ ReceiptLoopInterval = "500ms" CheckLoopInterval = "500ms" Attempts = 4 AttemptsDelay = "500ms" +TxResendTimeout = "2m" +NoReuseNonce = false CallGasLimit = 300000 GasPriceDiv = 100 +MaxGasPrice = "5000000000" [Coordinator.EthClient.Keystore] Path = "/tmp/iden3-test/hermez/ethkeystore" diff --git a/common/batch.go b/common/batch.go index a6e11ce..972b53a 100644 --- a/common/batch.go +++ b/common/batch.go @@ -27,6 +27,24 @@ type Batch struct { TotalFeesUSD *float64 `meddler:"total_fees_usd"` } +// NewEmptyBatch creates a new empty batch +func NewEmptyBatch() *Batch { + return &Batch{ + BatchNum: 0, + EthBlockNum: 0, + ForgerAddr: ethCommon.Address{}, + CollectedFees: make(map[TokenID]*big.Int), + FeeIdxsCoordinator: make([]Idx, 0), + StateRoot: big.NewInt(0), + NumAccounts: 0, + LastIdx: 0, + ExitRoot: big.NewInt(0), + ForgeL1TxsNum: nil, + SlotNum: 0, + TotalFeesUSD: nil, + } +} + // BatchNum identifies a batch type BatchNum int64 diff --git a/common/ethauction.go b/common/ethauction.go index 0292407..9425ea4 100644 --- a/common/ethauction.go +++ b/common/ethauction.go @@ -33,7 +33,8 @@ func (c *AuctionConstants) SlotNum(blockNum int64) int64 { if blockNum >= c.GenesisBlockNum { return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } - return -1 + // This result will be negative + return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } // SlotBlocks returns the first and the last block numbers included in that slot diff --git a/config/config.go b/config/config.go index 92330c8..b9aa213 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "io/ioutil" + "math/big" "time" "github.com/BurntSushi/toml" @@ -51,6 +52,27 @@ type Coordinator struct { // L1BatchTimeoutPerc is the portion of the range before the L1Batch // timeout that will trigger a schedule to forge an L1Batch L1BatchTimeoutPerc float64 `validate:"required"` + // StartSlotBlocksDelay is the number of blocks of delay to wait before + // starting the pipeline when we reach a slot in which we can forge. + StartSlotBlocksDelay int64 + // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which + // the forger address is checked to be allowed to forge (apart from + // checking the next block), used to decide when to stop scheduling new + // batches (by stopping the pipeline). + // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck + // is 5, eventhough at block 11 we canForge, the pipeline will be + // stopped if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // scheduling a batch and having it mined. + ScheduleBatchBlocksAheadCheck int64 + // SendBatchBlocksMarginCheck is the number of margin blocks ahead in + // which the coordinator is also checked to be allowed to forge, apart + // from the next block; used to decide when to stop sending batches to + // the smart contract. + // For example, if we are at block 10 and SendBatchBlocksMarginCheck is + // 5, eventhough at block 11 we canForge, the batch will be discarded + // if we can't forge at block 15. + SendBatchBlocksMarginCheck int64 // ProofServerPollInterval is the waiting interval between polling the // ProofServer while waiting for a particular status ProofServerPollInterval Duration `validate:"required"` @@ -65,19 +87,33 @@ type Coordinator struct { // SafetyPeriod is the number of batches after which // non-pending L2Txs are deleted from the pool SafetyPeriod common.BatchNum `validate:"required"` - // MaxTxs is the number of L2Txs that once reached triggers - // deletion of old L2Txs + // MaxTxs is the maximum number of pending L2Txs that can be + // stored in the pool. Once this number of pending L2Txs is + // reached, inserts to the pool will be denied until some of + // the pending txs are forged. MaxTxs uint32 `validate:"required"` // TTL is the Time To Live for L2Txs in the pool. Once MaxTxs // L2Txs is reached, L2Txs older than TTL will be deleted. TTL Duration `validate:"required"` - // PurgeBatchDelay is the delay between batches to purge outdated transactions + // PurgeBatchDelay is the delay between batches to purge + // outdated transactions. Oudated L2Txs are those that have + // been forged or marked as invalid for longer than the + // SafetyPeriod and pending L2Txs that have been in the pool + // for longer than TTL once there are MaxTxs. PurgeBatchDelay int64 `validate:"required"` - // InvalidateBatchDelay is the delay between batches to mark invalid transactions + // InvalidateBatchDelay is the delay between batches to mark + // invalid transactions due to nonce lower than the account + // nonce. InvalidateBatchDelay int64 `validate:"required"` - // PurgeBlockDelay is the delay between blocks to purge outdated transactions + // PurgeBlockDelay is the delay between blocks to purge + // outdated transactions. Oudated L2Txs are those that have + // been forged or marked as invalid for longer than the + // SafetyPeriod and pending L2Txs that have been in the pool + // for longer than TTL once there are MaxTxs. PurgeBlockDelay int64 `validate:"required"` - // InvalidateBlockDelay is the delay between blocks to mark invalid transactions + // InvalidateBlockDelay is the delay between blocks to mark + // invalid transactions due to nonce lower than the account + // nonce. InvalidateBlockDelay int64 `validate:"required"` } `validate:"required"` TxSelector struct { @@ -101,6 +137,9 @@ type Coordinator struct { // calls, except for methods where a particular gas limit is // harcoded because it's known to be a big value CallGasLimit uint64 `validate:"required"` + // MaxGasPrice is the maximum gas price allowed for ethereum + // transactions + MaxGasPrice *big.Int `validate:"required"` // GasPriceDiv is the gas price division GasPriceDiv uint64 `validate:"required"` // CheckLoopInterval is the waiting interval between receipt @@ -112,6 +151,13 @@ type Coordinator struct { // AttemptsDelay is delay between attempts do do an eth client // RPC call AttemptsDelay Duration `validate:"required"` + // TxResendTimeout is the timeout after which a non-mined + // ethereum transaction will be resent (reusing the nonce) with + // a newly calculated gas price + TxResendTimeout Duration `validate:"required"` + // NoReuseNonce disables reusing nonces of pending transactions for + // new replacement transactions + NoReuseNonce bool // Keystore is the ethereum keystore where private keys are kept Keystore struct { // Path to the keystore diff --git a/coordinator/batch.go b/coordinator/batch.go index b2acf67..56a4088 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -47,6 +47,8 @@ type Debug struct { MineBlockNum int64 // SendBlockNum is the blockNum when the batch was sent to ethereum SendBlockNum int64 + // ResendNum is the number of times the tx has been resent + ResendNum int // LastScheduledL1BatchBlockNum is the blockNum when the last L1Batch // was scheduled LastScheduledL1BatchBlockNum int64 @@ -64,10 +66,17 @@ type Debug struct { // StartToSendDelay is the delay between starting a batch and sending // it to ethereum, in seconds StartToSendDelay float64 + // StartToMineDelay is the delay between starting a batch and having + // it mined in seconds + StartToMineDelay float64 + // SendToMineDelay is the delay between sending a batch tx and having + // it mined in seconds + SendToMineDelay float64 } // BatchInfo contans the Batch information type BatchInfo struct { + PipelineNum int BatchNum common.BatchNum ServerProof prover.Client ZKInputs *common.ZKInputs @@ -82,9 +91,16 @@ type BatchInfo struct { CoordIdxs []common.Idx ForgeBatchArgs *eth.RollupForgeBatchArgs // FeesInfo - EthTx *types.Transaction - Receipt *types.Receipt - Debug Debug + EthTx *types.Transaction + EthTxErr error + // SendTimestamp the time of batch sent to ethereum + SendTimestamp time.Time + Receipt *types.Receipt + // Fail is true if: + // - The receipt status is failed + // - A previous parent batch is failed + Fail bool + Debug Debug } // DebugStore is a debug function to store the BatchInfo as a json text file in diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index ee6caf9..0c2705d 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -3,8 +3,8 @@ package coordinator import ( "context" "fmt" + "math/big" "os" - "strings" "sync" "time" @@ -42,6 +42,29 @@ type Config struct { // L1BatchTimeoutPerc is the portion of the range before the L1Batch // timeout that will trigger a schedule to forge an L1Batch L1BatchTimeoutPerc float64 + // StartSlotBlocksDelay is the number of blocks of delay to wait before + // starting the pipeline when we reach a slot in which we can forge. + StartSlotBlocksDelay int64 + // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which + // the forger address is checked to be allowed to forge (apart from + // checking the next block), used to decide when to stop scheduling new + // batches (by stopping the pipeline). + // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck + // is 5, eventhough at block 11 we canForge, the pipeline will be + // stopped if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // scheduling a batch and having it mined. + ScheduleBatchBlocksAheadCheck int64 + // SendBatchBlocksMarginCheck is the number of margin blocks ahead in + // which the coordinator is also checked to be allowed to forge, apart + // from the next block; used to decide when to stop sending batches to + // the smart contract. + // For example, if we are at block 10 and SendBatchBlocksMarginCheck is + // 5, eventhough at block 11 we canForge, the batch will be discarded + // if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // sending a batch and having it mined. + SendBatchBlocksMarginCheck int64 // EthClientAttempts is the number of attempts to do an eth client RPC // call before giving up EthClientAttempts int @@ -54,13 +77,25 @@ type Config struct { // EthClientAttemptsDelay is delay between attempts do do an eth client // RPC call EthClientAttemptsDelay time.Duration + // EthTxResendTimeout is the timeout after which a non-mined ethereum + // transaction will be resent (reusing the nonce) with a newly + // calculated gas price + EthTxResendTimeout time.Duration + // EthNoReuseNonce disables reusing nonces of pending transactions for + // new replacement transactions + EthNoReuseNonce bool + // MaxGasPrice is the maximum gas price allowed for ethereum + // transactions + MaxGasPrice *big.Int // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration // DebugBatchPath if set, specifies the path where batchInfo is stored // in JSON in every step/update of the pipeline - DebugBatchPath string - Purger PurgerCfg + DebugBatchPath string + Purger PurgerCfg + // VerifierIdx is the index of the verifier contract registered in the + // smart contract VerifierIdx uint8 TxProcessorConfig txprocessor.Config } @@ -74,15 +109,22 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) { } } +type fromBatch struct { + BatchNum common.BatchNum + ForgerAddr ethCommon.Address + StateRoot *big.Int +} + // Coordinator implements the Coordinator type type Coordinator struct { // State - pipelineBatchNum common.BatchNum // batchNum from which we started the pipeline - provers []prover.Client - consts synchronizer.SCConsts - vars synchronizer.SCVariables - stats synchronizer.Stats - started bool + pipelineNum int // Pipeline sequential number. The first pipeline is 1 + pipelineFromBatch fromBatch // batch from which we started the pipeline + provers []prover.Client + consts synchronizer.SCConsts + vars synchronizer.SCVariables + stats synchronizer.Stats + started bool cfg Config @@ -96,7 +138,8 @@ type Coordinator struct { wg sync.WaitGroup cancel context.CancelFunc - pipeline *Pipeline + pipeline *Pipeline + lastNonFailedBatchNum common.BatchNum purger *Purger txManager *TxManager @@ -139,10 +182,15 @@ func NewCoordinator(cfg Config, ctx, cancel := context.WithCancel(context.Background()) c := Coordinator{ - pipelineBatchNum: -1, - provers: serverProofs, - consts: *scConsts, - vars: *initSCVars, + pipelineNum: 0, + pipelineFromBatch: fromBatch{ + BatchNum: 0, + ForgerAddr: ethCommon.Address{}, + StateRoot: big.NewInt(0), + }, + provers: serverProofs, + consts: *scConsts, + vars: *initSCVars, cfg: cfg, @@ -183,8 +231,9 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { } func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { - return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, - c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) + c.pipelineNum++ + return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector, + c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts) } // MsgSyncBlock indicates an update to the Synchronizer stats @@ -205,6 +254,9 @@ type MsgSyncReorg struct { // MsgStopPipeline indicates a signal to reset the pipeline type MsgStopPipeline struct { Reason string + // FailedBatchNum indicates the first batchNum that failed in the + // pipeline. If FailedBatchNum is 0, it should be ignored. + FailedBatchNum common.BatchNum } // SendMsg is a thread safe method to pass a message to the Coordinator @@ -215,27 +267,36 @@ func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) { } } -func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - c.vars.Rollup = *vars.Rollup +func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariablesPtr) { + if update.Rollup != nil { + vars.Rollup = *update.Rollup } - if vars.Auction != nil { - c.vars.Auction = *vars.Auction + if update.Auction != nil { + vars.Auction = *update.Auction } - if vars.WDelayer != nil { - c.vars.WDelayer = *vars.WDelayer + if update.WDelayer != nil { + vars.WDelayer = *update.WDelayer } } +func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { + updateSCVars(&c.vars, vars) +} + func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables, currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64) bool { + if blockNum < auctionConstants.GenesisBlockNum { + log.Infow("canForge: requested blockNum is < genesis", "blockNum", blockNum, + "genesis", auctionConstants.GenesisBlockNum) + return false + } var slot *common.Slot if currentSlot.StartBlock <= blockNum && blockNum <= currentSlot.EndBlock { slot = currentSlot } else if nextSlot.StartBlock <= blockNum && blockNum <= nextSlot.EndBlock { slot = nextSlot } else { - log.Warnw("Coordinator: requested blockNum for canForge is outside slot", + log.Warnw("canForge: requested blockNum is outside current and next slot", "blockNum", blockNum, "currentSlot", currentSlot, "nextSlot", nextSlot, ) @@ -244,16 +305,23 @@ func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.Auc anyoneForge := false if !slot.ForgerCommitment && auctionConstants.RelativeBlock(blockNum) >= int64(auctionVars.SlotDeadline) { - log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)", + log.Debugw("canForge: anyone can forge in the current slot (slotDeadline passed)", "block", blockNum) anyoneForge = true } if slot.Forger == addr || anyoneForge { return true } + log.Debugw("canForge: can't forge", "slot.Forger", slot.Forger) return false } +func (c *Coordinator) canForgeAt(blockNum int64) bool { + return canForge(&c.consts.Auction, &c.vars.Auction, + &c.stats.Sync.Auction.CurrentSlot, &c.stats.Sync.Auction.NextSlot, + c.cfg.ForgerAddress, blockNum) +} + func (c *Coordinator) canForge() bool { blockNum := c.stats.Eth.LastBlock.Num + 1 return canForge(&c.consts.Auction, &c.vars.Auction, @@ -262,12 +330,24 @@ func (c *Coordinator) canForge() bool { } func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error { - canForge := c.canForge() + nextBlock := c.stats.Eth.LastBlock.Num + 1 + canForge := c.canForgeAt(nextBlock) + if c.cfg.ScheduleBatchBlocksAheadCheck != 0 && canForge { + canForge = c.canForgeAt(nextBlock + c.cfg.ScheduleBatchBlocksAheadCheck) + } if c.pipeline == nil { - if canForge { + relativeBlock := c.consts.Auction.RelativeBlock(nextBlock) + if canForge && relativeBlock < c.cfg.StartSlotBlocksDelay { + log.Debugf("Coordinator: delaying pipeline start due to "+ + "relativeBlock (%v) < cfg.StartSlotBlocksDelay (%v)", + relativeBlock, c.cfg.StartSlotBlocksDelay) + } else if canForge { log.Infow("Coordinator: forging state begin", "block", - stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch) - batchNum := common.BatchNum(stats.Sync.LastBatch) + stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) + batchNum := stats.Sync.LastBatch.BatchNum + if c.lastNonFailedBatchNum > batchNum { + batchNum = c.lastNonFailedBatchNum + } var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) @@ -276,7 +356,6 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) c.pipeline = nil return tracerr.Wrap(err) } - c.pipelineBatchNum = batchNum } } else { if !canForge { @@ -286,25 +365,12 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) } } if c.pipeline == nil { - // Mark invalid in Pool due to forged L2Txs - // for _, batch := range batches { - // if err := c.l2DB.InvalidateOldNonces( - // idxsNonceFromL2Txs(batch.L2Txs), batch.Batch.BatchNum); err != nil { - // return err - // } - // } - if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) { - if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil { - return tracerr.Wrap(err) - } - } - _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), - stats.Sync.LastBlock.Num, stats.Sync.LastBatch) - if err != nil { + if _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), + stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)); err != nil { return tracerr.Wrap(err) } - _, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch) - if err != nil { + if _, err := c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, + int64(stats.Sync.LastBatch.BatchNum)); err != nil { return tracerr.Wrap(err) } } @@ -331,33 +397,42 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error if c.pipeline != nil { c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars) } - if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum { - // There's been a reorg and the batch from which the pipeline - // was started was in a block that was discarded. The batch - // may not be in the main chain, so we stop the pipeline as a - // precaution (it will be started again once the node is in - // sync). - log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum", - "sync.LastBatch", c.stats.Sync.LastBatch, - "c.pipelineBatchNum", c.pipelineBatchNum) - if err := c.handleStopPipeline(ctx, "reorg"); err != nil { + if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress && + c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 { + // There's been a reorg and the batch state root from which the + // pipeline was started has changed (probably because it was in + // a block that was discarded), and it was sent by a different + // coordinator than us. That batch may never be in the main + // chain, so we stop the pipeline (it will be started again + // once the node is in sync). + log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch.ForgerAddr != cfg.ForgerAddr "+ + "& sync.LastBatch.StateRoot != pipelineFromBatch.StateRoot", + "sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot, + "pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot) + c.txManager.DiscardPipeline(ctx, c.pipelineNum) + if err := c.handleStopPipeline(ctx, "reorg", 0); err != nil { return tracerr.Wrap(err) } } return nil } -func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error { +// handleStopPipeline handles stopping the pipeline. If failedBatchNum is 0, +// the next pipeline will start from the last state of the synchronizer, +// otherwise, it will state from failedBatchNum-1. +func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string, failedBatchNum common.BatchNum) error { + batchNum := c.stats.Sync.LastBatch.BatchNum + if failedBatchNum != 0 { + batchNum = failedBatchNum - 1 + } if c.pipeline != nil { c.pipeline.Stop(c.ctx) c.pipeline = nil } - if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil { + if err := c.l2DB.Reorg(batchNum); err != nil { return tracerr.Wrap(err) } - if strings.Contains(reason, common.AuctionErrMsgCannotForge) { //nolint:staticcheck - // TODO: Check that we are in a slot in which we can't forge - } + c.lastNonFailedBatchNum = batchNum return nil } @@ -373,7 +448,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error { } case MsgStopPipeline: log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason) - if err := c.handleStopPipeline(ctx, msg.Reason); err != nil { + if err := c.handleStopPipeline(ctx, msg.Reason, msg.FailedBatchNum); err != nil { return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err)) } default: diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index a9189e8..3bb6902 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -261,8 +261,8 @@ func TestCoordinatorFlow(t *testing.T) { var stats synchronizer.Stats stats.Eth.LastBlock = *ethClient.CtlLastBlock() stats.Sync.LastBlock = stats.Eth.LastBlock - stats.Eth.LastBatch = ethClient.CtlLastForgedBatch() - stats.Sync.LastBatch = stats.Eth.LastBatch + stats.Eth.LastBatchNum = ethClient.CtlLastForgedBatch() + stats.Sync.LastBatch.BatchNum = common.BatchNum(stats.Eth.LastBatchNum) canForge, err := ethClient.AuctionCanForge(forger, blockNum+1) require.NoError(t, err) var slot common.Slot @@ -279,7 +279,7 @@ func TestCoordinatorFlow(t *testing.T) { // Copy stateDB to synchronizer if there was a new batch source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch) dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch) - if stats.Sync.LastBatch != 0 { + if stats.Sync.LastBatch.BatchNum != 0 { if _, err := os.Stat(dest); os.IsNotExist(err) { log.Infow("Making pebble checkpoint for sync", "source", source, "dest", dest) diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 81d5774..9939079 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "database/sql" "fmt" "math/big" "sync" @@ -24,19 +25,27 @@ type statsVars struct { Vars synchronizer.SCVariablesPtr } +type state struct { + batchNum common.BatchNum + lastScheduledL1BatchBlockNum int64 + lastForgeL1TxsNum int64 +} + // Pipeline manages the forging of batches with parallel server proofs type Pipeline struct { + num int cfg Config consts synchronizer.SCConsts // state - batchNum common.BatchNum - lastScheduledL1BatchBlockNum int64 - lastForgeL1TxsNum int64 - started bool + state state + started bool + rw sync.RWMutex + errAtBatchNum common.BatchNum proversPool *ProversPool provers []prover.Client + coord *Coordinator txManager *TxManager historyDB *historydb.HistoryDB l2DB *l2db.L2DB @@ -53,14 +62,28 @@ type Pipeline struct { cancel context.CancelFunc } +func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) { + p.rw.Lock() + defer p.rw.Unlock() + p.errAtBatchNum = batchNum +} + +func (p *Pipeline) getErrAtBatchNum() common.BatchNum { + p.rw.RLock() + defer p.rw.RUnlock() + return p.errAtBatchNum +} + // NewPipeline creates a new Pipeline func NewPipeline(ctx context.Context, cfg Config, + num int, // Pipeline sequential number historyDB *historydb.HistoryDB, l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, purger *Purger, + coord *Coordinator, txManager *TxManager, provers []prover.Client, scConsts *synchronizer.SCConsts, @@ -79,6 +102,7 @@ func NewPipeline(ctx context.Context, return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) } return &Pipeline{ + num: num, cfg: cfg, historyDB: historyDB, l2DB: l2DB, @@ -87,6 +111,7 @@ func NewPipeline(ctx context.Context, provers: provers, proversPool: proversPool, purger: purger, + coord: coord, txManager: txManager, consts: *scConsts, statsVarsCh: make(chan statsVars, queueLen), @@ -104,33 +129,67 @@ func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Sta // reset pipeline state func (p *Pipeline) reset(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { - p.batchNum = batchNum - p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum + p.state = state{ + batchNum: batchNum, + lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, + lastScheduledL1BatchBlockNum: 0, + } p.stats = *stats p.vars = *vars - p.lastScheduledL1BatchBlockNum = 0 - err := p.txSelector.Reset(p.batchNum) + // Reset the StateDB in TxSelector and BatchBuilder from the + // synchronizer only if the checkpoint we reset from either: + // a. Doesn't exist in the TxSelector/BatchBuilder + // b. The batch has already been synced by the synchronizer and has a + // different MTRoot than the BatchBuilder + // Otherwise, reset from the local checkpoint. + + // First attempt to reset from local checkpoint if such checkpoint exists + existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum) if err != nil { return tracerr.Wrap(err) } - err = p.batchBuilder.Reset(p.batchNum, true) + fromSynchronizerTxSelector := !existsTxSelector + if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil { + return tracerr.Wrap(err) + } + existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum) if err != nil { return tracerr.Wrap(err) } + fromSynchronizerBatchBuilder := !existsBatchBuilder + if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil { + return tracerr.Wrap(err) + } + + // After reset, check that if the batch exists in the historyDB, the + // stateRoot matches with the local one, if not, force a reset from + // synchronizer + batch, err := p.historyDB.GetBatch(p.state.batchNum) + if tracerr.Unwrap(err) == sql.ErrNoRows { + // nothing to do + } else if err != nil { + return tracerr.Wrap(err) + } else { + localStateRoot := p.batchBuilder.LocalStateDB().MT.Root().BigInt() + if batch.StateRoot.Cmp(localStateRoot) != 0 { + log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+ + "Forcing reset from Synchronizer", localStateRoot, batch.StateRoot) + // StateRoot from synchronizer doesn't match StateRoot + // from batchBuilder, force a reset from synchronizer + if err := p.txSelector.Reset(p.state.batchNum, true); err != nil { + return tracerr.Wrap(err) + } + if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil { + return tracerr.Wrap(err) + } + } + } return nil } func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - p.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - p.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - p.vars.WDelayer = *vars.WDelayer - } + updateSCVars(&p.vars, vars) } // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, @@ -143,7 +202,7 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu } else if err != nil { if tracerr.Unwrap(err) == errLastL1BatchNotSynced { log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, - "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) } else { log.Errorw("forgeBatch", "err", err) @@ -199,15 +258,32 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) case <-time.After(waitDuration): - batchNum = p.batchNum + 1 + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + waitDuration = p.cfg.ForgeRetryInterval + continue + } + batchNum = p.state.batchNum + 1 batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { continue + } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + waitDuration = p.cfg.ForgeRetryInterval + continue } else if err != nil { - waitDuration = p.cfg.SyncRetryInterval + p.setErrAtBatchNum(batchNum) + waitDuration = p.cfg.ForgeRetryInterval + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.handleForgBatch: %v", err), + FailedBatchNum: batchNum, + }) continue } - p.batchNum = batchNum + + p.state.batchNum = batchNum select { case batchChSentServerProof <- batchInfo: case <-p.ctx.Done(): @@ -225,16 +301,28 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.wg.Done() return case batchInfo := <-batchChSentServerProof: + // Once errAtBatchNum != 0, we stop forging + // batches because there's been an error and we + // wait for the pipeline to be stopped. + if p.getErrAtBatchNum() != 0 { + continue + } err := p.waitServerProof(p.ctx, batchInfo) - // We are done with this serverProof, add it back to the pool - p.proversPool.Add(p.ctx, batchInfo.ServerProof) - batchInfo.ServerProof = nil if p.ctx.Err() != nil { continue } else if err != nil { log.Errorw("waitServerProof", "err", err) + p.setErrAtBatchNum(batchInfo.BatchNum) + p.coord.SendMsg(p.ctx, MsgStopPipeline{ + Reason: fmt.Sprintf( + "Pipeline.waitServerProof: %v", err), + FailedBatchNum: batchInfo.BatchNum, + }) continue } + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(p.ctx, batchInfo.ServerProof) + // batchInfo.ServerProof = nil p.txManager.AddBatch(p.ctx, batchInfo) } } @@ -284,8 +372,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } - - batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + // Structure to accumulate data and metadata of the batch + batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} batchInfo.Debug.StartTimestamp = time.Now() batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 @@ -300,22 +388,19 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var auths [][]byte var coordIdxs []common.Idx + // TODO: If there are no txs and we are behind the timeout, skip + // forging a batch and return a particular error that can be handleded + // in the loop where handleForgeBatch is called to retry after an + // interval + // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true - defer func() { - // If there's no error, update the parameters related - // to the last L1Batch forged - if err == nil { - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.lastForgeL1TxsNum++ - } - }() - if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { return nil, tracerr.Wrap(errLastL1BatchNotSynced) } // 2a: L1+L2 txs - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) if err != nil { return nil, tracerr.Wrap(err) } @@ -324,6 +409,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } + + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = @@ -399,12 +487,12 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { // Take the lastL1BatchBlockNum as the biggest between the last // scheduled one, and the synchronized one. - lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum + lastL1BatchBlockNum := p.state.lastScheduledL1BatchBlockNum if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock } // Set Debug information - batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum + batchInfo.Debug.LastScheduledL1BatchBlockNum = p.state.lastScheduledL1BatchBlockNum batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum batchInfo.Debug.L1BatchBlockScheduleDeadline = diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go index 1bd7bcf..454c446 100644 --- a/coordinator/pipeline_test.go +++ b/coordinator/pipeline_test.go @@ -25,6 +25,14 @@ import ( "github.com/stretchr/testify/require" ) +func newBigInt(s string) *big.Int { + v, ok := new(big.Int).SetString(s, 10) + if !ok { + panic(fmt.Errorf("Can't set big.Int from %s", s)) + } + return v +} + func TestPipelineShouldL1L2Batch(t *testing.T) { ethClientSetup := test.NewClientSetupExample() ethClientSetup.ChainID = big.NewInt(int64(chainID)) @@ -77,7 +85,7 @@ func TestPipelineShouldL1L2Batch(t *testing.T) { // // Scheduled L1Batch // - pipeline.lastScheduledL1BatchBlockNum = startBlock + pipeline.state.lastScheduledL1BatchBlockNum = startBlock stats.Sync.LastL1BatchBlock = startBlock - 10 // We are are one block before the timeout range * 0.5 @@ -128,6 +136,11 @@ func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchr blocks, err := tc.GenerateBlocksFromInstructions(set) require.NoError(t, err) require.NotNil(t, blocks) + // Set StateRoots for batches manually (til doesn't set it) + blocks[0].Rollup.Batches[0].Batch.StateRoot = + newBigInt("0") + blocks[0].Rollup.Batches[1].Batch.StateRoot = + newBigInt("10941365282189107056349764238909072001483688090878331371699519307087372995595") ethAddTokens(blocks, ethClient) err = ethClient.CtlAddBlocks(blocks) @@ -172,7 +185,7 @@ func TestPipelineForgeBatchWithTxs(t *testing.T) { // users with positive balances tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) syncStats := sync.Stats() - batchNum := common.BatchNum(syncStats.Sync.LastBatch) + batchNum := syncStats.Sync.LastBatch.BatchNum syncSCVars := sync.SCVars() pipeline, err := coord.newPipeline(ctx) diff --git a/coordinator/purger.go b/coordinator/purger.go index fa0256d..8e11b1d 100644 --- a/coordinator/purger.go +++ b/coordinator/purger.go @@ -13,13 +13,23 @@ import ( // PurgerCfg is the purger configuration type PurgerCfg struct { - // PurgeBatchDelay is the delay between batches to purge outdated transactions + // PurgeBatchDelay is the delay between batches to purge outdated + // transactions. Oudated L2Txs are those that have been forged or + // marked as invalid for longer than the SafetyPeriod and pending L2Txs + // that have been in the pool for longer than TTL once there are + // MaxTxs. PurgeBatchDelay int64 - // InvalidateBatchDelay is the delay between batches to mark invalid transactions + // InvalidateBatchDelay is the delay between batches to mark invalid + // transactions due to nonce lower than the account nonce. InvalidateBatchDelay int64 - // PurgeBlockDelay is the delay between blocks to purge outdated transactions + // PurgeBlockDelay is the delay between blocks to purge outdated + // transactions. Oudated L2Txs are those that have been forged or + // marked as invalid for longer than the SafetyPeriod and pending L2Txs + // that have been in the pool for longer than TTL once there are + // MaxTxs. PurgeBlockDelay int64 - // InvalidateBlockDelay is the delay between blocks to mark invalid transactions + // InvalidateBlockDelay is the delay between blocks to mark invalid + // transactions due to nonce lower than the account nonce. InvalidateBlockDelay int64 } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 5a449c7..082d0f0 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "errors" "fmt" "math/big" "time" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/l2db" @@ -35,12 +37,20 @@ type TxManager struct { vars synchronizer.SCVariables statsVarsCh chan statsVars - queue []*BatchInfo + discardPipelineCh chan int // int refers to the pipelineNum + + minPipelineNum int + queue Queue // lastSuccessBatch stores the last BatchNum that who's forge call was confirmed lastSuccessBatch common.BatchNum - lastPendingBatch common.BatchNum - lastSuccessNonce uint64 - lastPendingNonce uint64 + // lastPendingBatch common.BatchNum + // accNonce is the account nonce in the last mined block (due to mined txs) + accNonce uint64 + // accNextNonce is the nonce that we should use to send the next tx. + // In some cases this will be a reused nonce of an already pending tx. + accNextNonce uint64 + + lastSentL1BatchBlockNum int64 } // NewTxManager creates a new TxManager @@ -54,26 +64,19 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac if err != nil { return nil, tracerr.Wrap(err) } - lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil) + accNonce, err := ethClient.EthNonceAt(ctx, *address, nil) if err != nil { return nil, err } - lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address) - if err != nil { - return nil, err - } - if lastSuccessNonce != lastPendingNonce { - return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)", - lastSuccessNonce, lastPendingNonce)) - } - log.Infow("TxManager started", "nonce", lastSuccessNonce) + log.Infow("TxManager started", "nonce", accNonce) return &TxManager{ - cfg: *cfg, - ethClient: ethClient, - l2DB: l2DB, - coord: coord, - batchCh: make(chan *BatchInfo, queueLen), - statsVarsCh: make(chan statsVars, queueLen), + cfg: *cfg, + ethClient: ethClient, + l2DB: l2DB, + coord: coord, + batchCh: make(chan *BatchInfo, queueLen), + statsVarsCh: make(chan statsVars, queueLen), + discardPipelineCh: make(chan int, queueLen), account: accounts.Account{ Address: *address, }, @@ -82,8 +85,10 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac vars: *initSCVars, - lastSuccessNonce: lastSuccessNonce, - lastPendingNonce: lastPendingNonce, + minPipelineNum: 0, + queue: NewQueue(), + accNonce: accNonce, + accNextNonce: accNonce, }, nil } @@ -104,18 +109,19 @@ func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.St } } -func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - t.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - t.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - t.vars.WDelayer = *vars.WDelayer +// DiscardPipeline is a thread safe method to notify about a discarded pipeline +// due to a reorg +func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) { + select { + case t.discardPipelineCh <- pipelineNum: + case <-ctx.Done(): } } +func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { + updateSCVars(&t.vars, vars) +} + // NewAuth generates a new auth object for an ethereum transaction func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx) @@ -123,6 +129,7 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { return nil, tracerr.Wrap(err) } inc := new(big.Int).Set(gasPrice) + // TODO: Replace this by a value of percentage const gasPriceDiv = 100 inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv)) gasPrice.Add(gasPrice, inc) @@ -141,29 +148,75 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { return auth, nil } -func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { - // TODO: Check if we can forge in the next blockNum, abort if we can't - batchInfo.Debug.Status = StatusSent - batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 - batchInfo.Debug.SendTimestamp = time.Now() - batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( - batchInfo.Debug.StartTimestamp).Seconds() +func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error { + nextBlock := t.stats.Eth.LastBlock.Num + 1 + if !t.canForgeAt(nextBlock) { + return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock)) + } + if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch { + return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock)) + } + margin := t.cfg.SendBatchBlocksMarginCheck + if margin != 0 { + if !t.canForgeAt(nextBlock + margin) { + return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v", + margin, nextBlock)) + } + if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch { + return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v", + margin, nextBlock)) + } + } + return nil +} + +func addPerc(v *big.Int, p int64) *big.Int { + r := new(big.Int).Set(v) + r.Mul(r, big.NewInt(p)) + // nolint reason: to calculate percentages we divide by 100 + r.Div(r, big.NewInt(100)) //nolit:gomnd + return r.Add(v, r) +} + +func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error { var ethTx *types.Transaction var err error auth, err := t.NewAuth(ctx) if err != nil { return tracerr.Wrap(err) } - auth.Nonce = big.NewInt(int64(t.lastPendingNonce)) - t.lastPendingNonce++ + auth.Nonce = big.NewInt(int64(t.accNextNonce)) + if resend { + auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce())) + } for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 { + return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)", + auth.GasPrice, t.cfg.MaxGasPrice)) + } + // RollupForgeBatch() calls ethclient.SendTransaction() ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth) - if err != nil { - // if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { - // log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err, - // "block", t.stats.Eth.LastBlock.Num+1) - // return tracerr.Wrap(err) - // } + if errors.Is(err, core.ErrNonceTooLow) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce", + "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum) + auth.Nonce.Add(auth.Nonce, big.NewInt(1)) + attempt-- + } else if errors.Is(err, core.ErrNonceTooHigh) { + log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce", + "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum) + auth.Nonce.Sub(auth.Nonce, big.NewInt(1)) + attempt-- + } else if errors.Is(err, core.ErrUnderpriced) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice", + "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum) + auth.GasPrice = addPerc(auth.GasPrice, 10) + attempt-- + } else if errors.Is(err, core.ErrReplaceUnderpriced) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice", + "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum) + auth.GasPrice = addPerc(auth.GasPrice, 10) + attempt-- + } else if err != nil { log.Errorw("TxManager ethClient.RollupForgeBatch", "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1, "batchNum", batchInfo.BatchNum) @@ -179,10 +232,29 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn if err != nil { return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) } + if !resend { + t.accNextNonce = auth.Nonce.Uint64() + 1 + } batchInfo.EthTx = ethTx - log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) + log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash()) + now := time.Now() + batchInfo.SendTimestamp = now + + if resend { + batchInfo.Debug.ResendNum++ + } + batchInfo.Debug.Status = StatusSent + batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 + batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp + batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() t.cfg.debugBatchStore(batchInfo) - t.lastPendingBatch = batchInfo.BatchNum + + if !resend { + if batchInfo.L1Batch { + t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 + } + } if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { return tracerr.Wrap(err) } @@ -225,13 +297,20 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) { receipt := batchInfo.Receipt if receipt != nil { + if batchInfo.EthTx.Nonce()+1 > t.accNonce { + t.accNonce = batchInfo.EthTx.Nonce() + 1 + } if receipt.Status == types.ReceiptStatusFailed { batchInfo.Debug.Status = StatusFailed - t.cfg.debugBatchStore(batchInfo) _, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber) - log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(), + log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash, "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(), "err", err) + batchInfo.EthTxErr = err + if batchInfo.BatchNum <= t.lastSuccessBatch { + t.lastSuccessBatch = batchInfo.BatchNum - 1 + } + t.cfg.debugBatchStore(batchInfo) return nil, tracerr.Wrap(fmt.Errorf( "ethereum transaction receipt status is failed: %w", err)) } else if receipt.Status == types.ReceiptStatusSuccessful { @@ -239,6 +318,17 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - batchInfo.Debug.StartBlockNum + if batchInfo.Debug.StartToMineDelay == 0 { + if block, err := t.ethClient.EthBlockByNumber(ctx, + receipt.BlockNumber.Int64()); err != nil { + log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err) + } else { + batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub( + batchInfo.Debug.SendTimestamp).Seconds() + batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() + } + } t.cfg.debugBatchStore(batchInfo) if batchInfo.BatchNum > t.lastSuccessBatch { t.lastSuccessBatch = batchInfo.BatchNum @@ -250,9 +340,72 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i return nil, nil } +// TODO: +// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions) + +// Queue of BatchInfos +type Queue struct { + list []*BatchInfo + // nonceByBatchNum map[common.BatchNum]uint64 + next int +} + +// NewQueue returns a new queue +func NewQueue() Queue { + return Queue{ + list: make([]*BatchInfo, 0), + // nonceByBatchNum: make(map[common.BatchNum]uint64), + next: 0, + } +} + +// Len is the length of the queue +func (q *Queue) Len() int { + return len(q.list) +} + +// At returns the BatchInfo at position (or nil if position is out of bounds) +func (q *Queue) At(position int) *BatchInfo { + if position >= len(q.list) { + return nil + } + return q.list[position] +} + +// Next returns the next BatchInfo (or nil if queue is empty) +func (q *Queue) Next() (int, *BatchInfo) { + if len(q.list) == 0 { + return 0, nil + } + defer func() { q.next = (q.next + 1) % len(q.list) }() + return q.next, q.list[q.next] +} + +// Remove removes the BatchInfo at position +func (q *Queue) Remove(position int) { + // batchInfo := q.list[position] + // delete(q.nonceByBatchNum, batchInfo.BatchNum) + q.list = append(q.list[:position], q.list[position+1:]...) + if len(q.list) == 0 { + q.next = 0 + } else { + q.next = position % len(q.list) + } +} + +// Push adds a new BatchInfo +func (q *Queue) Push(batchInfo *BatchInfo) { + q.list = append(q.list, batchInfo) + // q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce() +} + +// func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) { +// nonce, ok := q.nonceByBatchNum[batchNum] +// return nonce, ok +// } + // Run the TxManager func (t *TxManager) Run(ctx context.Context) { - next := 0 waitDuration := longWaitDuration var statsVars statsVars @@ -263,7 +416,7 @@ func (t *TxManager) Run(ctx context.Context) { t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) log.Infow("TxManager: received initial statsVars", - "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch) + "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) for { select { @@ -273,8 +426,27 @@ func (t *TxManager) Run(ctx context.Context) { case statsVars := <-t.statsVarsCh: t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) + case pipelineNum := <-t.discardPipelineCh: + t.minPipelineNum = pipelineNum + 1 + if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("TxManager: removeBadBatchInfos", "err", err) + continue + } case batchInfo := <-t.batchCh: - if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { + if batchInfo.PipelineNum < t.minPipelineNum { + log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum", + "num", batchInfo.PipelineNum, "minNum", t.minPipelineNum) + } + if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil { + log.Warnw("TxManager: shouldSend", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)}) + continue + } + if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil { continue } else if err != nil { // If we reach here it's because our ethNode has @@ -282,19 +454,20 @@ func (t *TxManager) Run(ctx context.Context) { // ethereum. This could be due to the ethNode // failure, or an invalid transaction (that // can't be mined) - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch send: %v", err)}) + log.Warnw("TxManager: forgeBatch send failed", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch send: %v", err)}) continue } - t.queue = append(t.queue, batchInfo) + t.queue.Push(batchInfo) waitDuration = t.cfg.TxManagerCheckInterval case <-time.After(waitDuration): - if len(t.queue) == 0 { + queuePosition, batchInfo := t.queue.Next() + if batchInfo == nil { waitDuration = longWaitDuration continue } - current := next - next = (current + 1) % len(t.queue) - batchInfo := t.queue[current] if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { continue } else if err != nil { //nolint:staticcheck @@ -304,7 +477,8 @@ func (t *TxManager) Run(ctx context.Context) { // if it was not mined, mined and succesfull or // mined and failed. This could be due to the // ethNode failure. - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) } confirm, err := t.handleReceipt(ctx, batchInfo) @@ -312,32 +486,106 @@ func (t *TxManager) Run(ctx context.Context) { continue } else if err != nil { //nolint:staticcheck // Transaction was rejected - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) + if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("TxManager: removeBadBatchInfos", "err", err) + continue } - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + continue } - if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { - log.Debugw("TxManager tx for RollupForgeBatch confirmed", - "batch", batchInfo.BatchNum) - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) + now := time.Now() + if !t.cfg.EthNoReuseNonce && confirm == nil && + now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout { + log.Infow("TxManager: forgeBatch tx not been mined timeout, resending", + "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) + if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil { + continue + } else if err != nil { + // If we reach here it's because our ethNode has + // been unable to send the transaction to + // ethereum. This could be due to the ethNode + // failure, or an invalid transaction (that + // can't be mined) + log.Warnw("TxManager: forgeBatch resend failed", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch resend: %v", err)}) + continue } } + + if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { + log.Debugw("TxManager: forgeBatch tx confirmed", + "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) + t.queue.Remove(queuePosition) + } } } } -// nolint reason: this function will be used in the future -//nolint:unused -func (t *TxManager) canForge(stats *synchronizer.Stats, blockNum int64) bool { +func (t *TxManager) removeBadBatchInfos(ctx context.Context) error { + next := 0 + for { + batchInfo := t.queue.At(next) + if batchInfo == nil { + break + } + if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { + return nil + } else if err != nil { + // Our ethNode is giving an error different + // than "not found" when getting the receipt + // for the transaction, so we can't figure out + // if it was not mined, mined and succesfull or + // mined and failed. This could be due to the + // ethNode failure. + next++ + continue + } + confirm, err := t.handleReceipt(ctx, batchInfo) + if ctx.Err() != nil { + return nil + } else if err != nil { + // Transaction was rejected + if t.minPipelineNum <= batchInfo.PipelineNum { + t.minPipelineNum = batchInfo.PipelineNum + 1 + } + t.queue.Remove(next) + continue + } + // If tx is pending but is from a cancelled pipeline, remove it + // from the queue + if confirm == nil { + if batchInfo.PipelineNum < t.minPipelineNum { + t.queue.Remove(next) + continue + } + } + next++ + } + accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil) + if err != nil { + return err + } + if !t.cfg.EthNoReuseNonce { + t.accNextNonce = accNonce + } + return nil +} + +func (t *TxManager) canForgeAt(blockNum int64) bool { return canForge(&t.consts.Auction, &t.vars.Auction, - &stats.Sync.Auction.CurrentSlot, &stats.Sync.Auction.NextSlot, + &t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot, t.cfg.ForgerAddress, blockNum) } + +func (t *TxManager) mustL1L2Batch(blockNum int64) bool { + lastL1BatchBlockNum := t.lastSentL1BatchBlockNum + if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock + } + return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1 +} diff --git a/coordinator/txmanager_test.go b/coordinator/txmanager_test.go new file mode 100644 index 0000000..4c624d6 --- /dev/null +++ b/coordinator/txmanager_test.go @@ -0,0 +1,15 @@ +package coordinator + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddPerc(t *testing.T) { + assert.Equal(t, "110", addPerc(big.NewInt(100), 10).String()) + assert.Equal(t, "101", addPerc(big.NewInt(100), 1).String()) + assert.Equal(t, "12", addPerc(big.NewInt(10), 20).String()) + assert.Equal(t, "1500", addPerc(big.NewInt(1000), 50).String()) +} diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 8abd994..38dafa4 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -164,6 +164,19 @@ func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error { return nil } +// GetBatch returns the batch with the given batchNum +func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error) { + var batch common.Batch + err := meddler.QueryRow( + hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, + batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, + batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, + batch.slot_num, batch.total_fees_usd FROM batch WHERE batch_num = $1;`, + batchNum, + ) + return &batch, err +} + // GetAllBatches retrieve all batches from the DB func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) { var batches []*common.Batch @@ -208,6 +221,18 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) { return batchNum, tracerr.Wrap(row.Scan(&batchNum)) } +// GetLastBatch returns the last forged batch +func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) { + var batch common.Batch + err := meddler.QueryRow( + hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, + batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, + batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, + batch.slot_num, batch.total_fees_usd FROM batch ORDER BY batch_num DESC LIMIT 1;`, + ) + return &batch, err +} + // GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) { row := hdb.db.QueryRow(`SELECT eth_block_num FROM batch diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index 80bcc0f..e7db118 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -203,6 +203,10 @@ func TestBatches(t *testing.T) { fetchedLastBatchNum, err := historyDB.GetLastBatchNum() assert.NoError(t, err) assert.Equal(t, batches[len(batches)-1].BatchNum, fetchedLastBatchNum) + // Test GetLastBatch + fetchedLastBatch, err := historyDB.GetLastBatch() + assert.NoError(t, err) + assert.Equal(t, &batches[len(batches)-1], fetchedLastBatch) // Test GetLastL1TxsNum fetchedLastL1TxsNum, err := historyDB.GetLastL1TxsNum() assert.NoError(t, err) @@ -211,6 +215,12 @@ func TestBatches(t *testing.T) { fetchedLastL1BatchBlockNum, err := historyDB.GetLastL1BatchBlockNum() assert.NoError(t, err) assert.Equal(t, lastL1BatchBlockNum, fetchedLastL1BatchBlockNum) + // Test GetBatch + fetchedBatch, err := historyDB.GetBatch(1) + require.NoError(t, err) + assert.Equal(t, &batches[0], fetchedBatch) + _, err = historyDB.GetBatch(common.BatchNum(len(batches) + 1)) + assert.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err)) } func TestBids(t *testing.T) { diff --git a/db/kvdb/kvdb.go b/db/kvdb/kvdb.go index c2f5633..a367bf3 100644 --- a/db/kvdb/kvdb.go +++ b/db/kvdb/kvdb.go @@ -425,12 +425,13 @@ func (k *KVDB) MakeCheckpoint() error { } // if checkpoint BatchNum already exist in disk, delete it - if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) { + if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { + } else if err != nil { + return tracerr.Wrap(err) + } else { if err := os.RemoveAll(checkpointPath); err != nil { return tracerr.Wrap(err) } - } else if err != nil && !os.IsNotExist(err) { - return tracerr.Wrap(err) } // execute Checkpoint @@ -451,12 +452,25 @@ func (k *KVDB) MakeCheckpoint() error { return nil } +// CheckpointExists returns true if the checkpoint exists +func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) + if _, err := os.Stat(source); os.IsNotExist(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + // DeleteCheckpoint removes if exist the checkpoint of the given batchNum func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error { checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum)) if _, err := os.Stat(checkpointPath); os.IsNotExist(err) { return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum)) + } else if err != nil { + return tracerr.Wrap(err) } return os.RemoveAll(checkpointPath) @@ -520,6 +534,8 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e if _, err := os.Stat(source); os.IsNotExist(err) { // if kvdb does not have checkpoint at batchNum, return err return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source)) + } else if err != nil { + return tracerr.Wrap(err) } // By locking we allow calling MakeCheckpointFromTo from multiple // places at the same time for the same stateDB. This allows the @@ -533,12 +549,13 @@ func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) e func pebbleMakeCheckpoint(source, dest string) error { // Remove dest folder (if it exists) before doing the checkpoint - if _, err := os.Stat(dest); !os.IsNotExist(err) { + if _, err := os.Stat(dest); os.IsNotExist(err) { + } else if err != nil { + return tracerr.Wrap(err) + } else { if err := os.RemoveAll(dest); err != nil { return tracerr.Wrap(err) } - } else if err != nil && !os.IsNotExist(err) { - return tracerr.Wrap(err) } sto, err := pebble.NewPebbleStorage(source, false) diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index bcf44a5..12c8200 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -498,11 +498,17 @@ func NewLocalStateDB(cfg Config, synchronizerDB *StateDB) (*LocalStateDB, error) }, nil } +// CheckpointExists returns true if the checkpoint exists +func (l *LocalStateDB) CheckpointExists(batchNum common.BatchNum) (bool, error) { + return l.db.CheckpointExists(batchNum) +} + // Reset performs a reset in the LocaStateDB. If fromSynchronizer is true, it // gets the state from LocalStateDB.synchronizerStateDB for the given batchNum. // If fromSynchronizer is false, get the state from LocalStateDB checkpoints. func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { if fromSynchronizer { + log.Debugw("Making StateDB ResetFromSynchronizer", "batch", batchNum, "type", l.cfg.Type) if err := l.db.ResetFromSynchronizer(batchNum, l.synchronizerStateDB.db); err != nil { return tracerr.Wrap(err) } diff --git a/go.sum b/go.sum index e06a659..06a6019 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/uf github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= +github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= +github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= @@ -84,6 +86,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk= github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= +github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU= +github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18/go.mod h1:HD5P3vAIAh+Y2GAxg0PrPN1P8WkepXGpjbUPDHJqqKM= @@ -169,6 +173,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc h1:jtW8jbpkO4YirRSyepBOH8E+2HEw6/hKkBvFPwhUN8c= github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= +github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c= +github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= @@ -596,6 +602,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= +github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= @@ -614,6 +622,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= +github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 h1:ju5UTwk5Odtm4trrY+4Ca4RMj5OyXbmVeDAVad2T0Jw= +github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE= github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw= github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM= diff --git a/node/node.go b/node/node.go index 44eded3..a09736b 100644 --- a/node/node.go +++ b/node/node.go @@ -2,6 +2,7 @@ package node import ( "context" + "errors" "fmt" "net/http" "sync" @@ -301,6 +302,9 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, + EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce, + EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration, + MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice, TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration, DebugBatchPath: cfg.Coordinator.Debug.BatchPath, Purger: coordinator.PurgerCfg{ @@ -487,11 +491,15 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va if stats.Synced() { if err := n.nodeAPI.api.UpdateNetworkInfo( stats.Eth.LastBlock, stats.Sync.LastBlock, - common.BatchNum(stats.Eth.LastBatch), + common.BatchNum(stats.Eth.LastBatchNum), stats.Sync.Auction.CurrentSlot.SlotNum, ); err != nil { log.Errorw("API.UpdateNetworkInfo", "err", err) } + } else { + n.nodeAPI.api.UpdateNetworkInfoBlock( + stats.Eth.LastBlock, stats.Sync.LastBlock, + ) } } } @@ -573,7 +581,11 @@ func (n *Node) StartSynchronizer() { if n.ctx.Err() != nil { continue } - log.Errorw("Synchronizer.Sync", "err", err) + if errors.Is(err, eth.ErrBlockHashMismatchEvent) { + log.Warnw("Synchronizer.Sync", "err", err) + } else { + log.Errorw("Synchronizer.Sync", "err", err) + } } } } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 984b66f..361bdc8 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -25,12 +25,12 @@ type Stats struct { Updated time.Time FirstBlockNum int64 LastBlock common.Block - LastBatch int64 + LastBatchNum int64 } Sync struct { Updated time.Time LastBlock common.Block - LastBatch int64 + LastBatch common.Batch // LastL1BatchBlock is the last ethereum block in which an // l1Batch was forged LastL1BatchBlock int64 @@ -77,13 +77,13 @@ func (s *StatsHolder) UpdateCurrentNextSlot(current *common.Slot, next *common.S } // UpdateSync updates the synchronizer stats -func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum, +func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.Batch, lastL1BatchBlock *int64, lastForgeL1TxsNum *int64) { now := time.Now() s.rw.Lock() s.Sync.LastBlock = *lastBlock if lastBatch != nil { - s.Sync.LastBatch = int64(*lastBatch) + s.Sync.LastBatch = *lastBatch } if lastL1BatchBlock != nil { s.Sync.LastL1BatchBlock = *lastL1BatchBlock @@ -105,16 +105,16 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error { lastBlock, err := ethClient.EthBlockByNumber(context.TODO(), -1) if err != nil { - return tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err)) } - lastBatch, err := ethClient.RollupLastForgedBatch() + lastBatchNum, err := ethClient.RollupLastForgedBatch() if err != nil { - return tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("RollupLastForgedBatch: %w", err)) } s.rw.Lock() s.Eth.Updated = now s.Eth.LastBlock = *lastBlock - s.Eth.LastBatch = lastBatch + s.Eth.LastBatchNum = lastBatchNum s.rw.Unlock() return nil } @@ -139,6 +139,10 @@ func (s *StatsHolder) CopyStats() *Stats { sCopy.Sync.Auction.NextSlot.DefaultSlotBid = common.CopyBigInt(s.Sync.Auction.NextSlot.DefaultSlotBid) } + if s.Sync.LastBatch.StateRoot != nil { + sCopy.Sync.LastBatch.StateRoot = + common.CopyBigInt(s.Sync.LastBatch.StateRoot) + } s.rw.RUnlock() return &sCopy } @@ -152,9 +156,9 @@ func (s *StatsHolder) blocksPerc() float64 { float64(s.Eth.LastBlock.Num-(s.Eth.FirstBlockNum-1)) } -func (s *StatsHolder) batchesPerc(batchNum int64) float64 { +func (s *StatsHolder) batchesPerc(batchNum common.BatchNum) float64 { return float64(batchNum) * 100.0 / - float64(s.Eth.LastBatch) + float64(s.Eth.LastBatchNum) } // StartBlockNums sets the first block used to start tracking the smart @@ -329,23 +333,25 @@ func (s *Synchronizer) setSlotCoordinator(slot *common.Slot) error { return nil } -// firstBatchBlockNum is the blockNum of first batch in that block, if any -func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*common.Slot, error) { - slot := common.Slot{ - SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum, - ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment, - } +// updateCurrentSlot updates the slot with information of the current slot. +// The information abouth which coordinator is allowed to forge is only updated +// when we are Synced. +// hasBatch is true when the last synced block contained at least one batch. +func (s *Synchronizer) updateCurrentSlot(slot *common.Slot, reset bool, hasBatch bool) error { // We want the next block because the current one is already mined blockNum := s.stats.Sync.LastBlock.Num + 1 slotNum := s.consts.Auction.SlotNum(blockNum) + firstBatchBlockNum := s.stats.Sync.LastBlock.Num if reset { + // Using this query only to know if there dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum) if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { - return nil, tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err)) + return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err)) } else if tracerr.Unwrap(err) == sql.ErrNoRows { - firstBatchBlockNum = nil + hasBatch = false } else { - firstBatchBlockNum = &dbFirstBatchBlockNum + hasBatch = true + firstBatchBlockNum = dbFirstBatchBlockNum } slot.ForgerCommitment = false } else if slotNum > slot.SlotNum { @@ -356,11 +362,11 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum) // If Synced, update the current coordinator if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum { - if err := s.setSlotCoordinator(&slot); err != nil { - return nil, tracerr.Wrap(err) + if err := s.setSlotCoordinator(slot); err != nil { + return tracerr.Wrap(err) } - if firstBatchBlockNum != nil && - s.consts.Auction.RelativeBlock(*firstBatchBlockNum) < + if hasBatch && + s.consts.Auction.RelativeBlock(firstBatchBlockNum) < int64(s.vars.Auction.SlotDeadline) { slot.ForgerCommitment = true } @@ -369,57 +375,61 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c // BEGIN SANITY CHECK canForge, err := s.ethClient.AuctionCanForge(slot.Forger, blockNum) if err != nil { - return nil, tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err)) } if !canForge { - return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ + return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ "differs from smart contract: %+v", slot)) } // END SANITY CHECK } - return &slot, nil + return nil } -func (s *Synchronizer) getNextSlot() (*common.Slot, error) { +// updateNextSlot updates the slot with information of the next slot. +// The information abouth which coordinator is allowed to forge is only updated +// when we are Synced. +func (s *Synchronizer) updateNextSlot(slot *common.Slot) error { // We want the next block because the current one is already mined blockNum := s.stats.Sync.LastBlock.Num + 1 slotNum := s.consts.Auction.SlotNum(blockNum) + 1 - slot := common.Slot{ - SlotNum: slotNum, - ForgerCommitment: false, - } + slot.SlotNum = slotNum + slot.ForgerCommitment = false slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum) // If Synced, update the current coordinator if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum { - if err := s.setSlotCoordinator(&slot); err != nil { - return nil, tracerr.Wrap(err) + if err := s.setSlotCoordinator(slot); err != nil { + return tracerr.Wrap(err) } // TODO: Remove this SANITY CHECK once this code is tested enough // BEGIN SANITY CHECK canForge, err := s.ethClient.AuctionCanForge(slot.Forger, slot.StartBlock) if err != nil { - return nil, tracerr.Wrap(err) + return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err)) } if !canForge { - return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ + return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+ "differs from smart contract: %+v", slot)) } // END SANITY CHECK } - return &slot, nil + return nil } -func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, firstBatchBlockNum *int64) error { - current, err := s.getCurrentSlot(reset, firstBatchBlockNum) - if err != nil { +// updateCurrentNextSlotIfSync updates the current and next slot. Information +// about forger address that is allowed to forge is only updated if we are +// Synced. +func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, hasBatch bool) error { + current := s.stats.Sync.Auction.CurrentSlot + next := s.stats.Sync.Auction.NextSlot + if err := s.updateCurrentSlot(¤t, reset, hasBatch); err != nil { return tracerr.Wrap(err) } - next, err := s.getNextSlot() - if err != nil { + if err := s.updateNextSlot(&next); err != nil { return tracerr.Wrap(err) } - s.stats.UpdateCurrentNextSlot(current, next) + s.stats.UpdateCurrentNextSlot(¤t, &next) return nil } @@ -458,9 +468,9 @@ func (s *Synchronizer) init() error { "ethLastBlock", s.stats.Eth.LastBlock, ) log.Infow("Sync init batch", - "syncLastBatch", s.stats.Sync.LastBatch, - "syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch), - "ethLastBatch", s.stats.Eth.LastBatch, + "syncLastBatch", s.stats.Sync.LastBatch.BatchNum, + "syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch.BatchNum), + "ethLastBatch", s.stats.Eth.LastBatchNum, ) return nil } @@ -521,7 +531,7 @@ func (s *Synchronizer) Sync2(ctx context.Context, if tracerr.Unwrap(err) == ethereum.NotFound { return nil, nil, nil } else if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err)) } log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String()) @@ -627,14 +637,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, } } s.stats.UpdateSync(ethBlock, - &rollupData.Batches[batchesLen-1].Batch.BatchNum, + &rollupData.Batches[batchesLen-1].Batch, lastL1BatchBlock, lastForgeL1TxsNum) } - var firstBatchBlockNum *int64 + hasBatch := false if len(rollupData.Batches) > 0 { - firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum + hasBatch = true } - if err := s.updateCurrentNextSlotIfSync(false, firstBatchBlockNum); err != nil { + if err := s.updateCurrentNextSlotIfSync(false, hasBatch); err != nil { return nil, nil, tracerr.Wrap(err) } @@ -646,8 +656,8 @@ func (s *Synchronizer) Sync2(ctx context.Context, for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", "syncLastBatch", batchData.Batch.BatchNum, - "syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)), - "ethLastBatch", s.stats.Eth.LastBatch, + "syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum), + "ethLastBatch", s.stats.Eth.LastBatchNum, ) } @@ -700,15 +710,15 @@ func getInitialVariables(ethClient eth.ClientInterface, consts *SCConsts) (*SCVariables, *StartBlockNums, error) { rollupInit, rollupInitBlock, err := ethClient.RollupEventInit() if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("RollupEventInit: %w", err)) } auctionInit, auctionInitBlock, err := ethClient.AuctionEventInit() if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("AuctionEventInit: %w", err)) } wDelayerInit, wDelayerInitBlock, err := ethClient.WDelayerEventInit() if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(fmt.Errorf("WDelayerEventInit: %w", err)) } rollupVars := rollupInit.RollupVariables() auctionVars := auctionInit.AuctionVariables(consts.Auction.InitialMinimalBidding) @@ -753,15 +763,15 @@ func (s *Synchronizer) resetState(block *common.Block) error { s.vars.WDelayer = *wDelayer } - batchNum, err := s.historyDB.GetLastBatchNum() + batch, err := s.historyDB.GetLastBatch() if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err)) } if tracerr.Unwrap(err) == sql.ErrNoRows { - batchNum = 0 + batch = &common.Batch{} } - err = s.stateDB.Reset(batchNum) + err = s.stateDB.Reset(batch.BatchNum) if err != nil { return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err)) } @@ -783,9 +793,9 @@ func (s *Synchronizer) resetState(block *common.Block) error { lastForgeL1TxsNum = &n } - s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum) + s.stats.UpdateSync(block, batch, &lastL1BatchBlockNum, lastForgeL1TxsNum) - if err := s.updateCurrentNextSlotIfSync(true, nil); err != nil { + if err := s.updateCurrentNextSlotIfSync(true, false); err != nil { return tracerr.Wrap(err) } return nil @@ -802,7 +812,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e // the expected one. rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, ðBlock.Hash) if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err)) } // No events in this block if rollupEvents == nil { @@ -919,9 +929,15 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e return nil, tracerr.Wrap(err) } if s.stateDB.CurrentBatch() != batchNum { - return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != evtForgeBatch.BatchNum = (%v)", + return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != "+ + "evtForgeBatch.BatchNum = (%v)", s.stateDB.CurrentBatch(), batchNum)) } + if s.stateDB.MT.Root().BigInt().Cmp(forgeBatchArgs.NewStRoot) != 0 { + return nil, tracerr.Wrap(fmt.Errorf("stateDB.MTRoot (%v) != "+ + "forgeBatchArgs.NewStRoot (%v)", + s.stateDB.MT.Root().BigInt(), forgeBatchArgs.NewStRoot)) + } // Transform processed PoolL2 txs to L2 and store in BatchData l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way @@ -1106,7 +1122,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, // Get auction events in the block auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, ðBlock.Hash) if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err)) } // No events in this block if auctionEvents == nil { @@ -1203,7 +1219,7 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat // Get wDelayer events in the block wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, ðBlock.Hash) if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err)) } // No events in this block if wDelayerEvents == nil { diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index 342caab..a0aa818 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -17,7 +17,6 @@ import ( "github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/eth" - "github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/test" "github.com/hermeznetwork/hermez-node/test/til" "github.com/jinzhu/copier" @@ -321,6 +320,14 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { return stateDB, historyDB } +func newBigInt(s string) *big.Int { + v, ok := new(big.Int).SetString(s, 10) + if !ok { + panic(fmt.Errorf("Can't set big.Int from %s", s)) + } + return v +} + func TestSyncGeneral(t *testing.T) { // // Setup @@ -339,7 +346,6 @@ func TestSyncGeneral(t *testing.T) { s, err := NewSynchronizer(client, historyDB, stateDB, Config{ StatsRefreshPeriod: 0 * time.Second, }) - log.Error(err) require.NoError(t, err) ctx := context.Background() @@ -434,12 +440,22 @@ func TestSyncGeneral(t *testing.T) { require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs)) require.Equal(t, 2, len(blocks[i].Rollup.Batches)) require.Equal(t, 2, len(blocks[i].Rollup.Batches[0].L1CoordinatorTxs)) + // Set StateRoots for batches manually (til doesn't set it) + blocks[i].Rollup.Batches[0].Batch.StateRoot = + newBigInt("18906357591508007884273218035694076596537737437965299189312069102730480717391") + blocks[i].Rollup.Batches[1].Batch.StateRoot = + newBigInt("9513185123401321669660637227182204000277156839501731093239187625486561933297") // blocks 1 (blockNum=3) i = 1 require.Equal(t, 3, int(blocks[i].Block.Num)) require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs)) require.Equal(t, 2, len(blocks[i].Rollup.Batches)) require.Equal(t, 3, len(blocks[i].Rollup.Batches[0].L2Txs)) + // Set StateRoots for batches manually (til doesn't set it) + blocks[i].Rollup.Batches[0].Batch.StateRoot = + newBigInt("13060270878200012606074130020925677466793317216609491464427188889005039616594") + blocks[i].Rollup.Batches[1].Batch.StateRoot = + newBigInt("21427104994652624302859637783375978708867165042357535792408500519060088086054") // Generate extra required data ethAddTokens(blocks, client) @@ -614,6 +630,12 @@ func TestSyncGeneral(t *testing.T) { blocks, err = tc.GenerateBlocks(set2) require.NoError(t, err) + // Set StateRoots for batches manually (til doesn't set it) + blocks[0].Rollup.Batches[0].Batch.StateRoot = + newBigInt("11218510534825843475100588932060366395781087435899915642332104464234485046683") + blocks[0].Rollup.Batches[1].Batch.StateRoot = + newBigInt("20283020730369146334077598087403837297563965802277806438205710455191646998983") + for i := 0; i < 4; i++ { client.CtlRollback() } diff --git a/txselector/txselector.go b/txselector/txselector.go index 4bf5f41..0ff79e0 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -88,12 +88,8 @@ func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB { // Reset tells the TxSelector to get it's internal AccountsDB // from the required `batchNum` -func (txsel *TxSelector) Reset(batchNum common.BatchNum) error { - err := txsel.localAccountsDB.Reset(batchNum, true) - if err != nil { - return tracerr.Wrap(err) - } - return nil +func (txsel *TxSelector) Reset(batchNum common.BatchNum, fromSynchronizer bool) error { + return tracerr.Wrap(txsel.localAccountsDB.Reset(batchNum, fromSynchronizer)) } func (txsel *TxSelector) getCoordIdx(tokenID common.TokenID) (common.Idx, error) { From f2e5800ebd17a58c1f27a63660353c9f46ded72f Mon Sep 17 00:00:00 2001 From: Eduard S Date: Fri, 12 Feb 2021 17:49:26 +0100 Subject: [PATCH 2/3] Delay forging of batches via config parameters coordinator: - Add config `ForgeDelay`: ForgeDelay is the delay after which a batch is forged if the slot is already commited. If set to 0s, the coordinator will continously forge at the maximum rate. - Add config `ForgeNoTxsDelay`: ForgeNoTxsDelay is the delay after which a batch is forged even if there are no txs to forge if the slot is already commited. If set to 0s, the coordinator will continously forge even if the batches are empty. - Add config `GasPriceIncPerc`: GasPriceIncPerc is the percentage increase of gas price set in an ethereum transaction from the suggested gas price by the ehtereum node - Remove unused configuration parameters `CallGasLimit` and `GasPriceDiv` - Forge always regardless of configured forge delay when the current slot is not yet commited and we are the winner of the slot synchronizer: - Don't log with error (use warning) level when there's a reorg and the queried events by block using the block hash returns "unknown block". --- config/config.go | 19 +++++++--- coordinator/coordinator.go | 35 +++++++++++++++--- coordinator/pipeline.go | 71 +++++++++++++++++++++++++++++++----- coordinator/txmanager.go | 20 +++++++--- eth/rollup.go | 2 +- node/node.go | 9 ++++- synchronizer/synchronizer.go | 35 ++++++++++++++---- 7 files changed, 154 insertions(+), 37 deletions(-) diff --git a/config/config.go b/config/config.go index b9aa213..81f33ed 100644 --- a/config/config.go +++ b/config/config.go @@ -79,6 +79,15 @@ type Coordinator struct { // ForgeRetryInterval is the waiting interval between calls forge a // batch after an error ForgeRetryInterval Duration `validate:"required"` + // ForgeDelay is the delay after which a batch is forged if the slot is + // already committed. If set to 0s, the coordinator will continuously + // forge at the maximum rate. + ForgeDelay Duration `validate:"-"` + // ForgeNoTxsDelay is the delay after which a batch is forged even if + // there are no txs to forge if the slot is already committed. If set + // to 0s, the coordinator will continuously forge even if the batches + // are empty. + ForgeNoTxsDelay Duration `validate:"-"` // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` @@ -133,15 +142,13 @@ type Coordinator struct { NLevels int64 `validate:"required"` } `validate:"required"` EthClient struct { - // CallGasLimit is the default gas limit set for ethereum - // calls, except for methods where a particular gas limit is - // harcoded because it's known to be a big value - CallGasLimit uint64 `validate:"required"` // MaxGasPrice is the maximum gas price allowed for ethereum // transactions MaxGasPrice *big.Int `validate:"required"` - // GasPriceDiv is the gas price division - GasPriceDiv uint64 `validate:"required"` + // GasPriceIncPerc is the percentage increase of gas price set + // in an ethereum transaction from the suggested gas price by + // the ehtereum node + GasPriceIncPerc int64 // CheckLoopInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager CheckLoopInterval Duration `validate:"required"` diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 0c2705d..ae232d6 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -23,7 +23,9 @@ import ( ) var ( - errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errForgeNoTxsBeforeDelay = fmt.Errorf("no txs to forge and we haven't reached the forge no txs delay") + errForgeBeforeDelay = fmt.Errorf("we haven't reached the forge delay") ) const ( @@ -71,6 +73,15 @@ type Config struct { // ForgeRetryInterval is the waiting interval between calls forge a // batch after an error ForgeRetryInterval time.Duration + // ForgeDelay is the delay after which a batch is forged if the slot is + // already committed. If set to 0s, the coordinator will continuously + // forge at the maximum rate. + ForgeDelay time.Duration + // ForgeNoTxsDelay is the delay after which a batch is forged even if + // there are no txs to forge if the slot is already committed. If set + // to 0s, the coordinator will continuously forge even if the batches + // are empty. + ForgeNoTxsDelay time.Duration // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration @@ -87,6 +98,10 @@ type Config struct { // MaxGasPrice is the maximum gas price allowed for ethereum // transactions MaxGasPrice *big.Int + // GasPriceIncPerc is the percentage increase of gas price set in an + // ethereum transaction from the suggested gas price by the ehtereum + // node + GasPriceIncPerc int64 // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration @@ -344,15 +359,22 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) } else if canForge { log.Infow("Coordinator: forging state begin", "block", stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) - batchNum := stats.Sync.LastBatch.BatchNum - if c.lastNonFailedBatchNum > batchNum { - batchNum = c.lastNonFailedBatchNum + fromBatch := fromBatch{ + BatchNum: stats.Sync.LastBatch.BatchNum, + ForgerAddr: stats.Sync.LastBatch.ForgerAddr, + StateRoot: stats.Sync.LastBatch.StateRoot, + } + if c.lastNonFailedBatchNum > fromBatch.BatchNum { + fromBatch.BatchNum = c.lastNonFailedBatchNum + fromBatch.ForgerAddr = c.cfg.ForgerAddress + fromBatch.StateRoot = big.NewInt(0) } var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) } - if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { + c.pipelineFromBatch = fromBatch + if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil { c.pipeline = nil return tracerr.Wrap(err) } @@ -398,7 +420,8 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars) } if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress && - c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 { + (c.stats.Sync.LastBatch.StateRoot == nil || c.pipelineFromBatch.StateRoot == nil || + c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0) { // There's been a reorg and the batch state root from which the // pipeline was started has changed (probably because it was in // a block that was discarded), and it was sent by a different diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 9939079..08ba352 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -29,6 +29,7 @@ type state struct { batchNum common.BatchNum lastScheduledL1BatchBlockNum int64 lastForgeL1TxsNum int64 + lastSlotForged int64 } // Pipeline manages the forging of batches with parallel server proofs @@ -42,6 +43,7 @@ type Pipeline struct { started bool rw sync.RWMutex errAtBatchNum common.BatchNum + lastForgeTime time.Time proversPool *ProversPool provers []prover.Client @@ -133,6 +135,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum, batchNum: batchNum, lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, lastScheduledL1BatchBlockNum: 0, + lastSlotForged: -1, } p.stats = *stats p.vars = *vars @@ -204,6 +207,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else if tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || + tracerr.Unwrap(err) == errForgeBeforeDelay { + // no log } else { log.Errorw("forgeBatch", "err", err) } @@ -269,7 +275,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { continue - } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || + tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || + tracerr.Unwrap(err) == errForgeBeforeDelay { waitDuration = p.cfg.ForgeRetryInterval continue } else if err != nil { @@ -282,6 +290,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, }) continue } + p.lastForgeTime = time.Now() p.state.batchNum = batchNum select { @@ -373,8 +382,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e return nil, tracerr.Wrap(err) } // Structure to accumulate data and metadata of the batch + now := time.Now() batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} - batchInfo.Debug.StartTimestamp = time.Now() + batchInfo.Debug.StartTimestamp = now batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 selectionCfg := &txselector.SelectionConfig{ @@ -388,10 +398,17 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var auths [][]byte var coordIdxs []common.Idx - // TODO: If there are no txs and we are behind the timeout, skip - // forging a batch and return a particular error that can be handleded - // in the loop where handleForgeBatch is called to retry after an - // interval + // Check if the slot is not yet fulfilled + slotCommitted := false + if p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || + p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged { + slotCommitted = true + } + + // If we haven't reached the ForgeDelay, skip forging the batch + if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { + return nil, errForgeBeforeDelay + } // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { @@ -409,9 +426,6 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } - - p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.state.lastForgeL1TxsNum++ } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = @@ -422,6 +436,43 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e l1UserTxsExtra = nil } + // If there are no txs to forge, no l1UserTxs in the open queue to + // freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the + // batch. + if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { + noTxs := false + if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { + if batchInfo.L1Batch { + // Query the L1UserTxs in the queue following + // the one we are trying to forge. + nextL1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs( + p.state.lastForgeL1TxsNum + 1) + if err != nil { + return nil, tracerr.Wrap(err) + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues and forge the + // L1UserTxs in the future. Otherwise, skip. + if len(nextL1UserTxs) == 0 { + noTxs = true + } + } else { + noTxs = true + } + } + if noTxs { + if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { + return nil, tracerr.Wrap(err) + } + return nil, errForgeNoTxsBeforeDelay + } + } + + if batchInfo.L1Batch { + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ + } + // 3. Save metadata from TxSelector output for BatchNum batchInfo.L1UserTxsExtra = l1UserTxsExtra batchInfo.L1CoordTxs = l1CoordTxs @@ -466,6 +517,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e p.cfg.debugBatchStore(batchInfo) log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum) + p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum + return batchInfo, nil } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 082d0f0..3b2ef6a 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -128,11 +128,14 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { if err != nil { return nil, tracerr.Wrap(err) } - inc := new(big.Int).Set(gasPrice) - // TODO: Replace this by a value of percentage - const gasPriceDiv = 100 - inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv)) - gasPrice.Add(gasPrice, inc) + if t.cfg.GasPriceIncPerc != 0 { + inc := new(big.Int).Set(gasPrice) + inc.Mul(inc, new(big.Int).SetInt64(t.cfg.GasPriceIncPerc)) + // nolint reason: to calculate percentages we use 100 + inc.Div(inc, new(big.Int).SetUint64(100)) //nolint:gomnd + gasPrice.Add(gasPrice, inc) + } + // log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice) auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID) @@ -141,6 +144,13 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { } auth.Value = big.NewInt(0) // in wei // TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs + // This requires a function that estimates the gas usage of the + // forgeBatch call based on the contents of the ForgeBatch args: + // - length of l2txs + // - length of l1Usertxs + // - length of l1CoordTxs with authorization signature + // - length of l1CoordTxs without authoriation signature + // - etc. auth.GasLimit = 1000000 auth.GasPrice = gasPrice auth.Nonce = nil diff --git a/eth/rollup.go b/eth/rollup.go index 992c2ca..5a77bd2 100644 --- a/eth/rollup.go +++ b/eth/rollup.go @@ -316,7 +316,7 @@ func NewRollupClient(client *EthereumClient, address ethCommon.Address, tokenHEZ } consts, err := c.RollupConstants() if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("RollupConstants at %v: %w", address, err)) } c.consts = consts return c, nil diff --git a/node/node.go b/node/node.go index a09736b..4d28ec8 100644 --- a/node/node.go +++ b/node/node.go @@ -103,8 +103,8 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { var keyStore *ethKeystore.KeyStore if mode == ModeCoordinator { ethCfg = eth.EthereumConfig{ - CallGasLimit: cfg.Coordinator.EthClient.CallGasLimit, - GasPriceDiv: cfg.Coordinator.EthClient.GasPriceDiv, + CallGasLimit: 0, // cfg.Coordinator.EthClient.CallGasLimit, + GasPriceDiv: 0, // cfg.Coordinator.EthClient.GasPriceDiv, } scryptN := ethKeystore.StandardScryptN @@ -299,12 +299,15 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ConfirmBlocks: cfg.Coordinator.ConfirmBlocks, L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc, ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration, + ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, + ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce, EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration, MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice, + GasPriceIncPerc: cfg.Coordinator.EthClient.GasPriceIncPerc, TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration, DebugBatchPath: cfg.Coordinator.Debug.BatchPath, Purger: coordinator.PurgerCfg{ @@ -583,6 +586,8 @@ func (n *Node) StartSynchronizer() { } if errors.Is(err, eth.ErrBlockHashMismatchEvent) { log.Warnw("Synchronizer.Sync", "err", err) + } else if errors.Is(err, synchronizer.ErrUnknownBlock) { + log.Warnw("Synchronizer.Sync", "err", err) } else { log.Errorw("Synchronizer.Sync", "err", err) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 361bdc8..9606188 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -18,6 +18,19 @@ import ( "github.com/hermeznetwork/tracerr" ) +const ( + // errStrUnknownBlock is the string returned by geth when querying an + // unknown block + errStrUnknownBlock = "unknown block" +) + +var ( + // ErrUnknownBlock is the error returned by the Synchronizer when a + // block is queried by hash but the ethereum node doesn't find it due + // to it being discarded from a reorg. + ErrUnknownBlock = fmt.Errorf("unknown block") +) + // Stats of the syncrhonizer type Stats struct { Eth struct { @@ -648,11 +661,6 @@ func (s *Synchronizer) Sync2(ctx context.Context, return nil, nil, tracerr.Wrap(err) } - log.Debugw("Synced block", - "syncLastBlockNum", s.stats.Sync.LastBlock.Num, - "syncBlocksPerc", s.stats.blocksPerc(), - "ethLastBlockNum", s.stats.Eth.LastBlock.Num, - ) for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", "syncLastBatch", batchData.Batch.BatchNum, @@ -660,6 +668,11 @@ func (s *Synchronizer) Sync2(ctx context.Context, "ethLastBatch", s.stats.Eth.LastBatchNum, ) } + log.Debugw("Synced block", + "syncLastBlockNum", s.stats.Sync.LastBlock.Num, + "syncBlocksPerc", s.stats.blocksPerc(), + "ethLastBlockNum", s.stats.Eth.LastBlock.Num, + ) return blockData, nil, nil } @@ -811,7 +824,9 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e // Get rollup events in the block, and make sure the block hash matches // the expected one. rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err)) } // No events in this block @@ -1121,7 +1136,9 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, // Get auction events in the block auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err)) } // No events in this block @@ -1218,7 +1235,9 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat // Get wDelayer events in the block wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err)) } // No events in this block From 7b01f6a2884ac03fe42eb1801f7e4dba35127205 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Fri, 12 Feb 2021 17:49:47 +0100 Subject: [PATCH 3/3] Add discard cli to discard synced blocks The command is useful in case the synchronizer reaches an invalid state and you want to roll back a few blocks and try again (maybe with some fixes in the code). --- cli/node/README.md | 11 +++++++- cli/node/cfg.buidler.toml | 7 +++-- cli/node/main.go | 56 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/cli/node/README.md b/cli/node/README.md index 9f2a27e..073ac82 100644 --- a/cli/node/README.md +++ b/cli/node/README.md @@ -117,7 +117,16 @@ Generate a new BabyJubJub key pair: ./node --mode coord --cfg cfg.buidler.toml genbjj ``` -Wipe the entier SQL database (this will destroy all synchronized and pool data): +Wipe the entier SQL database (this will destroy all synchronized and pool +data): ``` ./node --mode coord --cfg cfg.buidler.toml wipesql ``` + +Discard all synchronized blocks and associated state up to a given block +number. This command is useful in case the synchronizer reaches an invalid +state and you want to roll back a few blocks and try again (maybe with some +fixes in the code). +``` +./node --mode coord --cfg cfg.buidler.toml discard --block 8061330 +``` diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index dd63f9d..f80efa8 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -53,6 +53,8 @@ SendBatchBlocksMarginCheck = 1 ProofServerPollInterval = "1s" ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" +ForgeDelay = "10s" +ForgeNoTxsDelay = "0s" [Coordinator.FeeAccount] Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E" @@ -83,16 +85,13 @@ MaxTx = 512 NLevels = 32 [Coordinator.EthClient] -ReceiptTimeout = "60s" -ReceiptLoopInterval = "500ms" CheckLoopInterval = "500ms" Attempts = 4 AttemptsDelay = "500ms" TxResendTimeout = "2m" NoReuseNonce = false -CallGasLimit = 300000 -GasPriceDiv = 100 MaxGasPrice = "5000000000" +GasPriceIncPerc = 10 [Coordinator.EthClient.Keystore] Path = "/tmp/iden3-test/hermez/ethkeystore" diff --git a/cli/node/main.go b/cli/node/main.go index 3c21331..132b5ab 100644 --- a/cli/node/main.go +++ b/cli/node/main.go @@ -11,6 +11,8 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/hermeznetwork/hermez-node/config" dbUtils "github.com/hermeznetwork/hermez-node/db" + "github.com/hermeznetwork/hermez-node/db/historydb" + "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/node" "github.com/hermeznetwork/tracerr" @@ -23,6 +25,7 @@ const ( flagMode = "mode" flagSK = "privatekey" flagYes = "yes" + flagBlock = "block" modeSync = "sync" modeCoord = "coord" ) @@ -139,6 +142,47 @@ func cmdRun(c *cli.Context) error { return nil } +func cmdDiscard(c *cli.Context) error { + _cfg, err := parseCli(c) + if err != nil { + return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err)) + } + cfg := _cfg.node + blockNum := c.Int64(flagBlock) + log.Infof("Discarding all blocks up to block %v...", blockNum) + + db, err := dbUtils.InitSQLDB( + cfg.PostgreSQL.Port, + cfg.PostgreSQL.Host, + cfg.PostgreSQL.User, + cfg.PostgreSQL.Password, + cfg.PostgreSQL.Name, + ) + if err != nil { + return tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err)) + } + historyDB := historydb.NewHistoryDB(db, nil) + if err := historyDB.Reorg(blockNum); err != nil { + return tracerr.Wrap(fmt.Errorf("historyDB.Reorg: %w", err)) + } + batchNum, err := historyDB.GetLastBatchNum() + if err != nil { + return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err)) + } + l2DB := l2db.NewL2DB( + db, + cfg.Coordinator.L2DB.SafetyPeriod, + cfg.Coordinator.L2DB.MaxTxs, + cfg.Coordinator.L2DB.TTL.Duration, + nil, + ) + if err := l2DB.Reorg(batchNum); err != nil { + return tracerr.Wrap(fmt.Errorf("l2DB.Reorg: %w", err)) + } + + return nil +} + // Config is the configuration of the hermez node execution type Config struct { mode node.Mode @@ -239,6 +283,18 @@ func main() { Usage: "Run the hermez-node in the indicated mode", Action: cmdRun, }, + { + Name: "discard", + Aliases: []string{}, + Usage: "Discard blocks up to a specified block number", + Action: cmdDiscard, + Flags: []cli.Flag{ + &cli.Int64Flag{ + Name: flagBlock, + Usage: "last block number to keep", + Required: false, + }}, + }, } err := app.Run(os.Args)