From deede9541b7d79984ee3b7e0b8780a2e7290886d Mon Sep 17 00:00:00 2001 From: Eduard S Date: Fri, 22 Jan 2021 11:56:45 +0100 Subject: [PATCH] WIP --- cli/node/cfg.buidler.toml | 6 +- config/config.go | 25 +++++++ coordinator/batch.go | 11 ++- coordinator/coordinator.go | 70 ++++++++++++++++--- coordinator/pipeline.go | 10 +-- coordinator/txmanager.go | 137 ++++++++++++++++++++++++++----------- node/node.go | 4 ++ 7 files changed, 198 insertions(+), 65 deletions(-) diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 0edd4db..7b0e3b2 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -44,7 +44,10 @@ ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordina # ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator # ForgerAddressPrivateKey = "0xa8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563" ConfirmBlocks = 10 -L1BatchTimeoutPerc = 0.999 +L1BatchTimeoutPerc = 0.6 +StartSlotBlocksDelay = 2 +ScheduleBatchBlocksAheadCheck = 3 +SendBatchBlocksMarginCheck = 1 ProofServerPollInterval = "1s" ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" @@ -83,6 +86,7 @@ ReceiptLoopInterval = "500ms" CheckLoopInterval = "500ms" Attempts = 4 AttemptsDelay = "500ms" +TxResendTimeout = "2m" CallGasLimit = 300000 GasPriceDiv = 100 diff --git a/config/config.go b/config/config.go index 9cf963a..02cd135 100644 --- a/config/config.go +++ b/config/config.go @@ -51,6 +51,27 @@ type Coordinator struct { // L1BatchTimeoutPerc is the portion of the range before the L1Batch // timeout that will trigger a schedule to forge an L1Batch L1BatchTimeoutPerc float64 `validate:"required"` + // StartSlotBlocksDelay is the number of blocks of delay to wait before + // starting the pipeline when we reach a slot in which we can forge. + StartSlotBlocksDelay int64 + // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which + // the forger address is checked to be allowed to forge (appart from + // checking the next block), used to decide when to stop scheduling new + // batches (by stopping the pipeline). + // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck + // is 5, eventhough at block 11 we canForge, the pipeline will be + // stopped if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // scheduling a batch and having it mined. + ScheduleBatchBlocksAheadCheck int64 + // SendBatchBlocksMarginCheck is the number of margin blocks ahead in + // which the coordinator is also checked to be allowed to forge, appart + // from the next block; used to decide when to stop sending batches to + // the smart contract. + // For example, if we are at block 10 and SendBatchBlocksMarginCheck is + // 5, eventhough at block 11 we canForge, the batch will be discarded + // if we can't forge at block 15. + SendBatchBlocksMarginCheck int64 // ProofServerPollInterval is the waiting interval between polling the // ProofServer while waiting for a particular status ProofServerPollInterval Duration `validate:"required"` @@ -112,6 +133,10 @@ type Coordinator struct { // AttemptsDelay is delay between attempts do do an eth client // RPC call AttemptsDelay Duration `validate:"required"` + // TxResendTimeout is the timeout after which a non-mined + // ethereum transaction will be resent (reusing the nonce) with + // a newly calculated gas price + TxResendTimeout time.Duration `validate:"required"` // Keystore is the ethereum keystore where private keys are kept Keystore struct { // Path to the keystore diff --git a/coordinator/batch.go b/coordinator/batch.go index b2acf67..df0edf4 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -64,6 +64,9 @@ type Debug struct { // StartToSendDelay is the delay between starting a batch and sending // it to ethereum, in seconds StartToSendDelay float64 + // StartToMineDelay is the delay between starting a batch and having + // it mined in seconds + StartToMineDelay float64 } // BatchInfo contans the Batch information @@ -82,9 +85,11 @@ type BatchInfo struct { CoordIdxs []common.Idx ForgeBatchArgs *eth.RollupForgeBatchArgs // FeesInfo - EthTx *types.Transaction - Receipt *types.Receipt - Debug Debug + EthTx *types.Transaction + // SendTimestamp the time of batch sent to ethereum + SendTimestamp time.Time + Receipt *types.Receipt + Debug Debug } // DebugStore is a debug function to store the BatchInfo as a json text file in diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 5843c94..7d0604f 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -42,6 +42,29 @@ type Config struct { // L1BatchTimeoutPerc is the portion of the range before the L1Batch // timeout that will trigger a schedule to forge an L1Batch L1BatchTimeoutPerc float64 + // StartSlotBlocksDelay is the number of blocks of delay to wait before + // starting the pipeline when we reach a slot in which we can forge. + StartSlotBlocksDelay int64 + // ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which + // the forger address is checked to be allowed to forge (appart from + // checking the next block), used to decide when to stop scheduling new + // batches (by stopping the pipeline). + // For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck + // is 5, eventhough at block 11 we canForge, the pipeline will be + // stopped if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // scheduling a batch and having it mined. + ScheduleBatchBlocksAheadCheck int64 + // SendBatchBlocksMarginCheck is the number of margin blocks ahead in + // which the coordinator is also checked to be allowed to forge, appart + // from the next block; used to decide when to stop sending batches to + // the smart contract. + // For example, if we are at block 10 and SendBatchBlocksMarginCheck is + // 5, eventhough at block 11 we canForge, the batch will be discarded + // if we can't forge at block 15. + // This value should be the expected number of blocks it takes between + // sending a batch and having it mined. + SendBatchBlocksMarginCheck int64 // EthClientAttempts is the number of attempts to do an eth client RPC // call before giving up EthClientAttempts int @@ -54,13 +77,19 @@ type Config struct { // EthClientAttemptsDelay is delay between attempts do do an eth client // RPC call EthClientAttemptsDelay time.Duration + // EthTxResendTimeout is the timeout after which a non-mined ethereum + // transaction will be resent (reusing the nonce) with a newly + // calculated gas price + EthTxResendTimeout time.Duration // TxManagerCheckInterval is the waiting interval between receipt // checks of ethereum transactions in the TxManager TxManagerCheckInterval time.Duration // DebugBatchPath if set, specifies the path where batchInfo is stored // in JSON in every step/update of the pipeline - DebugBatchPath string - Purger PurgerCfg + DebugBatchPath string + Purger PurgerCfg + // VerifierIdx is the index of the verifier contract registered in the + // smart contract VerifierIdx uint8 TxProcessorConfig txprocessor.Config } @@ -215,18 +244,22 @@ func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) { } } -func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - c.vars.Rollup = *vars.Rollup +func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariablesPtr) { + if update.Rollup != nil { + vars.Rollup = *update.Rollup } - if vars.Auction != nil { - c.vars.Auction = *vars.Auction + if update.Auction != nil { + vars.Auction = *update.Auction } - if vars.WDelayer != nil { - c.vars.WDelayer = *vars.WDelayer + if update.WDelayer != nil { + vars.WDelayer = *update.WDelayer } } +func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { + updateSCVars(&c.vars, vars) +} + func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables, currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64) bool { var slot *common.Slot @@ -254,6 +287,12 @@ func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.Auc return false } +func (c *Coordinator) canForgeAt(blockNum int64) bool { + return canForge(&c.consts.Auction, &c.vars.Auction, + &c.stats.Sync.Auction.CurrentSlot, &c.stats.Sync.Auction.NextSlot, + c.cfg.ForgerAddress, blockNum) +} + func (c *Coordinator) canForge() bool { blockNum := c.stats.Eth.LastBlock.Num + 1 return canForge(&c.consts.Auction, &c.vars.Auction, @@ -262,9 +301,18 @@ func (c *Coordinator) canForge() bool { } func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error { - canForge := c.canForge() + nextBlock := c.stats.Eth.LastBlock.Num + 1 + canForge := c.canForgeAt(nextBlock) + if c.cfg.ScheduleBatchBlocksAheadCheck != 0 && canForge { + canForge = c.canForgeAt(nextBlock + c.cfg.ScheduleBatchBlocksAheadCheck) + } if c.pipeline == nil { - if canForge { + relativeBlock := c.consts.Auction.RelativeBlock(nextBlock) + if canForge && relativeBlock < c.cfg.StartSlotBlocksDelay { + log.Debugw("Coordinator: delaying pipeline start due to "+ + "relativeBlock (%v) < cfg.StartSlotBlocksDelay (%v)", + relativeBlock, c.cfg.StartSlotBlocksDelay) + } else if canForge { log.Infow("Coordinator: forging state begin", "block", stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch) batchNum := common.BatchNum(stats.Sync.LastBatch) diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 81d5774..7b20f4b 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -122,15 +122,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum, } func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - p.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - p.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - p.vars.WDelayer = *vars.WDelayer - } + updateSCVars(&p.vars, vars) } // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 5a449c7..26e5dac 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -41,6 +41,8 @@ type TxManager struct { lastPendingBatch common.BatchNum lastSuccessNonce uint64 lastPendingNonce uint64 + + lastSentL1BatchBlockNum int64 } // NewTxManager creates a new TxManager @@ -105,15 +107,7 @@ func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.St } func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - t.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - t.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - t.vars.WDelayer = *vars.WDelayer - } + updateSCVars(&t.vars, vars) } // NewAuth generates a new auth object for an ethereum transaction @@ -141,13 +135,29 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { return auth, nil } +func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error { + nextBlock := t.stats.Eth.LastBlock.Num + 1 + if !t.canForgeAt(nextBlock) { + return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock)) + } + if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch { + return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock)) + } + margin := t.cfg.SendBatchBlocksMarginCheck + if margin != 0 { + if !t.canForgeAt(nextBlock + margin) { + return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v", + margin, nextBlock)) + } + if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch { + return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v", + margin, nextBlock)) + } + } + return nil +} + func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { - // TODO: Check if we can forge in the next blockNum, abort if we can't - batchInfo.Debug.Status = StatusSent - batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 - batchInfo.Debug.SendTimestamp = time.Now() - batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( - batchInfo.Debug.StartTimestamp).Seconds() var ethTx *types.Transaction var err error auth, err := t.NewAuth(ctx) @@ -159,11 +169,6 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth) if err != nil { - // if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { - // log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err, - // "block", t.stats.Eth.LastBlock.Num+1) - // return tracerr.Wrap(err) - // } log.Errorw("TxManager ethClient.RollupForgeBatch", "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1, "batchNum", batchInfo.BatchNum) @@ -181,8 +186,20 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn } batchInfo.EthTx = ethTx log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) + now := time.Now() + batchInfo.SendTimestamp = now + + batchInfo.Debug.Status = StatusSent + batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1 + batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp + batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() t.cfg.debugBatchStore(batchInfo) + t.lastPendingBatch = batchInfo.BatchNum + if batchInfo.L1Batch { + t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 + } if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { return tracerr.Wrap(err) } @@ -225,6 +242,9 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) { receipt := batchInfo.Receipt if receipt != nil { + if batchInfo.EthTx.Nonce > t.lastSuccessNonce { + t.lastSuccessNonce = batchInfo.EthTx.Nonce + } if receipt.Status == types.ReceiptStatusFailed { batchInfo.Debug.Status = StatusFailed t.cfg.debugBatchStore(batchInfo) @@ -232,6 +252,9 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(), "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(), "err", err) + if batchInfo.BatchNum <= t.lastSuccessBatch { + t.lastSuccessBatch = batchInfo.BatchNum - 1 + } return nil, tracerr.Wrap(fmt.Errorf( "ethereum transaction receipt status is failed: %w", err)) } else if receipt.Status == types.ReceiptStatusSuccessful { @@ -239,6 +262,9 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - batchInfo.Debug.StartBlockNum + now := time.Now() + batchInfo.Debug.StartToMineDelay = now.Sub( + batchInfo.Debug.StartTimestamp).Seconds() t.cfg.debugBatchStore(batchInfo) if batchInfo.BatchNum > t.lastSuccessBatch { t.lastSuccessBatch = batchInfo.BatchNum @@ -250,6 +276,9 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i return nil, nil } +// TODO: +// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions) + // Run the TxManager func (t *TxManager) Run(ctx context.Context) { next := 0 @@ -274,6 +303,13 @@ func (t *TxManager) Run(ctx context.Context) { t.stats = statsVars.Stats t.syncSCVars(statsVars.Vars) case batchInfo := <-t.batchCh: + if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil { + log.Warnw("TxManager: shouldSend", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)}) + continue + } if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { continue } else if err != nil { @@ -282,7 +318,10 @@ func (t *TxManager) Run(ctx context.Context) { // ethereum. This could be due to the ethNode // failure, or an invalid transaction (that // can't be mined) - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch send: %v", err)}) + log.Warnw("TxManager: forgeBatch send failed", "err", err, + "batch", batchInfo.BatchNum) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch send: %v", err)}) continue } t.queue = append(t.queue, batchInfo) @@ -304,7 +343,8 @@ func (t *TxManager) Run(ctx context.Context) { // if it was not mined, mined and succesfull or // mined and failed. This could be due to the // ethNode failure. - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) } confirm, err := t.handleReceipt(ctx, batchInfo) @@ -312,32 +352,47 @@ func (t *TxManager) Run(ctx context.Context) { continue } else if err != nil { //nolint:staticcheck // Transaction was rejected - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) - } - t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + next = t.removeFromQueue(current) + t.coord.SendMsg(ctx, MsgStopPipeline{ + Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + continue + } + now := time.Now() + if confirm == nil && batchInfo.SendTimestamp > t.cfg.EthTxResendTimeout { + log.Infow("TxManager: forgeBatch tx not been mined timeout", + "tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum) + // TODO: Resend Tx with same nonce } if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { - log.Debugw("TxManager tx for RollupForgeBatch confirmed", - "batch", batchInfo.BatchNum) - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - next = 0 - } else { - next = current % len(t.queue) - } + log.Debugw("TxManager: forgeBatch tx confirmed", + "tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum) + next = t.removeFromQueue(current) } } } } -// nolint reason: this function will be used in the future -//nolint:unused -func (t *TxManager) canForge(stats *synchronizer.Stats, blockNum int64) bool { +// Removes batchInfo at position from the queue, and returns the next position +func (t *TxManager) removeFromQueue(position int) (next int) { + t.queue = append(t.queue[:current], t.queue[current+1:]...) + if len(t.queue) == 0 { + next = 0 + } else { + next = current % len(t.queue) + } + return next +} + +func (t *TxManager) canForgeAt(blockNum int64) bool { return canForge(&t.consts.Auction, &t.vars.Auction, - &stats.Sync.Auction.CurrentSlot, &stats.Sync.Auction.NextSlot, + &t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot, t.cfg.ForgerAddress, blockNum) } + +func (t *TxManager) mustL1L2Batch(blockNum int64) bool { + lastL1BatchBlockNum := t.lastSentL1BatchBlockNum + if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock + } + return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1 +} diff --git a/node/node.go b/node/node.go index eb04b48..6e7e669 100644 --- a/node/node.go +++ b/node/node.go @@ -484,6 +484,10 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va ); err != nil { log.Errorw("API.UpdateNetworkInfo", "err", err) } + } else { + n.nodeAPI.api.UpdateNetworkInfoBlock( + stats.Eth.LastBlock, stats.Sync.LastBlock, + ) } } }