mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 03:16:45 +01:00
Merge pull request #363 from hermeznetwork/feature/coordinator1
Make coordinator more robust agains failed forges
This commit is contained in:
@@ -678,21 +678,33 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
log.Debug("Pipeline forgeSendServerProof loop done")
|
||||
log.Debug("Pipeline forgeBatch loop done")
|
||||
p.wg.Done()
|
||||
return
|
||||
case syncStats := <-p.statsCh:
|
||||
p.stats = syncStats
|
||||
default:
|
||||
p.batchNum = p.batchNum + 1
|
||||
batchInfo, err := p.forgeSendServerProof(p.ctx, p.batchNum)
|
||||
batchNum = p.batchNum + 1
|
||||
batchInfo, err := p.forgeBatch(p.ctx, batchNum)
|
||||
if common.IsErrDone(err) {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorw("forgeSendServerProof", "err", err)
|
||||
} else if err != nil {
|
||||
log.Errorw("forgeBatch", "err", err)
|
||||
continue
|
||||
}
|
||||
// 6. Wait for an available server proof (blocking call)
|
||||
serverProof, err := p.proversPool.Get(p.ctx)
|
||||
if common.IsErrDone(err) {
|
||||
continue
|
||||
} else if err != nil {
|
||||
log.Errorw("proversPool.Get", "err", err)
|
||||
}
|
||||
if err := p.sendServerProof(p.ctx, serverProof, batchInfo); err != nil {
|
||||
log.Errorw("sendServerProof", "err", err)
|
||||
p.proversPool.Add(serverProof)
|
||||
continue
|
||||
}
|
||||
p.batchNum = batchNum
|
||||
batchChSentServerProof <- batchInfo
|
||||
}
|
||||
}
|
||||
@@ -711,6 +723,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
|
||||
if common.IsErrDone(err) {
|
||||
continue
|
||||
}
|
||||
// We are done with this serverProof, add it back to the pool
|
||||
p.proversPool.Add(batchInfo.ServerProof)
|
||||
batchInfo.ServerProof = nil
|
||||
if err != nil {
|
||||
log.Errorw("waitServerProof", "err", err)
|
||||
continue
|
||||
@@ -746,9 +761,21 @@ func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID {
|
||||
return txIDs
|
||||
}
|
||||
|
||||
// forgeSendServerProof the next batch, wait for a proof server to be available and send the
|
||||
// circuit inputs to the proof server.
|
||||
func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
||||
// sendServerProof sends the circuit inputs to the proof server
|
||||
func (p *Pipeline) sendServerProof(ctx context.Context, serverProof prover.Client,
|
||||
batchInfo *BatchInfo) error {
|
||||
p.cfg.debugBatchStore(batchInfo)
|
||||
|
||||
// 7. Call the selected idle server proof with BatchBuilder output,
|
||||
// save server proof info for batchNum
|
||||
if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// forgeBatch the next batch.
|
||||
func (p *Pipeline) forgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
||||
// remove transactions from the pool that have been there for too long
|
||||
_, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
|
||||
p.stats.Sync.LastBlock.Num, int64(batchNum))
|
||||
@@ -820,28 +847,6 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
|
||||
batchInfo.ZKInputs = zkInputs
|
||||
p.cfg.debugBatchStore(&batchInfo)
|
||||
|
||||
// 6. Wait for an available server proof blocking call
|
||||
serverProof, err := p.proversPool.Get(ctx)
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
batchInfo.ServerProof = serverProof
|
||||
defer func() {
|
||||
// If there's an error further on, add the serverProof back to
|
||||
// the pool
|
||||
if err != nil {
|
||||
p.proversPool.Add(serverProof)
|
||||
}
|
||||
}()
|
||||
p.cfg.debugBatchStore(&batchInfo)
|
||||
|
||||
// 7. Call the selected idle server proof with BatchBuilder output,
|
||||
// save server proof info for batchNum
|
||||
err = batchInfo.ServerProof.CalculateProof(ctx, zkInputs)
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
return &batchInfo, nil
|
||||
}
|
||||
|
||||
@@ -851,8 +856,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
p.proversPool.Add(batchInfo.ServerProof)
|
||||
batchInfo.ServerProof = nil
|
||||
batchInfo.Proof = proof
|
||||
batchInfo.PublicInputs = pubInputs
|
||||
batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo)
|
||||
|
||||
@@ -616,6 +616,6 @@ PoolTransfer(0) User2-User3: 300 (126)
|
||||
// TODO: Test Reorg
|
||||
// TODO: Test Pipeline
|
||||
// TODO: Test TxMonitor
|
||||
// TODO: Test forgeSendServerProof
|
||||
// TODO: Test forgeBatch
|
||||
// TODO: Test waitServerProof
|
||||
// TODO: Test handleReorg
|
||||
|
||||
Reference in New Issue
Block a user