@ -29,6 +29,7 @@ type state struct {
batchNum common . BatchNum
batchNum common . BatchNum
lastScheduledL1BatchBlockNum int64
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
lastForgeL1TxsNum int64
lastSlotForged int64
}
}
// Pipeline manages the forging of batches with parallel server proofs
// Pipeline manages the forging of batches with parallel server proofs
@ -42,6 +43,7 @@ type Pipeline struct {
started bool
started bool
rw sync . RWMutex
rw sync . RWMutex
errAtBatchNum common . BatchNum
errAtBatchNum common . BatchNum
lastForgeTime time . Time
proversPool * ProversPool
proversPool * ProversPool
provers [ ] prover . Client
provers [ ] prover . Client
@ -133,6 +135,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum,
batchNum : batchNum ,
batchNum : batchNum ,
lastForgeL1TxsNum : stats . Sync . LastForgeL1TxsNum ,
lastForgeL1TxsNum : stats . Sync . LastForgeL1TxsNum ,
lastScheduledL1BatchBlockNum : 0 ,
lastScheduledL1BatchBlockNum : 0 ,
lastSlotForged : - 1 ,
}
}
p . stats = * stats
p . stats = * stats
p . vars = * vars
p . vars = * vars
@ -204,6 +207,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu
log . Warnw ( "forgeBatch: scheduled L1Batch too early" , "err" , err ,
log . Warnw ( "forgeBatch: scheduled L1Batch too early" , "err" , err ,
"lastForgeL1TxsNum" , p . state . lastForgeL1TxsNum ,
"lastForgeL1TxsNum" , p . state . lastForgeL1TxsNum ,
"syncLastForgeL1TxsNum" , p . stats . Sync . LastForgeL1TxsNum )
"syncLastForgeL1TxsNum" , p . stats . Sync . LastForgeL1TxsNum )
} else if tracerr . Unwrap ( err ) == errForgeNoTxsBeforeDelay ||
tracerr . Unwrap ( err ) == errForgeBeforeDelay {
// no log
} else {
} else {
log . Errorw ( "forgeBatch" , "err" , err )
log . Errorw ( "forgeBatch" , "err" , err )
}
}
@ -269,7 +275,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
batchInfo , err := p . handleForgeBatch ( p . ctx , batchNum )
batchInfo , err := p . handleForgeBatch ( p . ctx , batchNum )
if p . ctx . Err ( ) != nil {
if p . ctx . Err ( ) != nil {
continue
continue
} else if tracerr . Unwrap ( err ) == errLastL1BatchNotSynced {
} else if tracerr . Unwrap ( err ) == errLastL1BatchNotSynced ||
tracerr . Unwrap ( err ) == errForgeNoTxsBeforeDelay ||
tracerr . Unwrap ( err ) == errForgeBeforeDelay {
waitDuration = p . cfg . ForgeRetryInterval
waitDuration = p . cfg . ForgeRetryInterval
continue
continue
} else if err != nil {
} else if err != nil {
@ -282,6 +290,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
} )
} )
continue
continue
}
}
p . lastForgeTime = time . Now ( )
p . state . batchNum = batchNum
p . state . batchNum = batchNum
select {
select {
@ -373,8 +382,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
// Structure to accumulate data and metadata of the batch
// Structure to accumulate data and metadata of the batch
now := time . Now ( )
batchInfo = & BatchInfo { PipelineNum : p . num , BatchNum : batchNum }
batchInfo = & BatchInfo { PipelineNum : p . num , BatchNum : batchNum }
batchInfo . Debug . StartTimestamp = time . Now ( )
batchInfo . Debug . StartTimestamp = now
batchInfo . Debug . StartBlockNum = p . stats . Eth . LastBlock . Num + 1
batchInfo . Debug . StartBlockNum = p . stats . Eth . LastBlock . Num + 1
selectionCfg := & txselector . SelectionConfig {
selectionCfg := & txselector . SelectionConfig {
@ -388,10 +398,17 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
var auths [ ] [ ] byte
var auths [ ] [ ] byte
var coordIdxs [ ] common . Idx
var coordIdxs [ ] common . Idx
// TODO: If there are no txs and we are behind the timeout, skip
// forging a batch and return a particular error that can be handleded
// in the loop where handleForgeBatch is called to retry after an
// interval
// Check if the slot is not yet fulfilled
slotCommitted := false
if p . stats . Sync . Auction . CurrentSlot . ForgerCommitment ||
p . stats . Sync . Auction . CurrentSlot . SlotNum == p . state . lastSlotForged {
slotCommitted = true
}
// If we haven't reached the ForgeDelay, skip forging the batch
if slotCommitted && now . Sub ( p . lastForgeTime ) < p . cfg . ForgeDelay {
return nil , errForgeBeforeDelay
}
// 1. Decide if we forge L2Tx or L1+L2Tx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p . shouldL1L2Batch ( batchInfo ) {
if p . shouldL1L2Batch ( batchInfo ) {
@ -409,9 +426,6 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
p . state . lastScheduledL1BatchBlockNum = p . stats . Eth . LastBlock . Num + 1
p . state . lastForgeL1TxsNum ++
} else {
} else {
// 2b: only L2 txs
// 2b: only L2 txs
coordIdxs , auths , l1CoordTxs , poolL2Txs , discardedL2Txs , err =
coordIdxs , auths , l1CoordTxs , poolL2Txs , discardedL2Txs , err =
@ -422,6 +436,43 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
l1UserTxsExtra = nil
l1UserTxsExtra = nil
}
}
// If there are no txs to forge, no l1UserTxs in the open queue to
// freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the
// batch.
if slotCommitted && now . Sub ( p . lastForgeTime ) < p . cfg . ForgeNoTxsDelay {
noTxs := false
if len ( l1UserTxsExtra ) == 0 && len ( l1CoordTxs ) == 0 && len ( poolL2Txs ) == 0 {
if batchInfo . L1Batch {
// Query the L1UserTxs in the queue following
// the one we are trying to forge.
nextL1UserTxs , err := p . historyDB . GetUnforgedL1UserTxs (
p . state . lastForgeL1TxsNum + 1 )
if err != nil {
return nil , tracerr . Wrap ( err )
}
// If there are future L1UserTxs, we forge a
// batch to advance the queues and forge the
// L1UserTxs in the future. Otherwise, skip.
if len ( nextL1UserTxs ) == 0 {
noTxs = true
}
} else {
noTxs = true
}
}
if noTxs {
if err := p . txSelector . Reset ( batchInfo . BatchNum - 1 , false ) ; err != nil {
return nil , tracerr . Wrap ( err )
}
return nil , errForgeNoTxsBeforeDelay
}
}
if batchInfo . L1Batch {
p . state . lastScheduledL1BatchBlockNum = p . stats . Eth . LastBlock . Num + 1
p . state . lastForgeL1TxsNum ++
}
// 3. Save metadata from TxSelector output for BatchNum
// 3. Save metadata from TxSelector output for BatchNum
batchInfo . L1UserTxsExtra = l1UserTxsExtra
batchInfo . L1UserTxsExtra = l1UserTxsExtra
batchInfo . L1CoordTxs = l1CoordTxs
batchInfo . L1CoordTxs = l1CoordTxs
@ -466,6 +517,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
p . cfg . debugBatchStore ( batchInfo )
p . cfg . debugBatchStore ( batchInfo )
log . Infow ( "Pipeline: batch forged internally" , "batch" , batchInfo . BatchNum )
log . Infow ( "Pipeline: batch forged internally" , "batch" , batchInfo . BatchNum )
p . state . lastSlotForged = p . stats . Sync . Auction . CurrentSlot . SlotNum
return batchInfo , nil
return batchInfo , nil
}
}