@ -3,6 +3,7 @@ package coordinator
import (
import (
"context"
"context"
"fmt"
"fmt"
"strings"
"sync"
"sync"
"time"
"time"
@ -11,18 +12,16 @@ import (
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/prover"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/hermeznetwork/tracerr"
"github.com/hermeznetwork/tracerr"
)
)
var errTODO = fmt . Errorf ( "TODO" )
// ErrDone is returned when the function is stopped asynchronously via a done
// (terminated) context. It doesn't indicate an error.
var ErrDone = fmt . Errorf ( "done" )
const queueLen = 16
// Config contains the Coordinator configuration
// Config contains the Coordinator configuration
type Config struct {
type Config struct {
@ -60,15 +59,16 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
// Coordinator implements the Coordinator type
// Coordinator implements the Coordinator type
type Coordinator struct {
type Coordinator struct {
// State
// State
batchNum common . BatchNum
serverProofs [ ] ServerProofInterface
consts synchronizer . SCConsts
vars synchronizer . SCVariables
started bool
pipelineBatchNum common . BatchNum // batchNum from which we started the pipeline
provers [ ] prover . Client
consts synchronizer . SCConsts
vars synchronizer . SCVariables
started bool
cfg Config
cfg Config
historyDB * historydb . HistoryDB
historyDB * historydb . HistoryDB
l2DB * l2db . L2DB
txSelector * txselector . TxSelector
txSelector * txselector . TxSelector
batchBuilder * batchbuilder . BatchBuilder
batchBuilder * batchbuilder . BatchBuilder
@ -85,9 +85,10 @@ type Coordinator struct {
// NewCoordinator creates a new Coordinator
// NewCoordinator creates a new Coordinator
func NewCoordinator ( cfg Config ,
func NewCoordinator ( cfg Config ,
historyDB * historydb . HistoryDB ,
historyDB * historydb . HistoryDB ,
l2DB * l2db . L2DB ,
txSelector * txselector . TxSelector ,
txSelector * txselector . TxSelector ,
batchBuilder * batchbuilder . BatchBuilder ,
batchBuilder * batchbuilder . BatchBuilder ,
serverProofs [ ] ServerProofInterface ,
serverProofs [ ] prover . Client ,
ethClient eth . ClientInterface ,
ethClient eth . ClientInterface ,
scConsts * synchronizer . SCConsts ,
scConsts * synchronizer . SCConsts ,
initSCVars * synchronizer . SCVariables ,
initSCVars * synchronizer . SCVariables ,
@ -102,18 +103,17 @@ func NewCoordinator(cfg Config,
cfg . EthClientAttempts ) )
cfg . EthClientAttempts ) )
}
}
txManager := NewTxManager ( & cfg , ethClient )
ctx , cancel := context . WithCancel ( context . Background ( ) )
ctx , cancel := context . WithCancel ( context . Background ( ) )
c := Coordinator {
c := Coordinator {
batchNum : - 1 ,
serverProofs : serverProofs ,
consts : * scConsts ,
vars : * initSCVars ,
pipelineBatchNum : - 1 ,
provers : serverProofs ,
consts : * scConsts ,
vars : * initSCVars ,
cfg : cfg ,
cfg : cfg ,
historyDB : historyDB ,
historyDB : historyDB ,
l2DB : l2DB ,
txSelector : txSelector ,
txSelector : txSelector ,
batchBuilder : batchBuilder ,
batchBuilder : batchBuilder ,
@ -123,15 +123,15 @@ func NewCoordinator(cfg Config,
ctx : ctx ,
ctx : ctx ,
// wg
// wg
cancel : cancel ,
cancel : cancel ,
txManager : txManager ,
}
}
txManager := NewTxManager ( & cfg , ethClient , l2DB , & c )
c . txManager = txManager
return & c , nil
return & c , nil
}
}
func ( c * Coordinator ) newPipeline ( ) * Pipeline {
return NewPipeline ( c . cfg , c . historyDB , c . txSelector , c . batchBuilder ,
c . txManager , c . serverProof s, & c . consts )
func ( c * Coordinator ) newPipeline ( ctx context . Context ) ( * Pipeline , error ) {
return NewPipeline ( ctx , c . cfg , c . historyDB , c . l2DB ,
c . txSelector , c . batchBuilder , c . tx Manager , c . prover s, & c . consts )
}
}
// MsgSyncStats indicates an update to the Synchronizer stats
// MsgSyncStats indicates an update to the Synchronizer stats
@ -148,6 +148,12 @@ type MsgSyncSCVars struct {
// MsgSyncReorg indicates a reorg
// MsgSyncReorg indicates a reorg
type MsgSyncReorg struct {
type MsgSyncReorg struct {
Stats synchronizer . Stats
}
// MsgStopPipeline indicates a signal to reset the pipeline
type MsgStopPipeline struct {
Reason string
}
}
// SendMsg is a thread safe method to pass a message to the Coordinator
// SendMsg is a thread safe method to pass a message to the Coordinator
@ -180,7 +186,7 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
return false
return false
}
}
func ( c * Coordinator ) handleMsgSyncStats ( stats * synchronizer . Stats ) error {
func ( c * Coordinator ) handleMsgSyncStats ( ctx context . Context , stats * synchronizer . Stats ) error {
if ! stats . Synced ( ) {
if ! stats . Synced ( ) {
return nil
return nil
}
}
@ -189,25 +195,43 @@ func (c *Coordinator) handleMsgSyncStats(stats *synchronizer.Stats) error {
canForge := c . canForge ( stats )
canForge := c . canForge ( stats )
if c . pipeline == nil {
if c . pipeline == nil {
if canForge {
if canForge {
log . Info ( "Coordinator: forging state begin" )
log . Infow ( "Coordinator: forging state begin" , "block" , stats . Eth . LastBlock . Num ,
"batch" , stats . Sync . LastBatch )
batchNum := common . BatchNum ( stats . Sync . LastBatch )
batchNum := common . BatchNum ( stats . Sync . LastBatch )
c . pipeline = c . newPipeline ( )
if err := c . pipeline . Start ( batchNum , stats , & c . vars ) ; err != nil {
var err error
if c . pipeline , err = c . newPipeline ( ctx ) ; err != nil {
return tracerr . Wrap ( err )
}
if err := c . pipeline . Start ( batchNum , stats . Sync . LastForgeL1TxsNum ,
stats , & c . vars ) ; err != nil {
c . pipeline = nil
return tracerr . Wrap ( err )
return tracerr . Wrap ( err )
}
}
c . pipelineBatchNum = batchNum
}
}
} else {
} else {
if canForge {
if canForge {
c . pipeline . SetSyncStats ( stats )
c . pipeline . SetSyncStats ( stats )
} else {
} else {
log . Info ( "Coordinator: forging state end" )
c . pipeline . Stop ( )
log . Infow ( "Coordinator: forging state end" , "block" , stats . Eth . LastBlock . Num )
c . pipeline . Stop ( c . ctx )
c . pipeline = nil
c . pipeline = nil
}
}
}
}
return nil
return nil
}
}
func ( c * Coordinator ) handleStopPipeline ( ctx context . Context , reason string ) error {
if c . pipeline != nil {
c . pipeline . Stop ( c . ctx )
c . pipeline = nil
}
if strings . Contains ( reason , common . AuctionErrMsgCannotForge ) { //nolint:staticcheck
// TODO: Check that we are in a slot in which we can't forge
}
return nil
}
// Start the coordinator
// Start the coordinator
func ( c * Coordinator ) Start ( ) {
func ( c * Coordinator ) Start ( ) {
if c . started {
if c . started {
@ -232,15 +256,26 @@ func (c *Coordinator) Start() {
switch msg := msg . ( type ) {
switch msg := msg . ( type ) {
case MsgSyncStats :
case MsgSyncStats :
stats := msg . Stats
stats := msg . Stats
if err := c . handleMsgSyncStats ( & stats ) ; err != nil {
if err := c . handleMsgSyncStats ( c . ctx , & stats ) ; common . IsErrDone ( err ) {
continue
} else if err != nil {
log . Errorw ( "Coordinator.handleMsgSyncStats error" , "err" , err )
log . Errorw ( "Coordinator.handleMsgSyncStats error" , "err" , err )
continue
continue
}
}
case MsgSyncReorg :
case MsgSyncReorg :
if err := c . handleReorg ( ) ; err != nil {
if err := c . handleReorg ( c . ctx , & msg . Stats ) ; common . IsErrDone ( err ) {
continue
} else if err != nil {
log . Errorw ( "Coordinator.handleReorg error" , "err" , err )
log . Errorw ( "Coordinator.handleReorg error" , "err" , err )
continue
continue
}
}
case MsgStopPipeline :
log . Infow ( "Coordinator received MsgStopPipeline" , "reason" , msg . Reason )
if err := c . handleStopPipeline ( c . ctx , msg . Reason ) ; common . IsErrDone ( err ) {
continue
} else if err != nil {
log . Errorw ( "Coordinator.handleStopPipeline" , "err" , err )
}
case MsgSyncSCVars :
case MsgSyncSCVars :
c . handleMsgSyncSCVars ( & msg )
c . handleMsgSyncSCVars ( & msg )
default :
default :
@ -251,6 +286,8 @@ func (c *Coordinator) Start() {
} ( )
} ( )
}
}
const stopCtxTimeout = 200 * time . Millisecond
// Stop the coordinator
// Stop the coordinator
func ( c * Coordinator ) Stop ( ) {
func ( c * Coordinator ) Stop ( ) {
if ! c . started {
if ! c . started {
@ -261,13 +298,31 @@ func (c *Coordinator) Stop() {
c . cancel ( )
c . cancel ( )
c . wg . Wait ( )
c . wg . Wait ( )
if c . pipeline != nil {
if c . pipeline != nil {
c . pipeline . Stop ( )
ctx , cancel := context . WithTimeout ( context . Background ( ) , stopCtxTimeout )
defer cancel ( )
c . pipeline . Stop ( ctx )
c . pipeline = nil
c . pipeline = nil
}
}
}
}
func ( c * Coordinator ) handleReorg ( ) error {
return nil // TODO
func ( c * Coordinator ) handleReorg ( ctx context . Context , stats * synchronizer . Stats ) error {
if common . BatchNum ( stats . Sync . LastBatch ) < c . pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log . Infow ( "Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum" ,
"sync.LastBatch" , stats . Sync . LastBatch ,
"c.pipelineBatchNum" , c . pipelineBatchNum )
if err := c . handleStopPipeline ( ctx , "reorg" ) ; err != nil {
return tracerr . Wrap ( err )
}
if err := c . l2DB . Reorg ( common . BatchNum ( stats . Sync . LastBatch ) ) ; err != nil {
return tracerr . Wrap ( err )
}
}
return nil
}
}
// TxManager handles everything related to ethereum transactions: It makes the
// TxManager handles everything related to ethereum transactions: It makes the
@ -276,21 +331,26 @@ func (c *Coordinator) handleReorg() error {
type TxManager struct {
type TxManager struct {
cfg Config
cfg Config
ethClient eth . ClientInterface
ethClient eth . ClientInterface
l2DB * l2db . L2DB // Used only to mark forged txs as forged in the L2DB
coord * Coordinator // Used only to send messages to stop the pipeline
batchCh chan * BatchInfo
batchCh chan * BatchInfo
lastBlockCh chan int64
lastBlockCh chan int64
queue [ ] * BatchInfo
queue [ ] * BatchInfo
lastBlock int64
lastBlock int64
// lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed
lastConfirmedBatch common . BatchNum
}
}
// NewTxManager creates a new TxManager
// NewTxManager creates a new TxManager
func NewTxManager ( cfg * Config , ethClient eth . ClientInterface ) * TxManager {
func NewTxManager ( cfg * Config , ethClient eth . ClientInterface , l2DB * l2db . L2DB ,
coord * Coordinator ) * TxManager {
return & TxManager {
return & TxManager {
cfg : * cfg ,
ethClient : ethClient ,
// TODO: Find best queue size
batchCh : make ( chan * BatchInfo , 16 ) , //nolint:gomnd
// TODO: Find best queue size
lastBlockCh : make ( chan int64 , 16 ) , //nolint:gomnd
cfg : * cfg ,
ethClient : ethClient ,
l2DB : l2DB ,
coord : coord ,
batchCh : make ( chan * BatchInfo , queueLen ) ,
lastBlockCh : make ( chan int64 , queueLen ) ,
lastBlock : - 1 ,
lastBlock : - 1 ,
}
}
}
}
@ -312,14 +372,19 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo)
for attempt := 0 ; attempt < t . cfg . EthClientAttempts ; attempt ++ {
for attempt := 0 ; attempt < t . cfg . EthClientAttempts ; attempt ++ {
ethTx , err = t . ethClient . RollupForgeBatch ( batchInfo . ForgeBatchArgs )
ethTx , err = t . ethClient . RollupForgeBatch ( batchInfo . ForgeBatchArgs )
if err != nil {
if err != nil {
if strings . Contains ( err . Error ( ) , common . AuctionErrMsgCannotForge ) {
log . Debugw ( "TxManager ethClient.RollupForgeBatch" , "err" , err ,
"block" , t . lastBlock )
return tracerr . Wrap ( err )
}
log . Errorw ( "TxManager ethClient.RollupForgeBatch" ,
log . Errorw ( "TxManager ethClient.RollupForgeBatch" ,
"attempt" , attempt , "err" , err )
"attempt" , attempt , "err" , err , "block" , t . lastBlock )
} else {
} else {
break
break
}
}
select {
select {
case <- ctx . Done ( ) :
case <- ctx . Done ( ) :
return tracerr . Wrap ( ErrDone )
return tracerr . Wrap ( common . ErrDone )
case <- time . After ( t . cfg . EthClientAttemptsDelay ) :
case <- time . After ( t . cfg . EthClientAttemptsDelay ) :
}
}
}
}
@ -327,7 +392,11 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo)
return tracerr . Wrap ( fmt . Errorf ( "reached max attempts for ethClient.RollupForgeBatch: %w" , err ) )
return tracerr . Wrap ( fmt . Errorf ( "reached max attempts for ethClient.RollupForgeBatch: %w" , err ) )
}
}
batchInfo . EthTx = ethTx
batchInfo . EthTx = ethTx
log . Infow ( "TxManager ethClient.RollupForgeBatch" , "batch" , batchInfo . BatchNum , "tx" , ethTx . Hash ( ) . Hex ( ) )
t . cfg . debugBatchStore ( batchInfo )
t . cfg . debugBatchStore ( batchInfo )
if err := t . l2DB . DoneForging ( l2TxsIDs ( batchInfo . L2Txs ) , batchInfo . BatchNum ) ; err != nil {
return tracerr . Wrap ( err )
}
return nil
return nil
}
}
@ -345,7 +414,7 @@ func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchI
}
}
select {
select {
case <- ctx . Done ( ) :
case <- ctx . Done ( ) :
return tracerr . Wrap ( ErrDone )
return tracerr . Wrap ( common . ErrDone )
case <- time . After ( t . cfg . EthClientAttemptsDelay ) :
case <- time . After ( t . cfg . EthClientAttemptsDelay ) :
}
}
}
}
@ -364,6 +433,9 @@ func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
log . Errorw ( "TxManager receipt status is failed" , "receipt" , receipt )
log . Errorw ( "TxManager receipt status is failed" , "receipt" , receipt )
return nil , tracerr . Wrap ( fmt . Errorf ( "ethereum transaction receipt statis is failed" ) )
return nil , tracerr . Wrap ( fmt . Errorf ( "ethereum transaction receipt statis is failed" ) )
} else if receipt . Status == types . ReceiptStatusSuccessful {
} else if receipt . Status == types . ReceiptStatusSuccessful {
if batchInfo . BatchNum > t . lastConfirmedBatch {
t . lastConfirmedBatch = batchInfo . BatchNum
}
confirm := t . lastBlock - receipt . BlockNumber . Int64 ( )
confirm := t . lastBlock - receipt . BlockNumber . Int64 ( )
return & confirm , nil
return & confirm , nil
}
}
@ -385,10 +457,10 @@ func (t *TxManager) Run(ctx context.Context) {
case lastBlock := <- t . lastBlockCh :
case lastBlock := <- t . lastBlockCh :
t . lastBlock = lastBlock
t . lastBlock = lastBlock
case batchInfo := <- t . batchCh :
case batchInfo := <- t . batchCh :
if err := t . rollupForgeBatch ( ctx , batchInfo ) ; tracerr . Unwrap ( err ) == ErrDone {
if err := t . rollupForgeBatch ( ctx , batchInfo ) ; common . IsErrDone ( err ) {
continue
continue
} else if err != nil {
} else if err != nil {
// TODO: Reset pipeline
t . coord . SendMsg ( MsgStopPipeline { Reason : fmt . Sprintf ( "forgeBatch call: %v" , err ) } )
continue
continue
}
}
log . Debugf ( "ethClient ForgeCall sent, batchNum: %d" , batchInfo . BatchNum )
log . Debugf ( "ethClient ForgeCall sent, batchNum: %d" , batchInfo . BatchNum )
@ -398,35 +470,35 @@ func (t *TxManager) Run(ctx context.Context) {
if len ( t . queue ) == 0 {
if len ( t . queue ) == 0 {
continue
continue
}
}
batchInfo := t . queue [ next ]
current := next
next = ( current + 1 ) % len ( t . queue )
batchInfo := t . queue [ current ]
err := t . ethTransactionReceipt ( ctx , batchInfo )
err := t . ethTransactionReceipt ( ctx , batchInfo )
if tracerr . Unwrap ( err ) == ErrDone {
if common . IsErrDone ( err ) {
continue
continue
} else if err != nil { //nolint:staticcheck
} else if err != nil { //nolint:staticcheck
// We can't get the receipt for the
// We can't get the receipt for the
// transaction, so we can't confirm if it was
// transaction, so we can't confirm if it was
// mined
// mined
// TODO: Reset pipeline
t . coord . SendMsg ( MsgStopPipeline { Reason : fmt . Sprintf ( "forgeBatch receipt: %v" , err ) } )
}
}
confirm , err := t . handleReceipt ( batchInfo )
confirm , err := t . handleReceipt ( batchInfo )
if err != nil { //nolint:staticcheck
if err != nil { //nolint:staticcheck
// Transaction was rejected
// Transaction was rejected
// TODO: Reset pipeline
t . coord . SendMsg ( MsgStopPipeline { Reason : fmt . Sprintf ( "forgeBatch reject: %v" , err ) } )
}
}
if confirm != nil && * confirm >= t . cfg . ConfirmBlocks {
if confirm != nil && * confirm >= t . cfg . ConfirmBlocks {
log . Debugw ( "TxManager tx for RollupForgeBatch confirmed" ,
log . Debugw ( "TxManager tx for RollupForgeBatch confirmed" ,
"batchNum " , batchInfo . BatchNum )
t . queue = t . queue [ 1 : ]
"batch" , batchInfo . BatchNum )
t . queue = append ( t . queue [ : current ] , t . queue [ current + 1 : ] ... )
if len ( t . queue ) == 0 {
if len ( t . queue ) == 0 {
waitTime = longWaitTime
waitTime = longWaitTime
next = 0
} else {
next = current % len ( t . queue )
}
}
}
}
if len ( t . queue ) == 0 {
next = 0
} else {
next = ( next + 1 ) % len ( t . queue )
}
}
}
}
}
}
}
@ -440,13 +512,16 @@ type Pipeline struct {
batchNum common . BatchNum
batchNum common . BatchNum
vars synchronizer . SCVariables
vars synchronizer . SCVariables
lastScheduledL1BatchBlockNum int64
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
started bool
started bool
serverProofPool * ServerProofPool
txManager * TxManager
historyDB * historydb . HistoryDB
txSelector * txselector . TxSelector
batchBuilder * batchbuilder . BatchBuilder
proversPool * ProversPool
provers [ ] prover . Client
txManager * TxManager
historyDB * historydb . HistoryDB
l2DB * l2db . L2DB
txSelector * txselector . TxSelector
batchBuilder * batchbuilder . BatchBuilder
stats synchronizer . Stats
stats synchronizer . Stats
statsCh chan synchronizer . Stats
statsCh chan synchronizer . Stats
@ -457,29 +532,41 @@ type Pipeline struct {
}
}
// NewPipeline creates a new Pipeline
// NewPipeline creates a new Pipeline
func NewPipeline ( cfg Config ,
func NewPipeline ( ctx context . Context ,
cfg Config ,
historyDB * historydb . HistoryDB ,
historyDB * historydb . HistoryDB ,
l2DB * l2db . L2DB ,
txSelector * txselector . TxSelector ,
txSelector * txselector . TxSelector ,
batchBuilder * batchbuilder . BatchBuilder ,
batchBuilder * batchbuilder . BatchBuilder ,
txManager * TxManager ,
txManager * TxManager ,
serverProofs [ ] ServerProofInterface ,
provers [ ] prover . Client ,
scConsts * synchronizer . SCConsts ,
scConsts * synchronizer . SCConsts ,
) * Pipeline {
serverProofPool := NewServerProofPool ( len ( serverProofs ) )
for _ , serverProof := range serverProofs {
serverProofPool . Add ( serverProof )
) ( * Pipeline , error ) {
proversPool := NewProversPool ( len ( provers ) )
proversPoolSize := 0
for _ , prover := range provers {
if err := prover . WaitReady ( ctx ) ; err != nil {
log . Errorw ( "prover.WaitReady" , "err" , err )
} else {
proversPool . Add ( prover )
proversPoolSize ++
}
}
}
return & Pipeline {
cfg : cfg ,
historyDB : historyDB ,
txSelector : txSelector ,
batchBuilder : batchBuilder ,
serverProofPool : serverProofPool ,
txManager : txManager ,
consts : * scConsts ,
// TODO: Find best queue size
statsCh : make ( chan synchronizer . Stats , 16 ) , //nolint:gomnd
if proversPoolSize == 0 {
return nil , tracerr . Wrap ( fmt . Errorf ( "no provers in the pool" ) )
}
}
return & Pipeline {
cfg : cfg ,
historyDB : historyDB ,
l2DB : l2DB ,
txSelector : txSelector ,
batchBuilder : batchBuilder ,
provers : provers ,
proversPool : proversPool ,
txManager : txManager ,
consts : * scConsts ,
statsCh : make ( chan synchronizer . Stats , queueLen ) ,
} , nil
}
}
// SetSyncStats is a thread safe method to sets the synchronizer Stats
// SetSyncStats is a thread safe method to sets the synchronizer Stats
@ -488,7 +575,7 @@ func (p *Pipeline) SetSyncStats(stats *synchronizer.Stats) {
}
}
// Start the forging pipeline
// Start the forging pipeline
func ( p * Pipeline ) Start ( batchNum common . BatchNum ,
func ( p * Pipeline ) Start ( batchNum common . BatchNum , lastForgeL1TxsNum int64 ,
syncStats * synchronizer . Stats , initSCVars * synchronizer . SCVariables ) error {
syncStats * synchronizer . Stats , initSCVars * synchronizer . SCVariables ) error {
if p . started {
if p . started {
log . Fatal ( "Pipeline already started" )
log . Fatal ( "Pipeline already started" )
@ -497,6 +584,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
// Reset pipeline state
// Reset pipeline state
p . batchNum = batchNum
p . batchNum = batchNum
p . lastForgeL1TxsNum = lastForgeL1TxsNum
p . vars = * initSCVars
p . vars = * initSCVars
p . lastScheduledL1BatchBlockNum = 0
p . lastScheduledL1BatchBlockNum = 0
@ -504,12 +592,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
err := p . txSelector . Reset ( p . batchNum )
err := p . txSelector . Reset ( p . batchNum )
if err != nil {
if err != nil {
log . Errorw ( "Pipeline: TxSelector.Reset" , "error" , err )
return tracerr . Wrap ( err )
return tracerr . Wrap ( err )
}
}
err = p . batchBuilder . Reset ( p . batchNum , true )
err = p . batchBuilder . Reset ( p . batchNum , true )
if err != nil {
if err != nil {
log . Errorw ( "Pipeline: BatchBuilder.Reset" , "error" , err )
return tracerr . Wrap ( err )
return tracerr . Wrap ( err )
}
}
@ -529,7 +615,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
default :
default :
p . batchNum = p . batchNum + 1
p . batchNum = p . batchNum + 1
batchInfo , err := p . forgeSendServerProof ( p . ctx , p . batchNum )
batchInfo , err := p . forgeSendServerProof ( p . ctx , p . batchNum )
if tracerr . Unwrap ( err ) == ErrDone {
if common . IsErrDone ( err ) {
continue
continue
}
}
if err != nil {
if err != nil {
@ -551,7 +637,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
return
return
case batchInfo := <- batchChSentServerProof :
case batchInfo := <- batchChSentServerProof :
err := p . waitServerProof ( p . ctx , batchInfo )
err := p . waitServerProof ( p . ctx , batchInfo )
if tracerr . Unwrap ( err ) == ErrDone {
if common . IsErrDone ( err ) {
continue
continue
}
}
if err != nil {
if err != nil {
@ -566,7 +652,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
}
}
// Stop the forging pipeline
// Stop the forging pipeline
func ( p * Pipeline ) Stop ( ) {
func ( p * Pipeline ) Stop ( ctx context . Context ) {
if ! p . started {
if ! p . started {
log . Fatal ( "Pipeline already stopped" )
log . Fatal ( "Pipeline already stopped" )
}
}
@ -574,14 +660,26 @@ func (p *Pipeline) Stop() {
log . Debug ( "Stopping Pipeline..." )
log . Debug ( "Stopping Pipeline..." )
p . cancel ( )
p . cancel ( )
p . wg . Wait ( )
p . wg . Wait ( )
// TODO: Cancel all proofServers with pending proofs
for _ , prover := range p . provers {
if err := prover . Cancel ( ctx ) ; err != nil {
log . Errorw ( "prover.Cancel" , "err" , err )
}
}
}
func l2TxsIDs ( txs [ ] common . PoolL2Tx ) [ ] common . TxID {
txIDs := make ( [ ] common . TxID , len ( txs ) )
for i , tx := range txs {
txIDs [ i ] = tx . TxID
}
return txIDs
}
}
// forgeSendServerProof the next batch, wait for a proof server to be available and send the
// forgeSendServerProof the next batch, wait for a proof server to be available and send the
// circuit inputs to the proof server.
// circuit inputs to the proof server.
func ( p * Pipeline ) forgeSendServerProof ( ctx context . Context , batchNum common . BatchNum ) ( * BatchInfo , error ) {
func ( p * Pipeline ) forgeSendServerProof ( ctx context . Context , batchNum common . BatchNum ) ( * BatchInfo , error ) {
// remove transactions from the pool that have been there for too long
// remove transactions from the pool that have been there for too long
err := p . purgeRemoveByTimeout ( )
err := p . l2DB . Purge ( common . BatchNum ( p . stats . Sync . LastBatch ) )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
@ -590,25 +688,37 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
var poolL2Txs [ ] common . PoolL2Tx
var poolL2Txs [ ] common . PoolL2Tx
// var feesInfo
// var feesInfo
var l1UserTxsExtra , l1Operator Txs [ ] common . L1Tx
var l1UserTxsExtra , l1Coord Txs [ ] common . L1Tx
// 1. Decide if we forge L2Tx or L1+L2Tx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p . shouldL1L2Batch ( ) {
if p . shouldL1L2Batch ( ) {
p . lastScheduledL1BatchBlockNum = p . stats . Eth . LastBatch
p . lastScheduledL1BatchBlockNum = p . stats . Eth . LastBlock . Num
// 2a: L1+L2 txs
// 2a: L1+L2 txs
// l1UserTxs, toForgeL1TxsNumber := c.historyDB.GetNextL1UserTxs() // TODO once HistoryDB is ready, uncomment
var l1UserTxs [ ] common . L1Tx = nil // tmp, depends on HistoryDB
l1UserTxsExtra , l1OperatorTxs , poolL2Txs , err = p . txSelector . GetL1L2TxSelection ( [ ] common . Idx { } , batchNum , l1UserTxs ) // TODO once feesInfo is added to method return, add the var
p . lastForgeL1TxsNum ++
l1UserTxs , err := p . historyDB . GetL1UserTxs ( p . lastForgeL1TxsNum )
if err != nil {
return nil , tracerr . Wrap ( err )
}
l1UserTxsExtra , l1CoordTxs , poolL2Txs , err = p . txSelector . GetL1L2TxSelection ( [ ] common . Idx { } , batchNum , l1UserTxs ) // TODO once feesInfo is added to method return, add the var
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
} else {
} else {
// 2b: only L2 txs
// 2b: only L2 txs
_ , poolL2Txs , err = p . txSelector . GetL2TxSelection ( [ ] common . Idx { } , batchNum ) // TODO once feesInfo is added to method return, add the var
l1CoordTxs , poolL2Txs , err = p . txSelector . GetL2TxSelection ( [ ] common . Idx { } , batchNum )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
l1UserTxsExtra = nil
l1UserTxsExtra = nil
l1OperatorTxs = nil
}
// 3. Save metadata from TxSelector output for BatchNum
// TODO feesInfo
batchInfo . L1UserTxsExtra = l1UserTxsExtra
batchInfo . L1CoordTxs = l1CoordTxs
batchInfo . L2Txs = poolL2Txs
if err := p . l2DB . StartForging ( l2TxsIDs ( batchInfo . L2Txs ) , batchInfo . BatchNum ) ; err != nil {
return nil , tracerr . Wrap ( err )
}
}
// Run purger to invalidate transactions that become invalid beause of
// Run purger to invalidate transactions that become invalid beause of
@ -620,17 +730,12 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
// 3. Save metadata from TxSelector output for BatchNum
// batchInfo.SetTxsInfo(l1UserTxsExtra, l1OperatorTxs, poolL2Txs) // TODO feesInfo
batchInfo . L1UserTxsExtra = l1UserTxsExtra
batchInfo . L1OperatorTxs = l1OperatorTxs
batchInfo . L2Txs = poolL2Txs
// 4. Call BatchBuilder with TxSelector output
// 4. Call BatchBuilder with TxSelector output
configBatch := & batchbuilder . ConfigBatch {
configBatch := & batchbuilder . ConfigBatch {
ForgerAddress : p . cfg . ForgerAddress ,
ForgerAddress : p . cfg . ForgerAddress ,
}
}
zkInputs , err := p . batchBuilder . BuildBatch ( [ ] common . Idx { } , configBatch , l1UserTxsExtra , l1OperatorTxs , poolL2Txs , nil ) // TODO []common.TokenID --> feesInfo
zkInputs , err := p . batchBuilder . BuildBatch ( [ ] common . Idx { } , configBatch ,
l1UserTxsExtra , l1CoordTxs , poolL2Txs , nil ) // TODO []common.TokenID --> feesInfo
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
@ -640,7 +745,7 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
p . cfg . debugBatchStore ( & batchInfo )
p . cfg . debugBatchStore ( & batchInfo )
// 6. Wait for an available server proof blocking call
// 6. Wait for an available server proof blocking call
serverProof , err := p . serverProof Pool. Get ( ctx )
serverProof , err := p . provers Pool. Get ( ctx )
if err != nil {
if err != nil {
return nil , tracerr . Wrap ( err )
return nil , tracerr . Wrap ( err )
}
}
@ -649,7 +754,7 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
// If there's an error further on, add the serverProof back to
// If there's an error further on, add the serverProof back to
// the pool
// the pool
if err != nil {
if err != nil {
p . serverProof Pool. Add ( serverProof )
p . provers Pool. Add ( serverProof )
}
}
} ( )
} ( )
p . cfg . debugBatchStore ( & batchInfo )
p . cfg . debugBatchStore ( & batchInfo )
@ -670,7 +775,7 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
if err != nil {
if err != nil {
return tracerr . Wrap ( err )
return tracerr . Wrap ( err )
}
}
p . serverProof Pool. Add ( batchInfo . ServerProof )
p . provers Pool. Add ( batchInfo . ServerProof )
batchInfo . ServerProof = nil
batchInfo . ServerProof = nil
batchInfo . Proof = proof
batchInfo . Proof = proof
batchInfo . ForgeBatchArgs = p . prepareForgeBatchArgs ( batchInfo )
batchInfo . ForgeBatchArgs = p . prepareForgeBatchArgs ( batchInfo )
@ -679,24 +784,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
return nil
return nil
}
}
// isForgeSequence returns true if the node is the Forger in the current ethereum block
// func (c *Coordinator) isForgeSequence() (bool, error) {
// // TODO: Consider checking if we can forge by quering the Synchronizer instead of using ethClient
// blockNum, err := c.ethClient.EthLastBlock()
// if err != nil {
// return false, err
// }
// addr, err := c.ethClient.EthAddress()
// if err != nil {
// return false, err
// }
// return c.ethClient.AuctionCanForge(*addr, blockNum+1)
// }
func ( p * Pipeline ) purgeRemoveByTimeout ( ) error {
return nil // TODO
}
func ( p * Pipeline ) purgeInvalidDueToL2TxsSelection ( l2Txs [ ] common . PoolL2Tx ) error {
func ( p * Pipeline ) purgeInvalidDueToL2TxsSelection ( l2Txs [ ] common . PoolL2Tx ) error {
return nil // TODO
return nil // TODO
}
}