@ -4,6 +4,7 @@ import (
"context"
"context"
"fmt"
"fmt"
"math/big"
"math/big"
"os"
"strings"
"strings"
"sync"
"sync"
"time"
"time"
@ -111,6 +112,12 @@ func NewCoordinator(cfg Config,
cfg . EthClientAttempts ) )
cfg . EthClientAttempts ) )
}
}
if cfg . DebugBatchPath != "" {
if err := os . MkdirAll ( cfg . DebugBatchPath , 0744 ) ; err != nil {
return nil , tracerr . Wrap ( err )
}
}
purger := Purger {
purger := Purger {
cfg : cfg . Purger ,
cfg : cfg . Purger ,
lastPurgeBlock : 0 ,
lastPurgeBlock : 0 ,
@ -147,9 +154,10 @@ func NewCoordinator(cfg Config,
return & c , nil
return & c , nil
}
}
func ( c * Coordinator ) newPipeline ( ctx context . Context ) ( * Pipeline , error ) {
func ( c * Coordinator ) newPipeline ( ctx context . Context ,
stats * synchronizer . Stats ) ( * Pipeline , error ) {
return NewPipeline ( ctx , c . cfg , c . historyDB , c . l2DB , c . txSelector ,
return NewPipeline ( ctx , c . cfg , c . historyDB , c . l2DB , c . txSelector ,
c . batchBuilder , c . purger , c . txManager , c . provers , & c . consts )
c . batchBuilder , c . purger , c . txManager , c . provers , stats , & c . consts )
}
}
// MsgSyncBlock indicates an update to the Synchronizer stats
// MsgSyncBlock indicates an update to the Synchronizer stats
@ -226,7 +234,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
stats . Eth . LastBlock . Num , "batch" , stats . Sync . LastBatch )
stats . Eth . LastBlock . Num , "batch" , stats . Sync . LastBatch )
batchNum := common . BatchNum ( stats . Sync . LastBatch )
batchNum := common . BatchNum ( stats . Sync . LastBatch )
var err error
var err error
if c . pipeline , err = c . newPipeline ( ctx ) ; err != nil {
if c . pipeline , err = c . newPipeline ( ctx , stats ) ; err != nil {
return tracerr . Wrap ( err )
return tracerr . Wrap ( err )
}
}
if err := c . pipeline . Start ( batchNum , stats . Sync . LastForgeL1TxsNum ,
if err := c . pipeline . Start ( batchNum , stats . Sync . LastForgeL1TxsNum ,
@ -295,22 +303,16 @@ func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) err
func ( c * Coordinator ) handleMsg ( ctx context . Context , msg interface { } ) error {
func ( c * Coordinator ) handleMsg ( ctx context . Context , msg interface { } ) error {
switch msg := msg . ( type ) {
switch msg := msg . ( type ) {
case MsgSyncBlock :
case MsgSyncBlock :
if err := c . handleMsgSyncBlock ( ctx , & msg ) ; common . IsErrDone ( err ) {
return nil
} else if err != nil {
if err := c . handleMsgSyncBlock ( ctx , & msg ) ; err != nil {
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleMsgSyncBlock error: %w" , err ) )
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleMsgSyncBlock error: %w" , err ) )
}
}
case MsgSyncReorg :
case MsgSyncReorg :
if err := c . handleReorg ( ctx , & msg . Stats ) ; common . IsErrDone ( err ) {
return nil
} else if err != nil {
if err := c . handleReorg ( ctx , & msg . Stats ) ; err != nil {
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleReorg error: %w" , err ) )
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleReorg error: %w" , err ) )
}
}
case MsgStopPipeline :
case MsgStopPipeline :
log . Infow ( "Coordinator received MsgStopPipeline" , "reason" , msg . Reason )
log . Infow ( "Coordinator received MsgStopPipeline" , "reason" , msg . Reason )
if err := c . handleStopPipeline ( ctx , msg . Reason ) ; common . IsErrDone ( err ) {
return nil
} else if err != nil {
if err := c . handleStopPipeline ( ctx , msg . Reason ) ; err != nil {
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleStopPipeline: %w" , err ) )
return tracerr . Wrap ( fmt . Errorf ( "Coordinator.handleStopPipeline: %w" , err ) )
}
}
default :
default :
@ -341,7 +343,9 @@ func (c *Coordinator) Start() {
c . wg . Done ( )
c . wg . Done ( )
return
return
case msg := <- c . msgCh :
case msg := <- c . msgCh :
if err := c . handleMsg ( c . ctx , msg ) ; err != nil {
if err := c . handleMsg ( c . ctx , msg ) ; c . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "Coordinator.handleMsg" , "err" , err )
log . Errorw ( "Coordinator.handleMsg" , "err" , err )
waitDuration = time . Duration ( c . cfg . SyncRetryInterval )
waitDuration = time . Duration ( c . cfg . SyncRetryInterval )
continue
continue
@ -352,7 +356,9 @@ func (c *Coordinator) Start() {
waitDuration = time . Duration ( longWaitDuration )
waitDuration = time . Duration ( longWaitDuration )
continue
continue
}
}
if err := c . syncStats ( c . ctx , c . stats ) ; err != nil {
if err := c . syncStats ( c . ctx , c . stats ) ; c . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "Coordinator.syncStats" , "err" , err )
log . Errorw ( "Coordinator.syncStats" , "err" , err )
waitDuration = time . Duration ( c . cfg . SyncRetryInterval )
waitDuration = time . Duration ( c . cfg . SyncRetryInterval )
continue
continue
@ -456,7 +462,8 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo)
return tracerr . Wrap ( err )
return tracerr . Wrap ( err )
}
}
log . Errorw ( "TxManager ethClient.RollupForgeBatch" ,
log . Errorw ( "TxManager ethClient.RollupForgeBatch" ,
"attempt" , attempt , "err" , err , "block" , t . lastBlock )
"attempt" , attempt , "err" , err , "block" , t . lastBlock ,
"batchNum" , batchInfo . BatchNum )
} else {
} else {
break
break
}
}
@ -484,6 +491,9 @@ func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchI
var err error
var err error
for attempt := 0 ; attempt < t . cfg . EthClientAttempts ; attempt ++ {
for attempt := 0 ; attempt < t . cfg . EthClientAttempts ; attempt ++ {
receipt , err = t . ethClient . EthTransactionReceipt ( ctx , txHash )
receipt , err = t . ethClient . EthTransactionReceipt ( ctx , txHash )
if ctx . Err ( ) != nil {
continue
}
if err != nil {
if err != nil {
log . Errorw ( "TxManager ethClient.EthTransactionReceipt" ,
log . Errorw ( "TxManager ethClient.EthTransactionReceipt" ,
"attempt" , attempt , "err" , err )
"attempt" , attempt , "err" , err )
@ -621,6 +631,7 @@ func NewPipeline(ctx context.Context,
purger * Purger ,
purger * Purger ,
txManager * TxManager ,
txManager * TxManager ,
provers [ ] prover . Client ,
provers [ ] prover . Client ,
stats * synchronizer . Stats ,
scConsts * synchronizer . SCConsts ,
scConsts * synchronizer . SCConsts ,
) ( * Pipeline , error ) {
) ( * Pipeline , error ) {
proversPool := NewProversPool ( len ( provers ) )
proversPool := NewProversPool ( len ( provers ) )
@ -647,6 +658,7 @@ func NewPipeline(ctx context.Context,
purger : purger ,
purger : purger ,
txManager : txManager ,
txManager : txManager ,
consts : * scConsts ,
consts : * scConsts ,
stats : * stats ,
statsCh : make ( chan synchronizer . Stats , queueLen ) ,
statsCh : make ( chan synchronizer . Stats , queueLen ) ,
} , nil
} , nil
}
}
@ -697,7 +709,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
for {
for {
select {
select {
case <- p . ctx . Done ( ) :
case <- p . ctx . Done ( ) :
log . Debug ( "Pipeline forgeBatch loop done" )
log . Info ( "Pipeline forgeBatch loop done" )
p . wg . Done ( )
p . wg . Done ( )
return
return
case syncStats := <- p . statsCh :
case syncStats := <- p . statsCh :
@ -705,7 +717,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
default :
default :
batchNum = p . batchNum + 1
batchNum = p . batchNum + 1
batchInfo , err := p . forgeBatch ( p . ctx , batchNum , selectionConfig )
batchInfo , err := p . forgeBatch ( p . ctx , batchNum , selectionConfig )
if common . IsErrDone ( err ) {
if p . ctx . Err ( ) != nil {
continue
continue
} else if err != nil {
} else if err != nil {
log . Errorw ( "forgeBatch" , "err" , err )
log . Errorw ( "forgeBatch" , "err" , err )
@ -713,14 +725,16 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
}
}
// 6. Wait for an available server proof (blocking call)
// 6. Wait for an available server proof (blocking call)
serverProof , err := p . proversPool . Get ( p . ctx )
serverProof , err := p . proversPool . Get ( p . ctx )
if common . IsErrDone ( err ) {
if p . ctx . Err ( ) != nil {
continue
continue
} else if err != nil {
} else if err != nil {
log . Errorw ( "proversPool.Get" , "err" , err )
log . Errorw ( "proversPool.Get" , "err" , err )
continue
continue
}
}
batchInfo . ServerProof = serverProof
batchInfo . ServerProof = serverProof
if err := p . sendServerProof ( p . ctx , batchInfo ) ; err != nil {
if err := p . sendServerProof ( p . ctx , batchInfo ) ; p . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "sendServerProof" , "err" , err )
log . Errorw ( "sendServerProof" , "err" , err )
batchInfo . ServerProof = nil
batchInfo . ServerProof = nil
p . proversPool . Add ( serverProof )
p . proversPool . Add ( serverProof )
@ -737,17 +751,17 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
for {
for {
select {
select {
case <- p . ctx . Done ( ) :
case <- p . ctx . Done ( ) :
log . Debug ( "Pipeline waitServerProofSendEth loop done" )
log . Info ( "Pipeline waitServerProofSendEth loop done" )
p . wg . Done ( )
p . wg . Done ( )
return
return
case batchInfo := <- batchChSentServerProof :
case batchInfo := <- batchChSentServerProof :
err := p . waitServerProof ( p . ctx , batchInfo )
err := p . waitServerProof ( p . ctx , batchInfo )
if common . IsErrDone ( err ) {
continue
}
// We are done with this serverProof, add it back to the pool
// We are done with this serverProof, add it back to the pool
p . proversPool . Add ( batchInfo . ServerProof )
p . proversPool . Add ( batchInfo . ServerProof )
batchInfo . ServerProof = nil
batchInfo . ServerProof = nil
if p . ctx . Err ( ) != nil {
continue
}
if err != nil {
if err != nil {
log . Errorw ( "waitServerProof" , "err" , err )
log . Errorw ( "waitServerProof" , "err" , err )
continue
continue
@ -765,7 +779,7 @@ func (p *Pipeline) Stop(ctx context.Context) {
log . Fatal ( "Pipeline already stopped" )
log . Fatal ( "Pipeline already stopped" )
}
}
p . started = false
p . started = false
log . Debug ( "Stopping Pipeline..." )
log . Info ( "Stopping Pipeline..." )
p . cancel ( )
p . cancel ( )
p . wg . Wait ( )
p . wg . Wait ( )
for _ , prover := range p . provers {
for _ , prover := range p . provers {
@ -899,7 +913,7 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
}
}
batchInfo . Proof = proof
batchInfo . Proof = proof
batchInfo . PublicInputs = pubInputs
batchInfo . PublicInputs = pubInputs
batchInfo . ForgeBatchArgs = p . prepareForgeBatchArgs ( batchInfo )
batchInfo . ForgeBatchArgs = prepareForgeBatchArgs ( batchInfo )
batchInfo . TxStatus = TxStatusPending
batchInfo . TxStatus = TxStatusPending
p . cfg . debugBatchStore ( batchInfo )
p . cfg . debugBatchStore ( batchInfo )
return nil
return nil
@ -921,7 +935,7 @@ func (p *Pipeline) shouldL1L2Batch() bool {
return false
return false
}
}
func ( p * Pipeline ) prepareForgeBatchArgs ( batchInfo * BatchInfo ) * eth . RollupForgeBatchArgs {
func prepareForgeBatchArgs ( batchInfo * BatchInfo ) * eth . RollupForgeBatchArgs {
proof := batchInfo . Proof
proof := batchInfo . Proof
zki := batchInfo . ZKInputs
zki := batchInfo . ZKInputs
return & eth . RollupForgeBatchArgs {
return & eth . RollupForgeBatchArgs {