From 450fa08d80590dbca866be61204f8eee8f7bdfee Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 17 Mar 2021 11:44:01 +0100 Subject: [PATCH] Add config parameter ForgeOncePerSlotIfTxs New configuration Coordinator configuration parameter `ForgeOncePerSlotIfTxs`: ForgeOncePerSlotIfTxs will make the coordinator forge at most one batch per slot, only if there are included txs in that batch, or pending l1UserTxs in the smart contract. Setting this parameter overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline` and `IgnoreSlotCommitment`. Also restructure a bit the functions that check policies to decide whether or not to forge a batch. --- config/config.go | 14 ++- coordinator/batch.go | 2 +- coordinator/coordinator.go | 12 +- coordinator/pipeline.go | 208 +++++++++++++++++++++-------------- coordinator/pipeline_test.go | 4 +- coordinator/txmanager.go | 2 +- node/node.go | 1 + 7 files changed, 149 insertions(+), 94 deletions(-) diff --git a/config/config.go b/config/config.go index 343904d..1db7a32 100644 --- a/config/config.go +++ b/config/config.go @@ -112,11 +112,17 @@ type Coordinator struct { // MustForgeAtSlotDeadline enables the coordinator to forge slots if // the empty slots reach the slot deadline. MustForgeAtSlotDeadline bool - // IgnoreSlotCommitment IgnoreSlotCommitment disables forcing the - // coordinator to forge a slot immediately when the slot is not - // committed. If set to false, the coordinator will immediately forge - // a batch at the beginning of a slot if it's the slot winner. + // IgnoreSlotCommitment disables forcing the coordinator to forge a + // slot immediately when the slot is not committed. If set to false, + // the coordinator will immediately forge a batch at the beginning of a + // slot if it's the slot winner. IgnoreSlotCommitment bool + // ForgeOncePerSlotIfTxs will make the coordinator forge at most one + // batch per slot, only if there are included txs in that batch, or + // pending l1UserTxs in the smart contract. Setting this parameter + // overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline` + // and `IgnoreSlotCommitment`. + ForgeOncePerSlotIfTxs bool // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` diff --git a/coordinator/batch.go b/coordinator/batch.go index 3ff236b..8df15d8 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -85,7 +85,7 @@ type BatchInfo struct { PublicInputs []*big.Int L1Batch bool VerifierIdx uint8 - L1UserTxsExtra []common.L1Tx + L1UserTxs []common.L1Tx L1CoordTxs []common.L1Tx L1CoordinatorTxsAuths [][]byte L2Txs []common.L2Tx diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 76481bc..a5a60ed 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -24,10 +24,8 @@ import ( ) var ( - errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") - errForgeNoTxsBeforeDelay = fmt.Errorf( - "no txs to forge and we haven't reached the forge no txs delay") - errForgeBeforeDelay = fmt.Errorf("we haven't reached the forge delay") + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errSkipBatchByPolicy = fmt.Errorf("skip batch by policy") ) const ( @@ -92,6 +90,12 @@ type Config struct { // the coordinator will immediately forge a batch at the beginning of // a slot if it's the slot winner. IgnoreSlotCommitment bool + // ForgeOncePerSlotIfTxs will make the coordinator forge at most one + // batch per slot, only if there are included txs in that batch, or + // pending l1UserTxs in the smart contract. Setting this parameter + // overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline` + // and `IgnoreSlotCommitment`. + ForgeOncePerSlotIfTxs bool // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index c816c49..aa690b4 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -224,8 +224,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, // 2. Forge the batch internally (make a selection of txs and prepare // all the smart contract arguments) + var skipReason *string p.mutexL2DBUpdateDelete.Lock() - batchInfo, err = p.forgeBatch(batchNum) + batchInfo, skipReason, err = p.forgeBatch(batchNum) p.mutexL2DBUpdateDelete.Unlock() if ctx.Err() != nil { return nil, ctx.Err() @@ -234,13 +235,13 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) - } else if tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || - tracerr.Unwrap(err) == errForgeBeforeDelay { - // no log } else { log.Errorw("forgeBatch", "err", err) } return nil, tracerr.Wrap(err) + } else if skipReason != nil { + log.Debugw("skipping batch", "batch", batchNum, "reason", *skipReason) + return nil, tracerr.Wrap(errSkipBatchByPolicy) } // 3. Send the ZKInputs to the proof server @@ -295,8 +296,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, if p.ctx.Err() != nil { continue } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || - tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || - tracerr.Unwrap(err) == errForgeBeforeDelay { + tracerr.Unwrap(err) == errSkipBatchByPolicy { continue } else if err != nil { p.setErrAtBatchNum(batchNum) @@ -389,25 +389,109 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er return nil } -// check if we reach the ForgeDelay or not before batch forging -func (p *Pipeline) preForgeBatchCheck(slotCommitted bool, now time.Time) error { - if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { - return errForgeBeforeDelay +// slotCommitted returns true if the current slot has already been committed +func (p *Pipeline) slotCommitted() bool { + // Synchronizer has synchronized a batch in the current slot (setting + // CurrentSlot.ForgerCommitment) or the pipeline has already + // internally-forged a batch in the current slot + return p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || + p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged +} + +// forgePolicySkipPreSelection is called before doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPreSelection(now time.Time) (bool, string) { + // Check if the slot is not yet fulfilled + slotCommitted := p.slotCommitted() + if p.cfg.ForgeOncePerSlotIfTxs { + if slotCommitted { + return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed" + } + return false, "" } - return nil + // Determine if we must commit the slot + if !p.cfg.IgnoreSlotCommitment && !slotCommitted { + return false, "" + } + + // If we haven't reached the ForgeDelay, skip forging the batch + if now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { + return true, "we haven't reached the forge delay" + } + return false, "" +} + +// forgePolicySkipPostSelection is called after doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPostSelection(now time.Time, l1UserTxsExtra, l1CoordTxs []common.L1Tx, + poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) (bool, string, error) { + // Check if the slot is not yet fulfilled + slotCommitted := p.slotCommitted() + + pendingTxs := true + if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { + if batchInfo.L1Batch { + // Query the number of unforged L1UserTxs + // (either in a open queue or in a frozen + // not-yet-forged queue). + count, err := p.historyDB.GetUnforgedL1UserTxsCount() + if err != nil { + return false, "", err + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues to be able to + // forge the L1UserTxs in the future. + // Otherwise, skip. + if count == 0 { + pendingTxs = false + } + } else { + pendingTxs = false + } + } + + if p.cfg.ForgeOncePerSlotIfTxs { + if slotCommitted { + return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed", + nil + } + if pendingTxs { + return false, "", nil + } + return true, "cfg.ForgeOncePerSlotIfTxs = true and no pending txs", + nil + } + + // Determine if we must commit the slot + if !p.cfg.IgnoreSlotCommitment && !slotCommitted { + return false, "", nil + } + + // check if there is no txs to forge, no l1UserTxs in the open queue to + // freeze and we haven't reached the ForgeNoTxsDelay + if now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { + if !pendingTxs { + return true, "no txs to forge and we haven't reached the forge no txs delay", + nil + } + } + return false, "", nil } // forgeBatch forges the batchNum batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, + skipReason *string, err error) { // remove transactions from the pool that have been there for too long _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } // Structure to accumulate data and metadata of the batch now := time.Now() @@ -417,53 +501,48 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var poolL2Txs []common.PoolL2Tx var discardedL2Txs []common.PoolL2Tx - var l1UserTxsExtra, l1CoordTxs []common.L1Tx + var l1UserTxs, l1CoordTxs []common.L1Tx var auths [][]byte var coordIdxs []common.Idx - // Check if the slot is not yet fulfilled - slotCommitted := p.cfg.IgnoreSlotCommitment - if p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || - p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged { - slotCommitted = true - } - - // If we haven't reached the ForgeDelay, skip forging the batch - if err = p.preForgeBatchCheck(slotCommitted, now); err != nil { - return nil, tracerr.Wrap(err) + if skip, reason := p.forgePolicySkipPreSelection(now); skip { + return nil, &reason, nil } // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { - return nil, tracerr.Wrap(errLastL1BatchNotSynced) + return nil, nil, tracerr.Wrap(errLastL1BatchNotSynced) } // 2a: L1+L2 txs - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) + _l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } - coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err = - p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, l1UserTxs) + coordIdxs, auths, l1UserTxs, l1CoordTxs, poolL2Txs, discardedL2Txs, err = + p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, _l1UserTxs) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = p.txSelector.GetL2TxSelection(p.cfg.TxProcessorConfig) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } - l1UserTxsExtra = nil + l1UserTxs = nil } - // If there are no txs to forge, no l1UserTxs in the open queue to - // freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the - // batch. - if err = p.postForgeBatchCheck(slotCommitted, now, l1UserTxsExtra, l1CoordTxs, poolL2Txs, batchInfo); err != nil { - return nil, tracerr.Wrap(err) + if skip, reason, err := p.forgePolicySkipPostSelection(now, + l1UserTxs, l1CoordTxs, poolL2Txs, batchInfo); err != nil { + return nil, nil, tracerr.Wrap(err) + } else if skip { + if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { + return nil, nil, tracerr.Wrap(err) + } + return nil, &reason, tracerr.Wrap(err) } if batchInfo.L1Batch { @@ -472,7 +551,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e } // 3. Save metadata from TxSelector output for BatchNum - batchInfo.L1UserTxsExtra = l1UserTxsExtra + batchInfo.L1UserTxs = l1UserTxs batchInfo.L1CoordTxs = l1CoordTxs batchInfo.L1CoordinatorTxsAuths = auths batchInfo.CoordIdxs = coordIdxs @@ -480,10 +559,10 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } // Invalidate transactions that become invalid because of @@ -492,21 +571,21 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e // all the nonces smaller than the current one) err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } // 4. Call BatchBuilder with TxSelector output configBatch := &batchbuilder.ConfigBatch{ TxProcessorConfig: p.cfg.TxProcessorConfig, } - zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, + zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxs, l1CoordTxs, poolL2Txs) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } batchInfo.L2Txs = l2Txs @@ -518,42 +597,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum - return batchInfo, nil -} - -// check if there is no txs to forge, no l1UserTxs in the open queue to freeze and we haven't reached the ForgeNoTxsDelay -func (p *Pipeline) postForgeBatchCheck(slotCommitted bool, now time.Time, l1UserTxsExtra, l1CoordTxs []common.L1Tx, - poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) error { - if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { - noTxs := false - if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { - if batchInfo.L1Batch { - // Query the number of unforged L1UserTxs - // (either in a open queue or in a frozen - // not-yet-forged queue). - count, err := p.historyDB.GetUnforgedL1UserTxsCount() - if err != nil { - return err - } - // If there are future L1UserTxs, we forge a - // batch to advance the queues to be able to - // forge the L1UserTxs in the future. - // Otherwise, skip. - if count == 0 { - noTxs = true - } - } else { - noTxs = true - } - } - if noTxs { - if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { - return err - } - return errForgeNoTxsBeforeDelay - } - } - return nil + return batchInfo, nil, nil } // waitServerProof gets the generated zkProof & sends it to the SmartContract @@ -598,7 +642,7 @@ func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), - L1UserTxs: batchInfo.L1UserTxsExtra, + L1UserTxs: batchInfo.L1UserTxs, L1CoordinatorTxs: batchInfo.L1CoordTxs, L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, L2TxsData: batchInfo.L2Txs, diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go index 1a4a1e4..a43e63d 100644 --- a/coordinator/pipeline_test.go +++ b/coordinator/pipeline_test.go @@ -224,12 +224,12 @@ PoolTransfer(0) User2-User3: 300 (126) batchNum++ - batchInfo, err := pipeline.forgeBatch(batchNum) + batchInfo, _, err := pipeline.forgeBatch(batchNum) require.NoError(t, err) assert.Equal(t, 3, len(batchInfo.L2Txs)) batchNum++ - batchInfo, err = pipeline.forgeBatch(batchNum) + batchInfo, _, err = pipeline.forgeBatch(batchNum) require.NoError(t, err) assert.Equal(t, 0, len(batchInfo.L2Txs)) } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 0a8aacb..d949caa 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -147,7 +147,7 @@ func (t *TxManager) NewAuth(ctx context.Context, batchInfo *BatchInfo) (*bind.Tr auth.Value = big.NewInt(0) // in wei gasLimit := t.cfg.ForgeBatchGasCost.Fixed + - uint64(len(batchInfo.L1UserTxsExtra))*t.cfg.ForgeBatchGasCost.L1UserTx + + uint64(len(batchInfo.L1UserTxs))*t.cfg.ForgeBatchGasCost.L1UserTx + uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx + uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx auth.GasLimit = gasLimit diff --git a/node/node.go b/node/node.go index 7a951a2..870b8fe 100644 --- a/node/node.go +++ b/node/node.go @@ -367,6 +367,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, MustForgeAtSlotDeadline: cfg.Coordinator.MustForgeAtSlotDeadline, IgnoreSlotCommitment: cfg.Coordinator.IgnoreSlotCommitment, + ForgeOncePerSlotIfTxs: cfg.Coordinator.ForgeOncePerSlotIfTxs, ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration,