|
|
@ -4,6 +4,7 @@ import ( |
|
|
|
"context" |
|
|
|
"fmt" |
|
|
|
"math/big" |
|
|
|
"os" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
@ -111,6 +112,12 @@ func NewCoordinator(cfg Config, |
|
|
|
cfg.EthClientAttempts)) |
|
|
|
} |
|
|
|
|
|
|
|
if cfg.DebugBatchPath != "" { |
|
|
|
if err := os.MkdirAll(cfg.DebugBatchPath, 0744); err != nil { |
|
|
|
return nil, tracerr.Wrap(err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
purger := Purger{ |
|
|
|
cfg: cfg.Purger, |
|
|
|
lastPurgeBlock: 0, |
|
|
@ -147,9 +154,10 @@ func NewCoordinator(cfg Config, |
|
|
|
return &c, nil |
|
|
|
} |
|
|
|
|
|
|
|
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { |
|
|
|
func (c *Coordinator) newPipeline(ctx context.Context, |
|
|
|
stats *synchronizer.Stats) (*Pipeline, error) { |
|
|
|
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, |
|
|
|
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) |
|
|
|
c.batchBuilder, c.purger, c.txManager, c.provers, stats, &c.consts) |
|
|
|
} |
|
|
|
|
|
|
|
// MsgSyncBlock indicates an update to the Synchronizer stats
|
|
|
@ -226,7 +234,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) |
|
|
|
stats.Eth.LastBlock.Num, "batch", stats.Sync.LastBatch) |
|
|
|
batchNum := common.BatchNum(stats.Sync.LastBatch) |
|
|
|
var err error |
|
|
|
if c.pipeline, err = c.newPipeline(ctx); err != nil { |
|
|
|
if c.pipeline, err = c.newPipeline(ctx, stats); err != nil { |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum, |
|
|
@ -295,22 +303,16 @@ func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) err |
|
|
|
func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error { |
|
|
|
switch msg := msg.(type) { |
|
|
|
case MsgSyncBlock: |
|
|
|
if err := c.handleMsgSyncBlock(ctx, &msg); common.IsErrDone(err) { |
|
|
|
return nil |
|
|
|
} else if err != nil { |
|
|
|
if err := c.handleMsgSyncBlock(ctx, &msg); err != nil { |
|
|
|
return tracerr.Wrap(fmt.Errorf("Coordinator.handleMsgSyncBlock error: %w", err)) |
|
|
|
} |
|
|
|
case MsgSyncReorg: |
|
|
|
if err := c.handleReorg(ctx, &msg.Stats); common.IsErrDone(err) { |
|
|
|
return nil |
|
|
|
} else if err != nil { |
|
|
|
if err := c.handleReorg(ctx, &msg.Stats); err != nil { |
|
|
|
return tracerr.Wrap(fmt.Errorf("Coordinator.handleReorg error: %w", err)) |
|
|
|
} |
|
|
|
case MsgStopPipeline: |
|
|
|
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason) |
|
|
|
if err := c.handleStopPipeline(ctx, msg.Reason); common.IsErrDone(err) { |
|
|
|
return nil |
|
|
|
} else if err != nil { |
|
|
|
if err := c.handleStopPipeline(ctx, msg.Reason); err != nil { |
|
|
|
return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err)) |
|
|
|
} |
|
|
|
default: |
|
|
@ -341,7 +343,9 @@ func (c *Coordinator) Start() { |
|
|
|
c.wg.Done() |
|
|
|
return |
|
|
|
case msg := <-c.msgCh: |
|
|
|
if err := c.handleMsg(c.ctx, msg); err != nil { |
|
|
|
if err := c.handleMsg(c.ctx, msg); c.ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { |
|
|
|
log.Errorw("Coordinator.handleMsg", "err", err) |
|
|
|
waitDuration = time.Duration(c.cfg.SyncRetryInterval) |
|
|
|
continue |
|
|
@ -352,7 +356,9 @@ func (c *Coordinator) Start() { |
|
|
|
waitDuration = time.Duration(longWaitDuration) |
|
|
|
continue |
|
|
|
} |
|
|
|
if err := c.syncStats(c.ctx, c.stats); err != nil { |
|
|
|
if err := c.syncStats(c.ctx, c.stats); c.ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { |
|
|
|
log.Errorw("Coordinator.syncStats", "err", err) |
|
|
|
waitDuration = time.Duration(c.cfg.SyncRetryInterval) |
|
|
|
continue |
|
|
@ -456,7 +462,8 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) |
|
|
|
return tracerr.Wrap(err) |
|
|
|
} |
|
|
|
log.Errorw("TxManager ethClient.RollupForgeBatch", |
|
|
|
"attempt", attempt, "err", err, "block", t.lastBlock) |
|
|
|
"attempt", attempt, "err", err, "block", t.lastBlock, |
|
|
|
"batchNum", batchInfo.BatchNum) |
|
|
|
} else { |
|
|
|
break |
|
|
|
} |
|
|
@ -484,6 +491,9 @@ func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchI |
|
|
|
var err error |
|
|
|
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { |
|
|
|
receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) |
|
|
|
if ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
log.Errorw("TxManager ethClient.EthTransactionReceipt", |
|
|
|
"attempt", attempt, "err", err) |
|
|
@ -621,6 +631,7 @@ func NewPipeline(ctx context.Context, |
|
|
|
purger *Purger, |
|
|
|
txManager *TxManager, |
|
|
|
provers []prover.Client, |
|
|
|
stats *synchronizer.Stats, |
|
|
|
scConsts *synchronizer.SCConsts, |
|
|
|
) (*Pipeline, error) { |
|
|
|
proversPool := NewProversPool(len(provers)) |
|
|
@ -647,6 +658,7 @@ func NewPipeline(ctx context.Context, |
|
|
|
purger: purger, |
|
|
|
txManager: txManager, |
|
|
|
consts: *scConsts, |
|
|
|
stats: *stats, |
|
|
|
statsCh: make(chan synchronizer.Stats, queueLen), |
|
|
|
}, nil |
|
|
|
} |
|
|
@ -697,7 +709,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-p.ctx.Done(): |
|
|
|
log.Debug("Pipeline forgeBatch loop done") |
|
|
|
log.Info("Pipeline forgeBatch loop done") |
|
|
|
p.wg.Done() |
|
|
|
return |
|
|
|
case syncStats := <-p.statsCh: |
|
|
@ -705,7 +717,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
default: |
|
|
|
batchNum = p.batchNum + 1 |
|
|
|
batchInfo, err := p.forgeBatch(p.ctx, batchNum, selectionConfig) |
|
|
|
if common.IsErrDone(err) { |
|
|
|
if p.ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { |
|
|
|
log.Errorw("forgeBatch", "err", err) |
|
|
@ -713,14 +725,16 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
} |
|
|
|
// 6. Wait for an available server proof (blocking call)
|
|
|
|
serverProof, err := p.proversPool.Get(p.ctx) |
|
|
|
if common.IsErrDone(err) { |
|
|
|
if p.ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { |
|
|
|
log.Errorw("proversPool.Get", "err", err) |
|
|
|
continue |
|
|
|
} |
|
|
|
batchInfo.ServerProof = serverProof |
|
|
|
if err := p.sendServerProof(p.ctx, batchInfo); err != nil { |
|
|
|
if err := p.sendServerProof(p.ctx, batchInfo); p.ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} else if err != nil { |
|
|
|
log.Errorw("sendServerProof", "err", err) |
|
|
|
batchInfo.ServerProof = nil |
|
|
|
p.proversPool.Add(serverProof) |
|
|
@ -737,17 +751,17 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, |
|
|
|
for { |
|
|
|
select { |
|
|
|
case <-p.ctx.Done(): |
|
|
|
log.Debug("Pipeline waitServerProofSendEth loop done") |
|
|
|
log.Info("Pipeline waitServerProofSendEth loop done") |
|
|
|
p.wg.Done() |
|
|
|
return |
|
|
|
case batchInfo := <-batchChSentServerProof: |
|
|
|
err := p.waitServerProof(p.ctx, batchInfo) |
|
|
|
if common.IsErrDone(err) { |
|
|
|
continue |
|
|
|
} |
|
|
|
// We are done with this serverProof, add it back to the pool
|
|
|
|
p.proversPool.Add(batchInfo.ServerProof) |
|
|
|
batchInfo.ServerProof = nil |
|
|
|
if p.ctx.Err() != nil { |
|
|
|
continue |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
log.Errorw("waitServerProof", "err", err) |
|
|
|
continue |
|
|
@ -765,7 +779,7 @@ func (p *Pipeline) Stop(ctx context.Context) { |
|
|
|
log.Fatal("Pipeline already stopped") |
|
|
|
} |
|
|
|
p.started = false |
|
|
|
log.Debug("Stopping Pipeline...") |
|
|
|
log.Info("Stopping Pipeline...") |
|
|
|
p.cancel() |
|
|
|
p.wg.Wait() |
|
|
|
for _, prover := range p.provers { |
|
|
@ -899,7 +913,7 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er |
|
|
|
} |
|
|
|
batchInfo.Proof = proof |
|
|
|
batchInfo.PublicInputs = pubInputs |
|
|
|
batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo) |
|
|
|
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) |
|
|
|
batchInfo.TxStatus = TxStatusPending |
|
|
|
p.cfg.debugBatchStore(batchInfo) |
|
|
|
return nil |
|
|
@ -921,7 +935,7 @@ func (p *Pipeline) shouldL1L2Batch() bool { |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
func (p *Pipeline) prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { |
|
|
|
func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { |
|
|
|
proof := batchInfo.Proof |
|
|
|
zki := batchInfo.ZKInputs |
|
|
|
return ð.RollupForgeBatchArgs{ |
|
|
|