diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index ae232d6..b6d922f 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -494,7 +494,7 @@ func (c *Coordinator) Start() { c.wg.Add(1) go func() { - waitDuration := longWaitDuration + waitCh := time.After(longWaitDuration) for { select { case <-c.ctx.Done(): @@ -506,23 +506,24 @@ func (c *Coordinator) Start() { continue } else if err != nil { log.Errorw("Coordinator.handleMsg", "err", err) - waitDuration = c.cfg.SyncRetryInterval + waitCh = time.After(c.cfg.SyncRetryInterval) continue } - waitDuration = longWaitDuration - case <-time.After(waitDuration): + waitCh = time.After(longWaitDuration) + case <-waitCh: if !c.stats.Synced() { - waitDuration = longWaitDuration + waitCh = time.After(longWaitDuration) continue } if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil { + waitCh = time.After(longWaitDuration) continue } else if err != nil { log.Errorw("Coordinator.syncStats", "err", err) - waitDuration = c.cfg.SyncRetryInterval + waitCh = time.After(c.cfg.SyncRetryInterval) continue } - waitDuration = longWaitDuration + waitCh = time.After(longWaitDuration) } } }() diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index 08ba352..aaeea59 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -253,7 +253,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.wg.Add(1) go func() { - waitDuration := zeroDuration + waitCh := time.After(zeroDuration) for { select { case <-p.ctx.Done(): @@ -263,31 +263,32 @@ func (p *Pipeline) Start(batchNum common.BatchNum, case statsVars := <-p.statsVarsCh: p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) - case <-time.After(waitDuration): + case <-waitCh: // Once errAtBatchNum != 0, we stop forging // batches because there's been an error and we // wait for the pipeline to be stopped. if p.getErrAtBatchNum() != 0 { - waitDuration = p.cfg.ForgeRetryInterval + waitCh = time.After(p.cfg.ForgeRetryInterval) continue } batchNum = p.state.batchNum + 1 batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { + waitCh = time.After(p.cfg.ForgeRetryInterval) continue } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || tracerr.Unwrap(err) == errForgeBeforeDelay { - waitDuration = p.cfg.ForgeRetryInterval + waitCh = time.After(p.cfg.ForgeRetryInterval) continue } else if err != nil { p.setErrAtBatchNum(batchNum) - waitDuration = p.cfg.ForgeRetryInterval p.coord.SendMsg(p.ctx, MsgStopPipeline{ Reason: fmt.Sprintf( "Pipeline.handleForgBatch: %v", err), FailedBatchNum: batchNum, }) + waitCh = time.After(p.cfg.ForgeRetryInterval) continue } p.lastForgeTime = time.Now() @@ -297,6 +298,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, case batchChSentServerProof <- batchInfo: case <-p.ctx.Done(): } + waitCh = time.After(zeroDuration) } } }() diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 3b2ef6a..b7f6bcb 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -416,7 +416,7 @@ func (q *Queue) Push(batchInfo *BatchInfo) { // Run the TxManager func (t *TxManager) Run(ctx context.Context) { - waitDuration := longWaitDuration + waitCh := time.After(longWaitDuration) var statsVars statsVars select { @@ -471,11 +471,11 @@ func (t *TxManager) Run(ctx context.Context) { continue } t.queue.Push(batchInfo) - waitDuration = t.cfg.TxManagerCheckInterval - case <-time.After(waitDuration): + waitCh = time.After(t.cfg.TxManagerCheckInterval) + case <-waitCh: queuePosition, batchInfo := t.queue.Next() if batchInfo == nil { - waitDuration = longWaitDuration + waitCh = time.After(longWaitDuration) continue } if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { diff --git a/test/proofserver/proofserver.go b/test/proofserver/proofserver.go index 791e110..8f6f3b6 100644 --- a/test/proofserver/proofserver.go +++ b/test/proofserver/proofserver.go @@ -145,7 +145,7 @@ const longWaitDuration = 999 * time.Hour // const provingDuration = 2 * time.Second func (s *Mock) runProver(ctx context.Context) { - waitDuration := longWaitDuration + waitCh := time.After(longWaitDuration) for { select { case <-ctx.Done(): @@ -153,21 +153,21 @@ func (s *Mock) runProver(ctx context.Context) { case msg := <-s.msgCh: switch msg.value { case "cancel": - waitDuration = longWaitDuration + waitCh = time.After(longWaitDuration) s.Lock() if !s.status.IsReady() { s.status = prover.StatusCodeAborted } s.Unlock() case "prove": - waitDuration = s.provingDuration + waitCh = time.After(s.provingDuration) s.Lock() s.status = prover.StatusCodeBusy s.Unlock() } msg.ackCh <- true - case <-time.After(waitDuration): - waitDuration = longWaitDuration + case <-waitCh: + waitCh = time.After(longWaitDuration) s.Lock() if s.status != prover.StatusCodeBusy { s.Unlock()