From 101a9547754f8d1eb170cbdddb03aa1c42960c99 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Mon, 11 Jan 2021 14:07:47 +0100 Subject: [PATCH] Fix forging L1Batch too early When scheduling an L1Batch, make sure the previous L1Batch has been synchronized. Otherwise, an L1Batch will be forged that may not contain all the L1UserTxs that are supposed to be included. --- cli/node/cfg.buidler.toml | 1 + config/config.go | 3 + coordinator/batch.go | 2 +- coordinator/coordinator.go | 126 ++++++++++++++++++++++------------- node/node.go | 1 + synchronizer/synchronizer.go | 1 + 6 files changed, 86 insertions(+), 48 deletions(-) diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index ec9edb9..6d11f7f 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -46,6 +46,7 @@ ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator ConfirmBlocks = 10 L1BatchTimeoutPerc = 0.4 ProofServerPollInterval = "1s" +ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" [Coordinator.FeeAccount] diff --git a/config/config.go b/config/config.go index bc092cc..421d34e 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,9 @@ type Coordinator struct { // ProofServerPollInterval is the waiting interval between polling the // ProofServer while waiting for a particular status ProofServerPollInterval Duration `validate:"required"` + // ForgeRetryInterval is the waiting interval between calls forge a + // batch after an error + ForgeRetryInterval Duration `validate:"required"` // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` diff --git a/coordinator/batch.go b/coordinator/batch.go index 04af0bf..b2acf67 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -98,7 +98,7 @@ func (b *BatchInfo) DebugStore(storePath string) error { // nolint reason: hardcoded 1_000_000 is the number of nanoseconds in a // millisecond //nolint:gomnd - filename := fmt.Sprintf("%08d-%v.%v.json", b.BatchNum, + filename := fmt.Sprintf("%08d-%v.%03d.json", b.BatchNum, b.Debug.StartTimestamp.Unix(), b.Debug.StartTimestamp.Nanosecond()/1_000_000) // nolint reason: 0640 allows rw to owner and r to group //nolint:gosec diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 8a85a50..3b64520 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -24,6 +24,8 @@ import ( "github.com/hermeznetwork/tracerr" ) +var errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + const queueLen = 16 // Config contains the Coordinator configuration @@ -39,6 +41,9 @@ type Config struct { // EthClientAttempts is the number of attempts to do an eth client RPC // call before giving up EthClientAttempts int + // ForgeRetryInterval is the waiting interval between calls forge a + // batch after an error + ForgeRetryInterval time.Duration // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration @@ -225,8 +230,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) } - if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum, - stats, &c.vars); err != nil { + if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { c.pipeline = nil return tracerr.Wrap(err) } @@ -348,7 +352,7 @@ func (c *Coordinator) Start() { c.wg.Add(1) go func() { - waitDuration := time.Duration(longWaitDuration) + waitDuration := longWaitDuration for { select { case <-c.ctx.Done(): @@ -360,23 +364,23 @@ func (c *Coordinator) Start() { continue } else if err != nil { log.Errorw("Coordinator.handleMsg", "err", err) - waitDuration = time.Duration(c.cfg.SyncRetryInterval) + waitDuration = c.cfg.SyncRetryInterval continue } - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration case <-time.After(waitDuration): if c.stats == nil { - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration continue } if err := c.syncStats(c.ctx, c.stats); c.ctx.Err() != nil { continue } else if err != nil { log.Errorw("Coordinator.syncStats", "err", err) - waitDuration = time.Duration(c.cfg.SyncRetryInterval) + waitDuration = c.cfg.SyncRetryInterval continue } - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration } } }() @@ -540,7 +544,7 @@ const longWaitDuration = 999 * time.Hour // Run the TxManager func (t *TxManager) Run(ctx context.Context) { next := 0 - waitDuration := time.Duration(longWaitDuration) + waitDuration := longWaitDuration for { select { @@ -675,10 +679,10 @@ func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronize } // reset pipeline state -func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64, +func (p *Pipeline) reset(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { p.batchNum = batchNum - p.lastForgeL1TxsNum = lastForgeL1TxsNum + p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum p.stats = *stats p.vars = *vars p.lastScheduledL1BatchBlockNum = 0 @@ -706,15 +710,49 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { } } +func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + batchInfo, err := p.forgeBatch(batchNum) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, + "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else { + log.Errorw("forgeBatch", "err", err) + } + return nil, err + } + // 6. Wait for an available server proof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, err + } + batchInfo.ServerProof = serverProof + if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("sendServerProof", "err", err) + batchInfo.ServerProof = nil + p.proversPool.Add(serverProof) + return nil, err + } + return batchInfo, nil +} + // Start the forging pipeline -func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, +func (p *Pipeline) Start(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { if p.started { log.Fatal("Pipeline already started") } p.started = true - if err := p.reset(batchNum, lastForgeL1TxsNum, stats, vars); err != nil { + if err := p.reset(batchNum, stats, vars); err != nil { return tracerr.Wrap(err) } p.ctx, p.cancel = context.WithCancel(context.Background()) @@ -723,7 +761,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, batchChSentServerProof := make(chan *BatchInfo, queueSize) p.wg.Add(1) + const zeroDuration = 0 * time.Second go func() { + waitDuration := zeroDuration for { select { case <-p.ctx.Done(): @@ -733,34 +773,15 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, case statsVars := <-p.statsVarsCh: p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) - default: + case <-time.After(waitDuration): batchNum = p.batchNum + 1 - batchInfo, err := p.forgeBatch(batchNum) - if p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("forgeBatch", "err", err) - continue - } - // 6. Wait for an available server proof (blocking call) - serverProof, err := p.proversPool.Get(p.ctx) - if p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("proversPool.Get", "err", err) - continue - } - batchInfo.ServerProof = serverProof - if err := p.sendServerProof(p.ctx, batchInfo); p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("sendServerProof", "err", err) - batchInfo.ServerProof = nil - p.proversPool.Add(serverProof) + if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + waitDuration = p.cfg.SyncRetryInterval continue + } else { + p.batchNum = batchNum + batchChSentServerProof <- batchInfo } - p.batchNum = batchNum - batchChSentServerProof <- batchInfo } } }() @@ -823,9 +844,9 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er } // forgeBatch the next batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { // remove transactions from the pool that have been there for too long - _, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { return nil, tracerr.Wrap(err) @@ -835,7 +856,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { return nil, tracerr.Wrap(err) } - batchInfo := BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch batchInfo.Debug.StartTimestamp = time.Now() batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 @@ -851,12 +872,23 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { var coordIdxs []common.Idx // 1. Decide if we forge L2Tx or L1+L2Tx - if p.shouldL1L2Batch(&batchInfo) { + if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + defer func() { + // If there's no error, update the parameters related + // to the last L1Batch forged + if err == nil { + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.lastForgeL1TxsNum++ + } + }() + if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + return nil, tracerr.Wrap(errLastL1BatchNotSynced) + //return nil, fmt.Errorf("Not synced yet LastForgeL1TxsNum. Expecting %v, got %v", + // p.lastForgeL1TxsNum, p.stats.Sync.LastForgeL1TxsNum) + } // 2a: L1+L2 txs - p.lastForgeL1TxsNum++ - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum) + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) if err != nil { return nil, tracerr.Wrap(err) } @@ -914,9 +946,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { // 5. Save metadata from BatchBuilder output for BatchNum batchInfo.ZKInputs = zkInputs batchInfo.Debug.Status = StatusForged - p.cfg.debugBatchStore(&batchInfo) + p.cfg.debugBatchStore(batchInfo) - return &batchInfo, nil + return batchInfo, nil } // waitServerProof gets the generated zkProof & sends it to the SmartContract diff --git a/node/node.go b/node/node.go index df16d8b..2ee8789 100644 --- a/node/node.go +++ b/node/node.go @@ -261,6 +261,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgerAddress: cfg.Coordinator.ForgerAddress, ConfirmBlocks: cfg.Coordinator.ConfirmBlocks, L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc, + ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 9155020..88d94a7 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -63,6 +63,7 @@ func NewStatsHolder(firstBlockNum int64, refreshPeriod time.Duration) *StatsHold stats := Stats{} stats.Eth.RefreshPeriod = refreshPeriod stats.Eth.FirstBlockNum = firstBlockNum + stats.Sync.LastForgeL1TxsNum = -1 return &StatsHolder{Stats: stats} }