From 101a9547754f8d1eb170cbdddb03aa1c42960c99 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Mon, 11 Jan 2021 14:07:47 +0100 Subject: [PATCH 1/2] Fix forging L1Batch too early When scheduling an L1Batch, make sure the previous L1Batch has been synchronized. Otherwise, an L1Batch will be forged that may not contain all the L1UserTxs that are supposed to be included. --- cli/node/cfg.buidler.toml | 1 + config/config.go | 3 + coordinator/batch.go | 2 +- coordinator/coordinator.go | 126 ++++++++++++++++++++++------------- node/node.go | 1 + synchronizer/synchronizer.go | 1 + 6 files changed, 86 insertions(+), 48 deletions(-) diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index ec9edb9..6d11f7f 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -46,6 +46,7 @@ ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator ConfirmBlocks = 10 L1BatchTimeoutPerc = 0.4 ProofServerPollInterval = "1s" +ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" [Coordinator.FeeAccount] diff --git a/config/config.go b/config/config.go index bc092cc..421d34e 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,9 @@ type Coordinator struct { // ProofServerPollInterval is the waiting interval between polling the // ProofServer while waiting for a particular status ProofServerPollInterval Duration `validate:"required"` + // ForgeRetryInterval is the waiting interval between calls forge a + // batch after an error + ForgeRetryInterval Duration `validate:"required"` // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` diff --git a/coordinator/batch.go b/coordinator/batch.go index 04af0bf..b2acf67 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -98,7 +98,7 @@ func (b *BatchInfo) DebugStore(storePath string) error { // nolint reason: hardcoded 1_000_000 is the number of nanoseconds in a // millisecond //nolint:gomnd - filename := fmt.Sprintf("%08d-%v.%v.json", b.BatchNum, + filename := fmt.Sprintf("%08d-%v.%03d.json", b.BatchNum, b.Debug.StartTimestamp.Unix(), b.Debug.StartTimestamp.Nanosecond()/1_000_000) // nolint reason: 0640 allows rw to owner and r to group //nolint:gosec diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 8a85a50..3b64520 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -24,6 +24,8 @@ import ( "github.com/hermeznetwork/tracerr" ) +var errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") + const queueLen = 16 // Config contains the Coordinator configuration @@ -39,6 +41,9 @@ type Config struct { // EthClientAttempts is the number of attempts to do an eth client RPC // call before giving up EthClientAttempts int + // ForgeRetryInterval is the waiting interval between calls forge a + // batch after an error + ForgeRetryInterval time.Duration // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration @@ -225,8 +230,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) if c.pipeline, err = c.newPipeline(ctx); err != nil { return tracerr.Wrap(err) } - if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum, - stats, &c.vars); err != nil { + if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { c.pipeline = nil return tracerr.Wrap(err) } @@ -348,7 +352,7 @@ func (c *Coordinator) Start() { c.wg.Add(1) go func() { - waitDuration := time.Duration(longWaitDuration) + waitDuration := longWaitDuration for { select { case <-c.ctx.Done(): @@ -360,23 +364,23 @@ func (c *Coordinator) Start() { continue } else if err != nil { log.Errorw("Coordinator.handleMsg", "err", err) - waitDuration = time.Duration(c.cfg.SyncRetryInterval) + waitDuration = c.cfg.SyncRetryInterval continue } - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration case <-time.After(waitDuration): if c.stats == nil { - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration continue } if err := c.syncStats(c.ctx, c.stats); c.ctx.Err() != nil { continue } else if err != nil { log.Errorw("Coordinator.syncStats", "err", err) - waitDuration = time.Duration(c.cfg.SyncRetryInterval) + waitDuration = c.cfg.SyncRetryInterval continue } - waitDuration = time.Duration(longWaitDuration) + waitDuration = longWaitDuration } } }() @@ -540,7 +544,7 @@ const longWaitDuration = 999 * time.Hour // Run the TxManager func (t *TxManager) Run(ctx context.Context) { next := 0 - waitDuration := time.Duration(longWaitDuration) + waitDuration := longWaitDuration for { select { @@ -675,10 +679,10 @@ func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronize } // reset pipeline state -func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64, +func (p *Pipeline) reset(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { p.batchNum = batchNum - p.lastForgeL1TxsNum = lastForgeL1TxsNum + p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum p.stats = *stats p.vars = *vars p.lastScheduledL1BatchBlockNum = 0 @@ -706,15 +710,49 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { } } +func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + batchInfo, err := p.forgeBatch(batchNum) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, + "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else { + log.Errorw("forgeBatch", "err", err) + } + return nil, err + } + // 6. Wait for an available server proof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, err + } + batchInfo.ServerProof = serverProof + if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("sendServerProof", "err", err) + batchInfo.ServerProof = nil + p.proversPool.Add(serverProof) + return nil, err + } + return batchInfo, nil +} + // Start the forging pipeline -func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, +func (p *Pipeline) Start(batchNum common.BatchNum, stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { if p.started { log.Fatal("Pipeline already started") } p.started = true - if err := p.reset(batchNum, lastForgeL1TxsNum, stats, vars); err != nil { + if err := p.reset(batchNum, stats, vars); err != nil { return tracerr.Wrap(err) } p.ctx, p.cancel = context.WithCancel(context.Background()) @@ -723,7 +761,9 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, batchChSentServerProof := make(chan *BatchInfo, queueSize) p.wg.Add(1) + const zeroDuration = 0 * time.Second go func() { + waitDuration := zeroDuration for { select { case <-p.ctx.Done(): @@ -733,34 +773,15 @@ func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, case statsVars := <-p.statsVarsCh: p.stats = statsVars.Stats p.syncSCVars(statsVars.Vars) - default: + case <-time.After(waitDuration): batchNum = p.batchNum + 1 - batchInfo, err := p.forgeBatch(batchNum) - if p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("forgeBatch", "err", err) - continue - } - // 6. Wait for an available server proof (blocking call) - serverProof, err := p.proversPool.Get(p.ctx) - if p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("proversPool.Get", "err", err) - continue - } - batchInfo.ServerProof = serverProof - if err := p.sendServerProof(p.ctx, batchInfo); p.ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("sendServerProof", "err", err) - batchInfo.ServerProof = nil - p.proversPool.Add(serverProof) + if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + waitDuration = p.cfg.SyncRetryInterval continue + } else { + p.batchNum = batchNum + batchChSentServerProof <- batchInfo } - p.batchNum = batchNum - batchChSentServerProof <- batchInfo } } }() @@ -823,9 +844,9 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er } // forgeBatch the next batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { // remove transactions from the pool that have been there for too long - _, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { return nil, tracerr.Wrap(err) @@ -835,7 +856,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { return nil, tracerr.Wrap(err) } - batchInfo := BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch batchInfo.Debug.StartTimestamp = time.Now() batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 @@ -851,12 +872,23 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { var coordIdxs []common.Idx // 1. Decide if we forge L2Tx or L1+L2Tx - if p.shouldL1L2Batch(&batchInfo) { + if p.shouldL1L2Batch(batchInfo) { batchInfo.L1Batch = true - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + defer func() { + // If there's no error, update the parameters related + // to the last L1Batch forged + if err == nil { + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.lastForgeL1TxsNum++ + } + }() + if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + return nil, tracerr.Wrap(errLastL1BatchNotSynced) + //return nil, fmt.Errorf("Not synced yet LastForgeL1TxsNum. Expecting %v, got %v", + // p.lastForgeL1TxsNum, p.stats.Sync.LastForgeL1TxsNum) + } // 2a: L1+L2 txs - p.lastForgeL1TxsNum++ - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum) + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) if err != nil { return nil, tracerr.Wrap(err) } @@ -914,9 +946,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) { // 5. Save metadata from BatchBuilder output for BatchNum batchInfo.ZKInputs = zkInputs batchInfo.Debug.Status = StatusForged - p.cfg.debugBatchStore(&batchInfo) + p.cfg.debugBatchStore(batchInfo) - return &batchInfo, nil + return batchInfo, nil } // waitServerProof gets the generated zkProof & sends it to the SmartContract diff --git a/node/node.go b/node/node.go index df16d8b..2ee8789 100644 --- a/node/node.go +++ b/node/node.go @@ -261,6 +261,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgerAddress: cfg.Coordinator.ForgerAddress, ConfirmBlocks: cfg.Coordinator.ConfirmBlocks, L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc, + ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 9155020..88d94a7 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -63,6 +63,7 @@ func NewStatsHolder(firstBlockNum int64, refreshPeriod time.Duration) *StatsHold stats := Stats{} stats.Eth.RefreshPeriod = refreshPeriod stats.Eth.FirstBlockNum = firstBlockNum + stats.Sync.LastForgeL1TxsNum = -1 return &StatsHolder{Stats: stats} } From 484fca12f83d6b0816e2a4af739ceafb2acc6406 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Mon, 11 Jan 2021 18:08:24 +0100 Subject: [PATCH 2/2] Organize coordinator code, and rename some funcs --- coordinator/coordinator.go | 615 +------------------------------- coordinator/coordinator_test.go | 277 -------------- coordinator/pipeline.go | 428 ++++++++++++++++++++++ coordinator/pipeline_test.go | 297 +++++++++++++++ coordinator/txmanager.go | 207 +++++++++++ 5 files changed, 940 insertions(+), 884 deletions(-) create mode 100644 coordinator/pipeline.go create mode 100644 coordinator/pipeline_test.go create mode 100644 coordinator/txmanager.go diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 3b64520..8865cfd 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -3,14 +3,12 @@ package coordinator import ( "context" "fmt" - "math/big" "os" "strings" "sync" "time" ethCommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/historydb" @@ -24,9 +22,15 @@ import ( "github.com/hermeznetwork/tracerr" ) -var errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") +var ( + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") +) -const queueLen = 16 +const ( + queueLen = 16 + longWaitDuration = 999 * time.Hour + zeroDuration = 0 * time.Second +) // Config contains the Coordinator configuration type Config struct { @@ -404,606 +408,3 @@ func (c *Coordinator) Stop() { c.pipeline = nil } } - -// TxManager handles everything related to ethereum transactions: It makes the -// call to forge, waits for transaction confirmation, and keeps checking them -// until a number of confirmed blocks have passed. -type TxManager struct { - cfg Config - ethClient eth.ClientInterface - l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB - coord *Coordinator // Used only to send messages to stop the pipeline - batchCh chan *BatchInfo - lastBlockCh chan int64 - queue []*BatchInfo - lastBlock int64 - // lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed - lastConfirmedBatch common.BatchNum -} - -// NewTxManager creates a new TxManager -func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, - coord *Coordinator) *TxManager { - return &TxManager{ - cfg: *cfg, - ethClient: ethClient, - l2DB: l2DB, - coord: coord, - batchCh: make(chan *BatchInfo, queueLen), - lastBlockCh: make(chan int64, queueLen), - lastBlock: -1, - } -} - -// AddBatch is a thread safe method to pass a new batch TxManager to be sent to -// the smart contract via the forge call -func (t *TxManager) AddBatch(batchInfo *BatchInfo) { - t.batchCh <- batchInfo -} - -// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager -func (t *TxManager) SetLastBlock(lastBlock int64) { - t.lastBlockCh <- lastBlock -} - -func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { - batchInfo.Debug.Status = StatusSent - batchInfo.Debug.SendBlockNum = t.lastBlock + 1 - batchInfo.Debug.SendTimestamp = time.Now() - batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( - batchInfo.Debug.StartTimestamp).Seconds() - var ethTx *types.Transaction - var err error - for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { - ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) - if err != nil { - if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { - log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err, - "block", t.lastBlock+1) - return tracerr.Wrap(err) - } - log.Errorw("TxManager ethClient.RollupForgeBatch", - "attempt", attempt, "err", err, "block", t.lastBlock+1, - "batchNum", batchInfo.BatchNum) - } else { - break - } - select { - case <-ctx.Done(): - return tracerr.Wrap(common.ErrDone) - case <-time.After(t.cfg.EthClientAttemptsDelay): - } - } - if err != nil { - return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) - } - batchInfo.EthTx = ethTx - log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) - t.cfg.debugBatchStore(batchInfo) - if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { - return tracerr.Wrap(err) - } - return nil -} - -func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error { - txHash := batchInfo.EthTx.Hash() - var receipt *types.Receipt - var err error - for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { - receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) - if ctx.Err() != nil { - continue - } - if err != nil { - log.Errorw("TxManager ethClient.EthTransactionReceipt", - "attempt", attempt, "err", err) - } else { - break - } - select { - case <-ctx.Done(): - return tracerr.Wrap(common.ErrDone) - case <-time.After(t.cfg.EthClientAttemptsDelay): - } - } - if err != nil { - return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err)) - } - batchInfo.Receipt = receipt - t.cfg.debugBatchStore(batchInfo) - return nil -} - -func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { - receipt := batchInfo.Receipt - if receipt != nil { - if receipt.Status == types.ReceiptStatusFailed { - batchInfo.Debug.Status = StatusFailed - t.cfg.debugBatchStore(batchInfo) - log.Errorw("TxManager receipt status is failed", "receipt", receipt) - return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) - } else if receipt.Status == types.ReceiptStatusSuccessful { - batchInfo.Debug.Status = StatusMined - batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() - batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - - batchInfo.Debug.StartBlockNum - t.cfg.debugBatchStore(batchInfo) - if batchInfo.BatchNum > t.lastConfirmedBatch { - t.lastConfirmedBatch = batchInfo.BatchNum - } - confirm := t.lastBlock - receipt.BlockNumber.Int64() - return &confirm, nil - } - } - return nil, nil -} - -const longWaitDuration = 999 * time.Hour - -// Run the TxManager -func (t *TxManager) Run(ctx context.Context) { - next := 0 - waitDuration := longWaitDuration - - for { - select { - case <-ctx.Done(): - log.Info("TxManager done") - return - case lastBlock := <-t.lastBlockCh: - t.lastBlock = lastBlock - case batchInfo := <-t.batchCh: - if err := t.rollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { - continue - } else if err != nil { - t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) - continue - } - log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) - t.queue = append(t.queue, batchInfo) - waitDuration = t.cfg.TxManagerCheckInterval - case <-time.After(waitDuration): - if len(t.queue) == 0 { - continue - } - current := next - next = (current + 1) % len(t.queue) - batchInfo := t.queue[current] - if err := t.ethTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { - continue - } else if err != nil { //nolint:staticcheck - // We can't get the receipt for the - // transaction, so we can't confirm if it was - // mined - t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) - } - - confirm, err := t.handleReceipt(batchInfo) - if err != nil { //nolint:staticcheck - // Transaction was rejected - t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) - } - if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { - log.Debugw("TxManager tx for RollupForgeBatch confirmed", - "batch", batchInfo.BatchNum) - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - waitDuration = longWaitDuration - next = 0 - } else { - next = current % len(t.queue) - } - } - } - } -} - -type statsVars struct { - Stats synchronizer.Stats - Vars synchronizer.SCVariablesPtr -} - -// Pipeline manages the forging of batches with parallel server proofs -type Pipeline struct { - cfg Config - consts synchronizer.SCConsts - - // state - batchNum common.BatchNum - lastScheduledL1BatchBlockNum int64 - lastForgeL1TxsNum int64 - started bool - - proversPool *ProversPool - provers []prover.Client - txManager *TxManager - historyDB *historydb.HistoryDB - l2DB *l2db.L2DB - txSelector *txselector.TxSelector - batchBuilder *batchbuilder.BatchBuilder - purger *Purger - - stats synchronizer.Stats - vars synchronizer.SCVariables - statsVarsCh chan statsVars - - ctx context.Context - wg sync.WaitGroup - cancel context.CancelFunc -} - -// NewPipeline creates a new Pipeline -func NewPipeline(ctx context.Context, - cfg Config, - historyDB *historydb.HistoryDB, - l2DB *l2db.L2DB, - txSelector *txselector.TxSelector, - batchBuilder *batchbuilder.BatchBuilder, - purger *Purger, - txManager *TxManager, - provers []prover.Client, - scConsts *synchronizer.SCConsts, -) (*Pipeline, error) { - proversPool := NewProversPool(len(provers)) - proversPoolSize := 0 - for _, prover := range provers { - if err := prover.WaitReady(ctx); err != nil { - log.Errorw("prover.WaitReady", "err", err) - } else { - proversPool.Add(prover) - proversPoolSize++ - } - } - if proversPoolSize == 0 { - return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) - } - return &Pipeline{ - cfg: cfg, - historyDB: historyDB, - l2DB: l2DB, - txSelector: txSelector, - batchBuilder: batchBuilder, - provers: provers, - proversPool: proversPool, - purger: purger, - txManager: txManager, - consts: *scConsts, - statsVarsCh: make(chan statsVars, queueLen), - }, nil -} - -// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats -func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { - p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars} -} - -// reset pipeline state -func (p *Pipeline) reset(batchNum common.BatchNum, - stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { - p.batchNum = batchNum - p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum - p.stats = *stats - p.vars = *vars - p.lastScheduledL1BatchBlockNum = 0 - - err := p.txSelector.Reset(p.batchNum) - if err != nil { - return tracerr.Wrap(err) - } - err = p.batchBuilder.Reset(p.batchNum, true) - if err != nil { - return tracerr.Wrap(err) - } - return nil -} - -func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - p.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - p.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - p.vars.WDelayer = *vars.WDelayer - } -} - -func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { - batchInfo, err := p.forgeBatch(batchNum) - if ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - if tracerr.Unwrap(err) == errLastL1BatchNotSynced { - log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, - "lastForgeL1TxsNum", p.lastForgeL1TxsNum, - "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) - } else { - log.Errorw("forgeBatch", "err", err) - } - return nil, err - } - // 6. Wait for an available server proof (blocking call) - serverProof, err := p.proversPool.Get(ctx) - if ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - log.Errorw("proversPool.Get", "err", err) - return nil, err - } - batchInfo.ServerProof = serverProof - if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - log.Errorw("sendServerProof", "err", err) - batchInfo.ServerProof = nil - p.proversPool.Add(serverProof) - return nil, err - } - return batchInfo, nil -} - -// Start the forging pipeline -func (p *Pipeline) Start(batchNum common.BatchNum, - stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { - if p.started { - log.Fatal("Pipeline already started") - } - p.started = true - - if err := p.reset(batchNum, stats, vars); err != nil { - return tracerr.Wrap(err) - } - p.ctx, p.cancel = context.WithCancel(context.Background()) - - queueSize := 1 - batchChSentServerProof := make(chan *BatchInfo, queueSize) - - p.wg.Add(1) - const zeroDuration = 0 * time.Second - go func() { - waitDuration := zeroDuration - for { - select { - case <-p.ctx.Done(): - log.Info("Pipeline forgeBatch loop done") - p.wg.Done() - return - case statsVars := <-p.statsVarsCh: - p.stats = statsVars.Stats - p.syncSCVars(statsVars.Vars) - case <-time.After(waitDuration): - batchNum = p.batchNum + 1 - if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { - waitDuration = p.cfg.SyncRetryInterval - continue - } else { - p.batchNum = batchNum - batchChSentServerProof <- batchInfo - } - } - } - }() - - p.wg.Add(1) - go func() { - for { - select { - case <-p.ctx.Done(): - log.Info("Pipeline waitServerProofSendEth loop done") - p.wg.Done() - return - case batchInfo := <-batchChSentServerProof: - err := p.waitServerProof(p.ctx, batchInfo) - // We are done with this serverProof, add it back to the pool - p.proversPool.Add(batchInfo.ServerProof) - batchInfo.ServerProof = nil - if p.ctx.Err() != nil { - continue - } - if err != nil { - log.Errorw("waitServerProof", "err", err) - continue - } - p.txManager.AddBatch(batchInfo) - } - } - }() - return nil -} - -// Stop the forging pipeline -func (p *Pipeline) Stop(ctx context.Context) { - if !p.started { - log.Fatal("Pipeline already stopped") - } - p.started = false - log.Info("Stopping Pipeline...") - p.cancel() - p.wg.Wait() - for _, prover := range p.provers { - if err := prover.Cancel(ctx); ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("prover.Cancel", "err", err) - } - } -} - -// sendServerProof sends the circuit inputs to the proof server -func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { - p.cfg.debugBatchStore(batchInfo) - - // 7. Call the selected idle server proof with BatchBuilder output, - // save server proof info for batchNum - if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { - return tracerr.Wrap(err) - } - return nil -} - -// forgeBatch the next batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { - // remove transactions from the pool that have been there for too long - _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), - p.stats.Sync.LastBlock.Num, int64(batchNum)) - if err != nil { - return nil, tracerr.Wrap(err) - } - _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) - if err != nil { - return nil, tracerr.Wrap(err) - } - - batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch - batchInfo.Debug.StartTimestamp = time.Now() - batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 - - selectionCfg := &txselector.SelectionConfig{ - MaxL1UserTxs: common.RollupConstMaxL1UserTx, - TxProcessorConfig: p.cfg.TxProcessorConfig, - } - - var poolL2Txs []common.PoolL2Tx - // var feesInfo - var l1UserTxsExtra, l1CoordTxs []common.L1Tx - var auths [][]byte - var coordIdxs []common.Idx - - // 1. Decide if we forge L2Tx or L1+L2Tx - if p.shouldL1L2Batch(batchInfo) { - batchInfo.L1Batch = true - defer func() { - // If there's no error, update the parameters related - // to the last L1Batch forged - if err == nil { - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.lastForgeL1TxsNum++ - } - }() - if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { - return nil, tracerr.Wrap(errLastL1BatchNotSynced) - //return nil, fmt.Errorf("Not synced yet LastForgeL1TxsNum. Expecting %v, got %v", - // p.lastForgeL1TxsNum, p.stats.Sync.LastForgeL1TxsNum) - } - // 2a: L1+L2 txs - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) - if err != nil { - return nil, tracerr.Wrap(err) - } - coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err = - p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs) - if err != nil { - return nil, tracerr.Wrap(err) - } - } else { - // 2b: only L2 txs - coordIdxs, auths, l1CoordTxs, poolL2Txs, err = - p.txSelector.GetL2TxSelection(selectionCfg) - if err != nil { - return nil, tracerr.Wrap(err) - } - l1UserTxsExtra = nil - } - - // 3. Save metadata from TxSelector output for BatchNum - batchInfo.L1UserTxsExtra = l1UserTxsExtra - batchInfo.L1CoordTxs = l1CoordTxs - batchInfo.L1CoordinatorTxsAuths = auths - batchInfo.CoordIdxs = coordIdxs - batchInfo.VerifierIdx = p.cfg.VerifierIdx - - if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { - return nil, tracerr.Wrap(err) - } - - // Invalidate transactions that become invalid beause of - // the poolL2Txs selected. Will mark as invalid the txs that have a - // (fromIdx, nonce) which already appears in the selected txs (includes - // all the nonces smaller than the current one) - err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) - if err != nil { - return nil, tracerr.Wrap(err) - } - - // 4. Call BatchBuilder with TxSelector output - configBatch := &batchbuilder.ConfigBatch{ - ForgerAddress: p.cfg.ForgerAddress, - TxProcessorConfig: p.cfg.TxProcessorConfig, - } - zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, - l1CoordTxs, poolL2Txs, nil) - if err != nil { - return nil, tracerr.Wrap(err) - } - l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way - if err != nil { - return nil, tracerr.Wrap(err) - } - batchInfo.L2Txs = l2Txs - - // 5. Save metadata from BatchBuilder output for BatchNum - batchInfo.ZKInputs = zkInputs - batchInfo.Debug.Status = StatusForged - p.cfg.debugBatchStore(batchInfo) - - return batchInfo, nil -} - -// waitServerProof gets the generated zkProof & sends it to the SmartContract -func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { - proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof - if err != nil { - return tracerr.Wrap(err) - } - batchInfo.Proof = proof - batchInfo.PublicInputs = pubInputs - batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) - batchInfo.Debug.Status = StatusProof - p.cfg.debugBatchStore(batchInfo) - return nil -} - -func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { - // Take the lastL1BatchBlockNum as the biggest between the last - // scheduled one, and the synchronized one. - lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum - if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { - lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock - } - // Set Debug information - batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum - batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock - batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum - batchInfo.Debug.L1BatchBlockScheduleDeadline = - int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc) - // Return true if we have passed the l1BatchTimeoutPerc portion of the - // range before the l1batch timeout. - return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >= - int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) -} - -func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { - proof := batchInfo.Proof - zki := batchInfo.ZKInputs - return ð.RollupForgeBatchArgs{ - NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), - NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), - NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), - L1UserTxs: batchInfo.L1UserTxsExtra, - L1CoordinatorTxs: batchInfo.L1CoordTxs, - L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, - L2TxsData: batchInfo.L2Txs, - FeeIdxCoordinator: batchInfo.CoordIdxs, - // Circuit selector - VerifierIdx: batchInfo.VerifierIdx, - L1Batch: batchInfo.L1Batch, - ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]}, - ProofB: [2][2]*big.Int{ - {proof.PiB[0][0], proof.PiB[0][1]}, - {proof.PiB[1][0], proof.PiB[1][1]}, - }, - ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]}, - } -} diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index a09443c..496be03 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -10,10 +10,7 @@ import ( "testing" "time" - ethKeystore "github.com/ethereum/go-ethereum/accounts/keystore" ethCommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" "github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/common" dbUtils "github.com/hermeznetwork/hermez-node/db" @@ -25,12 +22,10 @@ import ( "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/test" - "github.com/hermeznetwork/hermez-node/test/til" "github.com/hermeznetwork/hermez-node/txprocessor" "github.com/hermeznetwork/hermez-node/txselector" "github.com/hermeznetwork/tracerr" "github.com/iden3/go-iden3-crypto/babyjub" - "github.com/iden3/go-merkletree" "github.com/iden3/go-merkletree/db/pebble" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -434,74 +429,6 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) { assert.Nil(t, coord.pipeline) } -func TestPipelineShouldL1L2Batch(t *testing.T) { - ethClientSetup := test.NewClientSetupExample() - ethClientSetup.ChainID = big.NewInt(int64(chainID)) - - var timer timer - ctx := context.Background() - ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) - modules := newTestModules(t) - var stats synchronizer.Stats - coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) - pipeline, err := coord.newPipeline(ctx) - require.NoError(t, err) - pipeline.vars = coord.vars - - // Check that the parameters are the ones we expect and use in this test - require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc) - require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout) - l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc - l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout - - startBlock := int64(100) - // Empty batchInfo to pass to shouldL1L2Batch() which sets debug information - batchInfo := BatchInfo{} - - // - // No scheduled L1Batch - // - - // Last L1Batch was a long time ago - stats.Eth.LastBlock.Num = startBlock - stats.Sync.LastBlock = stats.Eth.LastBlock - stats.Sync.LastL1BatchBlock = 0 - pipeline.stats = stats - assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) - - stats.Sync.LastL1BatchBlock = startBlock - - // We are are one block before the timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) - - // We are are at timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) - - // - // Scheduled L1Batch - // - pipeline.lastScheduledL1BatchBlockNum = startBlock - stats.Sync.LastL1BatchBlock = startBlock - 10 - - // We are are one block before the timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) - - // We are are at timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) -} - // ethAddTokens adds the tokens from the blocks to the blockchain func ethAddTokens(blocks []common.BlockData, client *test.Client) { for _, block := range blocks { @@ -517,138 +444,6 @@ func ethAddTokens(blocks []common.BlockData, client *test.Client) { } } -const testTokensLen = 3 -const testUsersLen = 4 - -func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer, - historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context { - // Create a set with `testTokensLen` tokens and for each token - // `testUsersLen` accounts. - var set []til.Instruction - // set = append(set, til.Instruction{Typ: "Blockchain"}) - for tokenID := 1; tokenID < testTokensLen; tokenID++ { - set = append(set, til.Instruction{ - Typ: til.TypeAddToken, - TokenID: common.TokenID(tokenID), - }) - } - depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10) - require.True(t, ok) - for tokenID := 0; tokenID < testTokensLen; tokenID++ { - for user := 0; user < testUsersLen; user++ { - set = append(set, til.Instruction{ - Typ: common.TxTypeCreateAccountDeposit, - TokenID: common.TokenID(tokenID), - DepositAmount: depositAmount, - From: fmt.Sprintf("User%d", user), - }) - } - } - set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) - set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) - set = append(set, til.Instruction{Typ: til.TypeNewBlock}) - - tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) - blocks, err := tc.GenerateBlocksFromInstructions(set) - require.NoError(t, err) - require.NotNil(t, blocks) - - ethAddTokens(blocks, ethClient) - err = ethClient.CtlAddBlocks(blocks) - require.NoError(t, err) - - ctx := context.Background() - for { - syncBlock, discards, err := sync.Sync2(ctx, nil) - require.NoError(t, err) - require.Nil(t, discards) - if syncBlock == nil { - break - } - } - dbTokens, err := historyDB.GetAllTokens() - require.Nil(t, err) - require.Equal(t, testTokensLen, len(dbTokens)) - - dbAccounts, err := historyDB.GetAllAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts)) - - sdbAccounts, err := stateDB.GetAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) - - return tc -} - -func TestPipeline1(t *testing.T) { - ethClientSetup := test.NewClientSetupExample() - ethClientSetup.ChainID = big.NewInt(int64(chainID)) - - var timer timer - ctx := context.Background() - ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) - modules := newTestModules(t) - coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) - sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules) - - // preload the synchronier (via the test ethClient) some tokens and - // users with positive balances - tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) - syncStats := sync.Stats() - batchNum := common.BatchNum(syncStats.Sync.LastBatch) - syncSCVars := sync.SCVars() - - pipeline, err := coord.newPipeline(ctx) - require.NoError(t, err) - - // Insert some l2txs in the Pool - setPool := ` -Type: PoolL2 - -PoolTransfer(0) User0-User1: 100 (126) -PoolTransfer(0) User1-User2: 200 (126) -PoolTransfer(0) User2-User3: 300 (126) - ` - l2txs, err := tilCtx.GeneratePoolL2Txs(setPool) - require.NoError(t, err) - for _, tx := range l2txs { - err := modules.l2DB.AddTxTest(&tx) //nolint:gosec - require.NoError(t, err) - } - - err = pipeline.reset(batchNum, syncStats.Sync.LastForgeL1TxsNum, syncStats, &synchronizer.SCVariables{ - Rollup: *syncSCVars.Rollup, - Auction: *syncSCVars.Auction, - WDelayer: *syncSCVars.WDelayer, - }) - require.NoError(t, err) - // Sanity check - sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) - - // Sanity check - sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) - - // Sanity check - require.Equal(t, modules.stateDB.MT.Root(), - pipeline.batchBuilder.LocalStateDB().MT.Root()) - - batchNum++ - - batchInfo, err := pipeline.forgeBatch(batchNum) - require.NoError(t, err) - assert.Equal(t, 3, len(batchInfo.L2Txs)) - - batchNum++ - batchInfo, err = pipeline.forgeBatch(batchNum) - require.NoError(t, err) - assert.Equal(t, 0, len(batchInfo.L2Txs)) -} - func TestCoordinatorStress(t *testing.T) { if os.Getenv("TEST_COORD_STRESS") == "" { return @@ -714,79 +509,7 @@ func TestCoordinatorStress(t *testing.T) { coord.Stop() } -func TestRollupForgeBatch(t *testing.T) { - if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" { - return - } - const web3URL = "http://localhost:8545" - const password = "test" - addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7") - sk, err := crypto.HexToECDSA( - "a8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563") - require.NoError(t, err) - rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0") - pathKeystore, err := ioutil.TempDir("", "tmpKeystore") - require.NoError(t, err) - deleteme = append(deleteme, pathKeystore) - ctx := context.Background() - batchInfo := &BatchInfo{} - proofClient := &prover.MockClient{} - chainID := uint16(0) - - ethClient, err := ethclient.Dial(web3URL) - require.NoError(t, err) - ethCfg := eth.EthereumConfig{ - CallGasLimit: 300000, - GasPriceDiv: 100, - } - scryptN := ethKeystore.LightScryptN - scryptP := ethKeystore.LightScryptP - keyStore := ethKeystore.NewKeyStore(pathKeystore, - scryptN, scryptP) - account, err := keyStore.ImportECDSA(sk, password) - require.NoError(t, err) - require.Equal(t, account.Address, addr) - err = keyStore.Unlock(account, password) - require.NoError(t, err) - - client, err := eth.NewClient(ethClient, &account, keyStore, ð.ClientConfig{ - Ethereum: ethCfg, - Rollup: eth.RollupConfig{ - Address: rollupAddr, - }, - Auction: eth.AuctionConfig{ - Address: ethCommon.Address{}, - TokenHEZ: eth.TokenConfig{ - Address: ethCommon.Address{}, - Name: "HEZ", - }, - }, - WDelayer: eth.WDelayerConfig{ - Address: ethCommon.Address{}, - }, - }) - require.NoError(t, err) - - zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1)) - zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1} - zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2} - batchInfo.ZKInputs = zkInputs - err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs) - require.NoError(t, err) - - proof, pubInputs, err := proofClient.GetProof(ctx) - require.NoError(t, err) - batchInfo.Proof = proof - batchInfo.PublicInputs = pubInputs - - batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) - _, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs) - require.NoError(t, err) - batchInfo.Proof = proof -} - // TODO: Test Reorg -// TODO: Test Pipeline // TODO: Test TxMonitor // TODO: Test forgeBatch // TODO: Test waitServerProof diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go new file mode 100644 index 0000000..60f6417 --- /dev/null +++ b/coordinator/pipeline.go @@ -0,0 +1,428 @@ +package coordinator + +import ( + "context" + "fmt" + "math/big" + "sync" + "time" + + "github.com/hermeznetwork/hermez-node/batchbuilder" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/historydb" + "github.com/hermeznetwork/hermez-node/db/l2db" + "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/prover" + "github.com/hermeznetwork/hermez-node/synchronizer" + "github.com/hermeznetwork/hermez-node/txselector" + "github.com/hermeznetwork/tracerr" +) + +type statsVars struct { + Stats synchronizer.Stats + Vars synchronizer.SCVariablesPtr +} + +// Pipeline manages the forging of batches with parallel server proofs +type Pipeline struct { + cfg Config + consts synchronizer.SCConsts + + // state + batchNum common.BatchNum + lastScheduledL1BatchBlockNum int64 + lastForgeL1TxsNum int64 + started bool + + proversPool *ProversPool + provers []prover.Client + txManager *TxManager + historyDB *historydb.HistoryDB + l2DB *l2db.L2DB + txSelector *txselector.TxSelector + batchBuilder *batchbuilder.BatchBuilder + purger *Purger + + stats synchronizer.Stats + vars synchronizer.SCVariables + statsVarsCh chan statsVars + + ctx context.Context + wg sync.WaitGroup + cancel context.CancelFunc +} + +// NewPipeline creates a new Pipeline +func NewPipeline(ctx context.Context, + cfg Config, + historyDB *historydb.HistoryDB, + l2DB *l2db.L2DB, + txSelector *txselector.TxSelector, + batchBuilder *batchbuilder.BatchBuilder, + purger *Purger, + txManager *TxManager, + provers []prover.Client, + scConsts *synchronizer.SCConsts, +) (*Pipeline, error) { + proversPool := NewProversPool(len(provers)) + proversPoolSize := 0 + for _, prover := range provers { + if err := prover.WaitReady(ctx); err != nil { + log.Errorw("prover.WaitReady", "err", err) + } else { + proversPool.Add(prover) + proversPoolSize++ + } + } + if proversPoolSize == 0 { + return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) + } + return &Pipeline{ + cfg: cfg, + historyDB: historyDB, + l2DB: l2DB, + txSelector: txSelector, + batchBuilder: batchBuilder, + provers: provers, + proversPool: proversPool, + purger: purger, + txManager: txManager, + consts: *scConsts, + statsVarsCh: make(chan statsVars, queueLen), + }, nil +} + +// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats +func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { + p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars} +} + +// reset pipeline state +func (p *Pipeline) reset(batchNum common.BatchNum, + stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { + p.batchNum = batchNum + p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum + p.stats = *stats + p.vars = *vars + p.lastScheduledL1BatchBlockNum = 0 + + err := p.txSelector.Reset(p.batchNum) + if err != nil { + return tracerr.Wrap(err) + } + err = p.batchBuilder.Reset(p.batchNum, true) + if err != nil { + return tracerr.Wrap(err) + } + return nil +} + +func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { + if vars.Rollup != nil { + p.vars.Rollup = *vars.Rollup + } + if vars.Auction != nil { + p.vars.Auction = *vars.Auction + } + if vars.WDelayer != nil { + p.vars.WDelayer = *vars.WDelayer + } +} + +// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, +// and then waits for an available proof server and sends the zkInputs to it so +// that the proof computation begins. +func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + batchInfo, err := p.forgeBatch(batchNum) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, + "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else { + log.Errorw("forgeBatch", "err", err) + } + return nil, err + } + // 6. Wait for an available server proof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, err + } + batchInfo.ServerProof = serverProof + if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("sendServerProof", "err", err) + batchInfo.ServerProof = nil + p.proversPool.Add(serverProof) + return nil, err + } + return batchInfo, nil +} + +// Start the forging pipeline +func (p *Pipeline) Start(batchNum common.BatchNum, + stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { + if p.started { + log.Fatal("Pipeline already started") + } + p.started = true + + if err := p.reset(batchNum, stats, vars); err != nil { + return tracerr.Wrap(err) + } + p.ctx, p.cancel = context.WithCancel(context.Background()) + + queueSize := 1 + batchChSentServerProof := make(chan *BatchInfo, queueSize) + + p.wg.Add(1) + go func() { + waitDuration := zeroDuration + for { + select { + case <-p.ctx.Done(): + log.Info("Pipeline forgeBatch loop done") + p.wg.Done() + return + case statsVars := <-p.statsVarsCh: + p.stats = statsVars.Stats + p.syncSCVars(statsVars.Vars) + case <-time.After(waitDuration): + batchNum = p.batchNum + 1 + if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + waitDuration = p.cfg.SyncRetryInterval + continue + } else { + p.batchNum = batchNum + batchChSentServerProof <- batchInfo + } + } + } + }() + + p.wg.Add(1) + go func() { + for { + select { + case <-p.ctx.Done(): + log.Info("Pipeline waitServerProofSendEth loop done") + p.wg.Done() + return + case batchInfo := <-batchChSentServerProof: + err := p.waitServerProof(p.ctx, batchInfo) + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(batchInfo.ServerProof) + batchInfo.ServerProof = nil + if p.ctx.Err() != nil { + continue + } + if err != nil { + log.Errorw("waitServerProof", "err", err) + continue + } + p.txManager.AddBatch(batchInfo) + } + } + }() + return nil +} + +// Stop the forging pipeline +func (p *Pipeline) Stop(ctx context.Context) { + if !p.started { + log.Fatal("Pipeline already stopped") + } + p.started = false + log.Info("Stopping Pipeline...") + p.cancel() + p.wg.Wait() + for _, prover := range p.provers { + if err := prover.Cancel(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("prover.Cancel", "err", err) + } + } +} + +// sendServerProof sends the circuit inputs to the proof server +func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { + p.cfg.debugBatchStore(batchInfo) + + // 7. Call the selected idle server proof with BatchBuilder output, + // save server proof info for batchNum + if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +// forgeBatch forges the batchNum batch. +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { + // remove transactions from the pool that have been there for too long + _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, tracerr.Wrap(err) + } + _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, tracerr.Wrap(err) + } + + batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + batchInfo.Debug.StartTimestamp = time.Now() + batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 + + selectionCfg := &txselector.SelectionConfig{ + MaxL1UserTxs: common.RollupConstMaxL1UserTx, + TxProcessorConfig: p.cfg.TxProcessorConfig, + } + + var poolL2Txs []common.PoolL2Tx + var l1UserTxsExtra, l1CoordTxs []common.L1Tx + var auths [][]byte + var coordIdxs []common.Idx + + // 1. Decide if we forge L2Tx or L1+L2Tx + if p.shouldL1L2Batch(batchInfo) { + batchInfo.L1Batch = true + defer func() { + // If there's no error, update the parameters related + // to the last L1Batch forged + if err == nil { + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.lastForgeL1TxsNum++ + } + }() + if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + return nil, tracerr.Wrap(errLastL1BatchNotSynced) + } + // 2a: L1+L2 txs + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) + if err != nil { + return nil, tracerr.Wrap(err) + } + coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err = + p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs) + if err != nil { + return nil, tracerr.Wrap(err) + } + } else { + // 2b: only L2 txs + coordIdxs, auths, l1CoordTxs, poolL2Txs, err = + p.txSelector.GetL2TxSelection(selectionCfg) + if err != nil { + return nil, tracerr.Wrap(err) + } + l1UserTxsExtra = nil + } + + // 3. Save metadata from TxSelector output for BatchNum + batchInfo.L1UserTxsExtra = l1UserTxsExtra + batchInfo.L1CoordTxs = l1CoordTxs + batchInfo.L1CoordinatorTxsAuths = auths + batchInfo.CoordIdxs = coordIdxs + batchInfo.VerifierIdx = p.cfg.VerifierIdx + + if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { + return nil, tracerr.Wrap(err) + } + + // Invalidate transactions that become invalid beause of + // the poolL2Txs selected. Will mark as invalid the txs that have a + // (fromIdx, nonce) which already appears in the selected txs (includes + // all the nonces smaller than the current one) + err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) + if err != nil { + return nil, tracerr.Wrap(err) + } + + // 4. Call BatchBuilder with TxSelector output + configBatch := &batchbuilder.ConfigBatch{ + ForgerAddress: p.cfg.ForgerAddress, + TxProcessorConfig: p.cfg.TxProcessorConfig, + } + zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, + l1CoordTxs, poolL2Txs, nil) + if err != nil { + return nil, tracerr.Wrap(err) + } + l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way + if err != nil { + return nil, tracerr.Wrap(err) + } + batchInfo.L2Txs = l2Txs + + // 5. Save metadata from BatchBuilder output for BatchNum + batchInfo.ZKInputs = zkInputs + batchInfo.Debug.Status = StatusForged + p.cfg.debugBatchStore(batchInfo) + + return batchInfo, nil +} + +// waitServerProof gets the generated zkProof & sends it to the SmartContract +func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { + proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof + if err != nil { + return tracerr.Wrap(err) + } + batchInfo.Proof = proof + batchInfo.PublicInputs = pubInputs + batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) + batchInfo.Debug.Status = StatusProof + p.cfg.debugBatchStore(batchInfo) + return nil +} + +func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { + // Take the lastL1BatchBlockNum as the biggest between the last + // scheduled one, and the synchronized one. + lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum + if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock + } + // Set Debug information + batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum + batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock + batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum + batchInfo.Debug.L1BatchBlockScheduleDeadline = + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc) + // Return true if we have passed the l1BatchTimeoutPerc portion of the + // range before the l1batch timeout. + return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >= + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) +} + +func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { + proof := batchInfo.Proof + zki := batchInfo.ZKInputs + return ð.RollupForgeBatchArgs{ + NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), + NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), + NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), + L1UserTxs: batchInfo.L1UserTxsExtra, + L1CoordinatorTxs: batchInfo.L1CoordTxs, + L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, + L2TxsData: batchInfo.L2Txs, + FeeIdxCoordinator: batchInfo.CoordIdxs, + // Circuit selector + VerifierIdx: batchInfo.VerifierIdx, + L1Batch: batchInfo.L1Batch, + ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]}, + ProofB: [2][2]*big.Int{ + {proof.PiB[0][0], proof.PiB[0][1]}, + {proof.PiB[1][0], proof.PiB[1][1]}, + }, + ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]}, + } +} diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go new file mode 100644 index 0000000..65bde7d --- /dev/null +++ b/coordinator/pipeline_test.go @@ -0,0 +1,297 @@ +package coordinator + +import ( + "context" + "fmt" + "io/ioutil" + "math/big" + "os" + "testing" + + ethKeystore "github.com/ethereum/go-ethereum/accounts/keystore" + ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/historydb" + "github.com/hermeznetwork/hermez-node/db/statedb" + "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/prover" + "github.com/hermeznetwork/hermez-node/synchronizer" + "github.com/hermeznetwork/hermez-node/test" + "github.com/hermeznetwork/hermez-node/test/til" + "github.com/iden3/go-merkletree" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineShouldL1L2Batch(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) + + var timer timer + ctx := context.Background() + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + modules := newTestModules(t) + var stats synchronizer.Stats + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) + pipeline, err := coord.newPipeline(ctx) + require.NoError(t, err) + pipeline.vars = coord.vars + + // Check that the parameters are the ones we expect and use in this test + require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc) + require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout) + l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc + l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout + + startBlock := int64(100) + // Empty batchInfo to pass to shouldL1L2Batch() which sets debug information + batchInfo := BatchInfo{} + + // + // No scheduled L1Batch + // + + // Last L1Batch was a long time ago + stats.Eth.LastBlock.Num = startBlock + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.LastL1BatchBlock = 0 + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) + + stats.Sync.LastL1BatchBlock = startBlock + + // We are are one block before the timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) + + // We are are at timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) + + // + // Scheduled L1Batch + // + pipeline.lastScheduledL1BatchBlockNum = startBlock + stats.Sync.LastL1BatchBlock = startBlock - 10 + + // We are are one block before the timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) + + // We are are at timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) +} + +const testTokensLen = 3 +const testUsersLen = 4 + +func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer, + historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context { + // Create a set with `testTokensLen` tokens and for each token + // `testUsersLen` accounts. + var set []til.Instruction + // set = append(set, til.Instruction{Typ: "Blockchain"}) + for tokenID := 1; tokenID < testTokensLen; tokenID++ { + set = append(set, til.Instruction{ + Typ: til.TypeAddToken, + TokenID: common.TokenID(tokenID), + }) + } + depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10) + require.True(t, ok) + for tokenID := 0; tokenID < testTokensLen; tokenID++ { + for user := 0; user < testUsersLen; user++ { + set = append(set, til.Instruction{ + Typ: common.TxTypeCreateAccountDeposit, + TokenID: common.TokenID(tokenID), + DepositAmount: depositAmount, + From: fmt.Sprintf("User%d", user), + }) + } + } + set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) + set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) + set = append(set, til.Instruction{Typ: til.TypeNewBlock}) + + tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) + blocks, err := tc.GenerateBlocksFromInstructions(set) + require.NoError(t, err) + require.NotNil(t, blocks) + + ethAddTokens(blocks, ethClient) + err = ethClient.CtlAddBlocks(blocks) + require.NoError(t, err) + + ctx := context.Background() + for { + syncBlock, discards, err := sync.Sync2(ctx, nil) + require.NoError(t, err) + require.Nil(t, discards) + if syncBlock == nil { + break + } + } + dbTokens, err := historyDB.GetAllTokens() + require.Nil(t, err) + require.Equal(t, testTokensLen, len(dbTokens)) + + dbAccounts, err := historyDB.GetAllAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts)) + + sdbAccounts, err := stateDB.GetAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) + + return tc +} + +func TestPipelineForgeBatchWithTxs(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) + + var timer timer + ctx := context.Background() + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + modules := newTestModules(t) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) + sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules) + + // preload the synchronier (via the test ethClient) some tokens and + // users with positive balances + tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) + syncStats := sync.Stats() + batchNum := common.BatchNum(syncStats.Sync.LastBatch) + syncSCVars := sync.SCVars() + + pipeline, err := coord.newPipeline(ctx) + require.NoError(t, err) + + // Insert some l2txs in the Pool + setPool := ` +Type: PoolL2 + +PoolTransfer(0) User0-User1: 100 (126) +PoolTransfer(0) User1-User2: 200 (126) +PoolTransfer(0) User2-User3: 300 (126) + ` + l2txs, err := tilCtx.GeneratePoolL2Txs(setPool) + require.NoError(t, err) + for _, tx := range l2txs { + err := modules.l2DB.AddTxTest(&tx) //nolint:gosec + require.NoError(t, err) + } + + err = pipeline.reset(batchNum, syncStats, &synchronizer.SCVariables{ + Rollup: *syncSCVars.Rollup, + Auction: *syncSCVars.Auction, + WDelayer: *syncSCVars.WDelayer, + }) + require.NoError(t, err) + // Sanity check + sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) + + // Sanity check + sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) + + // Sanity check + require.Equal(t, modules.stateDB.MT.Root(), + pipeline.batchBuilder.LocalStateDB().MT.Root()) + + batchNum++ + + batchInfo, err := pipeline.forgeBatch(batchNum) + require.NoError(t, err) + assert.Equal(t, 3, len(batchInfo.L2Txs)) + + batchNum++ + batchInfo, err = pipeline.forgeBatch(batchNum) + require.NoError(t, err) + assert.Equal(t, 0, len(batchInfo.L2Txs)) +} + +func TestEthRollupForgeBatch(t *testing.T) { + if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" { + return + } + const web3URL = "http://localhost:8545" + const password = "test" + addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7") + sk, err := crypto.HexToECDSA( + "a8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563") + require.NoError(t, err) + rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0") + pathKeystore, err := ioutil.TempDir("", "tmpKeystore") + require.NoError(t, err) + deleteme = append(deleteme, pathKeystore) + ctx := context.Background() + batchInfo := &BatchInfo{} + proofClient := &prover.MockClient{} + chainID := uint16(0) + + ethClient, err := ethclient.Dial(web3URL) + require.NoError(t, err) + ethCfg := eth.EthereumConfig{ + CallGasLimit: 300000, + GasPriceDiv: 100, + } + scryptN := ethKeystore.LightScryptN + scryptP := ethKeystore.LightScryptP + keyStore := ethKeystore.NewKeyStore(pathKeystore, + scryptN, scryptP) + account, err := keyStore.ImportECDSA(sk, password) + require.NoError(t, err) + require.Equal(t, account.Address, addr) + err = keyStore.Unlock(account, password) + require.NoError(t, err) + + client, err := eth.NewClient(ethClient, &account, keyStore, ð.ClientConfig{ + Ethereum: ethCfg, + Rollup: eth.RollupConfig{ + Address: rollupAddr, + }, + Auction: eth.AuctionConfig{ + Address: ethCommon.Address{}, + TokenHEZ: eth.TokenConfig{ + Address: ethCommon.Address{}, + Name: "HEZ", + }, + }, + WDelayer: eth.WDelayerConfig{ + Address: ethCommon.Address{}, + }, + }) + require.NoError(t, err) + + zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1)) + zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1} + zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2} + batchInfo.ZKInputs = zkInputs + err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs) + require.NoError(t, err) + + proof, pubInputs, err := proofClient.GetProof(ctx) + require.NoError(t, err) + batchInfo.Proof = proof + batchInfo.PublicInputs = pubInputs + + batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) + _, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs) + require.NoError(t, err) + batchInfo.Proof = proof +} diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go new file mode 100644 index 0000000..020a79f --- /dev/null +++ b/coordinator/txmanager.go @@ -0,0 +1,207 @@ +package coordinator + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/l2db" + "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/tracerr" +) + +// TxManager handles everything related to ethereum transactions: It makes the +// call to forge, waits for transaction confirmation, and keeps checking them +// until a number of confirmed blocks have passed. +type TxManager struct { + cfg Config + ethClient eth.ClientInterface + l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB + coord *Coordinator // Used only to send messages to stop the pipeline + batchCh chan *BatchInfo + lastBlockCh chan int64 + queue []*BatchInfo + lastBlock int64 + // lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed + lastConfirmedBatch common.BatchNum +} + +// NewTxManager creates a new TxManager +func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, + coord *Coordinator) *TxManager { + return &TxManager{ + cfg: *cfg, + ethClient: ethClient, + l2DB: l2DB, + coord: coord, + batchCh: make(chan *BatchInfo, queueLen), + lastBlockCh: make(chan int64, queueLen), + lastBlock: -1, + } +} + +// AddBatch is a thread safe method to pass a new batch TxManager to be sent to +// the smart contract via the forge call +func (t *TxManager) AddBatch(batchInfo *BatchInfo) { + t.batchCh <- batchInfo +} + +// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager +func (t *TxManager) SetLastBlock(lastBlock int64) { + t.lastBlockCh <- lastBlock +} + +func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { + batchInfo.Debug.Status = StatusSent + batchInfo.Debug.SendBlockNum = t.lastBlock + 1 + batchInfo.Debug.SendTimestamp = time.Now() + batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() + var ethTx *types.Transaction + var err error + for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) + if err != nil { + if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { + log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err, + "block", t.lastBlock+1) + return tracerr.Wrap(err) + } + log.Errorw("TxManager ethClient.RollupForgeBatch", + "attempt", attempt, "err", err, "block", t.lastBlock+1, + "batchNum", batchInfo.BatchNum) + } else { + break + } + select { + case <-ctx.Done(): + return tracerr.Wrap(common.ErrDone) + case <-time.After(t.cfg.EthClientAttemptsDelay): + } + } + if err != nil { + return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) + } + batchInfo.EthTx = ethTx + log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) + t.cfg.debugBatchStore(batchInfo) + if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error { + txHash := batchInfo.EthTx.Hash() + var receipt *types.Receipt + var err error + for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) + if ctx.Err() != nil { + continue + } + if err != nil { + log.Errorw("TxManager ethClient.EthTransactionReceipt", + "attempt", attempt, "err", err) + } else { + break + } + select { + case <-ctx.Done(): + return tracerr.Wrap(common.ErrDone) + case <-time.After(t.cfg.EthClientAttemptsDelay): + } + } + if err != nil { + return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err)) + } + batchInfo.Receipt = receipt + t.cfg.debugBatchStore(batchInfo) + return nil +} + +func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { + receipt := batchInfo.Receipt + if receipt != nil { + if receipt.Status == types.ReceiptStatusFailed { + batchInfo.Debug.Status = StatusFailed + t.cfg.debugBatchStore(batchInfo) + log.Errorw("TxManager receipt status is failed", "receipt", receipt) + return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) + } else if receipt.Status == types.ReceiptStatusSuccessful { + batchInfo.Debug.Status = StatusMined + batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() + batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - + batchInfo.Debug.StartBlockNum + t.cfg.debugBatchStore(batchInfo) + if batchInfo.BatchNum > t.lastConfirmedBatch { + t.lastConfirmedBatch = batchInfo.BatchNum + } + confirm := t.lastBlock - receipt.BlockNumber.Int64() + return &confirm, nil + } + } + return nil, nil +} + +// Run the TxManager +func (t *TxManager) Run(ctx context.Context) { + next := 0 + waitDuration := longWaitDuration + + for { + select { + case <-ctx.Done(): + log.Info("TxManager done") + return + case lastBlock := <-t.lastBlockCh: + t.lastBlock = lastBlock + case batchInfo := <-t.batchCh: + if err := t.callRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { + continue + } else if err != nil { + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) + continue + } + log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) + t.queue = append(t.queue, batchInfo) + waitDuration = t.cfg.TxManagerCheckInterval + case <-time.After(waitDuration): + if len(t.queue) == 0 { + continue + } + current := next + next = (current + 1) % len(t.queue) + batchInfo := t.queue[current] + if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { + continue + } else if err != nil { //nolint:staticcheck + // We can't get the receipt for the + // transaction, so we can't confirm if it was + // mined + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) + } + + confirm, err := t.handleReceipt(batchInfo) + if err != nil { //nolint:staticcheck + // Transaction was rejected + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + } + if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { + log.Debugw("TxManager tx for RollupForgeBatch confirmed", + "batch", batchInfo.BatchNum) + t.queue = append(t.queue[:current], t.queue[current+1:]...) + if len(t.queue) == 0 { + waitDuration = longWaitDuration + next = 0 + } else { + next = current % len(t.queue) + } + } + } + } +}