diff --git a/config/config.go b/config/config.go index b9aa213..81f33ed 100644 --- a/config/config.go +++ b/config/config.go @@ -79,6 +79,15 @@ type Coordinator struct { // ForgeRetryInterval is the waiting interval between calls forge a // batch after an error ForgeRetryInterval Duration `validate:"required"` + // ForgeDelay is the delay after which a batch is forged if the slot is + // already committed. If set to 0s, the coordinator will continuously + // forge at the maximum rate. + ForgeDelay Duration `validate:"-"` + // ForgeNoTxsDelay is the delay after which a batch is forged even if + // there are no txs to forge if the slot is already committed. If set + // to 0s, the coordinator will continuously forge even if the batches + // are empty. + ForgeNoTxsDelay Duration `validate:"-"` // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` @@ -133,15 +142,13 @@ type Coordinator struct { NLevels int64 `validate:"required"` } `validate:"required"` EthClient struct { - // CallGasLimit is the default gas limit set for ethereum - // calls, except for methods where a particular gas limit is - // harcoded because it's known to be a big value - CallGasLimit uint64 `validate:"required"` // MaxGasPrice is the maximum gas price allowed for ethereum // transactions MaxGasPrice *big.Int `validate:"required"` - // GasPriceDiv is the gas price division - GasPriceDiv uint64 `validate:"required"` + // GasPriceIncPerc is the percentage increase of gas price set + // in an ethereum transaction from the suggested gas price by + // the ehtereum node + GasPriceIncPerc int64 // CheckLoopInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager CheckLoopInterval Duration `validate:"required"` diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 0c2705d..ae232d6 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -23,7 +23,9 @@ import ( ) var ( - errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errForgeNoTxsBeforeDelay = fmt.Errorf("no txs to forge and we haven't reached the forge no txs delay") + errForgeBeforeDelay = fmt.Errorf("we haven't reached the forge delay") ) const ( @@ -71,6 +73,15 @@ type Config struct { // ForgeRetryInterval is the waiting interval between calls forge a // batch after an error ForgeRetryInterval time.Duration + // ForgeDelay is the delay after which a batch is forged if the slot is + // already committed. If set to 0s, the coordinator will continuously + // forge at the maximum rate. + ForgeDelay time.Duration + // ForgeNoTxsDelay is the delay after which a batch is forged even if + // there are no txs to forge if the slot is already committed. If set + // to 0s, the coordinator will continuously forge even if the batches + // are empty. + ForgeNoTxsDelay time.Duration // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration @@ -87,6 +98,10 @@ type Config struct { // MaxGasPrice is the maximum gas price allowed for ethereum // transactions MaxGasPrice *big.Int + // GasPriceIncPerc is the percentage increase of gas price set in an + // ethereum transaction from the suggested gas price by the ehtereum + // node + GasPriceIncPerc int64 // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration @@ -344,15 +359,22 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) } else if canForge { log.Infow("Coordinator: forging state begin", "block", stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum) - batchNum := stats.Sync.LastBatch.BatchNum - if c.lastNonFailedBatchNum > batchNum { - batchNum = c.lastNonFailedBatchNum + fromBatch := fromBatch{ + BatchNum: stats.Sync.LastBatch.BatchNum, + ForgerAddr: stats.Sync.LastBatch.ForgerAddr, + StateRoot: stats.Sync.LastBatch.StateRoot, + } + if c.lastNonFailedBatchNum > fromBatch.BatchNum { + fromBatch.BatchNum = c.lastNonFailedBatchNum + fromBatch.ForgerAddr = c.cfg.ForgerAddress + fromBatch.StateRoot = big.NewInt(0) } var err error if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) } - if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { + c.pipelineFromBatch = fromBatch + if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil { c.pipeline = nil return tracerr.Wrap(err) } @@ -398,7 +420,8 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars) } if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress && - c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 { + (c.stats.Sync.LastBatch.StateRoot == nil || c.pipelineFromBatch.StateRoot == nil || + c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0) { // There's been a reorg and the batch state root from which the // pipeline was started has changed (probably because it was in // a block that was discarded), and it was sent by a different diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 9939079..08ba352 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -29,6 +29,7 @@ type state struct { batchNum common.BatchNum lastScheduledL1BatchBlockNum int64 lastForgeL1TxsNum int64 + lastSlotForged int64 } // Pipeline manages the forging of batches with parallel server proofs @@ -42,6 +43,7 @@ type Pipeline struct { started bool rw sync.RWMutex errAtBatchNum common.BatchNum + lastForgeTime time.Time proversPool *ProversPool provers []prover.Client @@ -133,6 +135,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum, batchNum: batchNum, lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, lastScheduledL1BatchBlockNum: 0, + lastSlotForged: -1, } p.stats = *stats p.vars = *vars @@ -204,6 +207,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else if tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || + tracerr.Unwrap(err) == errForgeBeforeDelay { + // no log } else { log.Errorw("forgeBatch", "err", err) } @@ -269,7 +275,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { continue - } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || + tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || + tracerr.Unwrap(err) == errForgeBeforeDelay { waitDuration = p.cfg.ForgeRetryInterval continue } else if err != nil { @@ -282,6 +290,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, }) continue } + p.lastForgeTime = time.Now() p.state.batchNum = batchNum select { @@ -373,8 +382,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e return nil, tracerr.Wrap(err) } // Structure to accumulate data and metadata of the batch + now := time.Now() batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} - batchInfo.Debug.StartTimestamp = time.Now() + batchInfo.Debug.StartTimestamp = now batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 selectionCfg := &txselector.SelectionConfig{ @@ -388,10 +398,17 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var auths [][]byte var coordIdxs []common.Idx - // TODO: If there are no txs and we are behind the timeout, skip - // forging a batch and return a particular error that can be handleded - // in the loop where handleForgeBatch is called to retry after an - // interval + // Check if the slot is not yet fulfilled + slotCommitted := false + if p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || + p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged { + slotCommitted = true + } + + // If we haven't reached the ForgeDelay, skip forging the batch + if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { + return nil, errForgeBeforeDelay + } // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { @@ -409,9 +426,6 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err != nil { return nil, tracerr.Wrap(err) } - - p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.state.lastForgeL1TxsNum++ } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = @@ -422,6 +436,43 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e l1UserTxsExtra = nil } + // If there are no txs to forge, no l1UserTxs in the open queue to + // freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the + // batch. + if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { + noTxs := false + if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { + if batchInfo.L1Batch { + // Query the L1UserTxs in the queue following + // the one we are trying to forge. + nextL1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs( + p.state.lastForgeL1TxsNum + 1) + if err != nil { + return nil, tracerr.Wrap(err) + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues and forge the + // L1UserTxs in the future. Otherwise, skip. + if len(nextL1UserTxs) == 0 { + noTxs = true + } + } else { + noTxs = true + } + } + if noTxs { + if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { + return nil, tracerr.Wrap(err) + } + return nil, errForgeNoTxsBeforeDelay + } + } + + if batchInfo.L1Batch { + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ + } + // 3. Save metadata from TxSelector output for BatchNum batchInfo.L1UserTxsExtra = l1UserTxsExtra batchInfo.L1CoordTxs = l1CoordTxs @@ -466,6 +517,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e p.cfg.debugBatchStore(batchInfo) log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum) + p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum + return batchInfo, nil } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 082d0f0..3b2ef6a 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -128,11 +128,14 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { if err != nil { return nil, tracerr.Wrap(err) } - inc := new(big.Int).Set(gasPrice) - // TODO: Replace this by a value of percentage - const gasPriceDiv = 100 - inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv)) - gasPrice.Add(gasPrice, inc) + if t.cfg.GasPriceIncPerc != 0 { + inc := new(big.Int).Set(gasPrice) + inc.Mul(inc, new(big.Int).SetInt64(t.cfg.GasPriceIncPerc)) + // nolint reason: to calculate percentages we use 100 + inc.Div(inc, new(big.Int).SetUint64(100)) //nolint:gomnd + gasPrice.Add(gasPrice, inc) + } + // log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice) auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID) @@ -141,6 +144,13 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { } auth.Value = big.NewInt(0) // in wei // TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs + // This requires a function that estimates the gas usage of the + // forgeBatch call based on the contents of the ForgeBatch args: + // - length of l2txs + // - length of l1Usertxs + // - length of l1CoordTxs with authorization signature + // - length of l1CoordTxs without authoriation signature + // - etc. auth.GasLimit = 1000000 auth.GasPrice = gasPrice auth.Nonce = nil diff --git a/eth/rollup.go b/eth/rollup.go index 992c2ca..5a77bd2 100644 --- a/eth/rollup.go +++ b/eth/rollup.go @@ -316,7 +316,7 @@ func NewRollupClient(client *EthereumClient, address ethCommon.Address, tokenHEZ } consts, err := c.RollupConstants() if err != nil { - return nil, tracerr.Wrap(err) + return nil, tracerr.Wrap(fmt.Errorf("RollupConstants at %v: %w", address, err)) } c.consts = consts return c, nil diff --git a/node/node.go b/node/node.go index a09736b..4d28ec8 100644 --- a/node/node.go +++ b/node/node.go @@ -103,8 +103,8 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { var keyStore *ethKeystore.KeyStore if mode == ModeCoordinator { ethCfg = eth.EthereumConfig{ - CallGasLimit: cfg.Coordinator.EthClient.CallGasLimit, - GasPriceDiv: cfg.Coordinator.EthClient.GasPriceDiv, + CallGasLimit: 0, // cfg.Coordinator.EthClient.CallGasLimit, + GasPriceDiv: 0, // cfg.Coordinator.EthClient.GasPriceDiv, } scryptN := ethKeystore.StandardScryptN @@ -299,12 +299,15 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ConfirmBlocks: cfg.Coordinator.ConfirmBlocks, L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc, ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration, + ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, + ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce, EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration, MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice, + GasPriceIncPerc: cfg.Coordinator.EthClient.GasPriceIncPerc, TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration, DebugBatchPath: cfg.Coordinator.Debug.BatchPath, Purger: coordinator.PurgerCfg{ @@ -583,6 +586,8 @@ func (n *Node) StartSynchronizer() { } if errors.Is(err, eth.ErrBlockHashMismatchEvent) { log.Warnw("Synchronizer.Sync", "err", err) + } else if errors.Is(err, synchronizer.ErrUnknownBlock) { + log.Warnw("Synchronizer.Sync", "err", err) } else { log.Errorw("Synchronizer.Sync", "err", err) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 361bdc8..9606188 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -18,6 +18,19 @@ import ( "github.com/hermeznetwork/tracerr" ) +const ( + // errStrUnknownBlock is the string returned by geth when querying an + // unknown block + errStrUnknownBlock = "unknown block" +) + +var ( + // ErrUnknownBlock is the error returned by the Synchronizer when a + // block is queried by hash but the ethereum node doesn't find it due + // to it being discarded from a reorg. + ErrUnknownBlock = fmt.Errorf("unknown block") +) + // Stats of the syncrhonizer type Stats struct { Eth struct { @@ -648,11 +661,6 @@ func (s *Synchronizer) Sync2(ctx context.Context, return nil, nil, tracerr.Wrap(err) } - log.Debugw("Synced block", - "syncLastBlockNum", s.stats.Sync.LastBlock.Num, - "syncBlocksPerc", s.stats.blocksPerc(), - "ethLastBlockNum", s.stats.Eth.LastBlock.Num, - ) for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", "syncLastBatch", batchData.Batch.BatchNum, @@ -660,6 +668,11 @@ func (s *Synchronizer) Sync2(ctx context.Context, "ethLastBatch", s.stats.Eth.LastBatchNum, ) } + log.Debugw("Synced block", + "syncLastBlockNum", s.stats.Sync.LastBlock.Num, + "syncBlocksPerc", s.stats.blocksPerc(), + "ethLastBlockNum", s.stats.Eth.LastBlock.Num, + ) return blockData, nil, nil } @@ -811,7 +824,9 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e // Get rollup events in the block, and make sure the block hash matches // the expected one. rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err)) } // No events in this block @@ -1121,7 +1136,9 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, // Get auction events in the block auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err)) } // No events in this block @@ -1218,7 +1235,9 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat // Get wDelayer events in the block wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, ðBlock.Hash) - if err != nil { + if err != nil && err.Error() == errStrUnknownBlock { + return nil, tracerr.Wrap(ErrUnknownBlock) + } else if err != nil { return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err)) } // No events in this block