@ -24,6 +24,8 @@ import (
"github.com/hermeznetwork/tracerr"
)
var errLastL1BatchNotSynced = fmt . Errorf ( "last L1Batch not synced yet" )
const queueLen = 16
// Config contains the Coordinator configuration
@ -39,6 +41,9 @@ type Config struct {
// EthClientAttempts is the number of attempts to do an eth client RPC
// call before giving up
EthClientAttempts int
// ForgeRetryInterval is the waiting interval between calls forge a
// batch after an error
ForgeRetryInterval time . Duration
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time . Duration
@ -225,8 +230,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
if c . pipeline , err = c . newPipeline ( ctx ) ; err != nil {
return tracerr . Wrap ( err )
}
if err := c . pipeline . Start ( batchNum , stats . Sync . LastForgeL1TxsNum ,
stats , & c . vars ) ; err != nil {
if err := c . pipeline . Start ( batchNum , stats , & c . vars ) ; err != nil {
c . pipeline = nil
return tracerr . Wrap ( err )
}
@ -348,7 +352,7 @@ func (c *Coordinator) Start() {
c . wg . Add ( 1 )
go func ( ) {
waitDuration := time . Duration ( longWaitDuration )
waitDuration := longWaitDuration
for {
select {
case <- c . ctx . Done ( ) :
@ -360,23 +364,23 @@ func (c *Coordinator) Start() {
continue
} else if err != nil {
log . Errorw ( "Coordinator.handleMsg" , "err" , err )
waitDuration = time . Duration ( c . cfg . SyncRetryInterval )
waitDuration = c . cfg . SyncRetryInterval
continue
}
waitDuration = time . Duration ( longWaitDuration )
waitDuration = longWaitDuration
case <- time . After ( waitDuration ) :
if c . stats == nil {
waitDuration = time . Duration ( longWaitDuration )
waitDuration = longWaitDuration
continue
}
if err := c . syncStats ( c . ctx , c . stats ) ; c . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "Coordinator.syncStats" , "err" , err )
waitDuration = time . Duration ( c . cfg . SyncRetryInterval )
waitDuration = c . cfg . SyncRetryInterval
continue
}
waitDuration = time . Duration ( longWaitDuration )
waitDuration = longWaitDuration
}
}
} ( )
@ -540,7 +544,7 @@ const longWaitDuration = 999 * time.Hour
// Run the TxManager
func ( t * TxManager ) Run ( ctx context . Context ) {
next := 0
waitDuration := time . Duration ( longWaitDuration )
waitDuration := longWaitDuration
for {
select {
@ -675,10 +679,10 @@ func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronize
}
// reset pipeline state
func ( p * Pipeline ) reset ( batchNum common . BatchNum , lastForgeL1TxsNum int64 ,
func ( p * Pipeline ) reset ( batchNum common . BatchNum ,
stats * synchronizer . Stats , vars * synchronizer . SCVariables ) error {
p . batchNum = batchNum
p . lastForgeL1TxsNum = lastForgeL1TxsNum
p . lastForgeL1TxsNum = stats . Sync . L astForgeL1TxsNum
p . stats = * stats
p . vars = * vars
p . lastScheduledL1BatchBlockNum = 0
@ -706,15 +710,49 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
}
}
func ( p * Pipeline ) handleForgeBatch ( ctx context . Context , batchNum common . BatchNum ) ( * BatchInfo , error ) {
batchInfo , err := p . forgeBatch ( batchNum )
if ctx . Err ( ) != nil {
return nil , ctx . Err ( )
} else if err != nil {
if tracerr . Unwrap ( err ) == errLastL1BatchNotSynced {
log . Warnw ( "forgeBatch: scheduled L1Batch too early" , "err" , err ,
"lastForgeL1TxsNum" , p . lastForgeL1TxsNum ,
"syncLastForgeL1TxsNum" , p . stats . Sync . LastForgeL1TxsNum )
} else {
log . Errorw ( "forgeBatch" , "err" , err )
}
return nil , err
}
// 6. Wait for an available server proof (blocking call)
serverProof , err := p . proversPool . Get ( ctx )
if ctx . Err ( ) != nil {
return nil , ctx . Err ( )
} else if err != nil {
log . Errorw ( "proversPool.Get" , "err" , err )
return nil , err
}
batchInfo . ServerProof = serverProof
if err := p . sendServerProof ( ctx , batchInfo ) ; ctx . Err ( ) != nil {
return nil , ctx . Err ( )
} else if err != nil {
log . Errorw ( "sendServerProof" , "err" , err )
batchInfo . ServerProof = nil
p . proversPool . Add ( serverProof )
return nil , err
}
return batchInfo , nil
}
// Start the forging pipeline
func ( p * Pipeline ) Start ( batchNum common . BatchNum , lastForgeL1TxsNum int64 ,
func ( p * Pipeline ) Start ( batchNum common . BatchNum ,
stats * synchronizer . Stats , vars * synchronizer . SCVariables ) error {
if p . started {
log . Fatal ( "Pipeline already started" )
}
p . started = true
if err := p . reset ( batchNum , lastForgeL1TxsNum , stats , vars ) ; err != nil {
if err := p . reset ( batchNum , stats , vars ) ; err != nil {
return tracerr . Wrap ( err )
}
p . ctx , p . cancel = context . WithCancel ( context . Background ( ) )
@ -723,7 +761,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
batchChSentServerProof := make ( chan * BatchInfo , queueSize )
p . wg . Add ( 1 )
const zeroDuration = 0 * time . Second
go func ( ) {
waitDuration := zeroDuration
for {
select {
case <- p . ctx . Done ( ) :
@ -733,34 +773,15 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
case statsVars := <- p . statsVarsCh :
p . stats = statsVars . Stats
p . syncSCVars ( statsVars . Vars )
default :
case <- time . After ( waitDuration ) :
batchNum = p . batchNum + 1
batchInfo , err := p . forgeBatch ( batchNum )
if p . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "forgeBatch" , "err" , err )
continue
}
// 6. Wait for an available server proof (blocking call)
serverProof , err := p . proversPool . Get ( p . ctx )
if p . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "proversPool.Get" , "err" , err )
continue
}
batchInfo . ServerProof = serverProof
if err := p . sendServerProof ( p . ctx , batchInfo ) ; p . ctx . Err ( ) != nil {
continue
} else if err != nil {
log . Errorw ( "sendServerProof" , "err" , err )
batchInfo . ServerProof = nil
p . proversPool . Add ( serverProof )
if batchInfo , err := p . handleForgeBatch ( p . ctx , batchNum ) ; err != nil {
waitDuration = p . cfg . SyncRetryInterval
continue
} else {
p . batchNum = batchNum
batchChSentServerProof <- batchInfo
}
p . batchNum = batchNum
batchChSentServerProof <- batchInfo
}
}
} ( )
@ -823,9 +844,9 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er
}
// forgeBatch the next batch.
func ( p * Pipeline ) forgeBatch ( batchNum common . BatchNum ) ( * BatchInfo , error ) {
func ( p * Pipeline ) forgeBatch ( batchNum common . BatchNum ) ( batchInfo * BatchInfo , err error ) {
// remove transactions from the pool that have been there for too long
_ , err : = p . purger . InvalidateMaybe ( p . l2DB , p . txSelector . LocalAccountsDB ( ) ,
_ , err = p . purger . InvalidateMaybe ( p . l2DB , p . txSelector . LocalAccountsDB ( ) ,
p . stats . Sync . LastBlock . Num , int64 ( batchNum ) )
if err != nil {
return nil , tracerr . Wrap ( err )
@ -835,7 +856,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) {
return nil , tracerr . Wrap ( err )
}
batchInfo := BatchInfo { BatchNum : batchNum } // to accumulate metadata of the batch
batchInfo = & BatchInfo { BatchNum : batchNum } // to accumulate metadata of the batch
batchInfo . Debug . StartTimestamp = time . Now ( )
batchInfo . Debug . StartBlockNum = p . stats . Eth . LastBlock . Num + 1
@ -851,12 +872,23 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) {
var coordIdxs [ ] common . Idx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p . shouldL1L2Batch ( & batchInfo ) {
if p . shouldL1L2Batch ( batchInfo ) {
batchInfo . L1Batch = true
p . lastScheduledL1BatchBlockNum = p . stats . Eth . LastBlock . Num + 1
defer func ( ) {
// If there's no error, update the parameters related
// to the last L1Batch forged
if err == nil {
p . lastScheduledL1BatchBlockNum = p . stats . Eth . LastBlock . Num + 1
p . lastForgeL1TxsNum ++
}
} ( )
if p . lastForgeL1TxsNum != p . stats . Sync . LastForgeL1TxsNum {
return nil , tracerr . Wrap ( errLastL1BatchNotSynced )
//return nil, fmt.Errorf("Not synced yet LastForgeL1TxsNum. Expecting %v, got %v",
// p.lastForgeL1TxsNum, p.stats.Sync.LastForgeL1TxsNum)
}
// 2a: L1+L2 txs
p . lastForgeL1TxsNum ++
l1UserTxs , err := p . historyDB . GetUnforgedL1UserTxs ( p . lastForgeL1TxsNum )
l1UserTxs , err := p . historyDB . GetUnforgedL1UserTxs ( p . lastForgeL1TxsNum + 1 )
if err != nil {
return nil , tracerr . Wrap ( err )
}
@ -914,9 +946,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) {
// 5. Save metadata from BatchBuilder output for BatchNum
batchInfo . ZKInputs = zkInputs
batchInfo . Debug . Status = StatusForged
p . cfg . debugBatchStore ( & batchInfo )
p . cfg . debugBatchStore ( batchInfo )
return & batchInfo , nil
return batchInfo , nil
}
// waitServerProof gets the generated zkProof & sends it to the SmartContract