Browse Source

Merge pull request #583 from hermeznetwork/fix/checkinterval

Fix missing timer reset in TxManager
feature/serveapicli
arnau 3 years ago
committed by GitHub
parent
commit
5ff0350f51
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 25 deletions
  1. +11
    -8
      coordinator/coordinator.go
  2. +7
    -7
      coordinator/pipeline.go
  3. +8
    -5
      coordinator/txmanager.go
  4. +11
    -5
      test/proofserver/proofserver.go

+ 11
- 8
coordinator/coordinator.go

@ -508,7 +508,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
waitCh := time.After(longWaitDuration)
timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
@ -520,24 +520,27 @@ func (c *Coordinator) Start() {
continue continue
} else if err != nil { } else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err) log.Errorw("Coordinator.handleMsg", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval)
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue continue
} }
waitCh = time.After(longWaitDuration)
case <-waitCh:
case <-timer.C:
timer.Reset(longWaitDuration)
if !c.stats.Synced() { if !c.stats.Synced() {
waitCh = time.After(longWaitDuration)
continue continue
} }
if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil { if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil {
waitCh = time.After(longWaitDuration)
continue continue
} else if err != nil { } else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err) log.Errorw("Coordinator.syncStats", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval)
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue continue
} }
waitCh = time.After(longWaitDuration)
} }
} }
}() }()

+ 7
- 7
coordinator/pipeline.go

@ -271,7 +271,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.wg.Add(1) p.wg.Add(1)
go func() { go func() {
waitCh := time.After(zeroDuration)
timer := time.NewTimer(zeroDuration)
for { for {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
@ -281,23 +281,21 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case statsVars := <-p.statsVarsCh: case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats p.stats = statsVars.Stats
p.syncSCVars(statsVars.Vars) p.syncSCVars(statsVars.Vars)
case <-waitCh:
case <-timer.C:
timer.Reset(p.cfg.ForgeRetryInterval)
// Once errAtBatchNum != 0, we stop forging // Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we // batches because there's been an error and we
// wait for the pipeline to be stopped. // wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 { if p.getErrAtBatchNum() != 0 {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} }
batchNum = p.state.batchNum + 1 batchNum = p.state.batchNum + 1
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
if p.ctx.Err() != nil { if p.ctx.Err() != nil {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced ||
tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay { tracerr.Unwrap(err) == errForgeBeforeDelay {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} else if err != nil { } else if err != nil {
p.setErrAtBatchNum(batchNum) p.setErrAtBatchNum(batchNum)
@ -306,7 +304,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
"Pipeline.handleForgBatch: %v", err), "Pipeline.handleForgBatch: %v", err),
FailedBatchNum: batchNum, FailedBatchNum: batchNum,
}) })
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} }
p.lastForgeTime = time.Now() p.lastForgeTime = time.Now()
@ -316,7 +313,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case batchChSentServerProof <- batchInfo: case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done(): case <-p.ctx.Done():
} }
waitCh = time.After(zeroDuration)
if !timer.Stop() {
<-timer.C
}
timer.Reset(zeroDuration)
} }
} }
}() }()

+ 8
- 5
coordinator/txmanager.go

@ -419,8 +419,6 @@ func (q *Queue) Push(batchInfo *BatchInfo) {
// Run the TxManager // Run the TxManager
func (t *TxManager) Run(ctx context.Context) { func (t *TxManager) Run(ctx context.Context) {
waitCh := time.After(longWaitDuration)
var statsVars statsVars var statsVars statsVars
select { select {
case statsVars = <-t.statsVarsCh: case statsVars = <-t.statsVarsCh:
@ -431,6 +429,7 @@ func (t *TxManager) Run(ctx context.Context) {
log.Infow("TxManager: received initial statsVars", log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -474,13 +473,17 @@ func (t *TxManager) Run(ctx context.Context) {
continue continue
} }
t.queue.Push(batchInfo) t.queue.Push(batchInfo)
waitCh = time.After(t.cfg.TxManagerCheckInterval)
case <-waitCh:
if !timer.Stop() {
<-timer.C
}
timer.Reset(t.cfg.TxManagerCheckInterval)
case <-timer.C:
queuePosition, batchInfo := t.queue.Next() queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil { if batchInfo == nil {
waitCh = time.After(longWaitDuration)
timer.Reset(longWaitDuration)
continue continue
} }
timer.Reset(t.cfg.TxManagerCheckInterval)
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue continue
} else if err != nil { //nolint:staticcheck } else if err != nil { //nolint:staticcheck

+ 11
- 5
test/proofserver/proofserver.go

@ -146,7 +146,7 @@ const longWaitDuration = 999 * time.Hour
// const provingDuration = 2 * time.Second // const provingDuration = 2 * time.Second
func (s *Mock) runProver(ctx context.Context) { func (s *Mock) runProver(ctx context.Context) {
waitCh := time.After(longWaitDuration)
timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -154,21 +154,27 @@ func (s *Mock) runProver(ctx context.Context) {
case msg := <-s.msgCh: case msg := <-s.msgCh:
switch msg.value { switch msg.value {
case "cancel": case "cancel":
waitCh = time.After(longWaitDuration)
if !timer.Stop() {
<-timer.C
}
timer.Reset(longWaitDuration)
s.Lock() s.Lock()
if !s.status.IsReady() { if !s.status.IsReady() {
s.status = prover.StatusCodeAborted s.status = prover.StatusCodeAborted
} }
s.Unlock() s.Unlock()
case "prove": case "prove":
waitCh = time.After(s.provingDuration)
if !timer.Stop() {
<-timer.C
}
timer.Reset(s.provingDuration)
s.Lock() s.Lock()
s.status = prover.StatusCodeBusy s.status = prover.StatusCodeBusy
s.Unlock() s.Unlock()
} }
msg.ackCh <- true msg.ackCh <- true
case <-waitCh:
waitCh = time.After(longWaitDuration)
case <-timer.C:
timer.Reset(longWaitDuration)
s.Lock() s.Lock()
if s.status != prover.StatusCodeBusy { if s.status != prover.StatusCodeBusy {
s.Unlock() s.Unlock()

Loading…
Cancel
Save