@ -224,8 +224,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context,
// 2. Forge the batch internally (make a selection of txs and prepare
// 2. Forge the batch internally (make a selection of txs and prepare
// all the smart contract arguments)
// all the smart contract arguments)
var skipReason * string
p . mutexL2DBUpdateDelete . Lock ( )
p . mutexL2DBUpdateDelete . Lock ( )
batchInfo , err = p . forgeBatch ( batchNum )
batchInfo , skipReason , err = p . forgeBatch ( batchNum )
p . mutexL2DBUpdateDelete . Unlock ( )
p . mutexL2DBUpdateDelete . Unlock ( )
if ctx . Err ( ) != nil {
if ctx . Err ( ) != nil {
return nil , ctx . Err ( )
return nil , ctx . Err ( )
@ -234,13 +235,13 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context,
log . Warnw ( "forgeBatch: scheduled L1Batch too early" , "err" , err ,
log . Warnw ( "forgeBatch: scheduled L1Batch too early" , "err" , err ,
"lastForgeL1TxsNum" , p . state . lastForgeL1TxsNum ,
"lastForgeL1TxsNum" , p . state . lastForgeL1TxsNum ,
"syncLastForgeL1TxsNum" , p . stats . Sync . LastForgeL1TxsNum )
"syncLastForgeL1TxsNum" , p . stats . Sync . LastForgeL1TxsNum )
} else if tracerr . Unwrap ( err ) == errForgeNoTxsBeforeDelay ||
tracerr . Unwrap ( err ) == errForgeBeforeDelay {
// no log
} else {
} else {
log . Errorw ( "forgeBatch" , "err" , err )
log . Errorw ( "forgeBatch" , "err" , err )
}
}
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
} else if skipReason != nil {
log . Debugw ( "skipping batch" , "batch" , batchNum , "reason" , * skipReason )
return nil , tracerr . Wrap ( errSkipBatchByPolicy )
}
}
// 3. Send the ZKInputs to the proof server
// 3. Send the ZKInputs to the proof server
@ -295,8 +296,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
if p . ctx . Err ( ) != nil {
if p . ctx . Err ( ) != nil {
continue
continue
} else if tracerr . Unwrap ( err ) == errLastL1BatchNotSynced ||
} else if tracerr . Unwrap ( err ) == errLastL1BatchNotSynced ||
tracerr . Unwrap ( err ) == errForgeNoTxsBeforeDelay ||
tracerr . Unwrap ( err ) == errForgeBeforeDelay {
tracerr . Unwrap ( err ) == errSkipBatchByPolicy {
continue
continue
} else if err != nil {
} else if err != nil {
p . setErrAtBatchNum ( batchNum )
p . setErrAtBatchNum ( batchNum )
@ -389,25 +389,109 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er
return nil
return nil
}
}
// check if we reach the ForgeDelay or not before batch forging
func ( p * Pipeline ) preForgeBatchCheck ( slotCommitted bool , now time . Time ) error {
if slotCommitted && now . Sub ( p . lastForgeTime ) < p . cfg . ForgeDelay {
return errForgeBeforeDelay
// slotCommitted returns true if the current slot has already been committed
func ( p * Pipeline ) slotCommitted ( ) bool {
// Synchronizer has synchronized a batch in the current slot (setting
// CurrentSlot.ForgerCommitment) or the pipeline has already
// internally-forged a batch in the current slot
return p . stats . Sync . Auction . CurrentSlot . ForgerCommitment ||
p . stats . Sync . Auction . CurrentSlot . SlotNum == p . state . lastSlotForged
}
// forgePolicySkipPreSelection is called before doing a tx selection in a batch to
// determine by policy if we should forge the batch or not. Returns true and
// the reason when the forging of the batch must be skipped.
func ( p * Pipeline ) forgePolicySkipPreSelection ( now time . Time ) ( bool , string ) {
// Check if the slot is not yet fulfilled
slotCommitted := p . slotCommitted ( )
if p . cfg . ForgeOncePerSlotIfTxs {
if slotCommitted {
return true , "cfg.ForgeOncePerSlotIfTxs = true and slot already committed"
}
return false , ""
}
}
return nil
// Determine if we must commit the slot
if ! p . cfg . IgnoreSlotCommitment && ! slotCommitted {
return false , ""
}
// If we haven't reached the ForgeDelay, skip forging the batch
if now . Sub ( p . lastForgeTime ) < p . cfg . ForgeDelay {
return true , "we haven't reached the forge delay"
}
return false , ""
}
// forgePolicySkipPostSelection is called after doing a tx selection in a batch to
// determine by policy if we should forge the batch or not. Returns true and
// the reason when the forging of the batch must be skipped.
func ( p * Pipeline ) forgePolicySkipPostSelection ( now time . Time , l1UserTxsExtra , l1CoordTxs [ ] common . L1Tx ,
poolL2Txs [ ] common . PoolL2Tx , batchInfo * BatchInfo ) ( bool , string , error ) {
// Check if the slot is not yet fulfilled
slotCommitted := p . slotCommitted ( )
pendingTxs := true
if len ( l1UserTxsExtra ) == 0 && len ( l1CoordTxs ) == 0 && len ( poolL2Txs ) == 0 {
if batchInfo . L1Batch {
// Query the number of unforged L1UserTxs
// (either in a open queue or in a frozen
// not-yet-forged queue).
count , err := p . historyDB . GetUnforgedL1UserTxsCount ( )
if err != nil {
return false , "" , err
}
// If there are future L1UserTxs, we forge a
// batch to advance the queues to be able to
// forge the L1UserTxs in the future.
// Otherwise, skip.
if count == 0 {
pendingTxs = false
}
} else {
pendingTxs = false
}
}
if p . cfg . ForgeOncePerSlotIfTxs {
if slotCommitted {
return true , "cfg.ForgeOncePerSlotIfTxs = true and slot already committed" ,
nil
}
if pendingTxs {
return false , "" , nil
}
return true , "cfg.ForgeOncePerSlotIfTxs = true and no pending txs" ,
nil
}
// Determine if we must commit the slot
if ! p . cfg . IgnoreSlotCommitment && ! slotCommitted {
return false , "" , nil
}
// check if there is no txs to forge, no l1UserTxs in the open queue to
// freeze and we haven't reached the ForgeNoTxsDelay
if now . Sub ( p . lastForgeTime ) < p . cfg . ForgeNoTxsDelay {
if ! pendingTxs {
return true , "no txs to forge and we haven't reached the forge no txs delay" ,
nil
}
}
return false , "" , nil
}
}
// forgeBatch forges the batchNum batch.
// forgeBatch forges the batchNum batch.
func ( p * Pipeline ) forgeBatch ( batchNum common . BatchNum ) ( batchInfo * BatchInfo , err error ) {
func ( p * Pipeline ) forgeBatch ( batchNum common . BatchNum ) ( batchInfo * BatchInfo ,
skipReason * string , err error ) {
// remove transactions from the pool that have been there for too long
// remove transactions from the pool that have been there for too long
_ , err = p . purger . InvalidateMaybe ( p . l2DB , p . txSelector . LocalAccountsDB ( ) ,
_ , err = p . purger . InvalidateMaybe ( p . l2DB , p . txSelector . LocalAccountsDB ( ) ,
p . stats . Sync . LastBlock . Num , int64 ( batchNum ) )
p . stats . Sync . LastBlock . Num , int64 ( batchNum ) )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
_ , err = p . purger . PurgeMaybe ( p . l2DB , p . stats . Sync . LastBlock . Num , int64 ( batchNum ) )
_ , err = p . purger . PurgeMaybe ( p . l2DB , p . stats . Sync . LastBlock . Num , int64 ( batchNum ) )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
// Structure to accumulate data and metadata of the batch
// Structure to accumulate data and metadata of the batch
now := time . Now ( )
now := time . Now ( )
@ -417,53 +501,48 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
var poolL2Txs [ ] common . PoolL2Tx
var poolL2Txs [ ] common . PoolL2Tx
var discardedL2Txs [ ] common . PoolL2Tx
var discardedL2Txs [ ] common . PoolL2Tx
var l1UserTxsExtra , l1CoordTxs [ ] common . L1Tx
var l1UserTxs , l1CoordTxs [ ] common . L1Tx
var auths [ ] [ ] byte
var auths [ ] [ ] byte
var coordIdxs [ ] common . Idx
var coordIdxs [ ] common . Idx
// Check if the slot is not yet fulfilled
slotCommitted := p . cfg . IgnoreSlotCommitment
if p . stats . Sync . Auction . CurrentSlot . ForgerCommitment ||
p . stats . Sync . Auction . CurrentSlot . SlotNum == p . state . lastSlotForged {
slotCommitted = true
}
// If we haven't reached the ForgeDelay, skip forging the batch
if err = p . preForgeBatchCheck ( slotCommitted , now ) ; err != nil {
return nil , tracerr . Wrap ( err )
if skip , reason := p . forgePolicySkipPreSelection ( now ) ; skip {
return nil , & reason , nil
}
}
// 1. Decide if we forge L2Tx or L1+L2Tx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p . shouldL1L2Batch ( batchInfo ) {
if p . shouldL1L2Batch ( batchInfo ) {
batchInfo . L1Batch = true
batchInfo . L1Batch = true
if p . state . lastForgeL1TxsNum != p . stats . Sync . LastForgeL1TxsNum {
if p . state . lastForgeL1TxsNum != p . stats . Sync . LastForgeL1TxsNum {
return nil , tracerr . Wrap ( errLastL1BatchNotSynced )
return nil , nil , tracerr . Wrap ( errLastL1BatchNotSynced )
}
}
// 2a: L1+L2 txs
// 2a: L1+L2 txs
l1UserTxs , err := p . historyDB . GetUnforgedL1UserTxs ( p . state . lastForgeL1TxsNum + 1 )
_ l1UserTxs, err := p . historyDB . GetUnforgedL1UserTxs ( p . state . lastForgeL1TxsNum + 1 )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
coordIdxs , auths , l1UserTxsExtra , l1CoordTxs , poolL2Txs , discardedL2Txs , err =
p . txSelector . GetL1L2TxSelection ( p . cfg . TxProcessorConfig , l1UserTxs )
coordIdxs , auths , l1UserTxs , l1CoordTxs , poolL2Txs , discardedL2Txs , err =
p . txSelector . GetL1L2TxSelection ( p . cfg . TxProcessorConfig , _ l1UserTxs)
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
} else {
} else {
// 2b: only L2 txs
// 2b: only L2 txs
coordIdxs , auths , l1CoordTxs , poolL2Txs , discardedL2Txs , err =
coordIdxs , auths , l1CoordTxs , poolL2Txs , discardedL2Txs , err =
p . txSelector . GetL2TxSelection ( p . cfg . TxProcessorConfig )
p . txSelector . GetL2TxSelection ( p . cfg . TxProcessorConfig )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
l1UserTxsExtra = nil
l1UserTxs = nil
}
}
// If there are no txs to forge, no l1UserTxs in the open queue to
// freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the
// batch.
if err = p . postForgeBatchCheck ( slotCommitted , now , l1UserTxsExtra , l1CoordTxs , poolL2Txs , batchInfo ) ; err != nil {
return nil , tracerr . Wrap ( err )
if skip , reason , err := p . forgePolicySkipPostSelection ( now ,
l1UserTxs , l1CoordTxs , poolL2Txs , batchInfo ) ; err != nil {
return nil , nil , tracerr . Wrap ( err )
} else if skip {
if err := p . txSelector . Reset ( batchInfo . BatchNum - 1 , false ) ; err != nil {
return nil , nil , tracerr . Wrap ( err )
}
return nil , & reason , tracerr . Wrap ( err )
}
}
if batchInfo . L1Batch {
if batchInfo . L1Batch {
@ -472,7 +551,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
}
}
// 3. Save metadata from TxSelector output for BatchNum
// 3. Save metadata from TxSelector output for BatchNum
batchInfo . L1UserTxsExtra = l1UserTxsExtra
batchInfo . L1UserTxs = l1UserTxs
batchInfo . L1CoordTxs = l1CoordTxs
batchInfo . L1CoordTxs = l1CoordTxs
batchInfo . L1CoordinatorTxsAuths = auths
batchInfo . L1CoordinatorTxsAuths = auths
batchInfo . CoordIdxs = coordIdxs
batchInfo . CoordIdxs = coordIdxs
@ -480,10 +559,10 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
if err := p . l2DB . StartForging ( common . TxIDsFromPoolL2Txs ( poolL2Txs ) ,
if err := p . l2DB . StartForging ( common . TxIDsFromPoolL2Txs ( poolL2Txs ) ,
batchInfo . BatchNum ) ; err != nil {
batchInfo . BatchNum ) ; err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
if err := p . l2DB . UpdateTxsInfo ( discardedL2Txs ) ; err != nil {
if err := p . l2DB . UpdateTxsInfo ( discardedL2Txs ) ; err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
// Invalidate transactions that become invalid because of
// Invalidate transactions that become invalid because of
@ -492,21 +571,21 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
// all the nonces smaller than the current one)
// all the nonces smaller than the current one)
err = p . l2DB . InvalidateOldNonces ( idxsNonceFromPoolL2Txs ( poolL2Txs ) , batchInfo . BatchNum )
err = p . l2DB . InvalidateOldNonces ( idxsNonceFromPoolL2Txs ( poolL2Txs ) , batchInfo . BatchNum )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
// 4. Call BatchBuilder with TxSelector output
// 4. Call BatchBuilder with TxSelector output
configBatch := & batchbuilder . ConfigBatch {
configBatch := & batchbuilder . ConfigBatch {
TxProcessorConfig : p . cfg . TxProcessorConfig ,
TxProcessorConfig : p . cfg . TxProcessorConfig ,
}
}
zkInputs , err := p . batchBuilder . BuildBatch ( coordIdxs , configBatch , l1UserTxsExtra ,
zkInputs , err := p . batchBuilder . BuildBatch ( coordIdxs , configBatch , l1UserTxs ,
l1CoordTxs , poolL2Txs )
l1CoordTxs , poolL2Txs )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
l2Txs , err := common . PoolL2TxsToL2Txs ( poolL2Txs ) // NOTE: This is a big uggly, find a better way
l2Txs , err := common . PoolL2TxsToL2Txs ( poolL2Txs ) // NOTE: This is a big uggly, find a better way
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , nil , tracerr . Wrap ( err )
}
}
batchInfo . L2Txs = l2Txs
batchInfo . L2Txs = l2Txs
@ -518,42 +597,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
p . state . lastSlotForged = p . stats . Sync . Auction . CurrentSlot . SlotNum
p . state . lastSlotForged = p . stats . Sync . Auction . CurrentSlot . SlotNum
return batchInfo , nil
}
// check if there is no txs to forge, no l1UserTxs in the open queue to freeze and we haven't reached the ForgeNoTxsDelay
func ( p * Pipeline ) postForgeBatchCheck ( slotCommitted bool , now time . Time , l1UserTxsExtra , l1CoordTxs [ ] common . L1Tx ,
poolL2Txs [ ] common . PoolL2Tx , batchInfo * BatchInfo ) error {
if slotCommitted && now . Sub ( p . lastForgeTime ) < p . cfg . ForgeNoTxsDelay {
noTxs := false
if len ( l1UserTxsExtra ) == 0 && len ( l1CoordTxs ) == 0 && len ( poolL2Txs ) == 0 {
if batchInfo . L1Batch {
// Query the number of unforged L1UserTxs
// (either in a open queue or in a frozen
// not-yet-forged queue).
count , err := p . historyDB . GetUnforgedL1UserTxsCount ( )
if err != nil {
return err
}
// If there are future L1UserTxs, we forge a
// batch to advance the queues to be able to
// forge the L1UserTxs in the future.
// Otherwise, skip.
if count == 0 {
noTxs = true
}
} else {
noTxs = true
}
}
if noTxs {
if err := p . txSelector . Reset ( batchInfo . BatchNum - 1 , false ) ; err != nil {
return err
}
return errForgeNoTxsBeforeDelay
}
}
return nil
return batchInfo , nil , nil
}
}
// waitServerProof gets the generated zkProof & sends it to the SmartContract
// waitServerProof gets the generated zkProof & sends it to the SmartContract
@ -598,7 +642,7 @@ func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
NewLastIdx : int64 ( zki . Metadata . NewLastIdxRaw ) ,
NewLastIdx : int64 ( zki . Metadata . NewLastIdxRaw ) ,
NewStRoot : zki . Metadata . NewStateRootRaw . BigInt ( ) ,
NewStRoot : zki . Metadata . NewStateRootRaw . BigInt ( ) ,
NewExitRoot : zki . Metadata . NewExitRootRaw . BigInt ( ) ,
NewExitRoot : zki . Metadata . NewExitRootRaw . BigInt ( ) ,
L1UserTxs : batchInfo . L1UserTxsExtra ,
L1UserTxs : batchInfo . L1UserTxs ,
L1CoordinatorTxs : batchInfo . L1CoordTxs ,
L1CoordinatorTxs : batchInfo . L1CoordTxs ,
L1CoordinatorTxsAuths : batchInfo . L1CoordinatorTxsAuths ,
L1CoordinatorTxsAuths : batchInfo . L1CoordinatorTxsAuths ,
L2TxsData : batchInfo . L2Txs ,
L2TxsData : batchInfo . L2Txs ,