diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 6f7befa..1ae4b54 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -671,21 +671,33 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, for { select { case <-p.ctx.Done(): - log.Debug("Pipeline forgeSendServerProof loop done") + log.Debug("Pipeline forgeBatch loop done") p.wg.Done() return case syncStats := <-p.statsCh: p.stats = syncStats default: - p.batchNum = p.batchNum + 1 - batchInfo, err := p.forgeSendServerProof(p.ctx, p.batchNum) + batchNum = p.batchNum + 1 + batchInfo, err := p.forgeBatch(p.ctx, batchNum) if common.IsErrDone(err) { continue + } else if err != nil { + log.Errorw("forgeBatch", "err", err) + continue } - if err != nil { - log.Errorw("forgeSendServerProof", "err", err) + // 6. Wait for an available server proof (blocking call) + serverProof, err := p.proversPool.Get(p.ctx) + if common.IsErrDone(err) { continue + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) } + if err := p.sendServerProof(p.ctx, serverProof, batchInfo); err != nil { + log.Errorw("sendServerProof", "err", err) + p.proversPool.Add(serverProof) + continue + } + p.batchNum = batchNum batchChSentServerProof <- batchInfo } } @@ -704,6 +716,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, if common.IsErrDone(err) { continue } + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(batchInfo.ServerProof) + batchInfo.ServerProof = nil if err != nil { log.Errorw("waitServerProof", "err", err) continue @@ -739,9 +754,21 @@ func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID { return txIDs } -// forgeSendServerProof the next batch, wait for a proof server to be available and send the -// circuit inputs to the proof server. -func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { +// sendServerProof sends the circuit inputs to the proof server +func (p *Pipeline) sendServerProof(ctx context.Context, serverProof prover.Client, + batchInfo *BatchInfo) error { + p.cfg.debugBatchStore(batchInfo) + + // 7. Call the selected idle server proof with BatchBuilder output, + // save server proof info for batchNum + if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +// forgeBatch the next batch. +func (p *Pipeline) forgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { // remove transactions from the pool that have been there for too long _, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), p.stats.Sync.LastBlock.Num, int64(batchNum)) @@ -813,28 +840,6 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat batchInfo.ZKInputs = zkInputs p.cfg.debugBatchStore(&batchInfo) - // 6. Wait for an available server proof blocking call - serverProof, err := p.proversPool.Get(ctx) - if err != nil { - return nil, tracerr.Wrap(err) - } - batchInfo.ServerProof = serverProof - defer func() { - // If there's an error further on, add the serverProof back to - // the pool - if err != nil { - p.proversPool.Add(serverProof) - } - }() - p.cfg.debugBatchStore(&batchInfo) - - // 7. Call the selected idle server proof with BatchBuilder output, - // save server proof info for batchNum - err = batchInfo.ServerProof.CalculateProof(ctx, zkInputs) - if err != nil { - return nil, tracerr.Wrap(err) - } - return &batchInfo, nil } @@ -844,8 +849,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er if err != nil { return tracerr.Wrap(err) } - p.proversPool.Add(batchInfo.ServerProof) - batchInfo.ServerProof = nil batchInfo.Proof = proof batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo) batchInfo.TxStatus = TxStatusPending diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index c5d4fd0..0469826 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -417,6 +417,6 @@ func TestPipelineShouldL1L2Batch(t *testing.T) { // TODO: Test Reorg // TODO: Test Pipeline // TODO: Test TxMonitor -// TODO: Test forgeSendServerProof +// TODO: Test forgeBatch // TODO: Test waitServerProof // TODO: Test handleReorg