From f2e5800ebd17a58c1f27a63660353c9f46ded72f Mon Sep 17 00:00:00 2001 From: Eduard S Date: Fri, 12 Feb 2021 17:49:26 +0100 Subject: [PATCH] Delay forging of batches via config parameters coordinator: - Add config `ForgeDelay`: ForgeDelay is the delay after which a batch is forged if the slot is already commited. If set to 0s, the coordinator will continously forge at the maximum rate. - Add config `ForgeNoTxsDelay`: ForgeNoTxsDelay is the delay after which a batch is forged even if there are no txs to forge if the slot is already commited. If set to 0s, the coordinator will continously forge even if the batches are empty. - Add config `GasPriceIncPerc`: GasPriceIncPerc is the percentage increase of gas price set in an ethereum transaction from the suggested gas price by the ehtereum node - Remove unused configuration parameters `CallGasLimit` and `GasPriceDiv` - Forge always regardless of configured forge delay when the current slot is not yet commited and we are the winner of the slot synchronizer: - Don't log with error (use warning) level when there's a reorg and the queried events by block using the block hash returns "unknown block". --- config/config.go | 19 +++++++--- coordinator/coordinator.go | 35 +++++++++++++++--- coordinator/pipeline.go | 71 +++++++++++++++++++++++++++++++----- coordinator/txmanager.go | 20 +++++++--- eth/rollup.go | 2 +- node/node.go | 9 ++++- synchronizer/synchronizer.go | 35 ++++++++++++++---- 7 files changed, 154 insertions(+), 37 deletions(-) diff --git a/config/config.go b/config/config.go index b9aa213..81f33ed 100644 --- a/config/config.go +++ b/config/config.go @@ -79,6 +79,15 @@ type Coordinator struct { // ForgeRetryInterval is the waiting interval between calls forge a // batch after an error ForgeRetryInterval Duration `validate:"required"` + // ForgeDelay is the delay after which a batch is forged if the slot is + // already committed. If set to 0s, the coordinator will continuously + // forge at the maximum rate. + ForgeDelay Duration `validate:"-"` + // ForgeNoTxsDelay is the delay after which a batch is forged even if + // there are no txs to forge if the slot is already committed. If set + // to 0s, the coordinator will continuously forge even if the batches + // are empty. + ForgeNoTxsDelay Duration `validate:"-"` // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` @@ -133,15 +142,13 @@ type Coordinator struct { NLevels int64 `validate:"required"` } `validate:"required"` EthClient struct { - // CallGasLimit is the default gas limit set for ethereum - // calls, except for methods where a particular gas limit is - // harcoded because it's known to be a big value - CallGasLimit uint64 `validate:"required"` // MaxGasPrice is the maximum gas price allowed for ethereum // transactions MaxGasPrice *big.Int `validate:"required"` - // GasPriceDiv is the gas price division - GasPriceDiv uint64 `validate:"required"` + // GasPriceIncPerc is the percentage increase of gas price set + // in an ethereum transaction from the suggested gas price by + // the ehtereum node + GasPriceIncPerc int64 // CheckLoopInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager CheckLoopInterval Duration `validate:"required"` diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 0c2705d..ae232d6 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -23,7 +23,9 @@ import ( ) var ( - errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errForgeNoTxsBeforeDelay = fmt.Errorf("no txs to forge and we haven't reached the forge no txs delay") + errForgeBeforeDelay = fmt.Errorf("we haven't reached the forge delay") ) const ( @@ -71,6 +73,15 @@ type Config struct { // ForgeRetryInterval is the waiting interval between calls forge a // batch after an error ForgeRetryInterval time.Duration + // ForgeDelay is the delay after which a batch is forged if the slot is + // already committed. If set to 0s, the coordinator will continuously + // forge at the maximum rate. + ForgeDelay time.Duration + // ForgeNoTxsDelay is the delay after which a batch is forged even if + // there are no txs to forge if the slot is already committed. If set + // to 0s, the coordinator will continuously forge even if the batches + // are empty. + ForgeNoTxsDelay time.Duration // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration @@ -87,6 +98,10 @@ type Config struct { // MaxGasPrice is the maximum gas price allowed for ethereum // transactions MaxGasPrice *big.Int + // GasPriceIncPerc is the percentage increase of gas price set in an + // ethereum transaction from the suggested gas price by the ehtereum + // node + GasPriceIncPerc int64 // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration @@ -344,15 +359,22 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) } else if canForge { log.Infow("Coordinator: forging state begin", "block", stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) - batchNum := stats.Sync.LastBatch.BatchNum - if c.lastNonFailedBatchNum > batchNum { - batchNum = c.lastNonFailedBatchNum + fromBatch := fromBatch{ + BatchNum: stats.Sync.LastBatch.BatchNum, + ForgerAddr: stats.Sync.LastBatch.ForgerAddr, + StateRoot: stats.Sync.LastBatch.StateRoot, + } + if c.lastNonFailedBatchNum > fromBatch.BatchNum { + fromBatch.BatchNum = c.lastNonFailedBatchNum + fromBatch.ForgerAddr = c.cfg.ForgerAddress + fromBatch.StateRoot = big.NewInt(0) } var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) } - if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { + c.pipelineFromBatch = fromBatch + if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil { c.pipeline = nil return tracerr.Wrap(err) } @@ -398,7 +420,8 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars) } if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress && - c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 { + (c.stats.Sync.LastBatch.StateRoot == nil || c.pipelineFromBatch.StateRoot == nil || + c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0) { // There's been a reorg and the batch state root from which the // pipeline was started has changed (probably because it was in // a block that was discarded), and it was sent by a different diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 9939079..08ba352 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -29,6 +29,7 @@ type state struct { batchNum common.BatchNum lastScheduledL1BatchBlockNum int64 lastForgeL1TxsNum int64 + lastSlotForged int64 } // Pipeline manages the forging of batches with parallel server proofs @@ -42,6 +43,7 @@ type Pipeline struct { started bool rw sync.RWMutex errAtBatchNum common.BatchNum + lastForgeTime time.Time proversPool *ProversPool provers []prover.Client @@ -133,6 +135,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum, batchNum: batchNum, lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, lastScheduledL1BatchBlockNum: 0, + lastSlotForged: -1, } p.stats = *stats p.vars = *vars @@ -204,6 +207,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else if tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || + tracerr.Unwrap(err) == errForgeBeforeDelay { + // no log } else { log.Errorw("forgeBatch", "err", err) } @@ -269,7 +275,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { continue - } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || + tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || + tracerr.Unwrap(err) == errForgeBeforeDelay { waitDuration = p.cfg.ForgeRetryInterval continue } else if err != nil { @@ -282,6 +290,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, }) continue } + p.lastForgeTime = time.Now() p.state.batchNum = batchNum select { @@ -373,8 +382,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e return nil, tracerr.Wrap(err) } // Structure to accumulate data and metadata of the batch + now := time.Now() batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} - batchInfo.Debug.StartTimestamp = time.Now() + batchInfo.Debug.StartTimestamp = now batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 selectionCfg := &txselector.SelectionConfig{ @@ -388,10 +398,17 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var auths [][]byte var coordIdxs []common.Idx - // TODO: If there are no txs and we are behind the timeout, skip - // forging a batch and return a particular error that can be handleded - // in the loop where handleForgeBatch is called to retry after an - // interval + // Check if the slot is not yet fulfilled + slotCommitted := false + if p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || + p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged { + slotCommitted = true + } + + // If we haven't reached the ForgeDelay, skip forging the batch + if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { + return nil, errForgeBeforeDelay + } // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { @@ -409,9 +426,6 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } - - p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.state.lastForgeL1TxsNum++ } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = @@ -422,6 +436,43 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e l1UserTxsExtra = nil } + // If there are no txs to forge, no l1UserTxs in the open queue to + // freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the + // batch. + if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { + noTxs := false + if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { + if batchInfo.L1Batch { + // Query the L1UserTxs in the queue following + // the one we are trying to forge. + nextL1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs( + p.state.lastForgeL1TxsNum + 1) + if err != nil { + return nil, tracerr.Wrap(err) + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues and forge the + // L1UserTxs in the future. Otherwise, skip. + if len(nextL1UserTxs) == 0 { + noTxs = true + } + } else { + noTxs = true + } + } + if noTxs { + if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { + return nil, tracerr.Wrap(err) + } + return nil, errForgeNoTxsBeforeDelay + } + } + + if batchInfo.L1Batch { + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ + } + // 3. Save metadata from TxSelector output for BatchNum batchInfo.L1UserTxsExtra = l1UserTxsExtra batchInfo.L1CoordTxs = l1CoordTxs @@ -466,6 +517,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e p.cfg.debugBatchStore(batchInfo) log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum) + p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum + return batchInfo, nil } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 082d0f0..3b2ef6a 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -128,11 +128,14 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { if err != nil { return nil, tracerr.Wrap(err) } - inc := new(big.Int).Set(gasPrice) - // TODO: Replace this by a value of percentage - const gasPriceDiv = 100 - inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv)) - gasPrice.Add(gasPrice, inc) + if t.cfg.GasPriceIncPerc != 0 { + inc := new(big.Int).Set(gasPrice) + inc.Mul(inc, new(big.Int).SetInt64(t.cfg.GasPriceIncPerc)) + // nolint reason: to calculate percentages we use 100 + inc.Div(inc, new(big.Int).SetUint64(100)) //nolint:gomnd + gasPrice.Add(gasPrice, inc) + } + // log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice) auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID) @@ -141,6 +144,13 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { } auth.Value = big.NewInt(0) // in wei // TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs + // This requires a function that estimates the gas usage of the + // forgeBatch call based on the contents of the ForgeBatch args: + // - length of l2txs + // - length of l1Usertxs + // - length of l1CoordTxs with authorization signature + // - length of l1CoordTxs without authoriation signature + // - etc. auth.GasLimit = 1000000 auth.GasPrice = gasPrice auth.Nonce = nil diff --git a/eth/rollup.go b/eth/rollup.go index 992c2ca..5a77bd2 100644 --- a/eth/rollup.go +++ b/eth/rollup.go @@ -316,7 +316,7 @@ func NewRollupClient(client *EthereumClient, address ethCommon.Address, tokenHEZ } consts, err := c.RollupConstants() if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("RollupConstants at %v: %w", address, err)) } c.consts = consts return c, nil diff --git a/node/node.go b/node/node.go index a09736b..4d28ec8 100644 --- a/node/node.go +++ b/node/node.go @@ -103,8 +103,8 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { var keyStore *ethKeystore.KeyStore if mode == ModeCoordinator { ethCfg = eth.EthereumConfig{ - CallGasLimit: cfg.Coordinator.EthClient.CallGasLimit, - GasPriceDiv: cfg.Coordinator.EthClient.GasPriceDiv, + CallGasLimit: 0, // cfg.Coordinator.EthClient.CallGasLimit, + GasPriceDiv: 0, // cfg.Coordinator.EthClient.GasPriceDiv, } scryptN := ethKeystore.StandardScryptN @@ -299,12 +299,15 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ConfirmBlocks: cfg.Coordinator.ConfirmBlocks, L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc, ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration, + ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, + ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce, EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration, MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice, + GasPriceIncPerc: cfg.Coordinator.EthClient.GasPriceIncPerc, TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration, DebugBatchPath: cfg.Coordinator.Debug.BatchPath, Purger: coordinator.PurgerCfg{ @@ -583,6 +586,8 @@ func (n *Node) StartSynchronizer() { } if errors.Is(err, eth.ErrBlockHashMismatchEvent) { log.Warnw("Synchronizer.Sync", "err", err) + } else if errors.Is(err, synchronizer.ErrUnknownBlock) { + log.Warnw("Synchronizer.Sync", "err", err) } else { log.Errorw("Synchronizer.Sync", "err", err) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 361bdc8..9606188 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -18,6 +18,19 @@ import ( "github.com/hermeznetwork/tracerr" ) +const ( + // errStrUnknownBlock is the string returned by geth when querying an + // unknown block + errStrUnknownBlock = "unknown block" +) + +var ( + // ErrUnknownBlock is the error returned by the Synchronizer when a + // block is queried by hash but the ethereum node doesn't find it due + // to it being discarded from a reorg. + ErrUnknownBlock = fmt.Errorf("unknown block") +) + // Stats of the syncrhonizer type Stats struct { Eth struct { @@ -648,11 +661,6 @@ func (s *Synchronizer) Sync2(ctx context.Context, return nil, nil, tracerr.Wrap(err) } - log.Debugw("Synced block", - "syncLastBlockNum", s.stats.Sync.LastBlock.Num, - "syncBlocksPerc", s.stats.blocksPerc(), - "ethLastBlockNum", s.stats.Eth.LastBlock.Num, - ) for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", "syncLastBatch", batchData.Batch.BatchNum, @@ -660,6 +668,11 @@ func (s *Synchronizer) Sync2(ctx context.Context, "ethLastBatch", s.stats.Eth.LastBatchNum, ) } + log.Debugw("Synced block", + "syncLastBlockNum", s.stats.Sync.LastBlock.Num, + "syncBlocksPerc", s.stats.blocksPerc(), + "ethLastBlockNum", s.stats.Eth.LastBlock.Num, + ) return blockData, nil, nil } @@ -811,7 +824,9 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e // Get rollup events in the block, and make sure the block hash matches // the expected one. rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err)) } // No events in this block @@ -1121,7 +1136,9 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, // Get auction events in the block auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err)) } // No events in this block @@ -1218,7 +1235,9 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat // Get wDelayer events in the block wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err)) } // No events in this block