diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 5263713..75028b0 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -508,7 +508,7 @@ func (c *Coordinator) Start() { c.wg.Add(1) go func() { - waitCh := time.After(longWaitDuration) + timer := time.NewTimer(longWaitDuration) for { select { case <-c.ctx.Done(): @@ -520,24 +520,27 @@ func (c *Coordinator) Start() { continue } else if err != nil { log.Errorw("Coordinator.handleMsg", "err", err) - waitCh = time.After(c.cfg.SyncRetryInterval) + if !timer.Stop() { + <-timer.C + } + timer.Reset(c.cfg.SyncRetryInterval) continue } - waitCh = time.After(longWaitDuration) - case <-waitCh: + case <-timer.C: + timer.Reset(longWaitDuration) if !c.stats.Synced() { - waitCh = time.After(longWaitDuration) continue } if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil { - waitCh = time.After(longWaitDuration) continue } else if err != nil { log.Errorw("Coordinator.syncStats", "err", err) - waitCh = time.After(c.cfg.SyncRetryInterval) + if !timer.Stop() { + <-timer.C + } + timer.Reset(c.cfg.SyncRetryInterval) continue } - waitCh = time.After(longWaitDuration) } } }() diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index c2d4e65..b7e0b7e 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -271,7 +271,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, p.wg.Add(1) go func() { - waitCh := time.After(zeroDuration) + timer := time.NewTimer(zeroDuration) for { select { case <-p.ctx.Done(): @@ -281,23 +281,21 @@ func (p *Pipeline) Start(batchNum common.BatchNum, case statsVars := <-p.statsVarsCh: p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) - case <-waitCh: + case <-timer.C: + timer.Reset(p.cfg.ForgeRetryInterval) // Once errAtBatchNum != 0, we stop forging // batches because there's been an error and we // wait for the pipeline to be stopped. if p.getErrAtBatchNum() != 0 { - waitCh = time.After(p.cfg.ForgeRetryInterval) continue } batchNum = p.state.batchNum + 1 batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { - waitCh = time.After(p.cfg.ForgeRetryInterval) continue } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || tracerr.Unwrap(err) == errForgeBeforeDelay { - waitCh = time.After(p.cfg.ForgeRetryInterval) continue } else if err != nil { p.setErrAtBatchNum(batchNum) @@ -306,7 +304,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum, "Pipeline.handleForgBatch: %v", err), FailedBatchNum: batchNum, }) - waitCh = time.After(p.cfg.ForgeRetryInterval) continue } p.lastForgeTime = time.Now() @@ -316,7 +313,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum, case batchChSentServerProof <- batchInfo: case <-p.ctx.Done(): } - waitCh = time.After(zeroDuration) + if !timer.Stop() { + <-timer.C + } + timer.Reset(zeroDuration) } } }() diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 674d717..3d753d6 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -419,8 +419,6 @@ func (q *Queue) Push(batchInfo *BatchInfo) { // Run the TxManager func (t *TxManager) Run(ctx context.Context) { - waitCh := time.After(longWaitDuration) - var statsVars statsVars select { case statsVars = <-t.statsVarsCh: @@ -431,6 +429,7 @@ func (t *TxManager) Run(ctx context.Context) { log.Infow("TxManager: received initial statsVars", "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) + timer := time.NewTimer(longWaitDuration) for { select { case <-ctx.Done(): @@ -474,13 +473,17 @@ func (t *TxManager) Run(ctx context.Context) { continue } t.queue.Push(batchInfo) - waitCh = time.After(t.cfg.TxManagerCheckInterval) - case <-waitCh: + if !timer.Stop() { + <-timer.C + } + timer.Reset(t.cfg.TxManagerCheckInterval) + case <-timer.C: queuePosition, batchInfo := t.queue.Next() if batchInfo == nil { - waitCh = time.After(longWaitDuration) + timer.Reset(longWaitDuration) continue } + timer.Reset(t.cfg.TxManagerCheckInterval) if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { continue } else if err != nil { //nolint:staticcheck diff --git a/test/proofserver/proofserver.go b/test/proofserver/proofserver.go index b995a53..01bedc9 100644 --- a/test/proofserver/proofserver.go +++ b/test/proofserver/proofserver.go @@ -146,7 +146,7 @@ const longWaitDuration = 999 * time.Hour // const provingDuration = 2 * time.Second func (s *Mock) runProver(ctx context.Context) { - waitCh := time.After(longWaitDuration) + timer := time.NewTimer(longWaitDuration) for { select { case <-ctx.Done(): @@ -154,21 +154,27 @@ func (s *Mock) runProver(ctx context.Context) { case msg := <-s.msgCh: switch msg.value { case "cancel": - waitCh = time.After(longWaitDuration) + if !timer.Stop() { + <-timer.C + } + timer.Reset(longWaitDuration) s.Lock() if !s.status.IsReady() { s.status = prover.StatusCodeAborted } s.Unlock() case "prove": - waitCh = time.After(s.provingDuration) + if !timer.Stop() { + <-timer.C + } + timer.Reset(s.provingDuration) s.Lock() s.status = prover.StatusCodeBusy s.Unlock() } msg.ackCh <- true - case <-waitCh: - waitCh = time.After(longWaitDuration) + case <-timer.C: + timer.Reset(longWaitDuration) s.Lock() if s.status != prover.StatusCodeBusy { s.Unlock()