diff --git a/config/config.go b/config/config.go index 343904d..1db7a32 100644 --- a/config/config.go +++ b/config/config.go @@ -112,11 +112,17 @@ type Coordinator struct { // MustForgeAtSlotDeadline enables the coordinator to forge slots if // the empty slots reach the slot deadline. MustForgeAtSlotDeadline bool - // IgnoreSlotCommitment IgnoreSlotCommitment disables forcing the - // coordinator to forge a slot immediately when the slot is not - // committed. If set to false, the coordinator will immediately forge - // a batch at the beginning of a slot if it's the slot winner. + // IgnoreSlotCommitment disables forcing the coordinator to forge a + // slot immediately when the slot is not committed. If set to false, + // the coordinator will immediately forge a batch at the beginning of a + // slot if it's the slot winner. IgnoreSlotCommitment bool + // ForgeOncePerSlotIfTxs will make the coordinator forge at most one + // batch per slot, only if there are included txs in that batch, or + // pending l1UserTxs in the smart contract. Setting this parameter + // overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline` + // and `IgnoreSlotCommitment`. + ForgeOncePerSlotIfTxs bool // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` diff --git a/coordinator/batch.go b/coordinator/batch.go index 3ff236b..8df15d8 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -85,7 +85,7 @@ type BatchInfo struct { PublicInputs []*big.Int L1Batch bool VerifierIdx uint8 - L1UserTxsExtra []common.L1Tx + L1UserTxs []common.L1Tx L1CoordTxs []common.L1Tx L1CoordinatorTxsAuths [][]byte L2Txs []common.L2Tx diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 76481bc..a5a60ed 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -24,10 +24,8 @@ import ( ) var ( - errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") - errForgeNoTxsBeforeDelay = fmt.Errorf( - "no txs to forge and we haven't reached the forge no txs delay") - errForgeBeforeDelay = fmt.Errorf("we haven't reached the forge delay") + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + errSkipBatchByPolicy = fmt.Errorf("skip batch by policy") ) const ( @@ -92,6 +90,12 @@ type Config struct { // the coordinator will immediately forge a batch at the beginning of // a slot if it's the slot winner. IgnoreSlotCommitment bool + // ForgeOncePerSlotIfTxs will make the coordinator forge at most one + // batch per slot, only if there are included txs in that batch, or + // pending l1UserTxs in the smart contract. Setting this parameter + // overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline` + // and `IgnoreSlotCommitment`. + ForgeOncePerSlotIfTxs bool // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index c816c49..aa690b4 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -224,8 +224,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, // 2. Forge the batch internally (make a selection of txs and prepare // all the smart contract arguments) + var skipReason *string p.mutexL2DBUpdateDelete.Lock() - batchInfo, err = p.forgeBatch(batchNum) + batchInfo, skipReason, err = p.forgeBatch(batchNum) p.mutexL2DBUpdateDelete.Unlock() if ctx.Err() != nil { return nil, ctx.Err() @@ -234,13 +235,13 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum, "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) - } else if tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || - tracerr.Unwrap(err) == errForgeBeforeDelay { - // no log } else { log.Errorw("forgeBatch", "err", err) } return nil, tracerr.Wrap(err) + } else if skipReason != nil { + log.Debugw("skipping batch", "batch", batchNum, "reason", *skipReason) + return nil, tracerr.Wrap(errSkipBatchByPolicy) } // 3. Send the ZKInputs to the proof server @@ -295,8 +296,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, if p.ctx.Err() != nil { continue } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || - tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || - tracerr.Unwrap(err) == errForgeBeforeDelay { + tracerr.Unwrap(err) == errSkipBatchByPolicy { continue } else if err != nil { p.setErrAtBatchNum(batchNum) @@ -389,25 +389,109 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er return nil } -// check if we reach the ForgeDelay or not before batch forging -func (p *Pipeline) preForgeBatchCheck(slotCommitted bool, now time.Time) error { - if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { - return errForgeBeforeDelay +// slotCommitted returns true if the current slot has already been committed +func (p *Pipeline) slotCommitted() bool { + // Synchronizer has synchronized a batch in the current slot (setting + // CurrentSlot.ForgerCommitment) or the pipeline has already + // internally-forged a batch in the current slot + return p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || + p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged +} + +// forgePolicySkipPreSelection is called before doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPreSelection(now time.Time) (bool, string) { + // Check if the slot is not yet fulfilled + slotCommitted := p.slotCommitted() + if p.cfg.ForgeOncePerSlotIfTxs { + if slotCommitted { + return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed" + } + return false, "" } - return nil + // Determine if we must commit the slot + if !p.cfg.IgnoreSlotCommitment && !slotCommitted { + return false, "" + } + + // If we haven't reached the ForgeDelay, skip forging the batch + if now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay { + return true, "we haven't reached the forge delay" + } + return false, "" +} + +// forgePolicySkipPostSelection is called after doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPostSelection(now time.Time, l1UserTxsExtra, l1CoordTxs []common.L1Tx, + poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) (bool, string, error) { + // Check if the slot is not yet fulfilled + slotCommitted := p.slotCommitted() + + pendingTxs := true + if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { + if batchInfo.L1Batch { + // Query the number of unforged L1UserTxs + // (either in a open queue or in a frozen + // not-yet-forged queue). + count, err := p.historyDB.GetUnforgedL1UserTxsCount() + if err != nil { + return false, "", err + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues to be able to + // forge the L1UserTxs in the future. + // Otherwise, skip. + if count == 0 { + pendingTxs = false + } + } else { + pendingTxs = false + } + } + + if p.cfg.ForgeOncePerSlotIfTxs { + if slotCommitted { + return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed", + nil + } + if pendingTxs { + return false, "", nil + } + return true, "cfg.ForgeOncePerSlotIfTxs = true and no pending txs", + nil + } + + // Determine if we must commit the slot + if !p.cfg.IgnoreSlotCommitment && !slotCommitted { + return false, "", nil + } + + // check if there is no txs to forge, no l1UserTxs in the open queue to + // freeze and we haven't reached the ForgeNoTxsDelay + if now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { + if !pendingTxs { + return true, "no txs to forge and we haven't reached the forge no txs delay", + nil + } + } + return false, "", nil } // forgeBatch forges the batchNum batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, + skipReason *string, err error) { // remove transactions from the pool that have been there for too long _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } // Structure to accumulate data and metadata of the batch now := time.Now() @@ -417,53 +501,48 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e var poolL2Txs []common.PoolL2Tx var discardedL2Txs []common.PoolL2Tx - var l1UserTxsExtra, l1CoordTxs []common.L1Tx + var l1UserTxs, l1CoordTxs []common.L1Tx var auths [][]byte var coordIdxs []common.Idx - // Check if the slot is not yet fulfilled - slotCommitted := p.cfg.IgnoreSlotCommitment - if p.stats.Sync.Auction.CurrentSlot.ForgerCommitment || - p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged { - slotCommitted = true - } - - // If we haven't reached the ForgeDelay, skip forging the batch - if err = p.preForgeBatchCheck(slotCommitted, now); err != nil { - return nil, tracerr.Wrap(err) + if skip, reason := p.forgePolicySkipPreSelection(now); skip { + return nil, &reason, nil } // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { - return nil, tracerr.Wrap(errLastL1BatchNotSynced) + return nil, nil, tracerr.Wrap(errLastL1BatchNotSynced) } // 2a: L1+L2 txs - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) + _l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } - coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err = - p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, l1UserTxs) + coordIdxs, auths, l1UserTxs, l1CoordTxs, poolL2Txs, discardedL2Txs, err = + p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, _l1UserTxs) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } } else { // 2b: only L2 txs coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = p.txSelector.GetL2TxSelection(p.cfg.TxProcessorConfig) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } - l1UserTxsExtra = nil + l1UserTxs = nil } - // If there are no txs to forge, no l1UserTxs in the open queue to - // freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the - // batch. - if err = p.postForgeBatchCheck(slotCommitted, now, l1UserTxsExtra, l1CoordTxs, poolL2Txs, batchInfo); err != nil { - return nil, tracerr.Wrap(err) + if skip, reason, err := p.forgePolicySkipPostSelection(now, + l1UserTxs, l1CoordTxs, poolL2Txs, batchInfo); err != nil { + return nil, nil, tracerr.Wrap(err) + } else if skip { + if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { + return nil, nil, tracerr.Wrap(err) + } + return nil, &reason, tracerr.Wrap(err) } if batchInfo.L1Batch { @@ -472,7 +551,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e } // 3. Save metadata from TxSelector output for BatchNum - batchInfo.L1UserTxsExtra = l1UserTxsExtra + batchInfo.L1UserTxs = l1UserTxs batchInfo.L1CoordTxs = l1CoordTxs batchInfo.L1CoordinatorTxsAuths = auths batchInfo.CoordIdxs = coordIdxs @@ -480,10 +559,10 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } // Invalidate transactions that become invalid because of @@ -492,21 +571,21 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e // all the nonces smaller than the current one) err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } // 4. Call BatchBuilder with TxSelector output configBatch := &batchbuilder.ConfigBatch{ TxProcessorConfig: p.cfg.TxProcessorConfig, } - zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, + zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxs, l1CoordTxs, poolL2Txs) if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way if err != nil { - return nil, tracerr.Wrap(err) + return nil, nil, tracerr.Wrap(err) } batchInfo.L2Txs = l2Txs @@ -518,42 +597,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum - return batchInfo, nil -} - -// check if there is no txs to forge, no l1UserTxs in the open queue to freeze and we haven't reached the ForgeNoTxsDelay -func (p *Pipeline) postForgeBatchCheck(slotCommitted bool, now time.Time, l1UserTxsExtra, l1CoordTxs []common.L1Tx, - poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) error { - if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay { - noTxs := false - if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 { - if batchInfo.L1Batch { - // Query the number of unforged L1UserTxs - // (either in a open queue or in a frozen - // not-yet-forged queue). - count, err := p.historyDB.GetUnforgedL1UserTxsCount() - if err != nil { - return err - } - // If there are future L1UserTxs, we forge a - // batch to advance the queues to be able to - // forge the L1UserTxs in the future. - // Otherwise, skip. - if count == 0 { - noTxs = true - } - } else { - noTxs = true - } - } - if noTxs { - if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { - return err - } - return errForgeNoTxsBeforeDelay - } - } - return nil + return batchInfo, nil, nil } // waitServerProof gets the generated zkProof & sends it to the SmartContract @@ -598,7 +642,7 @@ func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), - L1UserTxs: batchInfo.L1UserTxsExtra, + L1UserTxs: batchInfo.L1UserTxs, L1CoordinatorTxs: batchInfo.L1CoordTxs, L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, L2TxsData: batchInfo.L2Txs, diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go index 1a4a1e4..a43e63d 100644 --- a/coordinator/pipeline_test.go +++ b/coordinator/pipeline_test.go @@ -224,12 +224,12 @@ PoolTransfer(0) User2-User3: 300 (126) batchNum++ - batchInfo, err := pipeline.forgeBatch(batchNum) + batchInfo, _, err := pipeline.forgeBatch(batchNum) require.NoError(t, err) assert.Equal(t, 3, len(batchInfo.L2Txs)) batchNum++ - batchInfo, err = pipeline.forgeBatch(batchNum) + batchInfo, _, err = pipeline.forgeBatch(batchNum) require.NoError(t, err) assert.Equal(t, 0, len(batchInfo.L2Txs)) } diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 0a8aacb..d949caa 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -147,7 +147,7 @@ func (t *TxManager) NewAuth(ctx context.Context, batchInfo *BatchInfo) (*bind.Tr auth.Value = big.NewInt(0) // in wei gasLimit := t.cfg.ForgeBatchGasCost.Fixed + - uint64(len(batchInfo.L1UserTxsExtra))*t.cfg.ForgeBatchGasCost.L1UserTx + + uint64(len(batchInfo.L1UserTxs))*t.cfg.ForgeBatchGasCost.L1UserTx + uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx + uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx auth.GasLimit = gasLimit diff --git a/node/node.go b/node/node.go index 7a951a2..870b8fe 100644 --- a/node/node.go +++ b/node/node.go @@ -367,6 +367,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, MustForgeAtSlotDeadline: cfg.Coordinator.MustForgeAtSlotDeadline, IgnoreSlotCommitment: cfg.Coordinator.IgnoreSlotCommitment, + ForgeOncePerSlotIfTxs: cfg.Coordinator.ForgeOncePerSlotIfTxs, ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration,