diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index ec9edb9..6d11f7f 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -46,6 +46,7 @@ ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator ConfirmBlocks = 10 L1BatchTimeoutPerc = 0.4 ProofServerPollInterval = "1s" +ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" [Coordinator.FeeAccount] diff --git a/config/config.go b/config/config.go index bc092cc..421d34e 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,9 @@ type Coordinator struct { // ProofServerPollInterval is the waiting interval between polling the // ProofServer while waiting for a particular status ProofServerPollInterval Duration `validate:"required"` + // ForgeRetryInterval is the waiting interval between calls forge a + // batch after an error + ForgeRetryInterval Duration `validate:"required"` // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` diff --git a/coordinator/batch.go b/coordinator/batch.go index 04af0bf..b2acf67 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -98,7 +98,7 @@ func (b *BatchInfo) DebugStore(storePath string) error { // nolint reason: hardcoded 1_000_000 is the number of nanoseconds in a // millisecond //nolint:gomnd - filename := fmt.Sprintf("%08d-%v.%v.json", b.BatchNum, + filename := fmt.Sprintf("%08d-%v.%03d.json", b.BatchNum, b.Debug.StartTimestamp.Unix(), b.Debug.StartTimestamp.Nanosecond()/1_000_000) // nolint reason: 0640 allows rw to owner and r to group //nolint:gosec diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 8a85a50..3b64520 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -24,6 +24,8 @@ import ( "github.com/hermeznetwork/tracerr" ) +var errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + const queueLen = 16 // Config contains the Coordinator configuration @@ -39,6 +41,9 @@ type Config struct { // EthClientAttempts is the number of attempts to do an eth client RPC // call before giving up EthClientAttempts int + // ForgeRetryInterval is the waiting interval between calls forge a + // batch after an error + ForgeRetryInterval time.Duration // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration @@ -225,8 +230,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) } - if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum, - stats, &c.vars); err != nil { + if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { c.pipeline = nil return tracerr.Wrap(err) } @@ -348,7 +352,7 @@ func (c *Coordinator) Start() { c.wg.Add(1) go func() { - waitDuration := time.Duration(longWaitDuration) + waitDuration := longWaitDuration for { select { case <-c.ctx.Done(): @@ -360,23 +364,23 @@ func (c *Coordinator) Start() { continue } else if err != nil { log.Errorw("Coordinator.handleMsg", "err", err) - waitDuration = time.Duration(c.cfg.SyncRetryInterval) + waitDuration = c.cfg.SyncRetryInterval continue } - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration case <-time.After(waitDuration): if c.stats == nil { - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration continue } if err := c.syncStats(c.ctx, c.stats); c.ctx.Err() != nil { continue } else if err != nil { log.Errorw("Coordinator.syncStats", "err", err) - waitDuration = time.Duration(c.cfg.SyncRetryInterval) + waitDuration = c.cfg.SyncRetryInterval continue } - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration } } }() @@ -540,7 +544,7 @@ const longWaitDuration = 999 * time.Hour // Run the TxManager func (t *TxManager) Run(ctx context.Context) { next := 0 - waitDuration := time.Duration(longWaitDuration) + waitDuration := longWaitDuration for { select { @@ -675,10 +679,10 @@ func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronize } // reset pipeline state -func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64, +func (p *Pipeline) reset(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { p.batchNum = batchNum - p.lastForgeL1TxsNum = lastForgeL1TxsNum + p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum p.stats = *stats p.vars = *vars p.lastScheduledL1BatchBlockNum = 0 @@ -706,15 +710,49 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { } } +func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + batchInfo, err := p.forgeBatch(batchNum) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, + "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else { + log.Errorw("forgeBatch", "err", err) + } + return nil, err + } + // 6. Wait for an available server proof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, err + } + batchInfo.ServerProof = serverProof + if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("sendServerProof", "err", err) + batchInfo.ServerProof = nil + p.proversPool.Add(serverProof) + return nil, err + } + return batchInfo, nil +} + // Start the forging pipeline -func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, +func (p *Pipeline) Start(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { if p.started { log.Fatal("Pipeline already started") } p.started = true - if err := p.reset(batchNum, lastForgeL1TxsNum, stats, vars); err != nil { + if err := p.reset(batchNum, stats, vars); err != nil { return tracerr.Wrap(err) } p.ctx, p.cancel = context.WithCancel(context.Background()) @@ -723,7 +761,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, batchChSentServerProof := make(chan *BatchInfo, queueSize) p.wg.Add(1) + const zeroDuration = 0 * time.Second go func() { + waitDuration := zeroDuration for { select { case <-p.ctx.Done(): @@ -733,34 +773,15 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, case statsVars := <-p.statsVarsCh: p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) - default: + case <-time.After(waitDuration): batchNum = p.batchNum + 1 - batchInfo, err := p.forgeBatch(batchNum) - if p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("forgeBatch", "err", err) - continue - } - // 6. Wait for an available server proof (blocking call) - serverProof, err := p.proversPool.Get(p.ctx) - if p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("proversPool.Get", "err", err) - continue - } - batchInfo.ServerProof = serverProof - if err := p.sendServerProof(p.ctx, batchInfo); p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("sendServerProof", "err", err) - batchInfo.ServerProof = nil - p.proversPool.Add(serverProof) + if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + waitDuration = p.cfg.SyncRetryInterval continue + } else { + p.batchNum = batchNum + batchChSentServerProof <- batchInfo } - p.batchNum = batchNum - batchChSentServerProof <- batchInfo } } }() @@ -823,9 +844,9 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er } // forgeBatch the next batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { // remove transactions from the pool that have been there for too long - _, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { return nil, tracerr.Wrap(err) @@ -835,7 +856,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { return nil, tracerr.Wrap(err) } - batchInfo := BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch batchInfo.Debug.StartTimestamp = time.Now() batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 @@ -851,12 +872,23 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { var coordIdxs []common.Idx // 1. Decide if we forge L2Tx or L1+L2Tx - if p.shouldL1L2Batch(&batchInfo) { + if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + defer func() { + // If there's no error, update the parameters related + // to the last L1Batch forged + if err == nil { + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.lastForgeL1TxsNum++ + } + }() + if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + return nil, tracerr.Wrap(errLastL1BatchNotSynced) + //return nil, fmt.Errorf("Not synced yet LastForgeL1TxsNum. Expecting %v, got %v", + // p.lastForgeL1TxsNum, p.stats.Sync.LastForgeL1TxsNum) + } // 2a: L1+L2 txs - p.lastForgeL1TxsNum++ - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum) + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) if err != nil { return nil, tracerr.Wrap(err) } @@ -914,9 +946,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { // 5. Save metadata from BatchBuilder output for BatchNum batchInfo.ZKInputs = zkInputs batchInfo.Debug.Status = StatusForged - p.cfg.debugBatchStore(&batchInfo) + p.cfg.debugBatchStore(batchInfo) - return &batchInfo, nil + return batchInfo, nil } // waitServerProof gets the generated zkProof & sends it to the SmartContract diff --git a/node/node.go b/node/node.go index df16d8b..2ee8789 100644 --- a/node/node.go +++ b/node/node.go @@ -261,6 +261,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgerAddress: cfg.Coordinator.ForgerAddress, ConfirmBlocks: cfg.Coordinator.ConfirmBlocks, L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc, + ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 9155020..88d94a7 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -63,6 +63,7 @@ func NewStatsHolder(firstBlockNum int64, refreshPeriod time.Duration) *StatsHold stats := Stats{} stats.Eth.RefreshPeriod = refreshPeriod stats.Eth.FirstBlockNum = firstBlockNum + stats.Sync.LastForgeL1TxsNum = -1 return &StatsHolder{Stats: stats} }