@ -156,10 +156,9 @@ func NewCoordinator(cfg Config,
return & c , nil
}
func ( c * Coordinator ) newPipeline ( ctx context . Context ,
stats * synchronizer . Stats ) ( * Pipeline , error ) {
func ( c * Coordinator ) newPipeline ( ctx context . Context ) ( * Pipeline , error ) {
return NewPipeline ( ctx , c . cfg , c . historyDB , c . l2DB , c . txSelector ,
c . batchBuilder , c . purger , c . txManager , c . provers , stats , & c . consts )
c . batchBuilder , c . purger , c . txManager , c . provers , & c . consts )
}
// MsgSyncBlock indicates an update to the Synchronizer stats
@ -174,6 +173,7 @@ type MsgSyncBlock struct {
// MsgSyncReorg indicates a reorg
type MsgSyncReorg struct {
Stats synchronizer . Stats
Vars synchronizer . SCVariablesPtr
}
// MsgStopPipeline indicates a signal to reset the pipeline
@ -222,7 +222,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
stats . Eth . LastBlock . Num , "batch" , stats . Sync . LastBatch )
batchNum := common . BatchNum ( stats . Sync . LastBatch )
var err error
if c . pipeline , err = c . newPipeline ( ctx , stats ) ; err != nil {
if c . pipeline , err = c . newPipeline ( ctx ) ; err != nil {
return tracerr . Wrap ( err )
}
if err := c . pipeline . Start ( batchNum , stats . Sync . LastForgeL1TxsNum ,
@ -233,9 +233,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
c . pipelineBatchNum = batchNum
}
} else {
if canForge {
c . pipeline . SetSyncStats ( stats )
} else {
if ! canForge {
log . Infow ( "Coordinator: forging state end" , "block" , stats . Eth . LastBlock . Num )
c . pipeline . Stop ( c . ctx )
c . pipeline = nil
@ -269,15 +267,42 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
func ( c * Coordinator ) handleMsgSyncBlock ( ctx context . Context , msg * MsgSyncBlock ) error {
c . stats = & msg . Stats
// batches := msg.Batches
c . syncSCVars ( msg . Vars )
if c . pipeline != nil {
c . pipeline . SetSyncStatsVars ( & msg . Stats , & msg . Vars )
}
if ! c . stats . Synced ( ) {
return nil
}
c . syncSCVars ( msg . Vars )
return c . syncStats ( ctx , c . stats )
}
func ( c * Coordinator ) handleReorg ( ctx context . Context , msg * MsgSyncReorg ) error {
c . stats = & msg . Stats
c . syncSCVars ( msg . Vars )
if c . pipeline != nil {
c . pipeline . SetSyncStatsVars ( & msg . Stats , & msg . Vars )
}
if common . BatchNum ( c . stats . Sync . LastBatch ) < c . pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log . Infow ( "Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum" ,
"sync.LastBatch" , c . stats . Sync . LastBatch ,
"c.pipelineBatchNum" , c . pipelineBatchNum )
if err := c . handleStopPipeline ( ctx , "reorg" ) ; err != nil {
return tracerr . Wrap ( err )
}
}
return nil
}
func ( c * Coordinator ) handleStopPipeline ( ctx context . Context , reason string ) error {
if err := c . l2DB . Reorg ( common . BatchNum ( c . stats . Sync . LastBatch ) ) ; err != nil {
return tracerr . Wrap ( err )
}
if c . pipeline != nil {
c . pipeline . Stop ( c . ctx )
c . pipeline = nil
@ -295,7 +320,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error {
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleMsgSyncBlock error: %w" , err ) )
}
case MsgSyncReorg :
if err := c . handleReorg ( ctx , & msg . Stats ) ; err != nil {
if err := c . handleReorg ( ctx , & msg ) ; err != nil {
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleReorg error: %w" , err ) )
}
case MsgStopPipeline :
@ -376,27 +401,6 @@ func (c *Coordinator) Stop() {
}
}
func ( c * Coordinator ) handleReorg ( ctx context . Context , stats * synchronizer . Stats ) error {
c . stats = stats
if common . BatchNum ( c . stats . Sync . LastBatch ) < c . pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log . Infow ( "Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum" ,
"sync.LastBatch" , c . stats . Sync . LastBatch ,
"c.pipelineBatchNum" , c . pipelineBatchNum )
if err := c . handleStopPipeline ( ctx , "reorg" ) ; err != nil {
return tracerr . Wrap ( err )
}
if err := c . l2DB . Reorg ( common . BatchNum ( c . stats . Sync . LastBatch ) ) ; err != nil {
return tracerr . Wrap ( err )
}
}
return nil
}
// TxManager handles everything related to ethereum transactions: It makes the
// call to forge, waits for transaction confirmation, and keeps checking them
// until a number of confirmed blocks have passed.
@ -465,6 +469,7 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo)
return tracerr . Wrap ( fmt . Errorf ( "reached max attempts for ethClient.RollupForgeBatch: %w" , err ) )
}
batchInfo . EthTx = ethTx
batchInfo . Status = StatusSent
log . Infow ( "TxManager ethClient.RollupForgeBatch" , "batch" , batchInfo . BatchNum , "tx" , ethTx . Hash ( ) . Hex ( ) )
t . cfg . debugBatchStore ( batchInfo )
if err := t . l2DB . DoneForging ( common . TxIDsFromL2Txs ( batchInfo . L2Txs ) , batchInfo . BatchNum ) ; err != nil {
@ -506,9 +511,13 @@ func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo . Receipt
if receipt != nil {
if receipt . Status == types . ReceiptStatusFailed {
batchInfo . Status = StatusFailed
t . cfg . debugBatchStore ( batchInfo )
log . Errorw ( "TxManager receipt status is failed" , "receipt" , receipt )
return nil , tracerr . Wrap ( fmt . Errorf ( "ethereum transaction receipt statis is failed" ) )
} else if receipt . Status == types . ReceiptStatusSuccessful {
batchInfo . Status = StatusMined
t . cfg . debugBatchStore ( batchInfo )
if batchInfo . BatchNum > t . lastConfirmedBatch {
t . lastConfirmedBatch = batchInfo . BatchNum
}
@ -534,7 +543,7 @@ func (t *TxManager) Run(ctx context.Context) {
case lastBlock := <- t . lastBlockCh :
t . lastBlock = lastBlock
case batchInfo := <- t . batchCh :
if err := t . rollupForgeBatch ( ctx , batchInfo ) ; common . IsErrDone ( err ) {
if err := t . rollupForgeBatch ( ctx , batchInfo ) ; ctx . Err ( ) != nil {
continue
} else if err != nil {
t . coord . SendMsg ( MsgStopPipeline { Reason : fmt . Sprintf ( "forgeBatch call: %v" , err ) } )
@ -550,8 +559,7 @@ func (t *TxManager) Run(ctx context.Context) {
current := next
next = ( current + 1 ) % len ( t . queue )
batchInfo := t . queue [ current ]
err := t . ethTransactionReceipt ( ctx , batchInfo )
if common . IsErrDone ( err ) {
if err := t . ethTransactionReceipt ( ctx , batchInfo ) ; ctx . Err ( ) != nil {
continue
} else if err != nil { //nolint:staticcheck
// We can't get the receipt for the
@ -580,6 +588,11 @@ func (t *TxManager) Run(ctx context.Context) {
}
}
type statsVars struct {
Stats synchronizer . Stats
Vars synchronizer . SCVariablesPtr
}
// Pipeline manages the forging of batches with parallel server proofs
type Pipeline struct {
cfg Config
@ -587,7 +600,6 @@ type Pipeline struct {
// state
batchNum common . BatchNum
vars synchronizer . SCVariables
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
started bool
@ -601,8 +613,9 @@ type Pipeline struct {
batchBuilder * batchbuilder . BatchBuilder
purger * Purger
stats synchronizer . Stats
statsCh chan synchronizer . Stats
stats synchronizer . Stats
vars synchronizer . SCVariables
statsVarsCh chan statsVars
ctx context . Context
wg sync . WaitGroup
@ -619,7 +632,6 @@ func NewPipeline(ctx context.Context,
purger * Purger ,
txManager * TxManager ,
provers [ ] prover . Client ,
stats * synchronizer . Stats ,
scConsts * synchronizer . SCConsts ,
) ( * Pipeline , error ) {
proversPool := NewProversPool ( len ( provers ) )
@ -646,22 +658,22 @@ func NewPipeline(ctx context.Context,
purger : purger ,
txManager : txManager ,
consts : * scConsts ,
stats : * stats ,
statsCh : make ( chan synchronizer . Stats , queueLen ) ,
statsVarsCh : make ( chan statsVars , queueLen ) ,
} , nil
}
// SetSyncStats is a thread safe method to sets the synchronizer Stats
func ( p * Pipeline ) SetSyncStats ( stats * synchronizer . Stats ) {
p . statsCh <- * stats
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func ( p * Pipeline ) SetSyncStatsVars ( stats * synchronizer . Stats , vars * synchronizer . SCVariablesPtr ) {
p . statsVars Ch <- statsVars { Stats : * stats , Vars : * vars }
}
// reset pipeline state
func ( p * Pipeline ) reset ( batchNum common . BatchNum , lastForgeL1TxsNum int64 ,
initSCV ars * synchronizer . SCVariables ) error {
stats * synchronizer . Stats , v ars * synchronizer . SCVariables ) error {
p . batchNum = batchNum
p . lastForgeL1TxsNum = lastForgeL1TxsNum
p . vars = * initSCVars
p . stats = * stats
p . vars = * vars
p . lastScheduledL1BatchBlockNum = 0
err := p . txSelector . Reset ( p . batchNum )
@ -675,15 +687,27 @@ func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64,
return nil
}
func ( p * Pipeline ) syncSCVars ( vars synchronizer . SCVariablesPtr ) {
if vars . Rollup != nil {
p . vars . Rollup = * vars . Rollup
}
if vars . Auction != nil {
p . vars . Auction = * vars . Auction
}
if vars . WDelayer != nil {
p . vars . WDelayer = * vars . WDelayer
}
}
// Start the forging pipeline
func ( p * Pipeline ) Start ( batchNum common . BatchNum , lastForgeL1TxsNum int64 ,
syncStats * synchronizer . Stats , initSCVars * synchronizer . SCVariables ) error {
stats * synchronizer . Stats , v ars * synchronizer . SCVariables ) error {
if p . started {
log . Fatal ( "Pipeline already started" )
}
p . started = true
if err := p . reset ( batchNum , lastForgeL1TxsNum , initSCVars ) ; err != nil {
if err := p . reset ( batchNum , lastForgeL1TxsNum , stats , v ars) ; err != nil {
return tracerr . Wrap ( err )
}
p . ctx , p . cancel = context . WithCancel ( context . Background ( ) )
@ -699,8 +723,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
log . Info ( "Pipeline forgeBatch loop done" )
p . wg . Done ( )
return
case syncStats := <- p . statsCh :
p . stats = syncStats
case statsVars := <- p . statsVarsCh :
p . stats = statsVars . Stats
p . syncSCVars ( statsVars . Vars )
default :
batchNum = p . batchNum + 1
batchInfo , err := p . forgeBatch ( batchNum )
@ -813,6 +838,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) {
var l1UserTxsExtra , l1CoordTxs [ ] common . L1Tx
var auths [ ] [ ] byte
var coordIdxs [ ] common . Idx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p . shouldL1L2Batch ( ) {
batchInfo . L1Batch = true
@ -876,6 +902,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) {
// 5. Save metadata from BatchBuilder output for BatchNum
batchInfo . ZKInputs = zkInputs
batchInfo . Status = StatusForged
p . cfg . debugBatchStore ( & batchInfo )
return & batchInfo , nil
@ -890,7 +917,7 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
batchInfo . Proof = proof
batchInfo . PublicInputs = pubInputs
batchInfo . ForgeBatchArgs = prepareForgeBatchArgs ( batchInfo )
batchInfo . Tx Status = TxStatusPending
batchInfo . Status = StatusProof
p . cfg . debugBatchStore ( batchInfo )
return nil
}
@ -905,7 +932,7 @@ func (p *Pipeline) shouldL1L2Batch() bool {
// Return true if we have passed the l1BatchTimeoutPerc portion of the
// range before the l1batch timeout.
if p . stats . Eth . LastBlock . Num - lastL1BatchBlockNum >=
int64 ( float64 ( p . vars . Rollup . ForgeL1L2BatchTimeout ) * p . cfg . L1BatchTimeoutPerc ) {
int64 ( float64 ( p . vars . Rollup . ForgeL1L2BatchTimeout - 1 ) * p . cfg . L1BatchTimeoutPerc ) {
return true
}
return false