diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 3b64520..8865cfd 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -3,14 +3,12 @@ package coordinator import ( "context" "fmt" - "math/big" "os" "strings" "sync" "time" ethCommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/historydb" @@ -24,9 +22,15 @@ import ( "github.com/hermeznetwork/tracerr" ) -var errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") +var ( + errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet") +) -const queueLen = 16 +const ( + queueLen = 16 + longWaitDuration = 999 * time.Hour + zeroDuration = 0 * time.Second +) // Config contains the Coordinator configuration type Config struct { @@ -404,606 +408,3 @@ func (c *Coordinator) Stop() { c.pipeline = nil } } - -// TxManager handles everything related to ethereum transactions: It makes the -// call to forge, waits for transaction confirmation, and keeps checking them -// until a number of confirmed blocks have passed. -type TxManager struct { - cfg Config - ethClient eth.ClientInterface - l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB - coord *Coordinator // Used only to send messages to stop the pipeline - batchCh chan *BatchInfo - lastBlockCh chan int64 - queue []*BatchInfo - lastBlock int64 - // lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed - lastConfirmedBatch common.BatchNum -} - -// NewTxManager creates a new TxManager -func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, - coord *Coordinator) *TxManager { - return &TxManager{ - cfg: *cfg, - ethClient: ethClient, - l2DB: l2DB, - coord: coord, - batchCh: make(chan *BatchInfo, queueLen), - lastBlockCh: make(chan int64, queueLen), - lastBlock: -1, - } -} - -// AddBatch is a thread safe method to pass a new batch TxManager to be sent to -// the smart contract via the forge call -func (t *TxManager) AddBatch(batchInfo *BatchInfo) { - t.batchCh <- batchInfo -} - -// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager -func (t *TxManager) SetLastBlock(lastBlock int64) { - t.lastBlockCh <- lastBlock -} - -func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { - batchInfo.Debug.Status = StatusSent - batchInfo.Debug.SendBlockNum = t.lastBlock + 1 - batchInfo.Debug.SendTimestamp = time.Now() - batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( - batchInfo.Debug.StartTimestamp).Seconds() - var ethTx *types.Transaction - var err error - for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { - ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) - if err != nil { - if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { - log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err, - "block", t.lastBlock+1) - return tracerr.Wrap(err) - } - log.Errorw("TxManager ethClient.RollupForgeBatch", - "attempt", attempt, "err", err, "block", t.lastBlock+1, - "batchNum", batchInfo.BatchNum) - } else { - break - } - select { - case <-ctx.Done(): - return tracerr.Wrap(common.ErrDone) - case <-time.After(t.cfg.EthClientAttemptsDelay): - } - } - if err != nil { - return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) - } - batchInfo.EthTx = ethTx - log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) - t.cfg.debugBatchStore(batchInfo) - if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { - return tracerr.Wrap(err) - } - return nil -} - -func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error { - txHash := batchInfo.EthTx.Hash() - var receipt *types.Receipt - var err error - for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { - receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) - if ctx.Err() != nil { - continue - } - if err != nil { - log.Errorw("TxManager ethClient.EthTransactionReceipt", - "attempt", attempt, "err", err) - } else { - break - } - select { - case <-ctx.Done(): - return tracerr.Wrap(common.ErrDone) - case <-time.After(t.cfg.EthClientAttemptsDelay): - } - } - if err != nil { - return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err)) - } - batchInfo.Receipt = receipt - t.cfg.debugBatchStore(batchInfo) - return nil -} - -func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { - receipt := batchInfo.Receipt - if receipt != nil { - if receipt.Status == types.ReceiptStatusFailed { - batchInfo.Debug.Status = StatusFailed - t.cfg.debugBatchStore(batchInfo) - log.Errorw("TxManager receipt status is failed", "receipt", receipt) - return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) - } else if receipt.Status == types.ReceiptStatusSuccessful { - batchInfo.Debug.Status = StatusMined - batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() - batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - - batchInfo.Debug.StartBlockNum - t.cfg.debugBatchStore(batchInfo) - if batchInfo.BatchNum > t.lastConfirmedBatch { - t.lastConfirmedBatch = batchInfo.BatchNum - } - confirm := t.lastBlock - receipt.BlockNumber.Int64() - return &confirm, nil - } - } - return nil, nil -} - -const longWaitDuration = 999 * time.Hour - -// Run the TxManager -func (t *TxManager) Run(ctx context.Context) { - next := 0 - waitDuration := longWaitDuration - - for { - select { - case <-ctx.Done(): - log.Info("TxManager done") - return - case lastBlock := <-t.lastBlockCh: - t.lastBlock = lastBlock - case batchInfo := <-t.batchCh: - if err := t.rollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { - continue - } else if err != nil { - t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) - continue - } - log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) - t.queue = append(t.queue, batchInfo) - waitDuration = t.cfg.TxManagerCheckInterval - case <-time.After(waitDuration): - if len(t.queue) == 0 { - continue - } - current := next - next = (current + 1) % len(t.queue) - batchInfo := t.queue[current] - if err := t.ethTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { - continue - } else if err != nil { //nolint:staticcheck - // We can't get the receipt for the - // transaction, so we can't confirm if it was - // mined - t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) - } - - confirm, err := t.handleReceipt(batchInfo) - if err != nil { //nolint:staticcheck - // Transaction was rejected - t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) - } - if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { - log.Debugw("TxManager tx for RollupForgeBatch confirmed", - "batch", batchInfo.BatchNum) - t.queue = append(t.queue[:current], t.queue[current+1:]...) - if len(t.queue) == 0 { - waitDuration = longWaitDuration - next = 0 - } else { - next = current % len(t.queue) - } - } - } - } -} - -type statsVars struct { - Stats synchronizer.Stats - Vars synchronizer.SCVariablesPtr -} - -// Pipeline manages the forging of batches with parallel server proofs -type Pipeline struct { - cfg Config - consts synchronizer.SCConsts - - // state - batchNum common.BatchNum - lastScheduledL1BatchBlockNum int64 - lastForgeL1TxsNum int64 - started bool - - proversPool *ProversPool - provers []prover.Client - txManager *TxManager - historyDB *historydb.HistoryDB - l2DB *l2db.L2DB - txSelector *txselector.TxSelector - batchBuilder *batchbuilder.BatchBuilder - purger *Purger - - stats synchronizer.Stats - vars synchronizer.SCVariables - statsVarsCh chan statsVars - - ctx context.Context - wg sync.WaitGroup - cancel context.CancelFunc -} - -// NewPipeline creates a new Pipeline -func NewPipeline(ctx context.Context, - cfg Config, - historyDB *historydb.HistoryDB, - l2DB *l2db.L2DB, - txSelector *txselector.TxSelector, - batchBuilder *batchbuilder.BatchBuilder, - purger *Purger, - txManager *TxManager, - provers []prover.Client, - scConsts *synchronizer.SCConsts, -) (*Pipeline, error) { - proversPool := NewProversPool(len(provers)) - proversPoolSize := 0 - for _, prover := range provers { - if err := prover.WaitReady(ctx); err != nil { - log.Errorw("prover.WaitReady", "err", err) - } else { - proversPool.Add(prover) - proversPoolSize++ - } - } - if proversPoolSize == 0 { - return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) - } - return &Pipeline{ - cfg: cfg, - historyDB: historyDB, - l2DB: l2DB, - txSelector: txSelector, - batchBuilder: batchBuilder, - provers: provers, - proversPool: proversPool, - purger: purger, - txManager: txManager, - consts: *scConsts, - statsVarsCh: make(chan statsVars, queueLen), - }, nil -} - -// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats -func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { - p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars} -} - -// reset pipeline state -func (p *Pipeline) reset(batchNum common.BatchNum, - stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { - p.batchNum = batchNum - p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum - p.stats = *stats - p.vars = *vars - p.lastScheduledL1BatchBlockNum = 0 - - err := p.txSelector.Reset(p.batchNum) - if err != nil { - return tracerr.Wrap(err) - } - err = p.batchBuilder.Reset(p.batchNum, true) - if err != nil { - return tracerr.Wrap(err) - } - return nil -} - -func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { - if vars.Rollup != nil { - p.vars.Rollup = *vars.Rollup - } - if vars.Auction != nil { - p.vars.Auction = *vars.Auction - } - if vars.WDelayer != nil { - p.vars.WDelayer = *vars.WDelayer - } -} - -func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { - batchInfo, err := p.forgeBatch(batchNum) - if ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - if tracerr.Unwrap(err) == errLastL1BatchNotSynced { - log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, - "lastForgeL1TxsNum", p.lastForgeL1TxsNum, - "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) - } else { - log.Errorw("forgeBatch", "err", err) - } - return nil, err - } - // 6. Wait for an available server proof (blocking call) - serverProof, err := p.proversPool.Get(ctx) - if ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - log.Errorw("proversPool.Get", "err", err) - return nil, err - } - batchInfo.ServerProof = serverProof - if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { - return nil, ctx.Err() - } else if err != nil { - log.Errorw("sendServerProof", "err", err) - batchInfo.ServerProof = nil - p.proversPool.Add(serverProof) - return nil, err - } - return batchInfo, nil -} - -// Start the forging pipeline -func (p *Pipeline) Start(batchNum common.BatchNum, - stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { - if p.started { - log.Fatal("Pipeline already started") - } - p.started = true - - if err := p.reset(batchNum, stats, vars); err != nil { - return tracerr.Wrap(err) - } - p.ctx, p.cancel = context.WithCancel(context.Background()) - - queueSize := 1 - batchChSentServerProof := make(chan *BatchInfo, queueSize) - - p.wg.Add(1) - const zeroDuration = 0 * time.Second - go func() { - waitDuration := zeroDuration - for { - select { - case <-p.ctx.Done(): - log.Info("Pipeline forgeBatch loop done") - p.wg.Done() - return - case statsVars := <-p.statsVarsCh: - p.stats = statsVars.Stats - p.syncSCVars(statsVars.Vars) - case <-time.After(waitDuration): - batchNum = p.batchNum + 1 - if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { - waitDuration = p.cfg.SyncRetryInterval - continue - } else { - p.batchNum = batchNum - batchChSentServerProof <- batchInfo - } - } - } - }() - - p.wg.Add(1) - go func() { - for { - select { - case <-p.ctx.Done(): - log.Info("Pipeline waitServerProofSendEth loop done") - p.wg.Done() - return - case batchInfo := <-batchChSentServerProof: - err := p.waitServerProof(p.ctx, batchInfo) - // We are done with this serverProof, add it back to the pool - p.proversPool.Add(batchInfo.ServerProof) - batchInfo.ServerProof = nil - if p.ctx.Err() != nil { - continue - } - if err != nil { - log.Errorw("waitServerProof", "err", err) - continue - } - p.txManager.AddBatch(batchInfo) - } - } - }() - return nil -} - -// Stop the forging pipeline -func (p *Pipeline) Stop(ctx context.Context) { - if !p.started { - log.Fatal("Pipeline already stopped") - } - p.started = false - log.Info("Stopping Pipeline...") - p.cancel() - p.wg.Wait() - for _, prover := range p.provers { - if err := prover.Cancel(ctx); ctx.Err() != nil { - continue - } else if err != nil { - log.Errorw("prover.Cancel", "err", err) - } - } -} - -// sendServerProof sends the circuit inputs to the proof server -func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { - p.cfg.debugBatchStore(batchInfo) - - // 7. Call the selected idle server proof with BatchBuilder output, - // save server proof info for batchNum - if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { - return tracerr.Wrap(err) - } - return nil -} - -// forgeBatch the next batch. -func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { - // remove transactions from the pool that have been there for too long - _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), - p.stats.Sync.LastBlock.Num, int64(batchNum)) - if err != nil { - return nil, tracerr.Wrap(err) - } - _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) - if err != nil { - return nil, tracerr.Wrap(err) - } - - batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch - batchInfo.Debug.StartTimestamp = time.Now() - batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 - - selectionCfg := &txselector.SelectionConfig{ - MaxL1UserTxs: common.RollupConstMaxL1UserTx, - TxProcessorConfig: p.cfg.TxProcessorConfig, - } - - var poolL2Txs []common.PoolL2Tx - // var feesInfo - var l1UserTxsExtra, l1CoordTxs []common.L1Tx - var auths [][]byte - var coordIdxs []common.Idx - - // 1. Decide if we forge L2Tx or L1+L2Tx - if p.shouldL1L2Batch(batchInfo) { - batchInfo.L1Batch = true - defer func() { - // If there's no error, update the parameters related - // to the last L1Batch forged - if err == nil { - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.lastForgeL1TxsNum++ - } - }() - if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { - return nil, tracerr.Wrap(errLastL1BatchNotSynced) - //return nil, fmt.Errorf("Not synced yet LastForgeL1TxsNum. Expecting %v, got %v", - // p.lastForgeL1TxsNum, p.stats.Sync.LastForgeL1TxsNum) - } - // 2a: L1+L2 txs - l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) - if err != nil { - return nil, tracerr.Wrap(err) - } - coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err = - p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs) - if err != nil { - return nil, tracerr.Wrap(err) - } - } else { - // 2b: only L2 txs - coordIdxs, auths, l1CoordTxs, poolL2Txs, err = - p.txSelector.GetL2TxSelection(selectionCfg) - if err != nil { - return nil, tracerr.Wrap(err) - } - l1UserTxsExtra = nil - } - - // 3. Save metadata from TxSelector output for BatchNum - batchInfo.L1UserTxsExtra = l1UserTxsExtra - batchInfo.L1CoordTxs = l1CoordTxs - batchInfo.L1CoordinatorTxsAuths = auths - batchInfo.CoordIdxs = coordIdxs - batchInfo.VerifierIdx = p.cfg.VerifierIdx - - if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { - return nil, tracerr.Wrap(err) - } - - // Invalidate transactions that become invalid beause of - // the poolL2Txs selected. Will mark as invalid the txs that have a - // (fromIdx, nonce) which already appears in the selected txs (includes - // all the nonces smaller than the current one) - err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) - if err != nil { - return nil, tracerr.Wrap(err) - } - - // 4. Call BatchBuilder with TxSelector output - configBatch := &batchbuilder.ConfigBatch{ - ForgerAddress: p.cfg.ForgerAddress, - TxProcessorConfig: p.cfg.TxProcessorConfig, - } - zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, - l1CoordTxs, poolL2Txs, nil) - if err != nil { - return nil, tracerr.Wrap(err) - } - l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way - if err != nil { - return nil, tracerr.Wrap(err) - } - batchInfo.L2Txs = l2Txs - - // 5. Save metadata from BatchBuilder output for BatchNum - batchInfo.ZKInputs = zkInputs - batchInfo.Debug.Status = StatusForged - p.cfg.debugBatchStore(batchInfo) - - return batchInfo, nil -} - -// waitServerProof gets the generated zkProof & sends it to the SmartContract -func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { - proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof - if err != nil { - return tracerr.Wrap(err) - } - batchInfo.Proof = proof - batchInfo.PublicInputs = pubInputs - batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) - batchInfo.Debug.Status = StatusProof - p.cfg.debugBatchStore(batchInfo) - return nil -} - -func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { - // Take the lastL1BatchBlockNum as the biggest between the last - // scheduled one, and the synchronized one. - lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum - if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { - lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock - } - // Set Debug information - batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum - batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock - batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum - batchInfo.Debug.L1BatchBlockScheduleDeadline = - int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc) - // Return true if we have passed the l1BatchTimeoutPerc portion of the - // range before the l1batch timeout. - return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >= - int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) -} - -func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { - proof := batchInfo.Proof - zki := batchInfo.ZKInputs - return ð.RollupForgeBatchArgs{ - NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), - NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), - NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), - L1UserTxs: batchInfo.L1UserTxsExtra, - L1CoordinatorTxs: batchInfo.L1CoordTxs, - L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, - L2TxsData: batchInfo.L2Txs, - FeeIdxCoordinator: batchInfo.CoordIdxs, - // Circuit selector - VerifierIdx: batchInfo.VerifierIdx, - L1Batch: batchInfo.L1Batch, - ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]}, - ProofB: [2][2]*big.Int{ - {proof.PiB[0][0], proof.PiB[0][1]}, - {proof.PiB[1][0], proof.PiB[1][1]}, - }, - ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]}, - } -} diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index a09443c..496be03 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -10,10 +10,7 @@ import ( "testing" "time" - ethKeystore "github.com/ethereum/go-ethereum/accounts/keystore" ethCommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/ethclient" "github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/common" dbUtils "github.com/hermeznetwork/hermez-node/db" @@ -25,12 +22,10 @@ import ( "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/test" - "github.com/hermeznetwork/hermez-node/test/til" "github.com/hermeznetwork/hermez-node/txprocessor" "github.com/hermeznetwork/hermez-node/txselector" "github.com/hermeznetwork/tracerr" "github.com/iden3/go-iden3-crypto/babyjub" - "github.com/iden3/go-merkletree" "github.com/iden3/go-merkletree/db/pebble" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -434,74 +429,6 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) { assert.Nil(t, coord.pipeline) } -func TestPipelineShouldL1L2Batch(t *testing.T) { - ethClientSetup := test.NewClientSetupExample() - ethClientSetup.ChainID = big.NewInt(int64(chainID)) - - var timer timer - ctx := context.Background() - ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) - modules := newTestModules(t) - var stats synchronizer.Stats - coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) - pipeline, err := coord.newPipeline(ctx) - require.NoError(t, err) - pipeline.vars = coord.vars - - // Check that the parameters are the ones we expect and use in this test - require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc) - require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout) - l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc - l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout - - startBlock := int64(100) - // Empty batchInfo to pass to shouldL1L2Batch() which sets debug information - batchInfo := BatchInfo{} - - // - // No scheduled L1Batch - // - - // Last L1Batch was a long time ago - stats.Eth.LastBlock.Num = startBlock - stats.Sync.LastBlock = stats.Eth.LastBlock - stats.Sync.LastL1BatchBlock = 0 - pipeline.stats = stats - assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) - - stats.Sync.LastL1BatchBlock = startBlock - - // We are are one block before the timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) - - // We are are at timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) - - // - // Scheduled L1Batch - // - pipeline.lastScheduledL1BatchBlockNum = startBlock - stats.Sync.LastL1BatchBlock = startBlock - 10 - - // We are are one block before the timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) - - // We are are at timeout range * 0.5 - stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - stats.Sync.LastBlock = stats.Eth.LastBlock - pipeline.stats = stats - assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) -} - // ethAddTokens adds the tokens from the blocks to the blockchain func ethAddTokens(blocks []common.BlockData, client *test.Client) { for _, block := range blocks { @@ -517,138 +444,6 @@ func ethAddTokens(blocks []common.BlockData, client *test.Client) { } } -const testTokensLen = 3 -const testUsersLen = 4 - -func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer, - historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context { - // Create a set with `testTokensLen` tokens and for each token - // `testUsersLen` accounts. - var set []til.Instruction - // set = append(set, til.Instruction{Typ: "Blockchain"}) - for tokenID := 1; tokenID < testTokensLen; tokenID++ { - set = append(set, til.Instruction{ - Typ: til.TypeAddToken, - TokenID: common.TokenID(tokenID), - }) - } - depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10) - require.True(t, ok) - for tokenID := 0; tokenID < testTokensLen; tokenID++ { - for user := 0; user < testUsersLen; user++ { - set = append(set, til.Instruction{ - Typ: common.TxTypeCreateAccountDeposit, - TokenID: common.TokenID(tokenID), - DepositAmount: depositAmount, - From: fmt.Sprintf("User%d", user), - }) - } - } - set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) - set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) - set = append(set, til.Instruction{Typ: til.TypeNewBlock}) - - tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) - blocks, err := tc.GenerateBlocksFromInstructions(set) - require.NoError(t, err) - require.NotNil(t, blocks) - - ethAddTokens(blocks, ethClient) - err = ethClient.CtlAddBlocks(blocks) - require.NoError(t, err) - - ctx := context.Background() - for { - syncBlock, discards, err := sync.Sync2(ctx, nil) - require.NoError(t, err) - require.Nil(t, discards) - if syncBlock == nil { - break - } - } - dbTokens, err := historyDB.GetAllTokens() - require.Nil(t, err) - require.Equal(t, testTokensLen, len(dbTokens)) - - dbAccounts, err := historyDB.GetAllAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts)) - - sdbAccounts, err := stateDB.GetAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) - - return tc -} - -func TestPipeline1(t *testing.T) { - ethClientSetup := test.NewClientSetupExample() - ethClientSetup.ChainID = big.NewInt(int64(chainID)) - - var timer timer - ctx := context.Background() - ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) - modules := newTestModules(t) - coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) - sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules) - - // preload the synchronier (via the test ethClient) some tokens and - // users with positive balances - tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) - syncStats := sync.Stats() - batchNum := common.BatchNum(syncStats.Sync.LastBatch) - syncSCVars := sync.SCVars() - - pipeline, err := coord.newPipeline(ctx) - require.NoError(t, err) - - // Insert some l2txs in the Pool - setPool := ` -Type: PoolL2 - -PoolTransfer(0) User0-User1: 100 (126) -PoolTransfer(0) User1-User2: 200 (126) -PoolTransfer(0) User2-User3: 300 (126) - ` - l2txs, err := tilCtx.GeneratePoolL2Txs(setPool) - require.NoError(t, err) - for _, tx := range l2txs { - err := modules.l2DB.AddTxTest(&tx) //nolint:gosec - require.NoError(t, err) - } - - err = pipeline.reset(batchNum, syncStats.Sync.LastForgeL1TxsNum, syncStats, &synchronizer.SCVariables{ - Rollup: *syncSCVars.Rollup, - Auction: *syncSCVars.Auction, - WDelayer: *syncSCVars.WDelayer, - }) - require.NoError(t, err) - // Sanity check - sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) - - // Sanity check - sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts() - require.Nil(t, err) - require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) - - // Sanity check - require.Equal(t, modules.stateDB.MT.Root(), - pipeline.batchBuilder.LocalStateDB().MT.Root()) - - batchNum++ - - batchInfo, err := pipeline.forgeBatch(batchNum) - require.NoError(t, err) - assert.Equal(t, 3, len(batchInfo.L2Txs)) - - batchNum++ - batchInfo, err = pipeline.forgeBatch(batchNum) - require.NoError(t, err) - assert.Equal(t, 0, len(batchInfo.L2Txs)) -} - func TestCoordinatorStress(t *testing.T) { if os.Getenv("TEST_COORD_STRESS") == "" { return @@ -714,79 +509,7 @@ func TestCoordinatorStress(t *testing.T) { coord.Stop() } -func TestRollupForgeBatch(t *testing.T) { - if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" { - return - } - const web3URL = "http://localhost:8545" - const password = "test" - addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7") - sk, err := crypto.HexToECDSA( - "a8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563") - require.NoError(t, err) - rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0") - pathKeystore, err := ioutil.TempDir("", "tmpKeystore") - require.NoError(t, err) - deleteme = append(deleteme, pathKeystore) - ctx := context.Background() - batchInfo := &BatchInfo{} - proofClient := &prover.MockClient{} - chainID := uint16(0) - - ethClient, err := ethclient.Dial(web3URL) - require.NoError(t, err) - ethCfg := eth.EthereumConfig{ - CallGasLimit: 300000, - GasPriceDiv: 100, - } - scryptN := ethKeystore.LightScryptN - scryptP := ethKeystore.LightScryptP - keyStore := ethKeystore.NewKeyStore(pathKeystore, - scryptN, scryptP) - account, err := keyStore.ImportECDSA(sk, password) - require.NoError(t, err) - require.Equal(t, account.Address, addr) - err = keyStore.Unlock(account, password) - require.NoError(t, err) - - client, err := eth.NewClient(ethClient, &account, keyStore, ð.ClientConfig{ - Ethereum: ethCfg, - Rollup: eth.RollupConfig{ - Address: rollupAddr, - }, - Auction: eth.AuctionConfig{ - Address: ethCommon.Address{}, - TokenHEZ: eth.TokenConfig{ - Address: ethCommon.Address{}, - Name: "HEZ", - }, - }, - WDelayer: eth.WDelayerConfig{ - Address: ethCommon.Address{}, - }, - }) - require.NoError(t, err) - - zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1)) - zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1} - zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2} - batchInfo.ZKInputs = zkInputs - err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs) - require.NoError(t, err) - - proof, pubInputs, err := proofClient.GetProof(ctx) - require.NoError(t, err) - batchInfo.Proof = proof - batchInfo.PublicInputs = pubInputs - - batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) - _, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs) - require.NoError(t, err) - batchInfo.Proof = proof -} - // TODO: Test Reorg -// TODO: Test Pipeline // TODO: Test TxMonitor // TODO: Test forgeBatch // TODO: Test waitServerProof diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go new file mode 100644 index 0000000..60f6417 --- /dev/null +++ b/coordinator/pipeline.go @@ -0,0 +1,428 @@ +package coordinator + +import ( + "context" + "fmt" + "math/big" + "sync" + "time" + + "github.com/hermeznetwork/hermez-node/batchbuilder" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/historydb" + "github.com/hermeznetwork/hermez-node/db/l2db" + "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/prover" + "github.com/hermeznetwork/hermez-node/synchronizer" + "github.com/hermeznetwork/hermez-node/txselector" + "github.com/hermeznetwork/tracerr" +) + +type statsVars struct { + Stats synchronizer.Stats + Vars synchronizer.SCVariablesPtr +} + +// Pipeline manages the forging of batches with parallel server proofs +type Pipeline struct { + cfg Config + consts synchronizer.SCConsts + + // state + batchNum common.BatchNum + lastScheduledL1BatchBlockNum int64 + lastForgeL1TxsNum int64 + started bool + + proversPool *ProversPool + provers []prover.Client + txManager *TxManager + historyDB *historydb.HistoryDB + l2DB *l2db.L2DB + txSelector *txselector.TxSelector + batchBuilder *batchbuilder.BatchBuilder + purger *Purger + + stats synchronizer.Stats + vars synchronizer.SCVariables + statsVarsCh chan statsVars + + ctx context.Context + wg sync.WaitGroup + cancel context.CancelFunc +} + +// NewPipeline creates a new Pipeline +func NewPipeline(ctx context.Context, + cfg Config, + historyDB *historydb.HistoryDB, + l2DB *l2db.L2DB, + txSelector *txselector.TxSelector, + batchBuilder *batchbuilder.BatchBuilder, + purger *Purger, + txManager *TxManager, + provers []prover.Client, + scConsts *synchronizer.SCConsts, +) (*Pipeline, error) { + proversPool := NewProversPool(len(provers)) + proversPoolSize := 0 + for _, prover := range provers { + if err := prover.WaitReady(ctx); err != nil { + log.Errorw("prover.WaitReady", "err", err) + } else { + proversPool.Add(prover) + proversPoolSize++ + } + } + if proversPoolSize == 0 { + return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) + } + return &Pipeline{ + cfg: cfg, + historyDB: historyDB, + l2DB: l2DB, + txSelector: txSelector, + batchBuilder: batchBuilder, + provers: provers, + proversPool: proversPool, + purger: purger, + txManager: txManager, + consts: *scConsts, + statsVarsCh: make(chan statsVars, queueLen), + }, nil +} + +// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats +func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { + p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars} +} + +// reset pipeline state +func (p *Pipeline) reset(batchNum common.BatchNum, + stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { + p.batchNum = batchNum + p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum + p.stats = *stats + p.vars = *vars + p.lastScheduledL1BatchBlockNum = 0 + + err := p.txSelector.Reset(p.batchNum) + if err != nil { + return tracerr.Wrap(err) + } + err = p.batchBuilder.Reset(p.batchNum, true) + if err != nil { + return tracerr.Wrap(err) + } + return nil +} + +func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { + if vars.Rollup != nil { + p.vars.Rollup = *vars.Rollup + } + if vars.Auction != nil { + p.vars.Auction = *vars.Auction + } + if vars.WDelayer != nil { + p.vars.WDelayer = *vars.WDelayer + } +} + +// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, +// and then waits for an available proof server and sends the zkInputs to it so +// that the proof computation begins. +func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + batchInfo, err := p.forgeBatch(batchNum) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + if tracerr.Unwrap(err) == errLastL1BatchNotSynced { + log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, + "lastForgeL1TxsNum", p.lastForgeL1TxsNum, + "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) + } else { + log.Errorw("forgeBatch", "err", err) + } + return nil, err + } + // 6. Wait for an available server proof (blocking call) + serverProof, err := p.proversPool.Get(ctx) + if ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("proversPool.Get", "err", err) + return nil, err + } + batchInfo.ServerProof = serverProof + if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { + return nil, ctx.Err() + } else if err != nil { + log.Errorw("sendServerProof", "err", err) + batchInfo.ServerProof = nil + p.proversPool.Add(serverProof) + return nil, err + } + return batchInfo, nil +} + +// Start the forging pipeline +func (p *Pipeline) Start(batchNum common.BatchNum, + stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { + if p.started { + log.Fatal("Pipeline already started") + } + p.started = true + + if err := p.reset(batchNum, stats, vars); err != nil { + return tracerr.Wrap(err) + } + p.ctx, p.cancel = context.WithCancel(context.Background()) + + queueSize := 1 + batchChSentServerProof := make(chan *BatchInfo, queueSize) + + p.wg.Add(1) + go func() { + waitDuration := zeroDuration + for { + select { + case <-p.ctx.Done(): + log.Info("Pipeline forgeBatch loop done") + p.wg.Done() + return + case statsVars := <-p.statsVarsCh: + p.stats = statsVars.Stats + p.syncSCVars(statsVars.Vars) + case <-time.After(waitDuration): + batchNum = p.batchNum + 1 + if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { + waitDuration = p.cfg.SyncRetryInterval + continue + } else { + p.batchNum = batchNum + batchChSentServerProof <- batchInfo + } + } + } + }() + + p.wg.Add(1) + go func() { + for { + select { + case <-p.ctx.Done(): + log.Info("Pipeline waitServerProofSendEth loop done") + p.wg.Done() + return + case batchInfo := <-batchChSentServerProof: + err := p.waitServerProof(p.ctx, batchInfo) + // We are done with this serverProof, add it back to the pool + p.proversPool.Add(batchInfo.ServerProof) + batchInfo.ServerProof = nil + if p.ctx.Err() != nil { + continue + } + if err != nil { + log.Errorw("waitServerProof", "err", err) + continue + } + p.txManager.AddBatch(batchInfo) + } + } + }() + return nil +} + +// Stop the forging pipeline +func (p *Pipeline) Stop(ctx context.Context) { + if !p.started { + log.Fatal("Pipeline already stopped") + } + p.started = false + log.Info("Stopping Pipeline...") + p.cancel() + p.wg.Wait() + for _, prover := range p.provers { + if err := prover.Cancel(ctx); ctx.Err() != nil { + continue + } else if err != nil { + log.Errorw("prover.Cancel", "err", err) + } + } +} + +// sendServerProof sends the circuit inputs to the proof server +func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { + p.cfg.debugBatchStore(batchInfo) + + // 7. Call the selected idle server proof with BatchBuilder output, + // save server proof info for batchNum + if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +// forgeBatch forges the batchNum batch. +func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { + // remove transactions from the pool that have been there for too long + _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, tracerr.Wrap(err) + } + _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, tracerr.Wrap(err) + } + + batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch + batchInfo.Debug.StartTimestamp = time.Now() + batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 + + selectionCfg := &txselector.SelectionConfig{ + MaxL1UserTxs: common.RollupConstMaxL1UserTx, + TxProcessorConfig: p.cfg.TxProcessorConfig, + } + + var poolL2Txs []common.PoolL2Tx + var l1UserTxsExtra, l1CoordTxs []common.L1Tx + var auths [][]byte + var coordIdxs []common.Idx + + // 1. Decide if we forge L2Tx or L1+L2Tx + if p.shouldL1L2Batch(batchInfo) { + batchInfo.L1Batch = true + defer func() { + // If there's no error, update the parameters related + // to the last L1Batch forged + if err == nil { + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.lastForgeL1TxsNum++ + } + }() + if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { + return nil, tracerr.Wrap(errLastL1BatchNotSynced) + } + // 2a: L1+L2 txs + l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) + if err != nil { + return nil, tracerr.Wrap(err) + } + coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err = + p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs) + if err != nil { + return nil, tracerr.Wrap(err) + } + } else { + // 2b: only L2 txs + coordIdxs, auths, l1CoordTxs, poolL2Txs, err = + p.txSelector.GetL2TxSelection(selectionCfg) + if err != nil { + return nil, tracerr.Wrap(err) + } + l1UserTxsExtra = nil + } + + // 3. Save metadata from TxSelector output for BatchNum + batchInfo.L1UserTxsExtra = l1UserTxsExtra + batchInfo.L1CoordTxs = l1CoordTxs + batchInfo.L1CoordinatorTxsAuths = auths + batchInfo.CoordIdxs = coordIdxs + batchInfo.VerifierIdx = p.cfg.VerifierIdx + + if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { + return nil, tracerr.Wrap(err) + } + + // Invalidate transactions that become invalid beause of + // the poolL2Txs selected. Will mark as invalid the txs that have a + // (fromIdx, nonce) which already appears in the selected txs (includes + // all the nonces smaller than the current one) + err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) + if err != nil { + return nil, tracerr.Wrap(err) + } + + // 4. Call BatchBuilder with TxSelector output + configBatch := &batchbuilder.ConfigBatch{ + ForgerAddress: p.cfg.ForgerAddress, + TxProcessorConfig: p.cfg.TxProcessorConfig, + } + zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, + l1CoordTxs, poolL2Txs, nil) + if err != nil { + return nil, tracerr.Wrap(err) + } + l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way + if err != nil { + return nil, tracerr.Wrap(err) + } + batchInfo.L2Txs = l2Txs + + // 5. Save metadata from BatchBuilder output for BatchNum + batchInfo.ZKInputs = zkInputs + batchInfo.Debug.Status = StatusForged + p.cfg.debugBatchStore(batchInfo) + + return batchInfo, nil +} + +// waitServerProof gets the generated zkProof & sends it to the SmartContract +func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { + proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof + if err != nil { + return tracerr.Wrap(err) + } + batchInfo.Proof = proof + batchInfo.PublicInputs = pubInputs + batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) + batchInfo.Debug.Status = StatusProof + p.cfg.debugBatchStore(batchInfo) + return nil +} + +func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { + // Take the lastL1BatchBlockNum as the biggest between the last + // scheduled one, and the synchronized one. + lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum + if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock + } + // Set Debug information + batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum + batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock + batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum + batchInfo.Debug.L1BatchBlockScheduleDeadline = + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc) + // Return true if we have passed the l1BatchTimeoutPerc portion of the + // range before the l1batch timeout. + return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >= + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) +} + +func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { + proof := batchInfo.Proof + zki := batchInfo.ZKInputs + return ð.RollupForgeBatchArgs{ + NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), + NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), + NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), + L1UserTxs: batchInfo.L1UserTxsExtra, + L1CoordinatorTxs: batchInfo.L1CoordTxs, + L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, + L2TxsData: batchInfo.L2Txs, + FeeIdxCoordinator: batchInfo.CoordIdxs, + // Circuit selector + VerifierIdx: batchInfo.VerifierIdx, + L1Batch: batchInfo.L1Batch, + ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]}, + ProofB: [2][2]*big.Int{ + {proof.PiB[0][0], proof.PiB[0][1]}, + {proof.PiB[1][0], proof.PiB[1][1]}, + }, + ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]}, + } +} diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go new file mode 100644 index 0000000..65bde7d --- /dev/null +++ b/coordinator/pipeline_test.go @@ -0,0 +1,297 @@ +package coordinator + +import ( + "context" + "fmt" + "io/ioutil" + "math/big" + "os" + "testing" + + ethKeystore "github.com/ethereum/go-ethereum/accounts/keystore" + ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/historydb" + "github.com/hermeznetwork/hermez-node/db/statedb" + "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/prover" + "github.com/hermeznetwork/hermez-node/synchronizer" + "github.com/hermeznetwork/hermez-node/test" + "github.com/hermeznetwork/hermez-node/test/til" + "github.com/iden3/go-merkletree" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPipelineShouldL1L2Batch(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) + + var timer timer + ctx := context.Background() + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + modules := newTestModules(t) + var stats synchronizer.Stats + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) + pipeline, err := coord.newPipeline(ctx) + require.NoError(t, err) + pipeline.vars = coord.vars + + // Check that the parameters are the ones we expect and use in this test + require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc) + require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout) + l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc + l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout + + startBlock := int64(100) + // Empty batchInfo to pass to shouldL1L2Batch() which sets debug information + batchInfo := BatchInfo{} + + // + // No scheduled L1Batch + // + + // Last L1Batch was a long time ago + stats.Eth.LastBlock.Num = startBlock + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.LastL1BatchBlock = 0 + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) + + stats.Sync.LastL1BatchBlock = startBlock + + // We are are one block before the timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) + + // We are are at timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) + + // + // Scheduled L1Batch + // + pipeline.lastScheduledL1BatchBlockNum = startBlock + stats.Sync.LastL1BatchBlock = startBlock - 10 + + // We are are one block before the timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) + + // We are are at timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) +} + +const testTokensLen = 3 +const testUsersLen = 4 + +func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer, + historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context { + // Create a set with `testTokensLen` tokens and for each token + // `testUsersLen` accounts. + var set []til.Instruction + // set = append(set, til.Instruction{Typ: "Blockchain"}) + for tokenID := 1; tokenID < testTokensLen; tokenID++ { + set = append(set, til.Instruction{ + Typ: til.TypeAddToken, + TokenID: common.TokenID(tokenID), + }) + } + depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10) + require.True(t, ok) + for tokenID := 0; tokenID < testTokensLen; tokenID++ { + for user := 0; user < testUsersLen; user++ { + set = append(set, til.Instruction{ + Typ: common.TxTypeCreateAccountDeposit, + TokenID: common.TokenID(tokenID), + DepositAmount: depositAmount, + From: fmt.Sprintf("User%d", user), + }) + } + } + set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) + set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) + set = append(set, til.Instruction{Typ: til.TypeNewBlock}) + + tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) + blocks, err := tc.GenerateBlocksFromInstructions(set) + require.NoError(t, err) + require.NotNil(t, blocks) + + ethAddTokens(blocks, ethClient) + err = ethClient.CtlAddBlocks(blocks) + require.NoError(t, err) + + ctx := context.Background() + for { + syncBlock, discards, err := sync.Sync2(ctx, nil) + require.NoError(t, err) + require.Nil(t, discards) + if syncBlock == nil { + break + } + } + dbTokens, err := historyDB.GetAllTokens() + require.Nil(t, err) + require.Equal(t, testTokensLen, len(dbTokens)) + + dbAccounts, err := historyDB.GetAllAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts)) + + sdbAccounts, err := stateDB.GetAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) + + return tc +} + +func TestPipelineForgeBatchWithTxs(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) + + var timer timer + ctx := context.Background() + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + modules := newTestModules(t) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) + sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules) + + // preload the synchronier (via the test ethClient) some tokens and + // users with positive balances + tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) + syncStats := sync.Stats() + batchNum := common.BatchNum(syncStats.Sync.LastBatch) + syncSCVars := sync.SCVars() + + pipeline, err := coord.newPipeline(ctx) + require.NoError(t, err) + + // Insert some l2txs in the Pool + setPool := ` +Type: PoolL2 + +PoolTransfer(0) User0-User1: 100 (126) +PoolTransfer(0) User1-User2: 200 (126) +PoolTransfer(0) User2-User3: 300 (126) + ` + l2txs, err := tilCtx.GeneratePoolL2Txs(setPool) + require.NoError(t, err) + for _, tx := range l2txs { + err := modules.l2DB.AddTxTest(&tx) //nolint:gosec + require.NoError(t, err) + } + + err = pipeline.reset(batchNum, syncStats, &synchronizer.SCVariables{ + Rollup: *syncSCVars.Rollup, + Auction: *syncSCVars.Auction, + WDelayer: *syncSCVars.WDelayer, + }) + require.NoError(t, err) + // Sanity check + sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) + + // Sanity check + sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts() + require.Nil(t, err) + require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) + + // Sanity check + require.Equal(t, modules.stateDB.MT.Root(), + pipeline.batchBuilder.LocalStateDB().MT.Root()) + + batchNum++ + + batchInfo, err := pipeline.forgeBatch(batchNum) + require.NoError(t, err) + assert.Equal(t, 3, len(batchInfo.L2Txs)) + + batchNum++ + batchInfo, err = pipeline.forgeBatch(batchNum) + require.NoError(t, err) + assert.Equal(t, 0, len(batchInfo.L2Txs)) +} + +func TestEthRollupForgeBatch(t *testing.T) { + if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" { + return + } + const web3URL = "http://localhost:8545" + const password = "test" + addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7") + sk, err := crypto.HexToECDSA( + "a8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563") + require.NoError(t, err) + rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0") + pathKeystore, err := ioutil.TempDir("", "tmpKeystore") + require.NoError(t, err) + deleteme = append(deleteme, pathKeystore) + ctx := context.Background() + batchInfo := &BatchInfo{} + proofClient := &prover.MockClient{} + chainID := uint16(0) + + ethClient, err := ethclient.Dial(web3URL) + require.NoError(t, err) + ethCfg := eth.EthereumConfig{ + CallGasLimit: 300000, + GasPriceDiv: 100, + } + scryptN := ethKeystore.LightScryptN + scryptP := ethKeystore.LightScryptP + keyStore := ethKeystore.NewKeyStore(pathKeystore, + scryptN, scryptP) + account, err := keyStore.ImportECDSA(sk, password) + require.NoError(t, err) + require.Equal(t, account.Address, addr) + err = keyStore.Unlock(account, password) + require.NoError(t, err) + + client, err := eth.NewClient(ethClient, &account, keyStore, ð.ClientConfig{ + Ethereum: ethCfg, + Rollup: eth.RollupConfig{ + Address: rollupAddr, + }, + Auction: eth.AuctionConfig{ + Address: ethCommon.Address{}, + TokenHEZ: eth.TokenConfig{ + Address: ethCommon.Address{}, + Name: "HEZ", + }, + }, + WDelayer: eth.WDelayerConfig{ + Address: ethCommon.Address{}, + }, + }) + require.NoError(t, err) + + zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1)) + zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1} + zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2} + batchInfo.ZKInputs = zkInputs + err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs) + require.NoError(t, err) + + proof, pubInputs, err := proofClient.GetProof(ctx) + require.NoError(t, err) + batchInfo.Proof = proof + batchInfo.PublicInputs = pubInputs + + batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) + _, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs) + require.NoError(t, err) + batchInfo.Proof = proof +} diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go new file mode 100644 index 0000000..020a79f --- /dev/null +++ b/coordinator/txmanager.go @@ -0,0 +1,207 @@ +package coordinator + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/l2db" + "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/tracerr" +) + +// TxManager handles everything related to ethereum transactions: It makes the +// call to forge, waits for transaction confirmation, and keeps checking them +// until a number of confirmed blocks have passed. +type TxManager struct { + cfg Config + ethClient eth.ClientInterface + l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB + coord *Coordinator // Used only to send messages to stop the pipeline + batchCh chan *BatchInfo + lastBlockCh chan int64 + queue []*BatchInfo + lastBlock int64 + // lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed + lastConfirmedBatch common.BatchNum +} + +// NewTxManager creates a new TxManager +func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, + coord *Coordinator) *TxManager { + return &TxManager{ + cfg: *cfg, + ethClient: ethClient, + l2DB: l2DB, + coord: coord, + batchCh: make(chan *BatchInfo, queueLen), + lastBlockCh: make(chan int64, queueLen), + lastBlock: -1, + } +} + +// AddBatch is a thread safe method to pass a new batch TxManager to be sent to +// the smart contract via the forge call +func (t *TxManager) AddBatch(batchInfo *BatchInfo) { + t.batchCh <- batchInfo +} + +// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager +func (t *TxManager) SetLastBlock(lastBlock int64) { + t.lastBlockCh <- lastBlock +} + +func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { + batchInfo.Debug.Status = StatusSent + batchInfo.Debug.SendBlockNum = t.lastBlock + 1 + batchInfo.Debug.SendTimestamp = time.Now() + batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( + batchInfo.Debug.StartTimestamp).Seconds() + var ethTx *types.Transaction + var err error + for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) + if err != nil { + if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { + log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err, + "block", t.lastBlock+1) + return tracerr.Wrap(err) + } + log.Errorw("TxManager ethClient.RollupForgeBatch", + "attempt", attempt, "err", err, "block", t.lastBlock+1, + "batchNum", batchInfo.BatchNum) + } else { + break + } + select { + case <-ctx.Done(): + return tracerr.Wrap(common.ErrDone) + case <-time.After(t.cfg.EthClientAttemptsDelay): + } + } + if err != nil { + return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) + } + batchInfo.EthTx = ethTx + log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) + t.cfg.debugBatchStore(batchInfo) + if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error { + txHash := batchInfo.EthTx.Hash() + var receipt *types.Receipt + var err error + for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) + if ctx.Err() != nil { + continue + } + if err != nil { + log.Errorw("TxManager ethClient.EthTransactionReceipt", + "attempt", attempt, "err", err) + } else { + break + } + select { + case <-ctx.Done(): + return tracerr.Wrap(common.ErrDone) + case <-time.After(t.cfg.EthClientAttemptsDelay): + } + } + if err != nil { + return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err)) + } + batchInfo.Receipt = receipt + t.cfg.debugBatchStore(batchInfo) + return nil +} + +func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { + receipt := batchInfo.Receipt + if receipt != nil { + if receipt.Status == types.ReceiptStatusFailed { + batchInfo.Debug.Status = StatusFailed + t.cfg.debugBatchStore(batchInfo) + log.Errorw("TxManager receipt status is failed", "receipt", receipt) + return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) + } else if receipt.Status == types.ReceiptStatusSuccessful { + batchInfo.Debug.Status = StatusMined + batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() + batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - + batchInfo.Debug.StartBlockNum + t.cfg.debugBatchStore(batchInfo) + if batchInfo.BatchNum > t.lastConfirmedBatch { + t.lastConfirmedBatch = batchInfo.BatchNum + } + confirm := t.lastBlock - receipt.BlockNumber.Int64() + return &confirm, nil + } + } + return nil, nil +} + +// Run the TxManager +func (t *TxManager) Run(ctx context.Context) { + next := 0 + waitDuration := longWaitDuration + + for { + select { + case <-ctx.Done(): + log.Info("TxManager done") + return + case lastBlock := <-t.lastBlockCh: + t.lastBlock = lastBlock + case batchInfo := <-t.batchCh: + if err := t.callRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { + continue + } else if err != nil { + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) + continue + } + log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) + t.queue = append(t.queue, batchInfo) + waitDuration = t.cfg.TxManagerCheckInterval + case <-time.After(waitDuration): + if len(t.queue) == 0 { + continue + } + current := next + next = (current + 1) % len(t.queue) + batchInfo := t.queue[current] + if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { + continue + } else if err != nil { //nolint:staticcheck + // We can't get the receipt for the + // transaction, so we can't confirm if it was + // mined + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) + } + + confirm, err := t.handleReceipt(batchInfo) + if err != nil { //nolint:staticcheck + // Transaction was rejected + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) + } + if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { + log.Debugw("TxManager tx for RollupForgeBatch confirmed", + "batch", batchInfo.BatchNum) + t.queue = append(t.queue[:current], t.queue[current+1:]...) + if len(t.queue) == 0 { + waitDuration = longWaitDuration + next = 0 + } else { + next = current % len(t.queue) + } + } + } + } +}