From 482c94d374d6dc03d3f762c75e696c55664a8a77 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Tue, 1 Dec 2020 18:05:46 +0100 Subject: [PATCH] Advance coordinator implementation - Common - Move ErrTODO and ErrDone to common for usage where needed. - Coordinator - Move prover types to prover package - Handle reorgs, stopping the pipeline when necessary - Handle ethereum transaction errors by stopping the pipeline - In case of ethereum transaction revert, check for known revert causes (more revert causes can be added to handle more cases) - Fix skipped transactions in TxManager confirmation logic - Cancel and wait for provers to be ready - Connect L2DB to: - purge l2txs due to timeout - mark l2txs at the different states - Connect HistoryDB to query L1UserTxs to forge in an L1Batch - L2DB - Skip update functions when the input slices have no values (to avoid a query with no values that results in an SQL error) - StateDB - In LocalStateDB, fix Reset when mt == nil - Prover (new package) - Rename the interface to Prover - Rename the mock struct to Mock - Extend Prover interface methods to provide everything required by the coordinator - Begin implementing required http client code to interact with server proof (not tested) - Synchronizer: - Add LastForgeL1TxsNum to Stats - Test/Client - Update Auction logic to track slots in which there's no forge during the time before the deadline (following the solidity implementation) --- common/errors.go | 17 +- common/ethauction.go | 6 + coordinator/batch.go | 11 +- coordinator/coordinator.go | 327 ++++++++++++++++++++------------ coordinator/coordinator_test.go | 102 ++++++++-- coordinator/proofpool.go | 90 --------- coordinator/proverspool.go | 38 ++++ db/l2db/l2db.go | 16 +- db/statedb/statedb.go | 13 +- eth/auction.go | 9 +- node/node.go | 6 +- prover/prover.go | 291 ++++++++++++++++++++++++++++ synchronizer/synchronizer.go | 27 ++- test/ethclient.go | 52 +++-- 14 files changed, 735 insertions(+), 270 deletions(-) delete mode 100644 coordinator/proofpool.go create mode 100644 coordinator/proverspool.go create mode 100644 prover/prover.go diff --git a/common/errors.go b/common/errors.go index bb04f59..2ad2da0 100644 --- a/common/errors.go +++ b/common/errors.go @@ -1,6 +1,10 @@ package common -import "errors" +import ( + "errors" + + "github.com/hermeznetwork/tracerr" +) // ErrNotInFF is used when the *big.Int does not fit inside the Finite Field var ErrNotInFF = errors.New("BigInt not inside the Finite Field") @@ -16,3 +20,14 @@ var ErrIdxOverflow = errors.New("Idx overflow, max value: 2**48 -1") // ErrBatchQueueEmpty is used when the coordinator.BatchQueue.Pop() is called and has no elements var ErrBatchQueueEmpty = errors.New("BatchQueue empty") + +// ErrTODO is used when a function is not yet implemented +var ErrTODO = errors.New("TODO") + +// ErrDone is used when a function returns earlier due to a cancelled context +var ErrDone = errors.New("done") + +// IsErrDone returns true if the error or wrapped (with tracerr) error is ErrDone +func IsErrDone(err error) bool { + return tracerr.Unwrap(err) == ErrDone +} diff --git a/common/ethauction.go b/common/ethauction.go index 4005f82..7692348 100644 --- a/common/ethauction.go +++ b/common/ethauction.go @@ -6,6 +6,12 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" ) +const ( + // AuctionErrMsgCannotForge is the message returned in forge with the + // address cannot forge + AuctionErrMsgCannotForge = "HermezAuctionProtocol::forge: CANNOT_FORGE" +) + // AuctionConstants are the constants of the Rollup Smart Contract type AuctionConstants struct { // Blocks per slot diff --git a/coordinator/batch.go b/coordinator/batch.go index 5cc772d..10c7561 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -9,13 +9,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/eth" + "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/tracerr" ) -// Proof TBD this type will be received from the proof server -type Proof struct { -} - // TxStatus is used to mark the status of an ethereum transaction type TxStatus string @@ -29,11 +26,11 @@ const ( // BatchInfo contans the Batch information type BatchInfo struct { BatchNum common.BatchNum - ServerProof ServerProofInterface + ServerProof prover.Client ZKInputs *common.ZKInputs - Proof *Proof + Proof *prover.Proof L1UserTxsExtra []common.L1Tx - L1OperatorTxs []common.L1Tx + L1CoordTxs []common.L1Tx L2Txs []common.PoolL2Tx ForgeBatchArgs *eth.RollupForgeBatchArgs // FeesInfo diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index d619bdc..d1260bf 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -3,6 +3,7 @@ package coordinator import ( "context" "fmt" + "strings" "sync" "time" @@ -11,18 +12,16 @@ import ( "github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/historydb" + "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/txselector" "github.com/hermeznetwork/tracerr" ) -var errTODO = fmt.Errorf("TODO") - -// ErrDone is returned when the function is stopped asynchronously via a done -// (terminated) context. It doesn't indicate an error. -var ErrDone = fmt.Errorf("done") +const queueLen = 16 // Config contains the Coordinator configuration type Config struct { @@ -60,15 +59,16 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) { // Coordinator implements the Coordinator type type Coordinator struct { // State - batchNum common.BatchNum - serverProofs []ServerProofInterface - consts synchronizer.SCConsts - vars synchronizer.SCVariables - started bool + pipelineBatchNum common.BatchNum // batchNum from which we started the pipeline + provers []prover.Client + consts synchronizer.SCConsts + vars synchronizer.SCVariables + started bool cfg Config historyDB *historydb.HistoryDB + l2DB *l2db.L2DB txSelector *txselector.TxSelector batchBuilder *batchbuilder.BatchBuilder @@ -85,9 +85,10 @@ type Coordinator struct { // NewCoordinator creates a new Coordinator func NewCoordinator(cfg Config, historyDB *historydb.HistoryDB, + l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, - serverProofs []ServerProofInterface, + serverProofs []prover.Client, ethClient eth.ClientInterface, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables, @@ -102,18 +103,17 @@ func NewCoordinator(cfg Config, cfg.EthClientAttempts)) } - txManager := NewTxManager(&cfg, ethClient) - ctx, cancel := context.WithCancel(context.Background()) c := Coordinator{ - batchNum: -1, - serverProofs: serverProofs, - consts: *scConsts, - vars: *initSCVars, + pipelineBatchNum: -1, + provers: serverProofs, + consts: *scConsts, + vars: *initSCVars, cfg: cfg, historyDB: historyDB, + l2DB: l2DB, txSelector: txSelector, batchBuilder: batchBuilder, @@ -123,15 +123,15 @@ func NewCoordinator(cfg Config, ctx: ctx, // wg cancel: cancel, - - txManager: txManager, } + txManager := NewTxManager(&cfg, ethClient, l2DB, &c) + c.txManager = txManager return &c, nil } -func (c *Coordinator) newPipeline() *Pipeline { - return NewPipeline(c.cfg, c.historyDB, c.txSelector, c.batchBuilder, - c.txManager, c.serverProofs, &c.consts) +func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { + return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, + c.txSelector, c.batchBuilder, c.txManager, c.provers, &c.consts) } // MsgSyncStats indicates an update to the Synchronizer stats @@ -148,6 +148,12 @@ type MsgSyncSCVars struct { // MsgSyncReorg indicates a reorg type MsgSyncReorg struct { + Stats synchronizer.Stats +} + +// MsgStopPipeline indicates a signal to reset the pipeline +type MsgStopPipeline struct { + Reason string } // SendMsg is a thread safe method to pass a message to the Coordinator @@ -180,7 +186,7 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool { return false } -func (c *Coordinator) handleMsgSyncStats(stats *synchronizer.Stats) error { +func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronizer.Stats) error { if !stats.Synced() { return nil } @@ -189,25 +195,43 @@ func (c *Coordinator) handleMsgSyncStats(stats *synchronizer.Stats) error { canForge := c.canForge(stats) if c.pipeline == nil { if canForge { - log.Info("Coordinator: forging state begin") + log.Infow("Coordinator: forging state begin", "block", stats.Eth.LastBlock.Num, + "batch", stats.Sync.LastBatch) batchNum := common.BatchNum(stats.Sync.LastBatch) - c.pipeline = c.newPipeline() - if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { + var err error + if c.pipeline, err = c.newPipeline(ctx); err != nil { + return tracerr.Wrap(err) + } + if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum, + stats, &c.vars); err != nil { + c.pipeline = nil return tracerr.Wrap(err) } + c.pipelineBatchNum = batchNum } } else { if canForge { c.pipeline.SetSyncStats(stats) } else { - log.Info("Coordinator: forging state end") - c.pipeline.Stop() + log.Infow("Coordinator: forging state end", "block", stats.Eth.LastBlock.Num) + c.pipeline.Stop(c.ctx) c.pipeline = nil } } return nil } +func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error { + if c.pipeline != nil { + c.pipeline.Stop(c.ctx) + c.pipeline = nil + } + if strings.Contains(reason, common.AuctionErrMsgCannotForge) { //nolint:staticcheck + // TODO: Check that we are in a slot in which we can't forge + } + return nil +} + // Start the coordinator func (c *Coordinator) Start() { if c.started { @@ -232,15 +256,26 @@ func (c *Coordinator) Start() { switch msg := msg.(type) { case MsgSyncStats: stats := msg.Stats - if err := c.handleMsgSyncStats(&stats); err != nil { + if err := c.handleMsgSyncStats(c.ctx, &stats); common.IsErrDone(err) { + continue + } else if err != nil { log.Errorw("Coordinator.handleMsgSyncStats error", "err", err) continue } case MsgSyncReorg: - if err := c.handleReorg(); err != nil { + if err := c.handleReorg(c.ctx, &msg.Stats); common.IsErrDone(err) { + continue + } else if err != nil { log.Errorw("Coordinator.handleReorg error", "err", err) continue } + case MsgStopPipeline: + log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason) + if err := c.handleStopPipeline(c.ctx, msg.Reason); common.IsErrDone(err) { + continue + } else if err != nil { + log.Errorw("Coordinator.handleStopPipeline", "err", err) + } case MsgSyncSCVars: c.handleMsgSyncSCVars(&msg) default: @@ -251,6 +286,8 @@ func (c *Coordinator) Start() { }() } +const stopCtxTimeout = 200 * time.Millisecond + // Stop the coordinator func (c *Coordinator) Stop() { if !c.started { @@ -261,13 +298,31 @@ func (c *Coordinator) Stop() { c.cancel() c.wg.Wait() if c.pipeline != nil { - c.pipeline.Stop() + ctx, cancel := context.WithTimeout(context.Background(), stopCtxTimeout) + defer cancel() + c.pipeline.Stop(ctx) c.pipeline = nil } } -func (c *Coordinator) handleReorg() error { - return nil // TODO +func (c *Coordinator) handleReorg(ctx context.Context, stats *synchronizer.Stats) error { + if common.BatchNum(stats.Sync.LastBatch) < c.pipelineBatchNum { + // There's been a reorg and the batch from which the pipeline + // was started was in a block that was discarded. The batch + // may not be in the main chain, so we stop the pipeline as a + // precaution (it will be started again once the node is in + // sync). + log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum", + "sync.LastBatch", stats.Sync.LastBatch, + "c.pipelineBatchNum", c.pipelineBatchNum) + if err := c.handleStopPipeline(ctx, "reorg"); err != nil { + return tracerr.Wrap(err) + } + if err := c.l2DB.Reorg(common.BatchNum(stats.Sync.LastBatch)); err != nil { + return tracerr.Wrap(err) + } + } + return nil } // TxManager handles everything related to ethereum transactions: It makes the @@ -276,21 +331,26 @@ func (c *Coordinator) handleReorg() error { type TxManager struct { cfg Config ethClient eth.ClientInterface + l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB + coord *Coordinator // Used only to send messages to stop the pipeline batchCh chan *BatchInfo lastBlockCh chan int64 queue []*BatchInfo lastBlock int64 + // lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed + lastConfirmedBatch common.BatchNum } // NewTxManager creates a new TxManager -func NewTxManager(cfg *Config, ethClient eth.ClientInterface) *TxManager { +func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, + coord *Coordinator) *TxManager { return &TxManager{ - cfg: *cfg, - ethClient: ethClient, - // TODO: Find best queue size - batchCh: make(chan *BatchInfo, 16), //nolint:gomnd - // TODO: Find best queue size - lastBlockCh: make(chan int64, 16), //nolint:gomnd + cfg: *cfg, + ethClient: ethClient, + l2DB: l2DB, + coord: coord, + batchCh: make(chan *BatchInfo, queueLen), + lastBlockCh: make(chan int64, queueLen), lastBlock: -1, } } @@ -312,14 +372,19 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) if err != nil { + if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { + log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err, + "block", t.lastBlock) + return tracerr.Wrap(err) + } log.Errorw("TxManager ethClient.RollupForgeBatch", - "attempt", attempt, "err", err) + "attempt", attempt, "err", err, "block", t.lastBlock) } else { break } select { case <-ctx.Done(): - return tracerr.Wrap(ErrDone) + return tracerr.Wrap(common.ErrDone) case <-time.After(t.cfg.EthClientAttemptsDelay): } } @@ -327,7 +392,11 @@ func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) } batchInfo.EthTx = ethTx + log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) t.cfg.debugBatchStore(batchInfo) + if err := t.l2DB.DoneForging(l2TxsIDs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { + return tracerr.Wrap(err) + } return nil } @@ -345,7 +414,7 @@ func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchI } select { case <-ctx.Done(): - return tracerr.Wrap(ErrDone) + return tracerr.Wrap(common.ErrDone) case <-time.After(t.cfg.EthClientAttemptsDelay): } } @@ -364,6 +433,9 @@ func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { log.Errorw("TxManager receipt status is failed", "receipt", receipt) return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) } else if receipt.Status == types.ReceiptStatusSuccessful { + if batchInfo.BatchNum > t.lastConfirmedBatch { + t.lastConfirmedBatch = batchInfo.BatchNum + } confirm := t.lastBlock - receipt.BlockNumber.Int64() return &confirm, nil } @@ -385,10 +457,10 @@ func (t *TxManager) Run(ctx context.Context) { case lastBlock := <-t.lastBlockCh: t.lastBlock = lastBlock case batchInfo := <-t.batchCh: - if err := t.rollupForgeBatch(ctx, batchInfo); tracerr.Unwrap(err) == ErrDone { + if err := t.rollupForgeBatch(ctx, batchInfo); common.IsErrDone(err) { continue } else if err != nil { - // TODO: Reset pipeline + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) continue } log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) @@ -398,35 +470,35 @@ func (t *TxManager) Run(ctx context.Context) { if len(t.queue) == 0 { continue } - batchInfo := t.queue[next] + current := next + next = (current + 1) % len(t.queue) + batchInfo := t.queue[current] err := t.ethTransactionReceipt(ctx, batchInfo) - if tracerr.Unwrap(err) == ErrDone { + if common.IsErrDone(err) { continue } else if err != nil { //nolint:staticcheck // We can't get the receipt for the // transaction, so we can't confirm if it was // mined - // TODO: Reset pipeline + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) } confirm, err := t.handleReceipt(batchInfo) if err != nil { //nolint:staticcheck // Transaction was rejected - // TODO: Reset pipeline + t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) } if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { log.Debugw("TxManager tx for RollupForgeBatch confirmed", - "batchNum", batchInfo.BatchNum) - t.queue = t.queue[1:] + "batch", batchInfo.BatchNum) + t.queue = append(t.queue[:current], t.queue[current+1:]...) if len(t.queue) == 0 { waitTime = longWaitTime + next = 0 + } else { + next = current % len(t.queue) } } - if len(t.queue) == 0 { - next = 0 - } else { - next = (next + 1) % len(t.queue) - } } } } @@ -440,13 +512,16 @@ type Pipeline struct { batchNum common.BatchNum vars synchronizer.SCVariables lastScheduledL1BatchBlockNum int64 + lastForgeL1TxsNum int64 started bool - serverProofPool *ServerProofPool - txManager *TxManager - historyDB *historydb.HistoryDB - txSelector *txselector.TxSelector - batchBuilder *batchbuilder.BatchBuilder + proversPool *ProversPool + provers []prover.Client + txManager *TxManager + historyDB *historydb.HistoryDB + l2DB *l2db.L2DB + txSelector *txselector.TxSelector + batchBuilder *batchbuilder.BatchBuilder stats synchronizer.Stats statsCh chan synchronizer.Stats @@ -457,29 +532,41 @@ type Pipeline struct { } // NewPipeline creates a new Pipeline -func NewPipeline(cfg Config, +func NewPipeline(ctx context.Context, + cfg Config, historyDB *historydb.HistoryDB, + l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, txManager *TxManager, - serverProofs []ServerProofInterface, + provers []prover.Client, scConsts *synchronizer.SCConsts, -) *Pipeline { - serverProofPool := NewServerProofPool(len(serverProofs)) - for _, serverProof := range serverProofs { - serverProofPool.Add(serverProof) +) (*Pipeline, error) { + proversPool := NewProversPool(len(provers)) + proversPoolSize := 0 + for _, prover := range provers { + if err := prover.WaitReady(ctx); err != nil { + log.Errorw("prover.WaitReady", "err", err) + } else { + proversPool.Add(prover) + proversPoolSize++ + } } - return &Pipeline{ - cfg: cfg, - historyDB: historyDB, - txSelector: txSelector, - batchBuilder: batchBuilder, - serverProofPool: serverProofPool, - txManager: txManager, - consts: *scConsts, - // TODO: Find best queue size - statsCh: make(chan synchronizer.Stats, 16), //nolint:gomnd + if proversPoolSize == 0 { + return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) } + return &Pipeline{ + cfg: cfg, + historyDB: historyDB, + l2DB: l2DB, + txSelector: txSelector, + batchBuilder: batchBuilder, + provers: provers, + proversPool: proversPool, + txManager: txManager, + consts: *scConsts, + statsCh: make(chan synchronizer.Stats, queueLen), + }, nil } // SetSyncStats is a thread safe method to sets the synchronizer Stats @@ -488,7 +575,7 @@ func (p *Pipeline) SetSyncStats(stats *synchronizer.Stats) { } // Start the forging pipeline -func (p *Pipeline) Start(batchNum common.BatchNum, +func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64, syncStats *synchronizer.Stats, initSCVars *synchronizer.SCVariables) error { if p.started { log.Fatal("Pipeline already started") @@ -497,6 +584,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, // Reset pipeline state p.batchNum = batchNum + p.lastForgeL1TxsNum = lastForgeL1TxsNum p.vars = *initSCVars p.lastScheduledL1BatchBlockNum = 0 @@ -504,12 +592,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum, err := p.txSelector.Reset(p.batchNum) if err != nil { - log.Errorw("Pipeline: TxSelector.Reset", "error", err) return tracerr.Wrap(err) } err = p.batchBuilder.Reset(p.batchNum, true) if err != nil { - log.Errorw("Pipeline: BatchBuilder.Reset", "error", err) return tracerr.Wrap(err) } @@ -529,7 +615,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, default: p.batchNum = p.batchNum + 1 batchInfo, err := p.forgeSendServerProof(p.ctx, p.batchNum) - if tracerr.Unwrap(err) == ErrDone { + if common.IsErrDone(err) { continue } if err != nil { @@ -551,7 +637,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, return case batchInfo := <-batchChSentServerProof: err := p.waitServerProof(p.ctx, batchInfo) - if tracerr.Unwrap(err) == ErrDone { + if common.IsErrDone(err) { continue } if err != nil { @@ -566,7 +652,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum, } // Stop the forging pipeline -func (p *Pipeline) Stop() { +func (p *Pipeline) Stop(ctx context.Context) { if !p.started { log.Fatal("Pipeline already stopped") } @@ -574,14 +660,26 @@ func (p *Pipeline) Stop() { log.Debug("Stopping Pipeline...") p.cancel() p.wg.Wait() - // TODO: Cancel all proofServers with pending proofs + for _, prover := range p.provers { + if err := prover.Cancel(ctx); err != nil { + log.Errorw("prover.Cancel", "err", err) + } + } +} + +func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID { + txIDs := make([]common.TxID, len(txs)) + for i, tx := range txs { + txIDs[i] = tx.TxID + } + return txIDs } // forgeSendServerProof the next batch, wait for a proof server to be available and send the // circuit inputs to the proof server. func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { // remove transactions from the pool that have been there for too long - err := p.purgeRemoveByTimeout() + err := p.l2DB.Purge(common.BatchNum(p.stats.Sync.LastBatch)) if err != nil { return nil, tracerr.Wrap(err) } @@ -590,25 +688,37 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat var poolL2Txs []common.PoolL2Tx // var feesInfo - var l1UserTxsExtra, l1OperatorTxs []common.L1Tx + var l1UserTxsExtra, l1CoordTxs []common.L1Tx // 1. Decide if we forge L2Tx or L1+L2Tx if p.shouldL1L2Batch() { - p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBatch + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num // 2a: L1+L2 txs - // l1UserTxs, toForgeL1TxsNumber := c.historyDB.GetNextL1UserTxs() // TODO once HistoryDB is ready, uncomment - var l1UserTxs []common.L1Tx = nil // tmp, depends on HistoryDB - l1UserTxsExtra, l1OperatorTxs, poolL2Txs, err = p.txSelector.GetL1L2TxSelection([]common.Idx{}, batchNum, l1UserTxs) // TODO once feesInfo is added to method return, add the var + p.lastForgeL1TxsNum++ + l1UserTxs, err := p.historyDB.GetL1UserTxs(p.lastForgeL1TxsNum) + if err != nil { + return nil, tracerr.Wrap(err) + } + l1UserTxsExtra, l1CoordTxs, poolL2Txs, err = p.txSelector.GetL1L2TxSelection([]common.Idx{}, batchNum, l1UserTxs) // TODO once feesInfo is added to method return, add the var if err != nil { return nil, tracerr.Wrap(err) } } else { // 2b: only L2 txs - _, poolL2Txs, err = p.txSelector.GetL2TxSelection([]common.Idx{}, batchNum) // TODO once feesInfo is added to method return, add the var + l1CoordTxs, poolL2Txs, err = p.txSelector.GetL2TxSelection([]common.Idx{}, batchNum) if err != nil { return nil, tracerr.Wrap(err) } l1UserTxsExtra = nil - l1OperatorTxs = nil + } + + // 3. Save metadata from TxSelector output for BatchNum + // TODO feesInfo + batchInfo.L1UserTxsExtra = l1UserTxsExtra + batchInfo.L1CoordTxs = l1CoordTxs + batchInfo.L2Txs = poolL2Txs + + if err := p.l2DB.StartForging(l2TxsIDs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { + return nil, tracerr.Wrap(err) } // Run purger to invalidate transactions that become invalid beause of @@ -620,17 +730,12 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat return nil, tracerr.Wrap(err) } - // 3. Save metadata from TxSelector output for BatchNum - // batchInfo.SetTxsInfo(l1UserTxsExtra, l1OperatorTxs, poolL2Txs) // TODO feesInfo - batchInfo.L1UserTxsExtra = l1UserTxsExtra - batchInfo.L1OperatorTxs = l1OperatorTxs - batchInfo.L2Txs = poolL2Txs - // 4. Call BatchBuilder with TxSelector output configBatch := &batchbuilder.ConfigBatch{ ForgerAddress: p.cfg.ForgerAddress, } - zkInputs, err := p.batchBuilder.BuildBatch([]common.Idx{}, configBatch, l1UserTxsExtra, l1OperatorTxs, poolL2Txs, nil) // TODO []common.TokenID --> feesInfo + zkInputs, err := p.batchBuilder.BuildBatch([]common.Idx{}, configBatch, + l1UserTxsExtra, l1CoordTxs, poolL2Txs, nil) // TODO []common.TokenID --> feesInfo if err != nil { return nil, tracerr.Wrap(err) } @@ -640,7 +745,7 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat p.cfg.debugBatchStore(&batchInfo) // 6. Wait for an available server proof blocking call - serverProof, err := p.serverProofPool.Get(ctx) + serverProof, err := p.proversPool.Get(ctx) if err != nil { return nil, tracerr.Wrap(err) } @@ -649,7 +754,7 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat // If there's an error further on, add the serverProof back to // the pool if err != nil { - p.serverProofPool.Add(serverProof) + p.proversPool.Add(serverProof) } }() p.cfg.debugBatchStore(&batchInfo) @@ -670,7 +775,7 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er if err != nil { return tracerr.Wrap(err) } - p.serverProofPool.Add(batchInfo.ServerProof) + p.proversPool.Add(batchInfo.ServerProof) batchInfo.ServerProof = nil batchInfo.Proof = proof batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo) @@ -679,24 +784,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er return nil } -// isForgeSequence returns true if the node is the Forger in the current ethereum block -// func (c *Coordinator) isForgeSequence() (bool, error) { -// // TODO: Consider checking if we can forge by quering the Synchronizer instead of using ethClient -// blockNum, err := c.ethClient.EthLastBlock() -// if err != nil { -// return false, err -// } -// addr, err := c.ethClient.EthAddress() -// if err != nil { -// return false, err -// } -// return c.ethClient.AuctionCanForge(*addr, blockNum+1) -// } - -func (p *Pipeline) purgeRemoveByTimeout() error { - return nil // TODO -} - func (p *Pipeline) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error { return nil // TODO } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 024e162..1e6413d 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -1,6 +1,8 @@ package coordinator import ( + "context" + "fmt" "io/ioutil" "math/big" "os" @@ -10,18 +12,53 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" "github.com/hermeznetwork/hermez-node/batchbuilder" dbUtils "github.com/hermeznetwork/hermez-node/db" + "github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/test" "github.com/hermeznetwork/hermez-node/txselector" + "github.com/hermeznetwork/tracerr" + "github.com/iden3/go-merkletree/db/pebble" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var deleteme = []string{} +func pebbleMakeCheckpoint(source, dest string) error { + // Remove dest folder (if it exists) before doing the checkpoint + if _, err := os.Stat(dest); !os.IsNotExist(err) { + err := os.RemoveAll(dest) + if err != nil { + return tracerr.Wrap(err) + } + } else if err != nil && !os.IsNotExist(err) { + return tracerr.Wrap(err) + } + + sto, err := pebble.NewPebbleStorage(source, false) + if err != nil { + return tracerr.Wrap(err) + } + defer func() { + errClose := sto.Pebble().Close() + if errClose != nil { + log.Errorw("Pebble.Close", "err", errClose) + } + }() + + // execute Checkpoint + err = sto.Pebble().Checkpoint(dest) + if err != nil { + return tracerr.Wrap(err) + } + + return nil +} + func TestMain(m *testing.M) { exitVal := m.Run() for _, dir := range deleteme { @@ -32,10 +69,16 @@ func TestMain(m *testing.M) { os.Exit(exitVal) } -func newTestModules(t *testing.T) (*txselector.TxSelector, *batchbuilder.BatchBuilder) { // FUTURE once Synchronizer is ready, should return it also +var syncDBPath string +var txSelDBPath string +var batchBuilderDBPath string + +func newTestModules(t *testing.T) (*historydb.HistoryDB, *l2db.L2DB, + *txselector.TxSelector, *batchbuilder.BatchBuilder) { // FUTURE once Synchronizer is ready, should return it also nLevels := 32 - syncDBPath, err := ioutil.TempDir("", "tmpSyncDB") + var err error + syncDBPath, err = ioutil.TempDir("", "tmpSyncDB") require.Nil(t, err) deleteme = append(deleteme, syncDBPath) syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels) @@ -45,22 +88,23 @@ func newTestModules(t *testing.T) (*txselector.TxSelector, *batchbuilder.BatchBu db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") require.Nil(t, err) l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour) + historyDB := historydb.NewHistoryDB(db) - txselDir, err := ioutil.TempDir("", "tmpTxSelDB") + txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB") require.Nil(t, err) - deleteme = append(deleteme, txselDir) - txsel, err := txselector.NewTxSelector(txselDir, syncSdb, l2DB, 10, 10, 10) + deleteme = append(deleteme, txSelDBPath) + txsel, err := txselector.NewTxSelector(txSelDBPath, syncSdb, l2DB, 10, 10, 10) assert.Nil(t, err) - bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB") + batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB") require.Nil(t, err) - deleteme = append(deleteme, bbDir) - bb, err := batchbuilder.NewBatchBuilder(bbDir, syncSdb, nil, 0, uint64(nLevels)) + deleteme = append(deleteme, batchBuilderDBPath) + bb, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncSdb, nil, 0, uint64(nLevels)) assert.Nil(t, err) // l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0) - return txsel, bb + return historyDB, l2DB, txsel, bb } type timer struct { @@ -77,7 +121,7 @@ var bidder = ethCommon.HexToAddress("0x6b175474e89094c44da98b954eedeac495271d0f" var forger = ethCommon.HexToAddress("0xc344E203a046Da13b0B4467EB7B3629D0C99F6E6") func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *test.Client, ethClientSetup *test.ClientSetup) *Coordinator { - txsel, bb := newTestModules(t) + historyDB, l2DB, txsel, bb := newTestModules(t) debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch") require.Nil(t, err) @@ -89,10 +133,10 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t L1BatchTimeoutPerc: 0.5, EthClientAttempts: 5, EthClientAttemptsDelay: 100 * time.Millisecond, - TxManagerCheckInterval: 500 * time.Millisecond, + TxManagerCheckInterval: 300 * time.Millisecond, DebugBatchPath: debugBatchPath, } - serverProofs := []ServerProofInterface{&ServerProofMock{}, &ServerProofMock{}} + serverProofs := []prover.Client{&prover.MockClient{}, &prover.MockClient{}} scConsts := &synchronizer.SCConsts{ Rollup: *ethClientSetup.RollupConstants, @@ -104,7 +148,8 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t Auction: *ethClientSetup.AuctionVariables, WDelayer: *ethClientSetup.WDelayerVariables, } - coord, err := NewCoordinator(conf, nil, txsel, bb, serverProofs, ethClient, scConsts, initSCVars) + coord, err := NewCoordinator(conf, historyDB, l2DB, txsel, bb, serverProofs, + ethClient, scConsts, initSCVars) require.Nil(t, err) return coord } @@ -142,13 +187,26 @@ func TestCoordinatorFlow(t *testing.T) { time.Sleep(100 * time.Millisecond) var stats synchronizer.Stats stats.Eth.LastBlock = *ethClient.CtlLastBlock() - stats.Sync.LastBlock = *ethClient.CtlLastBlock() + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Eth.LastBatch = ethClient.CtlLastForgedBatch() + stats.Sync.LastBatch = stats.Eth.LastBatch canForge, err := ethClient.AuctionCanForge(forger, blockNum+1) require.Nil(t, err) if canForge { // fmt.Println("DBG canForge") stats.Sync.Auction.CurrentSlot.Forger = forger } + // Copy stateDB to synchronizer if there was a new batch + source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch) + dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch) + if stats.Sync.LastBatch != 0 { + if _, err := os.Stat(dest); os.IsNotExist(err) { + log.Infow("Making pebble checkpoint for sync", + "source", source, "dest", dest) + err = pebbleMakeCheckpoint(source, dest) + require.NoError(t, err) + } + } coord.SendMsg(MsgSyncStats{ Stats: stats, }) @@ -247,6 +305,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { require.Nil(t, err) var stats synchronizer.Stats + ctx := context.Background() // Slot 0. No bid, so the winner is the boot coordinator // pipelineStarted: false -> false @@ -254,7 +313,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger assert.Equal(t, false, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(&stats)) + require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) assert.Nil(t, coord.pipeline) // Slot 0. No bid, and we reach the deadline, so anyone can forge @@ -264,7 +323,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger assert.Equal(t, true, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(&stats)) + require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) assert.NotNil(t, coord.pipeline) // Slot 0. No bid, and we reach the deadline, so anyone can forge @@ -274,7 +333,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger assert.Equal(t, true, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(&stats)) + require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) assert.NotNil(t, coord.pipeline) // Slot 0. No bid, so the winner is the boot coordinator @@ -284,7 +343,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger assert.Equal(t, false, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(&stats)) + require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) assert.Nil(t, coord.pipeline) } @@ -292,9 +351,11 @@ func TestPipelineShouldL1L2Batch(t *testing.T) { ethClientSetup := test.NewClientSetupExample() var timer timer + ctx := context.Background() ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) - pipeline := coord.newPipeline() + pipeline, err := coord.newPipeline(ctx) + require.NoError(t, err) pipeline.vars = coord.vars // Check that the parameters are the ones we expect and use in this test @@ -354,3 +415,6 @@ func TestPipelineShouldL1L2Batch(t *testing.T) { // TODO: Test Reorg // TODO: Test Pipeline // TODO: Test TxMonitor +// TODO: Test forgeSendServerProof +// TODO: Test waitServerProof +// TODO: Test handleReorg diff --git a/coordinator/proofpool.go b/coordinator/proofpool.go deleted file mode 100644 index 7e7a2bc..0000000 --- a/coordinator/proofpool.go +++ /dev/null @@ -1,90 +0,0 @@ -package coordinator - -import ( - "context" - "time" - - "github.com/hermeznetwork/hermez-node/common" - "github.com/hermeznetwork/hermez-node/log" - "github.com/hermeznetwork/tracerr" -) - -// ServerProofInterface is the interface to a ServerProof that calculates zk proofs -type ServerProofInterface interface { - CalculateProof(zkInputs *common.ZKInputs) error - GetProof(ctx context.Context) (*Proof, error) -} - -// ServerProof contains the data related to a ServerProof -type ServerProof struct { - // TODO - URL string - Available bool -} - -// NewServerProof creates a new ServerProof -func NewServerProof(URL string) *ServerProof { - return &ServerProof{URL: URL} -} - -// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the -// Proof -func (p *ServerProof) CalculateProof(zkInputs *common.ZKInputs) error { - log.Error("TODO") - return tracerr.Wrap(errTODO) -} - -// GetProof retreives the Proof from the ServerProof -func (p *ServerProof) GetProof(ctx context.Context) (*Proof, error) { - log.Error("TODO") - return nil, tracerr.Wrap(errTODO) -} - -// ServerProofMock is a mock ServerProof to be used in tests. It doesn't calculate anything -type ServerProofMock struct { -} - -// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the -// Proof -func (p *ServerProofMock) CalculateProof(zkInputs *common.ZKInputs) error { - return nil -} - -// GetProof retreives the Proof from the ServerProof -func (p *ServerProofMock) GetProof(ctx context.Context) (*Proof, error) { - // Simulate a delay - select { - case <-time.After(200 * time.Millisecond): //nolint:gomnd - return &Proof{}, nil - case <-ctx.Done(): - return nil, tracerr.Wrap(ErrDone) - } -} - -// ServerProofPool contains the multiple ServerProof -type ServerProofPool struct { - pool chan ServerProofInterface -} - -// NewServerProofPool creates a new pool of ServerProofs. -func NewServerProofPool(maxServerProofs int) *ServerProofPool { - return &ServerProofPool{ - pool: make(chan ServerProofInterface, maxServerProofs), - } -} - -// Add a ServerProof to the pool -func (p *ServerProofPool) Add(serverProof ServerProofInterface) { - p.pool <- serverProof -} - -// Get returns the next available ServerProof -func (p *ServerProofPool) Get(ctx context.Context) (ServerProofInterface, error) { - select { - case <-ctx.Done(): - log.Info("ServerProofPool.Get done") - return nil, tracerr.Wrap(ErrDone) - case serverProof := <-p.pool: - return serverProof, nil - } -} diff --git a/coordinator/proverspool.go b/coordinator/proverspool.go new file mode 100644 index 0000000..d873c4a --- /dev/null +++ b/coordinator/proverspool.go @@ -0,0 +1,38 @@ +package coordinator + +import ( + "context" + + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/prover" + "github.com/hermeznetwork/tracerr" +) + +// ProversPool contains the multiple prover clients +type ProversPool struct { + pool chan prover.Client +} + +// NewProversPool creates a new pool of provers. +func NewProversPool(maxServerProofs int) *ProversPool { + return &ProversPool{ + pool: make(chan prover.Client, maxServerProofs), + } +} + +// Add a prover to the pool +func (p *ProversPool) Add(serverProof prover.Client) { + p.pool <- serverProof +} + +// Get returns the next available prover +func (p *ProversPool) Get(ctx context.Context) (prover.Client, error) { + select { + case <-ctx.Done(): + log.Info("ServerProofPool.Get done") + return nil, tracerr.Wrap(common.ErrDone) + case serverProof := <-p.pool: + return serverProof, nil + } +} diff --git a/db/l2db/l2db.go b/db/l2db/l2db.go index 6f0f2ae..b4b8b90 100644 --- a/db/l2db/l2db.go +++ b/db/l2db/l2db.go @@ -177,6 +177,9 @@ func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) { // StartForging updates the state of the transactions that will begin the forging process. // The state of the txs referenced by txIDs will be changed from Pending -> Forging func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error { + if len(txIDs) == 0 { + return nil + } query, args, err := sqlx.In( `UPDATE tx_pool SET state = ?, batch_num = ? @@ -197,6 +200,9 @@ func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) er // DoneForging updates the state of the transactions that have been forged // so the state of the txs referenced by txIDs will be changed from Forging -> Forged func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error { + if len(txIDs) == 0 { + return nil + } query, args, err := sqlx.In( `UPDATE tx_pool SET state = ?, batch_num = ? @@ -217,6 +223,9 @@ func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) err // InvalidateTxs updates the state of the transactions that are invalid. // The state of the txs referenced by txIDs will be changed from * -> Invalid func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error { + if len(txIDs) == 0 { + return nil + } query, args, err := sqlx.In( `UPDATE tx_pool SET state = ?, batch_num = ? @@ -236,6 +245,9 @@ func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) e // CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces. // The state of the affected txs will be changed from Pending -> Invalid func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) (err error) { + if len(updatedAccounts) == 0 { + return nil + } txn, err := l2db.db.Beginx() if err != nil { return tracerr.Wrap(err) @@ -261,7 +273,7 @@ func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common. return tracerr.Wrap(err) } } - return txn.Commit() + return tracerr.Wrap(txn.Commit()) } // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg. @@ -312,5 +324,5 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) { if err != nil { return tracerr.Wrap(err) } - return txn.Commit() + return tracerr.Wrap(txn.Commit()) } diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index b621d16..166a1ec 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -541,7 +541,8 @@ func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) er // use checkpoint from SynchronizerStateDB if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) { // if synchronizerStateDB does not have checkpoint at batchNum, return err - return tracerr.Wrap(fmt.Errorf("Checkpoint not exist in Synchronizer")) + return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" not exist in Synchronizer", + synchronizerCheckpointPath)) } if err := l.db.Pebble().Close(); err != nil { @@ -576,11 +577,13 @@ func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) er return tracerr.Wrap(err) } // open the MT for the current s.db - mt, err := merkletree.NewMerkleTree(l.db.WithPrefix(PrefixKeyMT), l.mt.MaxLevels()) - if err != nil { - return tracerr.Wrap(err) + if l.mt != nil { + mt, err := merkletree.NewMerkleTree(l.db.WithPrefix(PrefixKeyMT), l.mt.MaxLevels()) + if err != nil { + return tracerr.Wrap(err) + } + l.mt = mt } - l.mt = mt return nil } diff --git a/eth/auction.go b/eth/auction.go index cf23bcf..da860b9 100644 --- a/eth/auction.go +++ b/eth/auction.go @@ -22,10 +22,11 @@ import ( // SlotState is the state of a slot type SlotState struct { - Bidder ethCommon.Address - Fulfilled bool - BidAmount *big.Int - ClosedMinBid *big.Int + Bidder ethCommon.Address + ForgerCommitment bool + Fulfilled bool + BidAmount *big.Int + ClosedMinBid *big.Int } // NewSlotState returns an empty SlotState diff --git a/node/node.go b/node/node.go index c3e317f..7f85c5e 100644 --- a/node/node.go +++ b/node/node.go @@ -21,6 +21,7 @@ import ( "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/test/debugapi" "github.com/hermeznetwork/hermez-node/txselector" @@ -162,9 +163,9 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, if err != nil { return nil, tracerr.Wrap(err) } - serverProofs := make([]coordinator.ServerProofInterface, len(coordCfg.ServerProofs)) + serverProofs := make([]prover.Client, len(coordCfg.ServerProofs)) for i, serverProofCfg := range coordCfg.ServerProofs { - serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL) + serverProofs[i] = prover.NewProofServerClient(serverProofCfg.URL) } coord, err = coordinator.NewCoordinator( @@ -173,6 +174,7 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, ConfirmBlocks: coordCfg.ConfirmBlocks, }, historyDB, + l2DB, txSelector, batchBuilder, serverProofs, diff --git a/prover/prover.go b/prover/prover.go new file mode 100644 index 0000000..acec14f --- /dev/null +++ b/prover/prover.go @@ -0,0 +1,291 @@ +package prover + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "strings" + "time" + + "github.com/dghubble/sling" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/tracerr" +) + +// Proof TBD this type will be received from the proof server +type Proof struct { +} + +// Client is the interface to a ServerProof that calculates zk proofs +type Client interface { + // Non-blocking + CalculateProof(zkInputs *common.ZKInputs) error + // Blocking + GetProof(ctx context.Context) (*Proof, error) + // Non-Blocking + Cancel(ctx context.Context) error + // Blocking + WaitReady(ctx context.Context) error +} + +// StatusCode is the status string of the ProofServer +type StatusCode string + +const ( + // StatusCodeAborted means prover is ready to take new proof. Previous + // proof was aborted. + StatusCodeAborted StatusCode = "aborted" + // StatusCodeBusy means prover is busy computing proof. + StatusCodeBusy StatusCode = "busy" + // StatusCodeFailed means prover is ready to take new proof. Previous + // proof failed + StatusCodeFailed StatusCode = "failed" + // StatusCodeSuccess means prover is ready to take new proof. Previous + // proof succeeded + StatusCodeSuccess StatusCode = "success" + // StatusCodeUnverified means prover is ready to take new proof. + // Previous proof was unverified + StatusCodeUnverified StatusCode = "unverified" + // StatusCodeUninitialized means prover is not initialized + StatusCodeUninitialized StatusCode = "uninitialized" + // StatusCodeUndefined means prover is in an undefined state. Most + // likely is booting up. Keep trying + StatusCodeUndefined StatusCode = "undefined" + // StatusCodeInitializing means prover is initializing and not ready yet + StatusCodeInitializing StatusCode = "initializing" + // StatusCodeReady means prover initialized and ready to do first proof + StatusCodeReady StatusCode = "ready" +) + +// IsReady returns true when the prover is ready +func (status StatusCode) IsReady() bool { + if status == StatusCodeAborted || status == StatusCodeFailed || status == StatusCodeSuccess || + status == StatusCodeUnverified || status == StatusCodeReady { + return true + } + return false +} + +// IsInitialized returns true when the prover is initialized +func (status StatusCode) IsInitialized() bool { + if status == StatusCodeUninitialized || status == StatusCodeUndefined || + status == StatusCodeInitializing { + return false + } + return true +} + +// Status is the return struct for the status API endpoint +type Status struct { + Status StatusCode `json:"status"` + Proof string `json:"proof"` + PubData string `json:"pubData"` +} + +// ErrorServer is the return struct for an API error +type ErrorServer struct { + Status StatusCode `json:"status"` + Message string `json:"msg"` +} + +// Error message for ErrorServer +func (e ErrorServer) Error() string { + return fmt.Sprintf("server proof status (%v): %v", e.Status, e.Message) +} + +type apiMethod string + +const ( + // GET is an HTTP GET + GET apiMethod = "GET" + // POST is an HTTP POST with maybe JSON body + POST apiMethod = "POST" + // POSTFILE is an HTTP POST with a form file + POSTFILE apiMethod = "POSTFILE" +) + +// ProofServerClient contains the data related to a ProofServerClient +type ProofServerClient struct { + URL string + client *sling.Sling +} + +// NewProofServerClient creates a new ServerProof +func NewProofServerClient(URL string) *ProofServerClient { + if URL[len(URL)-1] != '/' { + URL += "/" + } + client := sling.New().Base(URL) + return &ProofServerClient{URL: URL, client: client} +} + +//nolint:unused +type formFileProvider struct { + writer *multipart.Writer + body []byte +} + +//nolint:unused +func newFormFileProvider(payload interface{}) (*formFileProvider, error) { + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + part, err := writer.CreateFormFile("file", "file.json") + if err != nil { + return nil, tracerr.Wrap(err) + } + if err := json.NewEncoder(part).Encode(payload); err != nil { + return nil, tracerr.Wrap(err) + } + if err := writer.Close(); err != nil { + return nil, tracerr.Wrap(err) + } + return &formFileProvider{ + writer: writer, + body: body.Bytes(), + }, nil +} + +func (p formFileProvider) ContentType() string { + return p.writer.FormDataContentType() +} + +func (p formFileProvider) Body() (io.Reader, error) { + return bytes.NewReader(p.body), nil +} + +//nolint:unused +func (p *ProofServerClient) apiRequest(ctx context.Context, method apiMethod, path string, + body interface{}, ret interface{}) error { + path = strings.TrimPrefix(path, "/") + var errSrv ErrorServer + var req *http.Request + var err error + switch method { + case GET: + req, err = p.client.New().Get(path).Request() + case POST: + req, err = p.client.New().Post(path).BodyJSON(body).Request() + case POSTFILE: + provider, err := newFormFileProvider(body) + if err != nil { + return tracerr.Wrap(err) + } + req, err = p.client.New().Post(path).BodyProvider(provider).Request() + if err != nil { + return tracerr.Wrap(err) + } + default: + return tracerr.Wrap(fmt.Errorf("invalid http method: %v", method)) + } + if err != nil { + return tracerr.Wrap(err) + } + res, err := p.client.Do(req.WithContext(ctx), ret, &errSrv) + if err != nil { + return tracerr.Wrap(err) + } + defer res.Body.Close() //nolint:errcheck + if !(200 <= res.StatusCode && res.StatusCode < 300) { + return tracerr.Wrap(errSrv) + } + return nil +} + +//nolint:unused +func (p *ProofServerClient) apiStatus(ctx context.Context) (*Status, error) { + var status Status + if err := p.apiRequest(ctx, GET, "/status", nil, &status); err != nil { + return nil, tracerr.Wrap(err) + } + return &status, nil +} + +//nolint:unused +func (p *ProofServerClient) apiCancel(ctx context.Context) error { + if err := p.apiRequest(ctx, POST, "/cancel", nil, nil); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +//nolint:unused +func (p *ProofServerClient) apiInput(ctx context.Context, zkInputs *common.ZKInputs) error { + if err := p.apiRequest(ctx, POSTFILE, "/input", zkInputs, nil); err != nil { + return tracerr.Wrap(err) + } + return nil +} + +// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the +// Proof +func (p *ProofServerClient) CalculateProof(zkInputs *common.ZKInputs) error { + log.Error("TODO") + return tracerr.Wrap(common.ErrTODO) +} + +// GetProof retreives the Proof from the ServerProof, blocking until the proof +// is ready. +func (p *ProofServerClient) GetProof(ctx context.Context) (*Proof, error) { + log.Error("TODO") + return nil, tracerr.Wrap(common.ErrTODO) +} + +// Cancel cancels any current proof computation +func (p *ProofServerClient) Cancel(ctx context.Context) error { + log.Error("TODO") + return tracerr.Wrap(common.ErrTODO) +} + +// WaitReady waits until the serverProof is ready +func (p *ProofServerClient) WaitReady(ctx context.Context) error { + log.Error("TODO") + return tracerr.Wrap(common.ErrTODO) +} + +// MockClient is a mock ServerProof to be used in tests. It doesn't calculate anything +type MockClient struct { +} + +// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the +// Proof +func (p *MockClient) CalculateProof(zkInputs *common.ZKInputs) error { + return nil +} + +// GetProof retreives the Proof from the ServerProof +func (p *MockClient) GetProof(ctx context.Context) (*Proof, error) { + // Simulate a delay + select { + case <-time.After(500 * time.Millisecond): //nolint:gomnd + return &Proof{}, nil + case <-ctx.Done(): + return nil, tracerr.Wrap(common.ErrDone) + } +} + +// Cancel cancels any current proof computation +func (p *MockClient) Cancel(ctx context.Context) error { + // Simulate a delay + select { + case <-time.After(80 * time.Millisecond): //nolint:gomnd + return nil + case <-ctx.Done(): + return tracerr.Wrap(common.ErrDone) + } +} + +// WaitReady waits until the prover is ready +func (p *MockClient) WaitReady(ctx context.Context) error { + // Simulate a delay + select { + case <-time.After(200 * time.Millisecond): //nolint:gomnd + return nil + case <-ctx.Done(): + return tracerr.Wrap(common.ErrDone) + } +} diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 109f6af..7a56bfe 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -57,8 +57,9 @@ type Stats struct { LastBatch int64 // LastL1BatchBlock is the last ethereum block in which an // l1Batch was forged - LastL1BatchBlock int64 - Auction struct { + LastL1BatchBlock int64 + LastForgeL1TxsNum int64 + Auction struct { CurrentSlot common.Slot } } @@ -97,7 +98,8 @@ func (s *StatsHolder) UpdateCurrentSlot(slot common.Slot) { } // UpdateSync updates the synchronizer stats -func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum, lastL1BatchBlock *int64) { +func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum, + lastL1BatchBlock *int64, lastForgeL1TxsNum *int64) { now := time.Now() s.rw.Lock() s.Sync.LastBlock = *lastBlock @@ -106,6 +108,7 @@ func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.Batc } if lastL1BatchBlock != nil { s.Sync.LastL1BatchBlock = *lastL1BatchBlock + s.Sync.LastForgeL1TxsNum = *lastForgeL1TxsNum } s.Sync.Updated = now s.rw.Unlock() @@ -506,16 +509,18 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) batchesLen := len(rollupData.Batches) if batchesLen == 0 { - s.stats.UpdateSync(ethBlock, nil, nil) + s.stats.UpdateSync(ethBlock, nil, nil, nil) } else { var lastL1BatchBlock *int64 + var lastForgeL1TxsNum *int64 for _, batchData := range rollupData.Batches { if batchData.L1Batch { lastL1BatchBlock = &batchData.Batch.EthBlockNum + lastForgeL1TxsNum = batchData.Batch.ForgeL1TxsNum } } s.stats.UpdateSync(ethBlock, - &rollupData.Batches[batchesLen-1].Batch.BatchNum, lastL1BatchBlock) + &rollupData.Batches[batchesLen-1].Batch.BatchNum, lastL1BatchBlock, lastForgeL1TxsNum) } if err := s.updateCurrentSlotIfSync(len(rollupData.Batches)); err != nil { return nil, nil, tracerr.Wrap(err) @@ -616,13 +621,23 @@ func (s *Synchronizer) resetState(block *common.Block) error { lastL1BatchBlockNum = 0 } + lastForgeL1TxsNum, err := s.historyDB.GetLastL1TxsNum() + if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { + log.Errorw("historyDB.GetLastL1BatchBlockNum", "err", err) + return tracerr.Wrap(err) + } + if tracerr.Unwrap(err) == sql.ErrNoRows || lastForgeL1TxsNum == nil { + n := int64(-1) + lastForgeL1TxsNum = &n + } + err = s.stateDB.Reset(batchNum) if err != nil { log.Errorw("stateDB.Reset", "err", err) return tracerr.Wrap(err) } - s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum) // TODO + s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum) if err := s.updateCurrentSlotIfSync(-1); err != nil { return tracerr.Wrap(err) diff --git a/test/ethclient.go b/test/ethclient.go index 65e8956..c38fcfb 100644 --- a/test/ethclient.go +++ b/test/ethclient.go @@ -152,6 +152,17 @@ func (a *AuctionBlock) forge(forger ethCommon.Address) error { slotState = eth.NewSlotState() a.State.Slots[slotToForge] = slotState } + + if !slotState.ForgerCommitment { + // Get the relativeBlock to check if the slotDeadline has been exceeded + relativeBlock := a.Eth.BlockNum - (a.Constants.GenesisBlockNum + + (slotToForge * int64(a.Constants.BlocksPerSlot))) + + if relativeBlock < int64(a.Vars.SlotDeadline) { + slotState.ForgerCommitment = true + } + } + slotState.Fulfilled = true a.Events.NewForge = append(a.Events.NewForge, eth.AuctionEventNewForge{ @@ -185,10 +196,9 @@ func (a *AuctionBlock) canForge(forger ethCommon.Address, blockNum int64) (bool, minBid = slotState.ClosedMinBid } - if !slotState.Fulfilled && (relativeBlock >= int64(a.Vars.SlotDeadline)) { + if !slotState.ForgerCommitment && (relativeBlock >= int64(a.Vars.SlotDeadline)) { // if the relative block has exceeded the slotDeadline and no batch has been forged, anyone can forge return true, nil - // TODO, find the forger set by the Bidder } else if coord, ok := a.State.Coordinators[slotState.Bidder]; ok && coord.Forger == forger && slotState.BidAmount.Cmp(minBid) >= 0 { // if forger bidAmount has exceeded the minBid it can forge @@ -208,6 +218,7 @@ type EthereumBlock struct { Hash ethCommon.Hash ParentHash ethCommon.Hash Tokens map[ethCommon.Address]eth.ERC20Consts + Nonce uint64 // state ethState } @@ -567,6 +578,16 @@ func (c *Client) CtlLastBlock() *common.Block { } } +// CtlLastForgedBatch returns the last batchNum without checks +func (c *Client) CtlLastForgedBatch() int64 { + c.rw.RLock() + defer c.rw.RUnlock() + + currentBlock := c.currentBlock() + e := currentBlock.Rollup + return int64(len(e.State.ExitRoots)) - 1 +} + // EthLastBlock returns the last blockNum func (c *Client) EthLastBlock() (int64, error) { c.rw.RLock() @@ -759,7 +780,7 @@ func (c *Client) RollupL1UserTxERC20ETH( r.Events.L1UserTx = append(r.Events.L1UserTx, eth.RollupEventL1UserTx{ L1UserTx: *l1Tx, }) - return r.addTransaction(newTransaction("l1UserTxERC20ETH", l1Tx)), nil + return r.addTransaction(c.newTransaction("l1UserTxERC20ETH", l1Tx)), nil } // RollupL1UserTxERC777 is the interface to call the smart contract function @@ -817,7 +838,7 @@ func (c *Client) RollupWithdrawMerkleProof(babyPubKey *babyjub.PublicKey, tokenI Siblings []*big.Int InstantWithdraw bool } - tx = r.addTransaction(newTransaction("withdrawMerkleProof", data{ + tx = r.addTransaction(c.newTransaction("withdrawMerkleProof", data{ BabyPubKey: babyPubKey, TokenID: tokenID, NumExitRoot: numExitRoot, @@ -845,12 +866,15 @@ type transactionData struct { Value interface{} } -func newTransaction(name string, value interface{}) *types.Transaction { +func (c *Client) newTransaction(name string, value interface{}) *types.Transaction { + eth := c.nextBlock().Eth + nonce := eth.Nonce + eth.Nonce++ data, err := json.Marshal(transactionData{name, value}) if err != nil { panic(err) } - return types.NewTransaction(0, ethCommon.Address{}, nil, 0, nil, + return types.NewTransaction(nonce, ethCommon.Address{}, nil, 0, nil, data) } @@ -870,7 +894,7 @@ func (c *Client) RollupForgeBatch(args *eth.RollupForgeBatchArgs) (tx *types.Tra return nil, tracerr.Wrap(err) } if !ok { - return nil, tracerr.Wrap(fmt.Errorf("incorrect slot")) + return nil, tracerr.Wrap(fmt.Errorf(common.AuctionErrMsgCannotForge)) } // TODO: Verify proof @@ -915,7 +939,7 @@ func (c *Client) addBatch(args *eth.RollupForgeBatchArgs) (*types.Transaction, e r.State.MapL1TxQueue[r.State.LastToForgeL1TxsNum] = eth.NewQueueStruct() } } - ethTx := r.addTransaction(newTransaction("forgebatch", args)) + ethTx := r.addTransaction(c.newTransaction("forgebatch", args)) c.forgeBatchArgsPending[ethTx.Hash()] = &batch{*args, *c.addr} r.Events.ForgeBatch = append(r.Events.ForgeBatch, eth.RollupEventForgeBatch{ BatchNum: int64(len(r.State.ExitRoots)) - 1, @@ -955,7 +979,7 @@ func (c *Client) RollupAddToken(tokenAddress ethCommon.Address, feeAddToken *big r.State.TokenList = append(r.State.TokenList, tokenAddress) r.Events.AddToken = append(r.Events.AddToken, eth.RollupEventAddToken{TokenAddress: tokenAddress, TokenID: uint32(len(r.State.TokenList) - 1)}) - return r.addTransaction(newTransaction("addtoken", tokenAddress)), nil + return r.addTransaction(c.newTransaction("addtoken", tokenAddress)), nil } // RollupGetCurrentTokens is the interface to call the smart contract function @@ -983,7 +1007,7 @@ func (c *Client) RollupUpdateForgeL1L2BatchTimeout(newForgeL1Timeout int64) (tx r.Events.UpdateForgeL1L2BatchTimeout = append(r.Events.UpdateForgeL1L2BatchTimeout, eth.RollupEventUpdateForgeL1L2BatchTimeout{NewForgeL1L2BatchTimeout: newForgeL1Timeout}) - return r.addTransaction(newTransaction("updateForgeL1L2BatchTimeout", newForgeL1Timeout)), nil + return r.addTransaction(c.newTransaction("updateForgeL1L2BatchTimeout", newForgeL1Timeout)), nil } // RollupUpdateFeeAddToken is the interface to call the smart contract function @@ -1091,7 +1115,7 @@ func (c *Client) AuctionSetOpenAuctionSlots(newOpenAuctionSlots uint16) (tx *typ a.Events.NewOpenAuctionSlots = append(a.Events.NewOpenAuctionSlots, eth.AuctionEventNewOpenAuctionSlots{NewOpenAuctionSlots: newOpenAuctionSlots}) - return a.addTransaction(newTransaction("setOpenAuctionSlots", newOpenAuctionSlots)), nil + return a.addTransaction(c.newTransaction("setOpenAuctionSlots", newOpenAuctionSlots)), nil } // AuctionGetOpenAuctionSlots is the interface to call the smart contract function @@ -1264,7 +1288,7 @@ func (c *Client) AuctionSetCoordinator(forger ethCommon.Address, URL string) (tx ForgerAddress ethCommon.Address URL string } - return a.addTransaction(newTransaction("registercoordinator", data{*c.addr, forger, URL})), nil + return a.addTransaction(c.newTransaction("registercoordinator", data{*c.addr, forger, URL})), nil } // AuctionIsRegisteredCoordinator is the interface to call the smart contract function @@ -1397,7 +1421,7 @@ func (c *Client) AuctionBid(amount *big.Int, slot int64, bidAmount *big.Int, BidAmount *big.Int Bidder ethCommon.Address } - return a.addTransaction(newTransaction("bid", data{slot, bidAmount, *c.addr})), nil + return a.addTransaction(c.newTransaction("bid", data{slot, bidAmount, *c.addr})), nil } // AuctionMultiBid is the interface to call the smart contract function. This @@ -1613,7 +1637,7 @@ func (c *Client) WDelayerChangeWithdrawalDelay(newWithdrawalDelay uint64) (tx *t w.Events.NewWithdrawalDelay = append(w.Events.NewWithdrawalDelay, eth.WDelayerEventNewWithdrawalDelay{WithdrawalDelay: newWithdrawalDelay}) - return w.addTransaction(newTransaction("changeWithdrawalDelay", newWithdrawalDelay)), nil + return w.addTransaction(c.newTransaction("changeWithdrawalDelay", newWithdrawalDelay)), nil } // WDelayerDepositInfo is the interface to call the smart contract function