Browse Source

Make coordinator more robust agains failed forges

feature/sql-semaphore1
Eduard S 3 years ago
parent
commit
f3505000dd
2 changed files with 36 additions and 33 deletions
  1. +35
    -32
      coordinator/coordinator.go
  2. +1
    -1
      coordinator/coordinator_test.go

+ 35
- 32
coordinator/coordinator.go

@ -671,21 +671,33 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
for { for {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
log.Debug("Pipeline forgeSendServerProof loop done")
log.Debug("Pipeline forgeBatch loop done")
p.wg.Done() p.wg.Done()
return return
case syncStats := <-p.statsCh: case syncStats := <-p.statsCh:
p.stats = syncStats p.stats = syncStats
default: default:
p.batchNum = p.batchNum + 1
batchInfo, err := p.forgeSendServerProof(p.ctx, p.batchNum)
batchNum = p.batchNum + 1
batchInfo, err := p.forgeBatch(p.ctx, batchNum)
if common.IsErrDone(err) { if common.IsErrDone(err) {
continue continue
} else if err != nil {
log.Errorw("forgeBatch", "err", err)
continue
} }
if err != nil {
log.Errorw("forgeSendServerProof", "err", err)
// 6. Wait for an available server proof (blocking call)
serverProof, err := p.proversPool.Get(p.ctx)
if common.IsErrDone(err) {
continue continue
} else if err != nil {
log.Errorw("proversPool.Get", "err", err)
} }
if err := p.sendServerProof(p.ctx, serverProof, batchInfo); err != nil {
log.Errorw("sendServerProof", "err", err)
p.proversPool.Add(serverProof)
continue
}
p.batchNum = batchNum
batchChSentServerProof <- batchInfo batchChSentServerProof <- batchInfo
} }
} }
@ -704,6 +716,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
if common.IsErrDone(err) { if common.IsErrDone(err) {
continue continue
} }
// We are done with this serverProof, add it back to the pool
p.proversPool.Add(batchInfo.ServerProof)
batchInfo.ServerProof = nil
if err != nil { if err != nil {
log.Errorw("waitServerProof", "err", err) log.Errorw("waitServerProof", "err", err)
continue continue
@ -739,9 +754,21 @@ func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID {
return txIDs return txIDs
} }
// forgeSendServerProof the next batch, wait for a proof server to be available and send the
// circuit inputs to the proof server.
func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
// sendServerProof sends the circuit inputs to the proof server
func (p *Pipeline) sendServerProof(ctx context.Context, serverProof prover.Client,
batchInfo *BatchInfo) error {
p.cfg.debugBatchStore(batchInfo)
// 7. Call the selected idle server proof with BatchBuilder output,
// save server proof info for batchNum
if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// forgeBatch the next batch.
func (p *Pipeline) forgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
// remove transactions from the pool that have been there for too long // remove transactions from the pool that have been there for too long
_, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), _, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
p.stats.Sync.LastBlock.Num, int64(batchNum)) p.stats.Sync.LastBlock.Num, int64(batchNum))
@ -813,28 +840,6 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
batchInfo.ZKInputs = zkInputs batchInfo.ZKInputs = zkInputs
p.cfg.debugBatchStore(&batchInfo) p.cfg.debugBatchStore(&batchInfo)
// 6. Wait for an available server proof blocking call
serverProof, err := p.proversPool.Get(ctx)
if err != nil {
return nil, tracerr.Wrap(err)
}
batchInfo.ServerProof = serverProof
defer func() {
// If there's an error further on, add the serverProof back to
// the pool
if err != nil {
p.proversPool.Add(serverProof)
}
}()
p.cfg.debugBatchStore(&batchInfo)
// 7. Call the selected idle server proof with BatchBuilder output,
// save server proof info for batchNum
err = batchInfo.ServerProof.CalculateProof(ctx, zkInputs)
if err != nil {
return nil, tracerr.Wrap(err)
}
return &batchInfo, nil return &batchInfo, nil
} }
@ -844,8 +849,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
p.proversPool.Add(batchInfo.ServerProof)
batchInfo.ServerProof = nil
batchInfo.Proof = proof batchInfo.Proof = proof
batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo) batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo)
batchInfo.TxStatus = TxStatusPending batchInfo.TxStatus = TxStatusPending

+ 1
- 1
coordinator/coordinator_test.go

@ -417,6 +417,6 @@ func TestPipelineShouldL1L2Batch(t *testing.T) {
// TODO: Test Reorg // TODO: Test Reorg
// TODO: Test Pipeline // TODO: Test Pipeline
// TODO: Test TxMonitor // TODO: Test TxMonitor
// TODO: Test forgeSendServerProof
// TODO: Test forgeBatch
// TODO: Test waitServerProof // TODO: Test waitServerProof
// TODO: Test handleReorg // TODO: Test handleReorg

Loading…
Cancel
Save