Browse Source

Fix missing timer reset in TxManager

Also, replace usage of time.Duration by time.NewTimer, because the later allows
replacing timers by stopping them before so that we never leak resources.
feature/edu
Eduard S 3 years ago
parent
commit
2a5992d218
4 changed files with 37 additions and 25 deletions
  1. +11
    -8
      coordinator/coordinator.go
  2. +7
    -7
      coordinator/pipeline.go
  3. +8
    -5
      coordinator/txmanager.go
  4. +11
    -5
      test/proofserver/proofserver.go

+ 11
- 8
coordinator/coordinator.go

@ -508,7 +508,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1)
go func() {
waitCh := time.After(longWaitDuration)
timer := time.NewTimer(longWaitDuration)
for {
select {
case <-c.ctx.Done():
@ -520,24 +520,27 @@ func (c *Coordinator) Start() {
continue
} else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval)
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue
}
waitCh = time.After(longWaitDuration)
case <-waitCh:
case <-timer.C:
timer.Reset(longWaitDuration)
if !c.stats.Synced() {
waitCh = time.After(longWaitDuration)
continue
}
if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil {
waitCh = time.After(longWaitDuration)
continue
} else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval)
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue
}
waitCh = time.After(longWaitDuration)
}
}
}()

+ 7
- 7
coordinator/pipeline.go

@ -271,7 +271,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.wg.Add(1)
go func() {
waitCh := time.After(zeroDuration)
timer := time.NewTimer(zeroDuration)
for {
select {
case <-p.ctx.Done():
@ -281,23 +281,21 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats
p.syncSCVars(statsVars.Vars)
case <-waitCh:
case <-timer.C:
timer.Reset(p.cfg.ForgeRetryInterval)
// Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we
// wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
}
batchNum = p.state.batchNum + 1
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
if p.ctx.Err() != nil {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced ||
tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
} else if err != nil {
p.setErrAtBatchNum(batchNum)
@ -306,7 +304,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
"Pipeline.handleForgBatch: %v", err),
FailedBatchNum: batchNum,
})
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
}
p.lastForgeTime = time.Now()
@ -316,7 +313,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done():
}
waitCh = time.After(zeroDuration)
if !timer.Stop() {
<-timer.C
}
timer.Reset(zeroDuration)
}
}
}()

+ 8
- 5
coordinator/txmanager.go

@ -419,8 +419,6 @@ func (q *Queue) Push(batchInfo *BatchInfo) {
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
waitCh := time.After(longWaitDuration)
var statsVars statsVars
select {
case statsVars = <-t.statsVarsCh:
@ -431,6 +429,7 @@ func (t *TxManager) Run(ctx context.Context) {
log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
timer := time.NewTimer(longWaitDuration)
for {
select {
case <-ctx.Done():
@ -474,13 +473,17 @@ func (t *TxManager) Run(ctx context.Context) {
continue
}
t.queue.Push(batchInfo)
waitCh = time.After(t.cfg.TxManagerCheckInterval)
case <-waitCh:
if !timer.Stop() {
<-timer.C
}
timer.Reset(t.cfg.TxManagerCheckInterval)
case <-timer.C:
queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil {
waitCh = time.After(longWaitDuration)
timer.Reset(longWaitDuration)
continue
}
timer.Reset(t.cfg.TxManagerCheckInterval)
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck

+ 11
- 5
test/proofserver/proofserver.go

@ -146,7 +146,7 @@ const longWaitDuration = 999 * time.Hour
// const provingDuration = 2 * time.Second
func (s *Mock) runProver(ctx context.Context) {
waitCh := time.After(longWaitDuration)
timer := time.NewTimer(longWaitDuration)
for {
select {
case <-ctx.Done():
@ -154,21 +154,27 @@ func (s *Mock) runProver(ctx context.Context) {
case msg := <-s.msgCh:
switch msg.value {
case "cancel":
waitCh = time.After(longWaitDuration)
if !timer.Stop() {
<-timer.C
}
timer.Reset(longWaitDuration)
s.Lock()
if !s.status.IsReady() {
s.status = prover.StatusCodeAborted
}
s.Unlock()
case "prove":
waitCh = time.After(s.provingDuration)
if !timer.Stop() {
<-timer.C
}
timer.Reset(s.provingDuration)
s.Lock()
s.status = prover.StatusCodeBusy
s.Unlock()
}
msg.ackCh <- true
case <-waitCh:
waitCh = time.After(longWaitDuration)
case <-timer.C:
timer.Reset(longWaitDuration)
s.Lock()
if s.status != prover.StatusCodeBusy {
s.Unlock()

Loading…
Cancel
Save