From df729b3b71c6e3085e0b9c880863ce4cb9c5f946 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 24 Feb 2021 15:43:55 +0100 Subject: [PATCH] Wait for serverProof before starting batch --- coordinator/pipeline.go | 44 ++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index a60423a..c2d4e65 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -198,12 +198,33 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { updateSCVars(&p.vars, vars) } -// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, -// and then waits for an available proof server and sends the zkInputs to it so -// that the proof computation begins. -func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { +// handleForgeBatch waits for an available proof server, calls p.forgeBatch to +// forge the batch and get the zkInputs, and then sends the zkInputs to the +// selected proof server so that the proof computation begins. +func (p *Pipeline) handleForgeBatch(ctx context.Context, + batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { + // 1. Wait for an available serverProof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, err + } + defer func() { + // If we encounter any error (notice that this function returns + // errors to notify that a batch is not forged not only because + // of unexpected errors but also due to benign causes), add the + // serverProof back to the pool + if err != nil { + p.proversPool.Add(ctx, serverProof) + } + }() + + // 2. Forge the batch internally (make a selection of txs and prepare + // all the smart contract arguments) p.mutexL2DBUpdateDelete.Lock() - batchInfo, err := p.forgeBatch(batchNum) + batchInfo, err = p.forgeBatch(batchNum) p.mutexL2DBUpdateDelete.Unlock() if ctx.Err() != nil { return nil, ctx.Err() @@ -220,21 +241,13 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu } return nil, err } - // 6. Wait for an available server proof (blocking call) - serverProof, err := p.proversPool.Get(ctx) - if ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - log.Errorw("proversPool.Get", "err", err) - return nil, err - } + + // 3. Send the ZKInputs to the proof server batchInfo.ServerProof = serverProof if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { return nil, ctx.Err() } else if err != nil { log.Errorw("sendServerProof", "err", err) - batchInfo.ServerProof = nil - p.proversPool.Add(ctx, serverProof) return nil, err } return batchInfo, nil @@ -338,7 +351,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum, } // We are done with this serverProof, add it back to the pool p.proversPool.Add(p.ctx, batchInfo.ServerProof) - // batchInfo.ServerProof = nil p.txManager.AddBatch(p.ctx, batchInfo) } }