From d284baf8c4ebbc7e6ee55e8c0f36f54436b3e6fc Mon Sep 17 00:00:00 2001 From: Eduard S Date: Thu, 28 Jan 2021 11:25:06 +0100 Subject: [PATCH] Make TxManager more robust --- cli/node/cfg.buidler.toml | 1 + common/batch.go | 37 ++++ common/ethauction.go | 4 +- config/config.go | 6 +- coordinator/batch.go | 14 +- coordinator/coordinator.go | 88 ++++++--- coordinator/coordinator_test.go | 13 +- coordinator/pipeline.go | 7 +- coordinator/pipeline_test.go | 2 +- coordinator/txmanager.go | 312 ++++++++++++++++++++++++++------ coordinator/txmanager_test.go | 15 ++ db/historydb/historydb.go | 12 ++ db/historydb/historydb_test.go | 4 + go.mod | 1 + node/node.go | 4 +- synchronizer/synchronizer.go | 32 ++-- 16 files changed, 438 insertions(+), 114 deletions(-) create mode 100644 coordinator/txmanager_test.go diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 7b0e3b2..ce4f6ec 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -89,6 +89,7 @@ AttemptsDelay = "500ms" TxResendTimeout = "2m" CallGasLimit = 300000 GasPriceDiv = 100 +MaxGasPrice = "0" [Coordinator.EthClient.Keystore] Path = "/tmp/iden3-test/hermez/ethkeystore" diff --git a/common/batch.go b/common/batch.go index a6e11ce..8c687ed 100644 --- a/common/batch.go +++ b/common/batch.go @@ -27,6 +27,23 @@ type Batch struct { TotalFeesUSD *float64 `meddler:"total_fees_usd"` } +func NewEmptyBatch() *Batch { + return &Batch{ + BatchNum: 0, + EthBlockNum: 0, + ForgerAddr: ethCommon.Address{}, + CollectedFees: make(map[TokenID]*big.Int), + FeeIdxsCoordinator: make([]Idx, 0), + StateRoot: big.NewInt(0), + NumAccounts: 0, + LastIdx: 0, + ExitRoot: big.NewInt(0), + ForgeL1TxsNum: nil, + SlotNum: 0, + TotalFeesUSD: nil, + } +} + // BatchNum identifies a batch type BatchNum int64 @@ -75,3 +92,23 @@ func NewBatchData() *BatchData { Batch: Batch{}, } } + +// BatchSync is a subset of Batch that contains fileds needed for the +// synchronizer and coordinator +// type BatchSync struct { +// BatchNum BatchNum `meddler:"batch_num"` +// EthBlockNum int64 `meddler:"eth_block_num"` // Ethereum block in which the batch is forged +// ForgerAddr ethCommon.Address `meddler:"forger_addr"` +// StateRoot *big.Int `meddler:"state_root,bigint"` +// SlotNum int64 `meddler:"slot_num"` // Slot in which the batch is forged +// } +// +// func NewBatchSync() *BatchSync { +// return &BatchSync{ +// BatchNum: 0, +// EthBlockNum: 0, +// ForgerAddr: ethCommon.Address, +// StateRoot: big.NewInt(0), +// SlotNum: 0, +// } +// } diff --git a/common/ethauction.go b/common/ethauction.go index 0292407..edcf9a2 100644 --- a/common/ethauction.go +++ b/common/ethauction.go @@ -32,8 +32,10 @@ type AuctionConstants struct { func (c *AuctionConstants) SlotNum(blockNum int64) int64 { if blockNum >= c.GenesisBlockNum { return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) + } else { + // This result will be negative + return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot) } - return -1 } // SlotBlocks returns the first and the last block numbers included in that slot diff --git a/config/config.go b/config/config.go index 02cd135..48d1767 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "fmt" "io/ioutil" + "math/big" "time" "github.com/BurntSushi/toml" @@ -122,6 +123,9 @@ type Coordinator struct { // calls, except for methods where a particular gas limit is // harcoded because it's known to be a big value CallGasLimit uint64 `validate:"required"` + // MaxGasPrice is the maximum gas price allowed for ethereum + // transactions + MaxGasPrice *big.Int `validate:"required"` // GasPriceDiv is the gas price division GasPriceDiv uint64 `validate:"required"` // CheckLoopInterval is the waiting interval between receipt @@ -136,7 +140,7 @@ type Coordinator struct { // TxResendTimeout is the timeout after which a non-mined // ethereum transaction will be resent (reusing the nonce) with // a newly calculated gas price - TxResendTimeout time.Duration `validate:"required"` + TxResendTimeout Duration `validate:"required"` // Keystore is the ethereum keystore where private keys are kept Keystore struct { // Path to the keystore diff --git a/coordinator/batch.go b/coordinator/batch.go index df0edf4..165859c 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -47,6 +47,8 @@ type Debug struct { MineBlockNum int64 // SendBlockNum is the blockNum when the batch was sent to ethereum SendBlockNum int64 + // ResendNum is the number of times the tx has been resent + ResendNum int // LastScheduledL1BatchBlockNum is the blockNum when the last L1Batch // was scheduled LastScheduledL1BatchBlockNum int64 @@ -64,13 +66,17 @@ type Debug struct { // StartToSendDelay is the delay between starting a batch and sending // it to ethereum, in seconds StartToSendDelay float64 - // StartToMineDelay is the delay between starting a batch and having + // StartToMineDelay is the delay between starting a batch and having // it mined in seconds StartToMineDelay float64 + // SendToMineDelay is the delay between sending a batch tx and having + // it mined in seconds + SendToMineDelay float64 } // BatchInfo contans the Batch information type BatchInfo struct { + PipelineNum int BatchNum common.BatchNum ServerProof prover.Client ZKInputs *common.ZKInputs @@ -89,7 +95,11 @@ type BatchInfo struct { // SendTimestamp the time of batch sent to ethereum SendTimestamp time.Time Receipt *types.Receipt - Debug Debug + // Fail is true if: + // - The receipt status is failed + // - A previous parent batch is failed + Fail bool + Debug Debug } // DebugStore is a debug function to store the BatchInfo as a json text file in diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 7d0604f..2bec1a7 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -3,6 +3,7 @@ package coordinator import ( "context" "fmt" + "math/big" "os" "strings" "sync" @@ -81,6 +82,9 @@ type Config struct { // transaction will be resent (reusing the nonce) with a newly // calculated gas price EthTxResendTimeout time.Duration + // MaxGasPrice is the maximum gas price allowed for ethereum + // transactions + MaxGasPrice *big.Int // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration @@ -103,15 +107,22 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) { } } +type fromBatch struct { + BatchNum common.BatchNum + ForgerAddr ethCommon.Address + StateRoot *big.Int +} + // Coordinator implements the Coordinator type type Coordinator struct { // State - pipelineBatchNum common.BatchNum // batchNum from which we started the pipeline - provers []prover.Client - consts synchronizer.SCConsts - vars synchronizer.SCVariables - stats synchronizer.Stats - started bool + pipelineNum int // Pipeline sequential number. The first pipeline is 1 + pipelineFromBatch fromBatch // batch from which we started the pipeline + provers []prover.Client + consts synchronizer.SCConsts + vars synchronizer.SCVariables + stats synchronizer.Stats + started bool cfg Config @@ -168,10 +179,15 @@ func NewCoordinator(cfg Config, ctx, cancel := context.WithCancel(context.Background()) c := Coordinator{ - pipelineBatchNum: -1, - provers: serverProofs, - consts: *scConsts, - vars: *initSCVars, + pipelineNum: 0, + pipelineFromBatch: fromBatch{ + BatchNum: 0, + ForgerAddr: ethCommon.Address{}, + StateRoot: big.NewInt(0), + }, + provers: serverProofs, + consts: *scConsts, + vars: *initSCVars, cfg: cfg, @@ -212,7 +228,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { } func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { - return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, + c.pipelineNum++ + return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector, c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) } @@ -262,13 +279,18 @@ func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables, currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64) bool { + if blockNum < auctionConstants.GenesisBlockNum { + log.Infow("canForge: requested blockNum is < genesis", "blockNum", blockNum, + "genesis", auctionConstants.GenesisBlockNum) + return false + } var slot *common.Slot if currentSlot.StartBlock <= blockNum && blockNum <= currentSlot.EndBlock { slot = currentSlot } else if nextSlot.StartBlock <= blockNum && blockNum <= nextSlot.EndBlock { slot = nextSlot } else { - log.Warnw("Coordinator: requested blockNum for canForge is outside slot", + log.Warnw("canForge: requested blockNum is outside current and next slot", "blockNum", blockNum, "currentSlot", currentSlot, "nextSlot", nextSlot, ) @@ -277,13 +299,14 @@ func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.Auc anyoneForge := false if !slot.ForgerCommitment && auctionConstants.RelativeBlock(blockNum) >= int64(auctionVars.SlotDeadline) { - log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)", + log.Debugw("canForge: anyone can forge in the current slot (slotDeadline passed)", "block", blockNum) anyoneForge = true } if slot.Forger == addr || anyoneForge { return true } + log.Debugw("canForge: can't forge", "slot.Forger", slot.Forger) return false } @@ -314,8 +337,8 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) relativeBlock, c.cfg.StartSlotBlocksDelay) } else if canForge { log.Infow("Coordinator: forging state begin", "block", - stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch) - batchNum := common.BatchNum(stats.Sync.LastBatch) + stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) + batchNum := stats.Sync.LastBatch.BatchNum var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) @@ -324,7 +347,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) c.pipeline = nil return tracerr.Wrap(err) } - c.pipelineBatchNum = batchNum + // c.pipelineBatchNum = batchNum } } else { if !canForge { @@ -341,17 +364,18 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) // return err // } // } - if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) { - if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil { + if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) { + if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil { return tracerr.Wrap(err) } } _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), - stats.Sync.LastBlock.Num, stats.Sync.LastBatch) + stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) if err != nil { return tracerr.Wrap(err) } - _, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch) + _, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, + int64(stats.Sync.LastBatch.BatchNum)) if err != nil { return tracerr.Wrap(err) } @@ -379,15 +403,19 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error if c.pipeline != nil { c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars) } - if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum { - // There's been a reorg and the batch from which the pipeline - // was started was in a block that was discarded. The batch - // may not be in the main chain, so we stop the pipeline as a - // precaution (it will be started again once the node is in - // sync). - log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum", - "sync.LastBatch", c.stats.Sync.LastBatch, - "c.pipelineBatchNum", c.pipelineBatchNum) + if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress && + c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 { + // There's been a reorg and the batch state root from which the + // pipeline was started has changed (probably because it was in + // a block that was discarded), and it was sent by a different + // coordinator than us. That batch may never be in the main + // chain, so we stop the pipeline (it will be started again + // once the node is in sync). + log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch.ForgerAddr != cfg.ForgerAddr "+ + "& sync.LastBatch.StateRoot != pipelineFromBatch.StateRoot", + "sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot, + "pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot) + c.txManager.DiscardPipeline(ctx, c.pipelineNum) if err := c.handleStopPipeline(ctx, "reorg"); err != nil { return tracerr.Wrap(err) } @@ -396,7 +424,7 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error } func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error { - if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil { + if err := c.l2DB.Reorg(c.stats.Sync.LastBatch.BatchNum); err != nil { return tracerr.Wrap(err) } if c.pipeline != nil { diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index ebd853d..06b002d 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "errors" "fmt" "io/ioutil" "math/big" @@ -11,6 +12,7 @@ import ( "time" ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/common" dbUtils "github.com/hermeznetwork/hermez-node/db" @@ -261,8 +263,8 @@ func TestCoordinatorFlow(t *testing.T) { var stats synchronizer.Stats stats.Eth.LastBlock = *ethClient.CtlLastBlock() stats.Sync.LastBlock = stats.Eth.LastBlock - stats.Eth.LastBatch = ethClient.CtlLastForgedBatch() - stats.Sync.LastBatch = stats.Eth.LastBatch + stats.Eth.LastBatchNum = ethClient.CtlLastForgedBatch() + stats.Sync.LastBatch.BatchNum = common.BatchNum(stats.Eth.LastBatchNum) canForge, err := ethClient.AuctionCanForge(forger, blockNum+1) require.NoError(t, err) var slot common.Slot @@ -279,7 +281,7 @@ func TestCoordinatorFlow(t *testing.T) { // Copy stateDB to synchronizer if there was a new batch source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch) dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch) - if stats.Sync.LastBatch != 0 { + if stats.Sync.LastBatch.BatchNum != 0 { if _, err := os.Stat(dest); os.IsNotExist(err) { log.Infow("Making pebble checkpoint for sync", "source", source, "dest", dest) @@ -566,3 +568,8 @@ func TestCoordinatorStress(t *testing.T) { // TODO: Test forgeBatch // TODO: Test waitServerProof // TODO: Test handleReorg + +func TestFoo(t *testing.T) { + a := tracerr.Wrap(fmt.Errorf("AAA: %w", core.ErrNonceTooLow)) + fmt.Println(errors.Is(a, core.ErrNonceTooLow)) +} diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 7b20f4b..6e5e2ee 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -26,6 +26,7 @@ type statsVars struct { // Pipeline manages the forging of batches with parallel server proofs type Pipeline struct { + num int cfg Config consts synchronizer.SCConsts @@ -56,6 +57,7 @@ type Pipeline struct { // NewPipeline creates a new Pipeline func NewPipeline(ctx context.Context, cfg Config, + num int, // Pipeline sequential number historyDB *historydb.HistoryDB, l2DB *l2db.L2DB, txSelector *txselector.TxSelector, @@ -79,6 +81,7 @@ func NewPipeline(ctx context.Context, return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) } return &Pipeline{ + num: num, cfg: cfg, historyDB: historyDB, l2DB: l2DB, @@ -276,8 +279,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } - - batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + // Structure to accumulate data and metadata of the batch + batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} batchInfo.Debug.StartTimestamp = time.Now() batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go index 1bd7bcf..9791d17 100644 --- a/coordinator/pipeline_test.go +++ b/coordinator/pipeline_test.go @@ -172,7 +172,7 @@ func TestPipelineForgeBatchWithTxs(t *testing.T) { // users with positive balances tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) syncStats := sync.Stats() - batchNum := common.BatchNum(syncStats.Sync.LastBatch) + batchNum := syncStats.Sync.LastBatch.BatchNum syncSCVars := sync.SCVars() pipeline, err := coord.newPipeline(ctx) diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 26e5dac..5188153 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "errors" "fmt" "math/big" "time" @@ -9,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/l2db" @@ -35,12 +37,20 @@ type TxManager struct { vars synchronizer.SCVariables statsVarsCh chan statsVars - queue []*BatchInfo + discardPipelineCh chan int // int refers to the pipelineNum + + minPipelineNum int + queue Queue // lastSuccessBatch stores the last BatchNum that who's forge call was confirmed lastSuccessBatch common.BatchNum - lastPendingBatch common.BatchNum - lastSuccessNonce uint64 - lastPendingNonce uint64 + // lastPendingBatch common.BatchNum + // accNonce is the account nonce in the last mined block (due to mined txs) + accNonce uint64 + // accNextNonce is the nonce that we should use to send the next tx. + // In some cases this will be a reused nonce of an already pending tx. + accNextNonce uint64 + // accPendingNonce is the pending nonce of the account due to pending txs + // accPendingNonce uint64 lastSentL1BatchBlockNum int64 } @@ -56,26 +66,27 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac if err != nil { return nil, tracerr.Wrap(err) } - lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil) + accNonce, err := ethClient.EthNonceAt(ctx, *address, nil) if err != nil { return nil, err } - lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address) - if err != nil { - return nil, err - } - if lastSuccessNonce != lastPendingNonce { - return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)", - lastSuccessNonce, lastPendingNonce)) - } - log.Infow("TxManager started", "nonce", lastSuccessNonce) + // accPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address) + // if err != nil { + // return nil, err + // } + // if accNonce != accPendingNonce { + // return nil, tracerr.Wrap(fmt.Errorf("currentNonce (%v) != accPendingNonce (%v)", + // accNonce, accPendingNonce)) + // } + log.Infow("TxManager started", "nonce", accNonce) return &TxManager{ - cfg: *cfg, - ethClient: ethClient, - l2DB: l2DB, - coord: coord, - batchCh: make(chan *BatchInfo, queueLen), - statsVarsCh: make(chan statsVars, queueLen), + cfg: *cfg, + ethClient: ethClient, + l2DB: l2DB, + coord: coord, + batchCh: make(chan *BatchInfo, queueLen), + statsVarsCh: make(chan statsVars, queueLen), + discardPipelineCh: make(chan int, queueLen), account: accounts.Account{ Address: *address, }, @@ -84,8 +95,11 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac vars: *initSCVars, - lastSuccessNonce: lastSuccessNonce, - lastPendingNonce: lastPendingNonce, + minPipelineNum: 0, + queue: NewQueue(), + accNonce: accNonce, + accNextNonce: accNonce, + // accPendingNonce: accPendingNonce, }, nil } @@ -106,6 +120,15 @@ func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.St } } +// DiscardPipeline is a thread safe method to notify about a discarded pipeline +// due to a reorg +func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) { + select { + case t.discardPipelineCh <- pipelineNum: + case <-ctx.Done(): + } +} + func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { updateSCVars(&t.vars, vars) } @@ -157,18 +180,52 @@ func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error { return nil } -func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { +func addPerc(v *big.Int, p int64) *big.Int { + r := new(big.Int).Set(v) + r.Mul(r, big.NewInt(p)) + r.Div(r, big.NewInt(100)) + return r.Add(v, r) +} + +func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error { var ethTx *types.Transaction var err error auth, err := t.NewAuth(ctx) if err != nil { return tracerr.Wrap(err) } - auth.Nonce = big.NewInt(int64(t.lastPendingNonce)) - t.lastPendingNonce++ + auth.Nonce = big.NewInt(int64(t.accNextNonce)) + if resend { + auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce())) + } for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 { + return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)", + auth.GasPrice, t.cfg.MaxGasPrice)) + } + // RollupForgeBatch() calls ethclient.SendTransaction() ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth) - if err != nil { + if errors.Is(err, core.ErrNonceTooLow) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce", + "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum) + auth.Nonce.Add(auth.Nonce, big.NewInt(1)) + attempt-- + } else if errors.Is(err, core.ErrNonceTooHigh) { + log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce", + "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum) + auth.Nonce.Sub(auth.Nonce, big.NewInt(1)) + attempt-- + } else if errors.Is(err, core.ErrUnderpriced) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice", + "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum) + auth.GasPrice = addPerc(auth.GasPrice, 10) + attempt-- + } else if errors.Is(err, core.ErrReplaceUnderpriced) { + log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice", + "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum) + auth.GasPrice = addPerc(auth.GasPrice, 10) + attempt-- + } else if err != nil { log.Errorw("TxManager ethClient.RollupForgeBatch", "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1, "batchNum", batchInfo.BatchNum) @@ -184,11 +241,17 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn if err != nil { return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) } + if !resend { + t.accNextNonce = auth.Nonce.Uint64() + 1 + } batchInfo.EthTx = ethTx - log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) + log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash()) now := time.Now() batchInfo.SendTimestamp = now + if resend { + batchInfo.Debug.ResendNum++ + } batchInfo.Debug.Status = StatusSent batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp @@ -196,9 +259,11 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn batchInfo.Debug.StartTimestamp).Seconds() t.cfg.debugBatchStore(batchInfo) - t.lastPendingBatch = batchInfo.BatchNum - if batchInfo.L1Batch { - t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 + // t.lastPendingBatch = batchInfo.BatchNum + if !resend { + if batchInfo.L1Batch { + t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 + } } if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { return tracerr.Wrap(err) @@ -242,14 +307,14 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) { receipt := batchInfo.Receipt if receipt != nil { - if batchInfo.EthTx.Nonce > t.lastSuccessNonce { - t.lastSuccessNonce = batchInfo.EthTx.Nonce + if batchInfo.EthTx.Nonce()+1 > t.accNonce { + t.accNonce = batchInfo.EthTx.Nonce() + 1 } if receipt.Status == types.ReceiptStatusFailed { batchInfo.Debug.Status = StatusFailed t.cfg.debugBatchStore(batchInfo) _, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber) - log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(), + log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash, "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(), "err", err) if batchInfo.BatchNum <= t.lastSuccessBatch { @@ -262,9 +327,17 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - batchInfo.Debug.StartBlockNum - now := time.Now() - batchInfo.Debug.StartToMineDelay = now.Sub( - batchInfo.Debug.StartTimestamp).Seconds() + if batchInfo.Debug.StartToMineDelay == 0 { + if block, err := t.ethClient.EthBlockByNumber(ctx, + receipt.BlockNumber.Int64()); err != nil { + log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err) + } else { + batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub( + batchInfo.Debug.SendTimestamp).Seconds() + batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() + } + } t.cfg.debugBatchStore(batchInfo) if batchInfo.BatchNum > t.lastSuccessBatch { t.lastSuccessBatch = batchInfo.BatchNum @@ -279,9 +352,62 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i // TODO: // - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions) +type Queue struct { + list []*BatchInfo + // nonceByBatchNum map[common.BatchNum]uint64 + next int +} + +func NewQueue() Queue { + return Queue{ + list: make([]*BatchInfo, 0), + // nonceByBatchNum: make(map[common.BatchNum]uint64), + next: 0, + } +} + +func (q *Queue) Len() int { + return len(q.list) +} + +func (q *Queue) At(position int) *BatchInfo { + if position >= len(q.list) { + return nil + } + return q.list[position] +} + +func (q *Queue) Next() (int, *BatchInfo) { + if len(q.list) == 0 { + return 0, nil + } + defer func() { q.next = (q.next + 1) % len(q.list) }() + return q.next, q.list[q.next] +} + +func (q *Queue) Remove(position int) { + // batchInfo := q.list[position] + // delete(q.nonceByBatchNum, batchInfo.BatchNum) + q.list = append(q.list[:position], q.list[position+1:]...) + if len(q.list) == 0 { + q.next = 0 + } else { + q.next = position % len(q.list) + } +} + +func (q *Queue) Push(batchInfo *BatchInfo) { + q.list = append(q.list, batchInfo) + // q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce() +} + +// func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) { +// nonce, ok := q.nonceByBatchNum[batchNum] +// return nonce, ok +// } + // Run the TxManager func (t *TxManager) Run(ctx context.Context) { - next := 0 waitDuration := longWaitDuration var statsVars statsVars @@ -292,7 +418,7 @@ func (t *TxManager) Run(ctx context.Context) { t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) log.Infow("TxManager: received initial statsVars", - "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch) + "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) for { select { @@ -302,7 +428,19 @@ func (t *TxManager) Run(ctx context.Context) { case statsVars := <-t.statsVarsCh: t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) + case pipelineNum := <-t.discardPipelineCh: + t.minPipelineNum = pipelineNum + 1 + if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("TxManager: removeBadBatchInfos", "err", err) + continue + } case batchInfo := <-t.batchCh: + if batchInfo.PipelineNum < t.minPipelineNum { + log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum", + "num", batchInfo.PipelineNum, "minNum", t.minPipelineNum) + } if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil { log.Warnw("TxManager: shouldSend", "err", err, "batch", batchInfo.BatchNum) @@ -310,7 +448,7 @@ func (t *TxManager) Run(ctx context.Context) { Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)}) continue } - if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { + if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil { continue } else if err != nil { // If we reach here it's because our ethNode has @@ -324,16 +462,14 @@ func (t *TxManager) Run(ctx context.Context) { Reason: fmt.Sprintf("forgeBatch send: %v", err)}) continue } - t.queue = append(t.queue, batchInfo) + t.queue.Push(batchInfo) waitDuration = t.cfg.TxManagerCheckInterval case <-time.After(waitDuration): - if len(t.queue) == 0 { + queuePosition, batchInfo := t.queue.Next() + if batchInfo == nil { waitDuration = longWaitDuration continue } - current := next - next = (current + 1) % len(t.queue) - batchInfo := t.queue[current] if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { continue } else if err != nil { //nolint:staticcheck @@ -352,35 +488,93 @@ func (t *TxManager) Run(ctx context.Context) { continue } else if err != nil { //nolint:staticcheck // Transaction was rejected - next = t.removeFromQueue(current) + if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("TxManager: removeBadBatchInfos", "err", err) + continue + } t.coord.SendMsg(ctx, MsgStopPipeline{ Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) continue } now := time.Now() - if confirm == nil && batchInfo.SendTimestamp > t.cfg.EthTxResendTimeout { - log.Infow("TxManager: forgeBatch tx not been mined timeout", - "tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum) - // TODO: Resend Tx with same nonce + if confirm == nil && now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout { + log.Infow("TxManager: forgeBatch tx not been mined timeout, resending", + "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) + if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil { + continue + } else if err != nil { + // If we reach here it's because our ethNode has + // been unable to send the transaction to + // ethereum. This could be due to the ethNode + // failure, or an invalid transaction (that + // can't be mined) + log.Warnw("TxManager: forgeBatch resend failed", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch resend: %v", err)}) + continue + } + } if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { log.Debugw("TxManager: forgeBatch tx confirmed", - "tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum) - next = t.removeFromQueue(current) + "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum) + t.queue.Remove(queuePosition) } } } } -// Removes batchInfo at position from the queue, and returns the next position -func (t *TxManager) removeFromQueue(position int) (next int) { - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) +func (t *TxManager) removeBadBatchInfos(ctx context.Context) error { + next := 0 + // batchNum := 0 + for { + batchInfo := t.queue.At(next) + if batchInfo == nil { + break + } + if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { + return nil + } else if err != nil { + // Our ethNode is giving an error different + // than "not found" when getting the receipt + // for the transaction, so we can't figure out + // if it was not mined, mined and succesfull or + // mined and failed. This could be due to the + // ethNode failure. + next++ + continue + } + confirm, err := t.handleReceipt(ctx, batchInfo) + if ctx.Err() != nil { + return nil + } else if err != nil { + // Transaction was rejected + if t.minPipelineNum <= batchInfo.PipelineNum { + t.minPipelineNum = batchInfo.PipelineNum + 1 + } + t.queue.Remove(next) + continue + } + // If tx is pending but is from a cancelled pipeline, remove it + // from the queue + if confirm == nil { + if batchInfo.PipelineNum < t.minPipelineNum { + // batchNum++ + t.queue.Remove(next) + continue + } + } + next++ + } + accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil) + if err != nil { + return err } - return next + t.accNextNonce = accNonce + return nil } func (t *TxManager) canForgeAt(blockNum int64) bool { diff --git a/coordinator/txmanager_test.go b/coordinator/txmanager_test.go new file mode 100644 index 0000000..4c624d6 --- /dev/null +++ b/coordinator/txmanager_test.go @@ -0,0 +1,15 @@ +package coordinator + +import ( + "math/big" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAddPerc(t *testing.T) { + assert.Equal(t, "110", addPerc(big.NewInt(100), 10).String()) + assert.Equal(t, "101", addPerc(big.NewInt(100), 1).String()) + assert.Equal(t, "12", addPerc(big.NewInt(10), 20).String()) + assert.Equal(t, "1500", addPerc(big.NewInt(1000), 50).String()) +} diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 1a36df2..1484ad1 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -320,6 +320,18 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) { return batchNum, tracerr.Wrap(row.Scan(&batchNum)) } +// GetLastBatchreturns the last forged batch +func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) { + var batch common.Batch + err := meddler.QueryRow( + hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, + batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, + batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, + batch.slot_num, batch.total_fees_usd FROM batch ORDER BY batch_num DESC LIMIT 1;`, + ) + return &batch, err +} + // GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) { row := hdb.db.QueryRow(`SELECT eth_block_num FROM batch diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index bb6c46a..b2f6941 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -200,6 +200,10 @@ func TestBatches(t *testing.T) { fetchedLastBatchNum, err := historyDB.GetLastBatchNum() assert.NoError(t, err) assert.Equal(t, batches[len(batches)-1].BatchNum, fetchedLastBatchNum) + // Test GetLastBatch + fetchedLastBatch, err := historyDB.GetLastBatch() + assert.NoError(t, err) + assert.Equal(t, &batches[len(batches)-1], fetchedLastBatch) // Test GetLastL1TxsNum fetchedLastL1TxsNum, err := historyDB.GetLastL1TxsNum() assert.NoError(t, err) diff --git a/go.mod b/go.mod index 638d4cf..1bc1586 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/mitchellh/mapstructure v1.3.0 github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351 github.com/russross/meddler v1.0.0 + github.com/sirupsen/logrus v1.5.0 // indirect github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect github.com/stretchr/testify v1.6.1 github.com/urfave/cli/v2 v2.2.0 diff --git a/node/node.go b/node/node.go index 6e7e669..924bed0 100644 --- a/node/node.go +++ b/node/node.go @@ -293,6 +293,8 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, + EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration, + MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice, TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration, DebugBatchPath: cfg.Coordinator.Debug.BatchPath, Purger: coordinator.PurgerCfg{ @@ -479,7 +481,7 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va if stats.Synced() { if err := n.nodeAPI.api.UpdateNetworkInfo( stats.Eth.LastBlock, stats.Sync.LastBlock, - common.BatchNum(stats.Eth.LastBatch), + common.BatchNum(stats.Eth.LastBatchNum), stats.Sync.Auction.CurrentSlot.SlotNum, ); err != nil { log.Errorw("API.UpdateNetworkInfo", "err", err) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 984b66f..7e37c98 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -25,12 +25,12 @@ type Stats struct { Updated time.Time FirstBlockNum int64 LastBlock common.Block - LastBatch int64 + LastBatchNum int64 } Sync struct { Updated time.Time LastBlock common.Block - LastBatch int64 + LastBatch common.Batch // LastL1BatchBlock is the last ethereum block in which an // l1Batch was forged LastL1BatchBlock int64 @@ -77,13 +77,13 @@ func (s *StatsHolder) UpdateCurrentNextSlot(current *common.Slot, next *common.S } // UpdateSync updates the synchronizer stats -func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum, +func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.Batch, lastL1BatchBlock *int64, lastForgeL1TxsNum *int64) { now := time.Now() s.rw.Lock() s.Sync.LastBlock = *lastBlock if lastBatch != nil { - s.Sync.LastBatch = int64(*lastBatch) + s.Sync.LastBatch = *lastBatch } if lastL1BatchBlock != nil { s.Sync.LastL1BatchBlock = *lastL1BatchBlock @@ -107,14 +107,14 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error { if err != nil { return tracerr.Wrap(err) } - lastBatch, err := ethClient.RollupLastForgedBatch() + lastBatchNum, err := ethClient.RollupLastForgedBatch() if err != nil { return tracerr.Wrap(err) } s.rw.Lock() s.Eth.Updated = now s.Eth.LastBlock = *lastBlock - s.Eth.LastBatch = lastBatch + s.Eth.LastBatchNum = lastBatchNum s.rw.Unlock() return nil } @@ -139,6 +139,10 @@ func (s *StatsHolder) CopyStats() *Stats { sCopy.Sync.Auction.NextSlot.DefaultSlotBid = common.CopyBigInt(s.Sync.Auction.NextSlot.DefaultSlotBid) } + if s.Sync.LastBatch.StateRoot != nil { + sCopy.Sync.LastBatch.StateRoot = + common.CopyBigInt(s.Sync.LastBatch.StateRoot) + } s.rw.RUnlock() return &sCopy } @@ -152,9 +156,9 @@ func (s *StatsHolder) blocksPerc() float64 { float64(s.Eth.LastBlock.Num-(s.Eth.FirstBlockNum-1)) } -func (s *StatsHolder) batchesPerc(batchNum int64) float64 { +func (s *StatsHolder) batchesPerc(batchNum common.BatchNum) float64 { return float64(batchNum) * 100.0 / - float64(s.Eth.LastBatch) + float64(s.Eth.LastBatchNum) } // StartBlockNums sets the first block used to start tracking the smart @@ -458,9 +462,9 @@ func (s *Synchronizer) init() error { "ethLastBlock", s.stats.Eth.LastBlock, ) log.Infow("Sync init batch", - "syncLastBatch", s.stats.Sync.LastBatch, - "syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch), - "ethLastBatch", s.stats.Eth.LastBatch, + "syncLastBatch", s.stats.Sync.LastBatch.BatchNum, + "syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch.BatchNum), + "ethLastBatch", s.stats.Eth.LastBatchNum, ) return nil } @@ -627,7 +631,7 @@ func (s *Synchronizer) Sync2(ctx context.Context, } } s.stats.UpdateSync(ethBlock, - &rollupData.Batches[batchesLen-1].Batch.BatchNum, + &rollupData.Batches[batchesLen-1].Batch, lastL1BatchBlock, lastForgeL1TxsNum) } var firstBatchBlockNum *int64 @@ -646,8 +650,8 @@ func (s *Synchronizer) Sync2(ctx context.Context, for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", "syncLastBatch", batchData.Batch.BatchNum, - "syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)), - "ethLastBatch", s.stats.Eth.LastBatch, + "syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum), + "ethLastBatch", s.stats.Eth.LastBatchNum, ) }