Make coordinator more responsive

- API:
	- Replace `emergencyModeStaringTime` by `emercengyModeStartingBlock`
- Synchronizer:
	- Track emergency mode starting block
- cli/node
	- Add working coordinator config
- coordinator:
	- Retry handler for synchronizer stats in case of error (instead of
	  waiting for the next block to try again)
	- On init, trigger an initial call to the handler for synced block
	  before waiting for the synchronizer, to force the coordinator to start
	  its logic even if there's no new block right after the node has been
	  started (very useful for running in testnet where the frequency of
	  blocks is variable)
	- Merge Msg for synced block and updated vars into one: `MsgSyncBlock`.
This commit is contained in:
Eduard S
2020-12-11 19:35:33 +01:00
parent a165bda17d
commit a7351992cd
12 changed files with 273 additions and 228 deletions

View File

@@ -36,6 +36,9 @@ type Config struct {
// EthClientAttempts is the number of attempts to do an eth client RPC
// call before giving up
EthClientAttempts int
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time.Duration
// EthClientAttemptsDelay is delay between attempts do do an eth client
// RPC call
EthClientAttemptsDelay time.Duration
@@ -64,6 +67,7 @@ type Coordinator struct {
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
stats *synchronizer.Stats
started bool
cfg Config
@@ -150,14 +154,9 @@ func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
type MsgSyncBlock struct {
Stats synchronizer.Stats
Batches []common.BatchData
}
// MsgSyncSCVars indicates an update to Smart Contract Vars
// TODO: Move this to MsgSyncBlock and remove MsgSyncSCVars
type MsgSyncSCVars struct {
Rollup *common.RollupVariables
Auction *common.AuctionVariables
WDelayer *common.WDelayerVariables
// Vars contains each Smart Contract variables if they are updated, or
// nil if they haven't changed.
Vars synchronizer.SCVariablesPtr
}
// MsgSyncReorg indicates a reorg
@@ -175,15 +174,15 @@ func (c *Coordinator) SendMsg(msg interface{}) {
c.msgCh <- msg
}
func (c *Coordinator) handleMsgSyncSCVars(msg *MsgSyncSCVars) {
if msg.Rollup != nil {
c.vars.Rollup = *msg.Rollup
func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
c.vars.Rollup = *vars.Rollup
}
if msg.Auction != nil {
c.vars.Auction = *msg.Auction
if vars.Auction != nil {
c.vars.Auction = *vars.Auction
}
if msg.WDelayer != nil {
c.vars.WDelayer = *msg.WDelayer
if vars.WDelayer != nil {
c.vars.WDelayer = *vars.WDelayer
}
}
@@ -200,12 +199,7 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
return false
}
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
stats := &msg.Stats
// batches := msg.Batches
if !stats.Synced() {
return nil
}
func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error {
c.txManager.SetLastBlock(stats.Eth.LastBlock.Num)
canForge := c.canForge(stats)
@@ -260,6 +254,16 @@ func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock)
return nil
}
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
c.stats = &msg.Stats
// batches := msg.Batches
if !c.stats.Synced() {
return nil
}
c.syncSCVars(msg.Vars)
return c.syncStats(ctx, c.stats)
}
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error {
if c.pipeline != nil {
c.pipeline.Stop(c.ctx)
@@ -271,6 +275,33 @@ func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) err
return nil
}
func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case MsgSyncBlock:
if err := c.handleMsgSyncBlock(ctx, &msg); common.IsErrDone(err) {
return nil
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleMsgSyncBlock error: %w", err))
}
case MsgSyncReorg:
if err := c.handleReorg(ctx, &msg.Stats); common.IsErrDone(err) {
return nil
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleReorg error: %w", err))
}
case MsgStopPipeline:
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
if err := c.handleStopPipeline(ctx, msg.Reason); common.IsErrDone(err) {
return nil
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err))
}
default:
log.Fatalw("Coordinator Unexpected Coordinator msg of type %T: %+v", msg, msg)
}
return nil
}
// Start the coordinator
func (c *Coordinator) Start() {
if c.started {
@@ -285,6 +316,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1)
go func() {
waitDuration := time.Duration(longWaitDuration)
for {
select {
case <-c.ctx.Done():
@@ -292,33 +324,23 @@ func (c *Coordinator) Start() {
c.wg.Done()
return
case msg := <-c.msgCh:
switch msg := msg.(type) {
case MsgSyncBlock:
if err := c.handleMsgSyncBlock(c.ctx, &msg); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleMsgSyncBlock error", "err", err)
continue
}
case MsgSyncReorg:
if err := c.handleReorg(c.ctx, &msg.Stats); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleReorg error", "err", err)
continue
}
case MsgStopPipeline:
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
if err := c.handleStopPipeline(c.ctx, msg.Reason); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleStopPipeline", "err", err)
}
case MsgSyncSCVars:
c.handleMsgSyncSCVars(&msg)
default:
log.Fatalw("Coordinator Unexpected Coordinator msg of type %T: %+v", msg, msg)
if err := c.handleMsg(c.ctx, msg); err != nil {
log.Errorw("Coordinator.handleMsg", "err", err)
waitDuration = time.Duration(c.cfg.SyncRetryInterval)
continue
}
waitDuration = time.Duration(longWaitDuration)
case <-time.After(waitDuration):
if c.stats == nil {
waitDuration = time.Duration(longWaitDuration)
continue
}
if err := c.syncStats(c.ctx, c.stats); err != nil {
log.Errorw("Coordinator.syncStats", "err", err)
waitDuration = time.Duration(c.cfg.SyncRetryInterval)
continue
}
waitDuration = time.Duration(longWaitDuration)
}
}
}()
@@ -344,19 +366,20 @@ func (c *Coordinator) Stop() {
}
func (c *Coordinator) handleReorg(ctx context.Context, stats *synchronizer.Stats) error {
if common.BatchNum(stats.Sync.LastBatch) < c.pipelineBatchNum {
c.stats = stats
if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum",
"sync.LastBatch", stats.Sync.LastBatch,
"sync.LastBatch", c.stats.Sync.LastBatch,
"c.pipelineBatchNum", c.pipelineBatchNum)
if err := c.handleStopPipeline(ctx, "reorg"); err != nil {
return tracerr.Wrap(err)
}
if err := c.l2DB.Reorg(common.BatchNum(stats.Sync.LastBatch)); err != nil {
if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil {
return tracerr.Wrap(err)
}
}
@@ -481,12 +504,12 @@ func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
return nil, nil
}
const longWaitTime = 999 * time.Hour
const longWaitDuration = 999 * time.Hour
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
next := 0
waitTime := time.Duration(longWaitTime)
waitDuration := time.Duration(longWaitDuration)
for {
select {
case <-ctx.Done():
@@ -503,8 +526,8 @@ func (t *TxManager) Run(ctx context.Context) {
}
log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum)
t.queue = append(t.queue, batchInfo)
waitTime = t.cfg.TxManagerCheckInterval
case <-time.After(waitTime):
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
continue
}
@@ -531,7 +554,7 @@ func (t *TxManager) Run(ctx context.Context) {
"batch", batchInfo.BatchNum)
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
waitTime = longWaitTime
waitDuration = longWaitDuration
next = 0
} else {
next = current % len(t.queue)