Browse Source

Fix timers resetting in coordinator loops

feature/txprocessor-update
Eduard S 3 years ago
parent
commit
264f01b572
4 changed files with 24 additions and 21 deletions
  1. +8
    -7
      coordinator/coordinator.go
  2. +7
    -5
      coordinator/pipeline.go
  3. +4
    -4
      coordinator/txmanager.go
  4. +5
    -5
      test/proofserver/proofserver.go

+ 8
- 7
coordinator/coordinator.go

@ -494,7 +494,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1)
go func() {
waitDuration := longWaitDuration
waitCh := time.After(longWaitDuration)
for {
select {
case <-c.ctx.Done():
@ -506,23 +506,24 @@ func (c *Coordinator) Start() {
continue
} else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err)
waitDuration = c.cfg.SyncRetryInterval
waitCh = time.After(c.cfg.SyncRetryInterval)
continue
}
waitDuration = longWaitDuration
case <-time.After(waitDuration):
waitCh = time.After(longWaitDuration)
case <-waitCh:
if !c.stats.Synced() {
waitDuration = longWaitDuration
waitCh = time.After(longWaitDuration)
continue
}
if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil {
waitCh = time.After(longWaitDuration)
continue
} else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err)
waitDuration = c.cfg.SyncRetryInterval
waitCh = time.After(c.cfg.SyncRetryInterval)
continue
}
waitDuration = longWaitDuration
waitCh = time.After(longWaitDuration)
}
}
}()

+ 7
- 5
coordinator/pipeline.go

@ -253,7 +253,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.wg.Add(1)
go func() {
waitDuration := zeroDuration
waitCh := time.After(zeroDuration)
for {
select {
case <-p.ctx.Done():
@ -263,31 +263,32 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats
p.syncSCVars(statsVars.Vars)
case <-time.After(waitDuration):
case <-waitCh:
// Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we
// wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 {
waitDuration = p.cfg.ForgeRetryInterval
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
}
batchNum = p.state.batchNum + 1
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
if p.ctx.Err() != nil {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced ||
tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay {
waitDuration = p.cfg.ForgeRetryInterval
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
} else if err != nil {
p.setErrAtBatchNum(batchNum)
waitDuration = p.cfg.ForgeRetryInterval
p.coord.SendMsg(p.ctx, MsgStopPipeline{
Reason: fmt.Sprintf(
"Pipeline.handleForgBatch: %v", err),
FailedBatchNum: batchNum,
})
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue
}
p.lastForgeTime = time.Now()
@ -297,6 +298,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done():
}
waitCh = time.After(zeroDuration)
}
}
}()

+ 4
- 4
coordinator/txmanager.go

@ -416,7 +416,7 @@ func (q *Queue) Push(batchInfo *BatchInfo) {
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
waitDuration := longWaitDuration
waitCh := time.After(longWaitDuration)
var statsVars statsVars
select {
@ -471,11 +471,11 @@ func (t *TxManager) Run(ctx context.Context) {
continue
}
t.queue.Push(batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
waitCh = time.After(t.cfg.TxManagerCheckInterval)
case <-waitCh:
queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil {
waitDuration = longWaitDuration
waitCh = time.After(longWaitDuration)
continue
}
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {

+ 5
- 5
test/proofserver/proofserver.go

@ -145,7 +145,7 @@ const longWaitDuration = 999 * time.Hour
// const provingDuration = 2 * time.Second
func (s *Mock) runProver(ctx context.Context) {
waitDuration := longWaitDuration
waitCh := time.After(longWaitDuration)
for {
select {
case <-ctx.Done():
@ -153,21 +153,21 @@ func (s *Mock) runProver(ctx context.Context) {
case msg := <-s.msgCh:
switch msg.value {
case "cancel":
waitDuration = longWaitDuration
waitCh = time.After(longWaitDuration)
s.Lock()
if !s.status.IsReady() {
s.status = prover.StatusCodeAborted
}
s.Unlock()
case "prove":
waitDuration = s.provingDuration
waitCh = time.After(s.provingDuration)
s.Lock()
s.status = prover.StatusCodeBusy
s.Unlock()
}
msg.ackCh <- true
case <-time.After(waitDuration):
waitDuration = longWaitDuration
case <-waitCh:
waitCh = time.After(longWaitDuration)
s.Lock()
if s.status != prover.StatusCodeBusy {
s.Unlock()

Loading…
Cancel
Save