|
|
@ -156,10 +156,9 @@ func NewCoordinator(cfg Config, |
|
|
|
return &c, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Coordinator) newPipeline(ctx context.Context, |
|
|
|
stats *synchronizer.Stats) (*Pipeline, error) { |
|
|
|
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { |
|
|
|
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, |
|
|
|
c.batchBuilder, c.purger, c.txManager, c.provers, stats, &c.consts) |
|
|
|
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) |
|
|
|
} |
|
|
|
|
|
|
|
// MsgSyncBlock indicates an update to the Synchronizer stats
|
|
|
@ -174,6 +173,7 @@ type MsgSyncBlock struct { |
|
|
|
// MsgSyncReorg indicates a reorg
|
|
|
|
type MsgSyncReorg struct { |
|
|
|
Stats synchronizer.Stats |
|
|
|
Vars synchronizer.SCVariablesPtr |
|
|
|
} |
|
|
|
|
|
|
|
// MsgStopPipeline indicates a signal to reset the pipeline
|
|
|
@ -222,7 +222,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) |
|
|
|
stats.Eth.LastBlock.Num, "batch", stats.Sync.LastBatch) |
|
|
|
batchNum := common.BatchNum(stats.Sync.LastBatch) |
|
|
|
var err error |
|
|
|
if c.pipeline, err = c.newPipeline(ctx, stats); err != nil { |
|
|
|
if c.pipeline, err = c.newPipeline(ctx); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum, |
|
|
@ -233,9 +233,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) |
|
|
|
c.pipelineBatchNum = batchNum |
|
|
|
} |
|
|
|
} else { |
|
|
|
if canForge { |
|
|
|
c.pipeline.SetSyncStats(stats) |
|
|
|
} else { |
|
|
|
if !canForge { |
|
|
|
log.Infow("Coordinator: forging state end", "block", stats.Eth.LastBlock.Num) |
|
|
|
c.pipeline.Stop(c.ctx) |
|
|
|
c.pipeline = nil |
|
|
@ -269,15 +267,42 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) |
|
|
|
|
|
|
|
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error { |
|
|
|
c.stats = &msg.Stats |
|
|
|
// batches := msg.Batches
|
|
|
|
c.syncSCVars(msg.Vars) |
|
|
|
if c.pipeline != nil { |
|
|
|
c.pipeline.SetSyncStatsVars(&msg.Stats, &msg.Vars) |
|
|
|
} |
|
|
|
if !c.stats.Synced() { |
|
|
|
return nil |
|
|
|
} |
|
|
|
c.syncSCVars(msg.Vars) |
|
|
|
return c.syncStats(ctx, c.stats) |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error { |
|
|
|
c.stats = &msg.Stats |
|
|
|
c.syncSCVars(msg.Vars) |
|
|
|
if c.pipeline != nil { |
|
|
|
c.pipeline.SetSyncStatsVars(&msg.Stats, &msg.Vars) |
|
|
|
} |
|
|
|
if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum { |
|
|
|
// There's been a reorg and the batch from which the pipeline
|
|
|
|
// was started was in a block that was discarded. The batch
|
|
|
|
// may not be in the main chain, so we stop the pipeline as a
|
|
|
|
// precaution (it will be started again once the node is in
|
|
|
|
// sync).
|
|
|
|
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum", |
|
|
|
"sync.LastBatch", c.stats.Sync.LastBatch, |
|
|
|
"c.pipelineBatchNum", c.pipelineBatchNum) |
|
|
|
if err := c.handleStopPipeline(ctx, "reorg"); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error { |
|
|
|
if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
if c.pipeline != nil { |
|
|
|
c.pipeline.Stop(c.ctx) |
|
|
|
c.pipeline = nil |
|
|
@ -295,7 +320,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error { |
|
|
|
return tracerr.Wrap(fmt.Errorf("Coordinator.handleMsgSyncBlock error: %w", err)) |
|
|
|
} |
|
|
|
case MsgSyncReorg: |
|
|
|
if err := c.handleReorg(ctx, &msg.Stats); err != nil { |
|
|
|
if err := c.handleReorg(ctx, &msg); err != nil { |
|
|
|
return tracerr.Wrap(fmt.Errorf("Coordinator.handleReorg error: %w", err)) |
|
|
|
} |
|
|
|
case MsgStopPipeline: |
|
|
@ -376,27 +401,6 @@ func (c *Coordinator) Stop() { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Coordinator) handleReorg(ctx context.Context, stats *synchronizer.Stats) error { |
|
|
|
c.stats = stats |
|
|
|
if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum { |
|
|
|
// There's been a reorg and the batch from which the pipeline
|
|
|
|
// was started was in a block that was discarded. The batch
|
|
|
|
// may not be in the main chain, so we stop the pipeline as a
|
|
|
|
// precaution (it will be started again once the node is in
|
|
|
|
// sync).
|
|
|
|
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum", |
|
|
|
"sync.LastBatch", c.stats.Sync.LastBatch, |
|
|
|
"c.pipelineBatchNum", c.pipelineBatchNum) |
|
|
|
if err := c.handleStopPipeline(ctx, "reorg"); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// TxManager handles everything related to ethereum transactions: It makes the
|
|
|
|
// call to forge, waits for transaction confirmation, and keeps checking them
|
|
|
|
// until a number of confirmed blocks have passed.
|
|
|
@ -465,6 +469,7 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) |
|
|
|
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) |
|
|
|
} |
|
|
|
batchInfo.EthTx = ethTx |
|
|
|
batchInfo.Status = StatusSent |
|
|
|
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) |
|
|
|
t.cfg.debugBatchStore(batchInfo) |
|
|
|
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { |
|
|
@ -506,9 +511,13 @@ func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { |
|
|
|
receipt := batchInfo.Receipt |
|
|
|
if receipt != nil { |
|
|
|
if receipt.Status == types.ReceiptStatusFailed { |
|
|
|
batchInfo.Status = StatusFailed |
|
|
|
t.cfg.debugBatchStore(batchInfo) |
|
|
|
log.Errorw("TxManager receipt status is failed", "receipt", receipt) |
|
|
|
return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) |
|
|
|
} else if receipt.Status == types.ReceiptStatusSuccessful { |
|
|
|
batchInfo.Status = StatusMined |
|
|
|
t.cfg.debugBatchStore(batchInfo) |
|
|
|
if batchInfo.BatchNum > t.lastConfirmedBatch { |
|
|
|
t.lastConfirmedBatch = batchInfo.BatchNum |
|
|
|
} |
|
|
@ -534,7 +543,7 @@ func (t *TxManager) Run(ctx context.Context) { |
|
|
|
case lastBlock := <-t.lastBlockCh: |
|
|
|
t.lastBlock = lastBlock |
|
|
|
case batchInfo := <-t.batchCh: |
|
|
|
if err := t.rollupForgeBatch(ctx, batchInfo); common.IsErrDone(err) { |
|
|
|
if err := t.rollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { |
|
|
|
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) |
|
|
@ -550,8 +559,7 @@ func (t *TxManager) Run(ctx context.Context) { |
|
|
|
current := next |
|
|
|
next = (current + 1) % len(t.queue) |
|
|
|
batchInfo := t.queue[current] |
|
|
|
err := t.ethTransactionReceipt(ctx, batchInfo) |
|
|
|
if common.IsErrDone(err) { |
|
|
|
if err := t.ethTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { //nolint:staticcheck
|
|
|
|
// We can't get the receipt for the
|
|
|
@ -580,6 +588,11 @@ func (t *TxManager) Run(ctx context.Context) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
type statsVars struct { |
|
|
|
Stats synchronizer.Stats |
|
|
|
Vars synchronizer.SCVariablesPtr |
|
|
|
} |
|
|
|
|
|
|
|
// Pipeline manages the forging of batches with parallel server proofs
|
|
|
|
type Pipeline struct { |
|
|
|
cfg Config |
|
|
@ -587,7 +600,6 @@ type Pipeline struct { |
|
|
|
|
|
|
|
// state
|
|
|
|
batchNum common.BatchNum |
|
|
|
vars synchronizer.SCVariables |
|
|
|
lastScheduledL1BatchBlockNum int64 |
|
|
|
lastForgeL1TxsNum int64 |
|
|
|
started bool |
|
|
@ -601,8 +613,9 @@ type Pipeline struct { |
|
|
|
batchBuilder *batchbuilder.BatchBuilder |
|
|
|
purger *Purger |
|
|
|
|
|
|
|
stats synchronizer.Stats |
|
|
|
statsCh chan synchronizer.Stats |
|
|
|
stats synchronizer.Stats |
|
|
|
vars synchronizer.SCVariables |
|
|
|
statsVarsCh chan statsVars |
|
|
|
|
|
|
|
ctx context.Context |
|
|
|
wg sync.WaitGroup |
|
|
@ -619,7 +632,6 @@ func NewPipeline(ctx context.Context, |
|
|
|
purger *Purger, |
|
|
|
txManager *TxManager, |
|
|
|
provers []prover.Client, |
|
|
|
stats *synchronizer.Stats, |
|
|
|
scConsts *synchronizer.SCConsts, |
|
|
|
) (*Pipeline, error) { |
|
|
|
proversPool := NewProversPool(len(provers)) |
|
|
@ -646,22 +658,22 @@ func NewPipeline(ctx context.Context, |
|
|
|
purger: purger, |
|
|
|
txManager: txManager, |
|
|
|
consts: *scConsts, |
|
|
|
stats: *stats, |
|
|
|
statsCh: make(chan synchronizer.Stats, queueLen), |
|
|
|
statsVarsCh: make(chan statsVars, queueLen), |
|
|
|
}, nil |
|
|
|
} |
|
|
|
|
|
|
|
// SetSyncStats is a thread safe method to sets the synchronizer Stats
|
|
|
|
func (p *Pipeline) SetSyncStats(stats *synchronizer.Stats) { |
|
|
|
p.statsCh <- *stats |
|
|
|
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
|
|
|
|
func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { |
|
|
|
p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars} |
|
|
|
} |
|
|
|
|
|
|
|
// reset pipeline state
|
|
|
|
func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
initSCVars *synchronizer.SCVariables) error { |
|
|
|
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { |
|
|
|
p.batchNum = batchNum |
|
|
|
p.lastForgeL1TxsNum = lastForgeL1TxsNum |
|
|
|
p.vars = *initSCVars |
|
|
|
p.stats = *stats |
|
|
|
p.vars = *vars |
|
|
|
p.lastScheduledL1BatchBlockNum = 0 |
|
|
|
|
|
|
|
err := p.txSelector.Reset(p.batchNum) |
|
|
@ -675,15 +687,27 @@ func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { |
|
|
|
if vars.Rollup != nil { |
|
|
|
p.vars.Rollup = *vars.Rollup |
|
|
|
} |
|
|
|
if vars.Auction != nil { |
|
|
|
p.vars.Auction = *vars.Auction |
|
|
|
} |
|
|
|
if vars.WDelayer != nil { |
|
|
|
p.vars.WDelayer = *vars.WDelayer |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Start the forging pipeline
|
|
|
|
func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
syncStats *synchronizer.Stats, initSCVars *synchronizer.SCVariables) error { |
|
|
|
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { |
|
|
|
if p.started { |
|
|
|
log.Fatal("Pipeline already started") |
|
|
|
} |
|
|
|
p.started = true |
|
|
|
|
|
|
|
if err := p.reset(batchNum, lastForgeL1TxsNum, initSCVars); err != nil { |
|
|
|
if err := p.reset(batchNum, lastForgeL1TxsNum, stats, vars); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
p.ctx, p.cancel = context.WithCancel(context.Background()) |
|
|
@ -699,8 +723,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
log.Info("Pipeline forgeBatch loop done") |
|
|
|
p.wg.Done() |
|
|
|
return |
|
|
|
case syncStats := <-p.statsCh: |
|
|
|
p.stats = syncStats |
|
|
|
case statsVars := <-p.statsVarsCh: |
|
|
|
p.stats = statsVars.Stats |
|
|
|
p.syncSCVars(statsVars.Vars) |
|
|
|
default: |
|
|
|
batchNum = p.batchNum + 1 |
|
|
|
batchInfo, err := p.forgeBatch(batchNum) |
|
|
@ -813,6 +838,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { |
|
|
|
var l1UserTxsExtra, l1CoordTxs []common.L1Tx |
|
|
|
var auths [][]byte |
|
|
|
var coordIdxs []common.Idx |
|
|
|
|
|
|
|
// 1. Decide if we forge L2Tx or L1+L2Tx
|
|
|
|
if p.shouldL1L2Batch() { |
|
|
|
batchInfo.L1Batch = true |
|
|
@ -876,6 +902,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { |
|
|
|
|
|
|
|
// 5. Save metadata from BatchBuilder output for BatchNum
|
|
|
|
batchInfo.ZKInputs = zkInputs |
|
|
|
batchInfo.Status = StatusForged |
|
|
|
p.cfg.debugBatchStore(&batchInfo) |
|
|
|
|
|
|
|
return &batchInfo, nil |
|
|
@ -890,7 +917,7 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er |
|
|
|
batchInfo.Proof = proof |
|
|
|
batchInfo.PublicInputs = pubInputs |
|
|
|
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) |
|
|
|
batchInfo.TxStatus = TxStatusPending |
|
|
|
batchInfo.Status = StatusProof |
|
|
|
p.cfg.debugBatchStore(batchInfo) |
|
|
|
return nil |
|
|
|
} |
|
|
@ -905,7 +932,7 @@ func (p *Pipeline) shouldL1L2Batch() bool { |
|
|
|
// Return true if we have passed the l1BatchTimeoutPerc portion of the
|
|
|
|
// range before the l1batch timeout.
|
|
|
|
if p.stats.Eth.LastBlock.Num-lastL1BatchBlockNum >= |
|
|
|
int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout)*p.cfg.L1BatchTimeoutPerc) { |
|
|
|
int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) { |
|
|
|
return true |
|
|
|
} |
|
|
|
return false |
|
|
|