diff --git a/api/api_test.go b/api/api_test.go index 7c215c1..eb1652f 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -412,7 +412,7 @@ func TestMain(m *testing.M) { bids: testBids, slots: api.genTestSlots( 20, - commonBlocks[len(commonBlocks)-1].EthBlockNum, + commonBlocks[len(commonBlocks)-1].Num, testBids, auctionVars, ), @@ -611,7 +611,7 @@ func doBadReq(method, path string, reqBody io.Reader, expectedResponseCode int) func getTimestamp(blockNum int64, blocks []common.Block) time.Time { for i := 0; i < len(blocks); i++ { - if blocks[i].EthBlockNum == blockNum { + if blocks[i].Num == blockNum { return blocks[i].Timestamp } } @@ -647,7 +647,7 @@ func getAccountByIdx(idx common.Idx, accs []common.Account) *common.Account { func getBlockByNum(ethBlockNum int64, blocks []common.Block) common.Block { for _, b := range blocks { - if b.EthBlockNum == ethBlockNum { + if b.Num == ethBlockNum { return b } } diff --git a/api/aux_test.go b/api/aux_test.go index a9139cd..6505bd3 100644 --- a/api/aux_test.go +++ b/api/aux_test.go @@ -13,7 +13,7 @@ import ( func AddAditionalInformation(blocks []common.BlockData) { for i := range blocks { blocks[i].Block.Timestamp = time.Now().Add(time.Second * 13).UTC() - blocks[i].Block.Hash = ethCommon.BigToHash(big.NewInt(blocks[i].Block.EthBlockNum)) + blocks[i].Block.Hash = ethCommon.BigToHash(big.NewInt(blocks[i].Block.Num)) for j := range blocks[i].Rollup.AddedTokens { blocks[i].Rollup.AddedTokens[j].Name = "NAME" + strconv.Itoa(int(blocks[i].Rollup.AddedTokens[j].TokenID)) blocks[i].Rollup.AddedTokens[j].Symbol = strconv.Itoa(int(blocks[i].Rollup.AddedTokens[j].TokenID)) diff --git a/api/batch_test.go b/api/batch_test.go index 8a615f9..5611311 100644 --- a/api/batch_test.go +++ b/api/batch_test.go @@ -60,7 +60,7 @@ func genTestBatches( block := common.Block{} found := false for _, b := range blocks { - if b.EthBlockNum == cBatches[i].EthBlockNum { + if b.Num == cBatches[i].EthBlockNum { block = b found = true break diff --git a/api/slots.go b/api/slots.go index fbfa0ff..6c14603 100644 --- a/api/slots.go +++ b/api/slots.go @@ -116,9 +116,9 @@ func (a *API) getSlot(c *gin.Context) { var slot SlotAPI if err == sql.ErrNoRows { - slot = a.newSlotAPI(slotNum, currentBlock.EthBlockNum, nil, auctionVars) + slot = a.newSlotAPI(slotNum, currentBlock.Num, nil, auctionVars) } else { - slot = a.newSlotAPI(bid.SlotNum, currentBlock.EthBlockNum, &bid, auctionVars) + slot = a.newSlotAPI(bid.SlotNum, currentBlock.Num, &bid, auctionVars) } // JSON response @@ -221,7 +221,7 @@ func (a *API) getSlots(c *gin.Context) { retBadReq(err, c) return } - currentSlot := a.getCurrentSlot(currentBlock.EthBlockNum) + currentSlot := a.getCurrentSlot(currentBlock.Num) auctionVars, err := a.h.GetAuctionVars() if err != nil { retBadReq(err, c) @@ -268,7 +268,7 @@ func (a *API) getSlots(c *gin.Context) { // Build the slot information with previous bids var slotsBids []SlotAPI if len(bids) > 0 { - slotsBids = a.newSlotsAPIFromWinnerBids(fromItem, order, bids, currentBlock.EthBlockNum, auctionVars) + slotsBids = a.newSlotsAPIFromWinnerBids(fromItem, order, bids, currentBlock.Num, auctionVars) if err != nil { retBadReq(err, c) return @@ -296,7 +296,7 @@ func (a *API) getSlots(c *gin.Context) { } } if !found { - slots, err = a.addEmptySlot(slots, i, currentBlock.EthBlockNum, auctionVars, fromItem, order) + slots, err = a.addEmptySlot(slots, i, currentBlock.Num, auctionVars, fromItem, order) if err != nil { retBadReq(err, c) return diff --git a/api/slots_test.go b/api/slots_test.go index 4a4209e..df10585 100644 --- a/api/slots_test.go +++ b/api/slots_test.go @@ -181,7 +181,7 @@ func TestGetSlots(t *testing.T) { err = doGoodReqPaginated(path, historydb.OrderAsc, &testSlotsResponse{}, appendIter) assert.NoError(t, err) - currentSlot := api.getCurrentSlot(tc.blocks[len(tc.blocks)-1].EthBlockNum) + currentSlot := api.getCurrentSlot(tc.blocks[len(tc.blocks)-1].Num) finishedAuctionSlots := []testSlot{} for i := 0; i < len(tc.slots); i++ { finishAuction := currentSlot + int64(tc.auctionVars.ClosedAuctionSlots) diff --git a/api/state.go b/api/state.go index dfae473..e4fd075 100644 --- a/api/state.go +++ b/api/state.go @@ -65,13 +65,21 @@ func (a *API) SetAuctionVariables(auctionVariables common.AuctionVariables) { // Network +// UpdateNetworkInfoBlock update Status.Network block related information +func (a *API) UpdateNetworkInfoBlock( + lastEthBlock, lastSyncBlock common.Block, +) { + a.status.Network.LastSyncBlock = lastSyncBlock.Num + a.status.Network.LastEthBlock = lastEthBlock.Num +} + // UpdateNetworkInfo update Status.Network information func (a *API) UpdateNetworkInfo( lastEthBlock, lastSyncBlock common.Block, lastBatchNum common.BatchNum, currentSlot int64, ) error { - a.status.Network.LastSyncBlock = lastSyncBlock.EthBlockNum - a.status.Network.LastEthBlock = lastEthBlock.EthBlockNum + a.status.Network.LastSyncBlock = lastSyncBlock.Num + a.status.Network.LastEthBlock = lastEthBlock.Num lastBatch, err := a.h.GetBatchAPI(lastBatchNum) if err != nil { return err @@ -106,8 +114,8 @@ func (a *API) GetNextForgers(lastBlock common.Block, currentSlot, lastClosedSlot SlotNum: i, FromBlock: fromBlock, ToBlock: toBlock, - FromTimestamp: lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(fromBlock-lastBlock.EthBlockNum))), - ToTimestamp: lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(toBlock-lastBlock.EthBlockNum))), + FromTimestamp: lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(fromBlock-lastBlock.Num))), + ToTimestamp: lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(toBlock-lastBlock.Num))), }, } foundBid := false diff --git a/api/state_test.go b/api/state_test.go index bcec9cb..1eabdde 100644 --- a/api/state_test.go +++ b/api/state_test.go @@ -69,8 +69,8 @@ func TestNextForgers(t *testing.T) { assert.Equal(t, bootCoordinator.Bidder, nextForgers[q].Coordinator.Bidder) } firstBlockSlot, lastBlockSlot := api.getFirstLastBlock(j) - fromTimestamp := lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(firstBlockSlot-lastBlock.EthBlockNum))) - toTimestamp := lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(lastBlockSlot-lastBlock.EthBlockNum))) + fromTimestamp := lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(firstBlockSlot-lastBlock.Num))) + toTimestamp := lastBlock.Timestamp.Add(time.Second * time.Duration(secondsPerBlock*(lastBlockSlot-lastBlock.Num))) assert.Equal(t, fromTimestamp.Unix(), nextForgers[q].Period.FromTimestamp.Unix()) assert.Equal(t, toTimestamp.Unix(), nextForgers[q].Period.ToTimestamp.Unix()) } @@ -90,7 +90,7 @@ func TestUpdateNetworkInfo(t *testing.T) { currentSlotNum := int64(1) err := api.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) assert.NoError(t, err) - assert.Equal(t, lastBlock.EthBlockNum, api.status.Network.LastSyncBlock) + assert.Equal(t, lastBlock.Num, api.status.Network.LastSyncBlock) assert.Equal(t, lastBatchNum, api.status.Network.LastBatch.BatchNum) assert.Equal(t, currentSlotNum, api.status.Network.CurrentSlot) assert.Equal(t, int(api.status.Auction.ClosedAuctionSlots)+1, len(api.status.Network.NextForgers)) @@ -146,8 +146,8 @@ func TestGetState(t *testing.T) { assert.Equal(t, tc.rollupVars, status.Rollup) assert.Equal(t, tc.auctionVars, status.Auction) assert.Equal(t, tc.wdelayerVars, status.WithdrawalDelayer) - assert.Equal(t, lastBlock.EthBlockNum, status.Network.LastEthBlock) - assert.Equal(t, lastBlock.EthBlockNum, status.Network.LastSyncBlock) + assert.Equal(t, lastBlock.Num, status.Network.LastEthBlock) + assert.Equal(t, lastBlock.Num, status.Network.LastSyncBlock) assert.Equal(t, lastBatchNum, status.Network.LastBatch.BatchNum) assert.Equal(t, currentSlotNum, status.Network.CurrentSlot) assert.Equal(t, int(api.status.Auction.ClosedAuctionSlots)+1, len(status.Network.NextForgers)) diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 25ef21d..8791919 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -1,6 +1,8 @@ [API] Address = "localhost:8086" Explorer = true +UpdateMetricsInterval = "10s" +UpdateRecommendedFeeInterval = "10s" [Debug] APIAddress = "localhost:12345" diff --git a/common/bid.go b/common/bid.go index a5c673b..b006c28 100644 --- a/common/bid.go +++ b/common/bid.go @@ -33,7 +33,10 @@ type Slot struct { BatchesLen int BidValue *big.Int BootCoord bool - Bidder ethCommon.Address - Forger ethCommon.Address - URL string + // Bidder, Forer and URL correspond to the winner of the slot (which is + // not always the highest bidder). These are the values of the + // coordinator that is able to forge exclusively before the deadline. + Bidder ethCommon.Address + Forger ethCommon.Address + URL string } diff --git a/common/block.go b/common/block.go index 45fe960..6134ba0 100644 --- a/common/block.go +++ b/common/block.go @@ -9,10 +9,10 @@ import ( // Block represents of an Ethereum block type Block struct { - EthBlockNum int64 `meddler:"eth_block_num"` - Timestamp time.Time `meddler:"timestamp,utctime"` - Hash ethCommon.Hash `meddler:"hash"` - ParentHash ethCommon.Hash `meddler:"-"` + Num int64 `meddler:"eth_block_num"` + Timestamp time.Time `meddler:"timestamp,utctime"` + Hash ethCommon.Hash `meddler:"hash"` + ParentHash ethCommon.Hash `meddler:"-" json:"-"` } // RollupData contains information returned by the Rollup smart contract @@ -63,9 +63,10 @@ type WDelayerTransfer struct { // WDelayerData contains information returned by the WDelayer smart contract type WDelayerData struct { - Vars *WDelayerVariables - Deposits []WDelayerTransfer - DepositsByTxHash map[ethCommon.Hash]*WDelayerTransfer + Vars *WDelayerVariables + Deposits []WDelayerTransfer + // We use an array because there can be multiple deposits in a single eth transaction + DepositsByTxHash map[ethCommon.Hash][]*WDelayerTransfer Withdrawals []WDelayerTransfer } @@ -74,7 +75,7 @@ func NewWDelayerData() WDelayerData { return WDelayerData{ Vars: nil, Deposits: make([]WDelayerTransfer, 0), - DepositsByTxHash: make(map[ethCommon.Hash]*WDelayerTransfer), + DepositsByTxHash: make(map[ethCommon.Hash][]*WDelayerTransfer), Withdrawals: make([]WDelayerTransfer, 0), } } diff --git a/config/config.go b/config/config.go index 0cc4cec..e0d3176 100644 --- a/config/config.go +++ b/config/config.go @@ -90,8 +90,10 @@ type Node struct { TokenHEZName string `validate:"required"` } `validate:"required"` API struct { - Address string - Explorer bool + Address string + Explorer bool + UpdateMetricsInterval Duration + UpdateRecommendedFeeInterval Duration } `validate:"required"` Debug struct { APIAddress string diff --git a/coordinator/batch.go b/coordinator/batch.go index 461946c..372119d 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -1,6 +1,11 @@ package coordinator import ( + "encoding/json" + "fmt" + "io/ioutil" + "path" + "github.com/ethereum/go-ethereum/core/types" "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/eth" @@ -33,4 +38,22 @@ type BatchInfo struct { // FeesInfo TxStatus TxStatus EthTx *types.Transaction + Receipt *types.Receipt +} + +// DebugStore is a debug function to store the BatchInfo as a json text file in +// storePath +func (b *BatchInfo) DebugStore(storePath string) error { + batchJSON, err := json.Marshal(b) + if err != nil { + return err + } + oldStateRoot := "null" + if b.ZKInputs != nil && b.ZKInputs.OldStateRoot != nil { + oldStateRoot = b.ZKInputs.OldStateRoot.String() + } + filename := fmt.Sprintf("%010d-%s.json", b.BatchNum, oldStateRoot) + // nolint reason: 0640 allows rw to owner and r to group + //nolint:gosec + return ioutil.WriteFile(path.Join(storePath, filename), batchJSON, 0640) } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index cbae8f7..2c74368 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -25,71 +25,98 @@ var ErrDone = fmt.Errorf("done") // Config contains the Coordinator configuration type Config struct { + // ForgerAddress is the address under which this coordinator is forging ForgerAddress ethCommon.Address + // ConfirmBlocks is the number of confirmation blocks to wait for sent + // ethereum transactions before forgetting about them ConfirmBlocks int64 + // L1BatchTimeoutPerc is the portion of the range before the L1Batch + // timeout that will trigger a schedule to forge an L1Batch + L1BatchTimeoutPerc float64 + // EthClientAttempts is the number of attempts to do an eth client RPC + // call before giving up + EthClientAttempts int + // EthClientAttemptsDelay is delay between attempts do do an eth client + // RPC call + EthClientAttemptsDelay time.Duration + // TxManagerCheckInterval is the waiting interval between receipt + // checks of ethereum transactions in the TxManager + TxManagerCheckInterval time.Duration + // DebugBatchPath if set, specifies the path where batchInfo is stored + // in JSON in every step/update of the pipeline + DebugBatchPath string +} + +func (c *Config) debugBatchStore(batchInfo *BatchInfo) { + if c.DebugBatchPath != "" { + if err := batchInfo.DebugStore(c.DebugBatchPath); err != nil { + log.Warnw("Error storing debug BatchInfo", + "path", c.DebugBatchPath, "err", err) + } + } } // Coordinator implements the Coordinator type type Coordinator struct { // State - forging bool - batchNum common.BatchNum - serverProofPool *ServerProofPool - consts synchronizer.SCConsts - vars synchronizer.SCVariables + batchNum common.BatchNum + serverProofs []ServerProofInterface + consts synchronizer.SCConsts + vars synchronizer.SCVariables + started bool cfg Config - hdb *historydb.HistoryDB - txsel *txselector.TxSelector + historyDB *historydb.HistoryDB + txSelector *txselector.TxSelector batchBuilder *batchbuilder.BatchBuilder - ethClient eth.ClientInterface - msgCh chan interface{} ctx context.Context wg sync.WaitGroup cancel context.CancelFunc - pipelineCtx context.Context - pipelineWg sync.WaitGroup - pipelineCancel context.CancelFunc + pipeline *Pipeline txManager *TxManager } // NewCoordinator creates a new Coordinator func NewCoordinator(cfg Config, - hdb *historydb.HistoryDB, - txsel *txselector.TxSelector, - bb *batchbuilder.BatchBuilder, + historyDB *historydb.HistoryDB, + txSelector *txselector.TxSelector, + batchBuilder *batchbuilder.BatchBuilder, serverProofs []ServerProofInterface, ethClient eth.ClientInterface, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables, -) *Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here - serverProofPool := NewServerProofPool(len(serverProofs)) - for _, serverProof := range serverProofs { - serverProofPool.Add(serverProof) +) (*Coordinator, error) { + // nolint reason: hardcoded `1.0`, by design the percentage can't be over 100% + if cfg.L1BatchTimeoutPerc >= 1.0 { //nolint:gomnd + return nil, fmt.Errorf("invalid value for Config.L1BatchTimeoutPerc (%v >= 1.0)", + cfg.L1BatchTimeoutPerc) + } + if cfg.EthClientAttempts < 1 { + return nil, fmt.Errorf("invalid value for Config.EthClientAttempts (%v < 1)", + cfg.EthClientAttempts) } - txManager := NewTxManager(ethClient, cfg.ConfirmBlocks) + txManager := NewTxManager(&cfg, ethClient) ctx, cancel := context.WithCancel(context.Background()) c := Coordinator{ - forging: false, - batchNum: -1, - serverProofPool: serverProofPool, - consts: *scConsts, - vars: *initSCVars, + batchNum: -1, + serverProofs: serverProofs, + consts: *scConsts, + vars: *initSCVars, cfg: cfg, - hdb: hdb, - txsel: txsel, - batchBuilder: bb, + historyDB: historyDB, + txSelector: txSelector, + batchBuilder: batchBuilder, - ethClient: ethClient, + // ethClient: ethClient, msgCh: make(chan interface{}), ctx: ctx, @@ -98,7 +125,12 @@ func NewCoordinator(cfg Config, txManager: txManager, } - return &c + return &c, nil +} + +func (c *Coordinator) newPipeline() *Pipeline { + return NewPipeline(c.cfg, c.historyDB, c.txSelector, c.batchBuilder, + c.txManager, c.serverProofs, &c.consts) } // MsgSyncStats indicates an update to the Synchronizer stats @@ -134,50 +166,53 @@ func (c *Coordinator) handleMsgSyncSCVars(msg *MsgSyncSCVars) { } } -func (c *Coordinator) handleMsgSyncStats(stats *synchronizer.Stats) error { - if !stats.Synced() { - return nil - } - c.txManager.SetLastBlock(stats.Eth.LastBlock) - +func (c *Coordinator) canForge(stats *synchronizer.Stats) bool { anyoneForge := false if stats.Sync.Auction.CurrentSlot.BatchesLen == 0 && - c.consts.Auction.RelativeBlock(stats.Eth.LastBlock) > int64(c.vars.Auction.SlotDeadline) { + c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) > int64(c.vars.Auction.SlotDeadline) { log.Debug("Coordinator: anyone can forge in the current slot (slotDeadline passed)") anyoneForge = true } - if stats.Sync.Auction.CurrentSlot.Forger != c.cfg.ForgerAddress && !anyoneForge { - if c.forging { - log.Info("Coordinator: forging state end") - c.forging = false - c.PipelineStop() - } - // log.Debug("Coordinator: not in forge time") // DBG + if stats.Sync.Auction.CurrentSlot.Forger == c.cfg.ForgerAddress || anyoneForge { + return true + } + return false +} + +func (c *Coordinator) handleMsgSyncStats(stats *synchronizer.Stats) error { + if !stats.Synced() { return nil } - // log.Debug("Coordinator: forge time") // DBG - if !c.forging { - // Start pipeline from a batchNum state taken from synchronizer - log.Info("Coordinator: forging state begin") - c.batchNum = common.BatchNum(stats.Sync.LastBatch) - err := c.txsel.Reset(c.batchNum) - if err != nil { - log.Errorw("Coordinator: TxSelector.Reset", "error", err) - return err + c.txManager.SetLastBlock(stats.Eth.LastBlock.Num) + + canForge := c.canForge(stats) + if c.pipeline == nil { + if canForge { + log.Info("Coordinator: forging state begin") + batchNum := common.BatchNum(stats.Sync.LastBatch) + c.pipeline = c.newPipeline() + if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil { + return err + } } - err = c.batchBuilder.Reset(c.batchNum, true) - if err != nil { - log.Errorw("Coordinator: BatchBuilder.Reset", "error", err) - return err + } else { + if canForge { + c.pipeline.SetSyncStats(stats) + } else { + log.Info("Coordinator: forging state end") + c.pipeline.Stop() + c.pipeline = nil } - c.forging = true - c.PipelineStart() } return nil } // Start the coordinator func (c *Coordinator) Start() { + if c.started { + log.Fatal("Coordinator already started") + } + c.started = true c.wg.Add(1) go func() { c.txManager.Run(c.ctx) @@ -198,10 +233,12 @@ func (c *Coordinator) Start() { stats := msg.Stats if err := c.handleMsgSyncStats(&stats); err != nil { log.Errorw("Coordinator.handleMsgSyncStats error", "err", err) + continue } case MsgSyncReorg: if err := c.handleReorg(); err != nil { log.Errorw("Coordinator.handleReorg error", "err", err) + continue } case MsgSyncSCVars: c.handleMsgSyncSCVars(&msg) @@ -215,97 +252,45 @@ func (c *Coordinator) Start() { // Stop the coordinator func (c *Coordinator) Stop() { - log.Infow("Stopping coordinator...") + if !c.started { + log.Fatal("Coordinator already stopped") + } + c.started = false + log.Infow("Stopping Coordinator...") c.cancel() c.wg.Wait() - if c.forging { - c.forging = false - c.PipelineStop() + if c.pipeline != nil { + c.pipeline.Stop() + c.pipeline = nil } } -// PipelineStart starts the forging pipeline -func (c *Coordinator) PipelineStart() { - c.pipelineCtx, c.pipelineCancel = context.WithCancel(context.Background()) - - queueSize := 1 - batchChSentServerProof := make(chan *BatchInfo, queueSize) - - c.pipelineWg.Add(1) - go func() { - for { - select { - case <-c.pipelineCtx.Done(): - log.Debug("Pipeline forgeSendServerProof loop done") - c.pipelineWg.Done() - return - default: - c.batchNum = c.batchNum + 1 - batchInfo, err := c.forgeSendServerProof(c.pipelineCtx, c.batchNum) - if err == ErrDone { - continue - } - if err != nil { - log.Errorw("forgeSendServerProof", "err", err) - continue - } - batchChSentServerProof <- batchInfo - } - } - }() - - c.pipelineWg.Add(1) - go func() { - for { - select { - case <-c.pipelineCtx.Done(): - log.Debug("Pipeline waitServerProofSendEth loop done") - c.pipelineWg.Done() - return - case batchInfo := <-batchChSentServerProof: - err := c.waitServerProof(c.pipelineCtx, batchInfo) - if err == ErrDone { - continue - } - if err != nil { - log.Errorw("waitServerProof", "err", err) - continue - } - c.txManager.AddBatch(batchInfo) - } - } - }() -} - -// PipelineStop stops the forging pipeline -func (c *Coordinator) PipelineStop() { - log.Debug("Stopping pipeline...") - c.pipelineCancel() - c.pipelineWg.Wait() +func (c *Coordinator) handleReorg() error { + return nil // TODO } // TxManager handles everything related to ethereum transactions: It makes the // call to forge, waits for transaction confirmation, and keeps checking them // until a number of confirmed blocks have passed. type TxManager struct { - ethClient eth.ClientInterface - batchCh chan *BatchInfo - lastBlockCh chan int64 - queue []*BatchInfo - confirmation int64 - lastBlock int64 + cfg Config + ethClient eth.ClientInterface + batchCh chan *BatchInfo + lastBlockCh chan int64 + queue []*BatchInfo + lastBlock int64 } // NewTxManager creates a new TxManager -func NewTxManager(ethClient eth.ClientInterface, confirmation int64) *TxManager { +func NewTxManager(cfg *Config, ethClient eth.ClientInterface) *TxManager { return &TxManager{ + cfg: *cfg, ethClient: ethClient, // TODO: Find best queue size batchCh: make(chan *BatchInfo, 16), //nolint:gomnd // TODO: Find best queue size - lastBlockCh: make(chan int64, 16), //nolint:gomnd - confirmation: confirmation, - lastBlock: -1, + lastBlockCh: make(chan int64, 16), //nolint:gomnd + lastBlock: -1, } } @@ -320,13 +305,77 @@ func (t *TxManager) SetLastBlock(lastBlock int64) { t.lastBlockCh <- lastBlock } -const waitTime = 200 * time.Millisecond +func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { + var ethTx *types.Transaction + var err error + for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) + if err != nil { + log.Errorw("TxManager ethClient.RollupForgeBatch", + "attempt", attempt, "err", err) + } else { + break + } + select { + case <-ctx.Done(): + return ErrDone + case <-time.After(t.cfg.EthClientAttemptsDelay): + } + } + if err != nil { + return fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err) + } + batchInfo.EthTx = ethTx + t.cfg.debugBatchStore(batchInfo) + return nil +} + +func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error { + txHash := batchInfo.EthTx.Hash() + var receipt *types.Receipt + var err error + for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { + receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) + if err != nil { + log.Errorw("TxManager ethClient.EthTransactionReceipt", + "attempt", attempt, "err", err) + } else { + break + } + select { + case <-ctx.Done(): + return ErrDone + case <-time.After(t.cfg.EthClientAttemptsDelay): + } + } + if err != nil { + return fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err) + } + batchInfo.Receipt = receipt + t.cfg.debugBatchStore(batchInfo) + return nil +} + +func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { + receipt := batchInfo.Receipt + if receipt != nil { + if receipt.Status == types.ReceiptStatusFailed { + log.Errorw("TxManager receipt status is failed", "receipt", receipt) + return nil, fmt.Errorf("ethereum transaction receipt statis is failed") + } else if receipt.Status == types.ReceiptStatusSuccessful { + confirm := t.lastBlock - receipt.BlockNumber.Int64() + return &confirm, nil + } + } + return nil, nil +} + const longWaitTime = 999 * time.Hour // Run the TxManager func (t *TxManager) Run(ctx context.Context) { next := 0 - d := time.Duration(longWaitTime) + waitTime := time.Duration(longWaitTime) for { select { case <-ctx.Done(): @@ -335,41 +384,41 @@ func (t *TxManager) Run(ctx context.Context) { case lastBlock := <-t.lastBlockCh: t.lastBlock = lastBlock case batchInfo := <-t.batchCh: - ethTx, err := t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) - if err != nil { - // TODO: Figure out different error cases and handle them properly - log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err) + if err := t.rollupForgeBatch(ctx, batchInfo); err == ErrDone { + continue + } else if err != nil { + // TODO: Reset pipeline continue } log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) - batchInfo.EthTx = ethTx t.queue = append(t.queue, batchInfo) - d = waitTime - case <-time.After(d): + waitTime = t.cfg.TxManagerCheckInterval + case <-time.After(waitTime): if len(t.queue) == 0 { continue } batchInfo := t.queue[next] - txID := batchInfo.EthTx.Hash() - receipt, err := t.ethClient.EthTransactionReceipt(ctx, txID) - if err != nil { - log.Errorw("TxManager ethClient.EthTransactionReceipt", "err", err) - // TODO: Figure out different error cases and handle them properly - // TODO: Notify the Coordinator to maybe reset the pipeline + err := t.ethTransactionReceipt(ctx, batchInfo) + if err == ErrDone { continue + } else if err != nil { //nolint:staticcheck + // We can't get the receipt for the + // transaction, so we can't confirm if it was + // mined + // TODO: Reset pipeline } - if receipt != nil { - if receipt.Status == types.ReceiptStatusFailed { - log.Errorw("TxManager receipt status is failed", "receipt", receipt) - } else if receipt.Status == types.ReceiptStatusSuccessful { - if t.lastBlock-receipt.BlockNumber.Int64() >= t.confirmation { - log.Debugw("TxManager tx for RollupForgeBatch confirmed", "batchNum", batchInfo.BatchNum) - t.queue = t.queue[1:] - if len(t.queue) == 0 { - d = longWaitTime - } - } + confirm, err := t.handleReceipt(batchInfo) + if err != nil { //nolint:staticcheck + // Transaction was rejected + // TODO: Reset pipeline + } + if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { + log.Debugw("TxManager tx for RollupForgeBatch confirmed", + "batchNum", batchInfo.BatchNum) + t.queue = t.queue[1:] + if len(t.queue) == 0 { + waitTime = longWaitTime } } if len(t.queue) == 0 { @@ -381,11 +430,157 @@ func (t *TxManager) Run(ctx context.Context) { } } +// Pipeline manages the forging of batches with parallel server proofs +type Pipeline struct { + cfg Config + consts synchronizer.SCConsts + + // state + batchNum common.BatchNum + vars synchronizer.SCVariables + lastScheduledL1BatchBlockNum int64 + started bool + + serverProofPool *ServerProofPool + txManager *TxManager + historyDB *historydb.HistoryDB + txSelector *txselector.TxSelector + batchBuilder *batchbuilder.BatchBuilder + + stats synchronizer.Stats + statsCh chan synchronizer.Stats + + ctx context.Context + wg sync.WaitGroup + cancel context.CancelFunc +} + +// NewPipeline creates a new Pipeline +func NewPipeline(cfg Config, + historyDB *historydb.HistoryDB, + txSelector *txselector.TxSelector, + batchBuilder *batchbuilder.BatchBuilder, + txManager *TxManager, + serverProofs []ServerProofInterface, + scConsts *synchronizer.SCConsts, +) *Pipeline { + serverProofPool := NewServerProofPool(len(serverProofs)) + for _, serverProof := range serverProofs { + serverProofPool.Add(serverProof) + } + return &Pipeline{ + cfg: cfg, + historyDB: historyDB, + txSelector: txSelector, + batchBuilder: batchBuilder, + serverProofPool: serverProofPool, + txManager: txManager, + consts: *scConsts, + // TODO: Find best queue size + statsCh: make(chan synchronizer.Stats, 16), //nolint:gomnd + } +} + +// SetSyncStats is a thread safe method to sets the synchronizer Stats +func (p *Pipeline) SetSyncStats(stats *synchronizer.Stats) { + p.statsCh <- *stats +} + +// Start the forging pipeline +func (p *Pipeline) Start(batchNum common.BatchNum, + syncStats *synchronizer.Stats, initSCVars *synchronizer.SCVariables) error { + if p.started { + log.Fatal("Pipeline already started") + } + p.started = true + + // Reset pipeline state + p.batchNum = batchNum + p.vars = *initSCVars + p.lastScheduledL1BatchBlockNum = 0 + + p.ctx, p.cancel = context.WithCancel(context.Background()) + + err := p.txSelector.Reset(p.batchNum) + if err != nil { + log.Errorw("Pipeline: TxSelector.Reset", "error", err) + return err + } + err = p.batchBuilder.Reset(p.batchNum, true) + if err != nil { + log.Errorw("Pipeline: BatchBuilder.Reset", "error", err) + return err + } + + queueSize := 1 + batchChSentServerProof := make(chan *BatchInfo, queueSize) + + p.wg.Add(1) + go func() { + for { + select { + case <-p.ctx.Done(): + log.Debug("Pipeline forgeSendServerProof loop done") + p.wg.Done() + return + case syncStats := <-p.statsCh: + p.stats = syncStats + default: + p.batchNum = p.batchNum + 1 + batchInfo, err := p.forgeSendServerProof(p.ctx, p.batchNum) + if err == ErrDone { + continue + } + if err != nil { + log.Errorw("forgeSendServerProof", "err", err) + continue + } + batchChSentServerProof <- batchInfo + } + } + }() + + p.wg.Add(1) + go func() { + for { + select { + case <-p.ctx.Done(): + log.Debug("Pipeline waitServerProofSendEth loop done") + p.wg.Done() + return + case batchInfo := <-batchChSentServerProof: + err := p.waitServerProof(p.ctx, batchInfo) + if err == ErrDone { + continue + } + if err != nil { + log.Errorw("waitServerProof", "err", err) + continue + } + p.txManager.AddBatch(batchInfo) + } + } + }() + return nil +} + +// Stop the forging pipeline +func (p *Pipeline) Stop() { + if !p.started { + log.Fatal("Pipeline already stopped") + } + p.started = false + log.Debug("Stopping Pipeline...") + p.cancel() + p.wg.Wait() + // TODO: Cancel all proofServers with pending proofs +} + // forgeSendServerProof the next batch, wait for a proof server to be available and send the // circuit inputs to the proof server. -func (c *Coordinator) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { +func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { // remove transactions from the pool that have been there for too long - err := c.purgeRemoveByTimeout() + err := p.purgeRemoveByTimeout() if err != nil { return nil, err } @@ -396,17 +591,18 @@ func (c *Coordinator) forgeSendServerProof(ctx context.Context, batchNum common. // var feesInfo var l1UserTxsExtra, l1OperatorTxs []common.L1Tx // 1. Decide if we forge L2Tx or L1+L2Tx - if c.shouldL1L2Batch() { + if p.shouldL1L2Batch() { + p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBatch // 2a: L1+L2 txs - // l1UserTxs, toForgeL1TxsNumber := c.hdb.GetNextL1UserTxs() // TODO once HistoryDB is ready, uncomment - var l1UserTxs []common.L1Tx = nil // tmp, depends on HistoryDB - l1UserTxsExtra, l1OperatorTxs, poolL2Txs, err = c.txsel.GetL1L2TxSelection([]common.Idx{}, batchNum, l1UserTxs) // TODO once feesInfo is added to method return, add the var + // l1UserTxs, toForgeL1TxsNumber := c.historyDB.GetNextL1UserTxs() // TODO once HistoryDB is ready, uncomment + var l1UserTxs []common.L1Tx = nil // tmp, depends on HistoryDB + l1UserTxsExtra, l1OperatorTxs, poolL2Txs, err = p.txSelector.GetL1L2TxSelection([]common.Idx{}, batchNum, l1UserTxs) // TODO once feesInfo is added to method return, add the var if err != nil { return nil, err } } else { // 2b: only L2 txs - _, poolL2Txs, err = c.txsel.GetL2TxSelection([]common.Idx{}, batchNum) // TODO once feesInfo is added to method return, add the var + _, poolL2Txs, err = p.txSelector.GetL2TxSelection([]common.Idx{}, batchNum) // TODO once feesInfo is added to method return, add the var if err != nil { return nil, err } @@ -418,7 +614,7 @@ func (c *Coordinator) forgeSendServerProof(ctx context.Context, batchNum common. // the poolL2Txs selected. Will mark as invalid the txs that have a // (fromIdx, nonce) which already appears in the selected txs (includes // all the nonces smaller than the current one) - err = c.purgeInvalidDueToL2TxsSelection(poolL2Txs) + err = p.purgeInvalidDueToL2TxsSelection(poolL2Txs) if err != nil { return nil, err } @@ -431,18 +627,19 @@ func (c *Coordinator) forgeSendServerProof(ctx context.Context, batchNum common. // 4. Call BatchBuilder with TxSelector output configBatch := &batchbuilder.ConfigBatch{ - ForgerAddress: c.cfg.ForgerAddress, + ForgerAddress: p.cfg.ForgerAddress, } - zkInputs, err := c.batchBuilder.BuildBatch([]common.Idx{}, configBatch, l1UserTxsExtra, l1OperatorTxs, poolL2Txs, nil) // TODO []common.TokenID --> feesInfo + zkInputs, err := p.batchBuilder.BuildBatch([]common.Idx{}, configBatch, l1UserTxsExtra, l1OperatorTxs, poolL2Txs, nil) // TODO []common.TokenID --> feesInfo if err != nil { return nil, err } // 5. Save metadata from BatchBuilder output for BatchNum batchInfo.ZKInputs = zkInputs + p.cfg.debugBatchStore(&batchInfo) // 6. Wait for an available server proof blocking call - serverProof, err := c.serverProofPool.Get(ctx) + serverProof, err := p.serverProofPool.Get(ctx) if err != nil { return nil, err } @@ -451,9 +648,10 @@ func (c *Coordinator) forgeSendServerProof(ctx context.Context, batchNum common. // If there's an error further on, add the serverProof back to // the pool if err != nil { - c.serverProofPool.Add(serverProof) + p.serverProofPool.Add(serverProof) } }() + p.cfg.debugBatchStore(&batchInfo) // 7. Call the selected idle server proof with BatchBuilder output, // save server proof info for batchNum @@ -466,35 +664,20 @@ func (c *Coordinator) forgeSendServerProof(ctx context.Context, batchNum common. } // waitServerProof gets the generated zkProof & sends it to the SmartContract -func (c *Coordinator) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { +func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { proof, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof if err != nil { return err } - c.serverProofPool.Add(batchInfo.ServerProof) + p.serverProofPool.Add(batchInfo.ServerProof) batchInfo.ServerProof = nil batchInfo.Proof = proof - batchInfo.ForgeBatchArgs = c.prepareForgeBatchArgs(batchInfo) + batchInfo.ForgeBatchArgs = p.prepareForgeBatchArgs(batchInfo) batchInfo.TxStatus = TxStatusPending - - // TODO(FUTURE) once tx data type is defined, store ethTx (returned by ForgeCall) - // TBD if use ethTxStore as a disk k-v database, or use a Queue - // tx, err := c.ethTxStore.NewTx() - // if err != nil { - // return err - // } - // tx.Put(ethTx.Hash(), ethTx.Bytes()) - // if err := tx.Commit(); err!=nil { - // return nil - // } - + p.cfg.debugBatchStore(batchInfo) return nil } -func (c *Coordinator) handleReorg() error { - return nil // TODO -} - // isForgeSequence returns true if the node is the Forger in the current ethereum block // func (c *Coordinator) isForgeSequence() (bool, error) { // // TODO: Consider checking if we can forge by quering the Synchronizer instead of using ethClient @@ -509,19 +692,31 @@ func (c *Coordinator) handleReorg() error { // return c.ethClient.AuctionCanForge(*addr, blockNum+1) // } -func (c *Coordinator) purgeRemoveByTimeout() error { +func (p *Pipeline) purgeRemoveByTimeout() error { return nil // TODO } -func (c *Coordinator) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error { +func (p *Pipeline) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error { return nil // TODO } -func (c *Coordinator) shouldL1L2Batch() bool { - return false // TODO +func (p *Pipeline) shouldL1L2Batch() bool { + // Take the lastL1BatchBlockNum as the biggest between the last + // scheduled one, and the synchronized one. + lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum + if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { + lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock + } + // Return true if we have passed the l1BatchTimeoutPerc portion of the + // range before the l1batch timeout. + if p.stats.Eth.LastBlock.Num-lastL1BatchBlockNum >= + int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout)*p.cfg.L1BatchTimeoutPerc) { + return true + } + return false } -func (c *Coordinator) prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { +func (p *Pipeline) prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { // TODO return ð.RollupForgeBatchArgs{} } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index d88f584..024e162 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -10,7 +10,6 @@ import ( ethCommon "github.com/ethereum/go-ethereum/common" "github.com/hermeznetwork/hermez-node/batchbuilder" dbUtils "github.com/hermeznetwork/hermez-node/db" - "github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/log" @@ -21,13 +20,25 @@ import ( "github.com/stretchr/testify/require" ) +var deleteme = []string{} + +func TestMain(m *testing.M) { + exitVal := m.Run() + for _, dir := range deleteme { + if err := os.RemoveAll(dir); err != nil { + panic(err) + } + } + os.Exit(exitVal) +} + func newTestModules(t *testing.T) (*txselector.TxSelector, *batchbuilder.BatchBuilder) { // FUTURE once Synchronizer is ready, should return it also nLevels := 32 - synchDBPath, err := ioutil.TempDir("", "tmpSynchDB") + syncDBPath, err := ioutil.TempDir("", "tmpSyncDB") require.Nil(t, err) - defer assert.Nil(t, os.RemoveAll(synchDBPath)) - synchSdb, err := statedb.NewStateDB(synchDBPath, statedb.TypeSynchronizer, nLevels) + deleteme = append(deleteme, syncDBPath) + syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels) assert.Nil(t, err) pass := os.Getenv("POSTGRES_PASS") @@ -37,14 +48,14 @@ func newTestModules(t *testing.T) (*txselector.TxSelector, *batchbuilder.BatchBu txselDir, err := ioutil.TempDir("", "tmpTxSelDB") require.Nil(t, err) - defer assert.Nil(t, os.RemoveAll(txselDir)) - txsel, err := txselector.NewTxSelector(txselDir, synchSdb, l2DB, 10, 10, 10) + deleteme = append(deleteme, txselDir) + txsel, err := txselector.NewTxSelector(txselDir, syncSdb, l2DB, 10, 10, 10) assert.Nil(t, err) bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB") require.Nil(t, err) - defer assert.Nil(t, os.RemoveAll(bbDir)) - bb, err := batchbuilder.NewBatchBuilder(bbDir, synchSdb, nil, 0, uint64(nLevels)) + deleteme = append(deleteme, bbDir) + bb, err := batchbuilder.NewBatchBuilder(bbDir, syncSdb, nil, 0, uint64(nLevels)) assert.Nil(t, err) // l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0) @@ -62,49 +73,50 @@ func (t *timer) Time() int64 { return currentTime } -var forger ethCommon.Address -var bidder ethCommon.Address - -func waitForSlot(t *testing.T, coord *Coordinator, c *test.Client, slot int64) { - for { - blockNum, err := c.EthLastBlock() - require.Nil(t, err) - nextBlockSlot, err := c.AuctionGetSlotNumber(blockNum + 1) - require.Nil(t, err) - if nextBlockSlot == slot { - break - } - c.CtlMineBlock() - time.Sleep(100 * time.Millisecond) - var stats synchronizer.Stats - stats.Eth.LastBlock = c.CtlLastBlock() - stats.Sync.LastBlock = c.CtlLastBlock() - canForge, err := c.AuctionCanForge(forger, blockNum+1) - require.Nil(t, err) - if canForge { - // fmt.Println("DBG canForge") - stats.Sync.Auction.CurrentSlot.Forger = forger - } - coord.SendMsg(MsgSyncStats{ - Stats: stats, - }) - } -} +var bidder = ethCommon.HexToAddress("0x6b175474e89094c44da98b954eedeac495271d0f") +var forger = ethCommon.HexToAddress("0xc344E203a046Da13b0B4467EB7B3629D0C99F6E6") -func TestCoordinator(t *testing.T) { +func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *test.Client, ethClientSetup *test.ClientSetup) *Coordinator { txsel, bb := newTestModules(t) - bidder = ethCommon.HexToAddress("0x6b175474e89094c44da98b954eedeac495271d0f") - forger = ethCommon.HexToAddress("0xc344E203a046Da13b0B4467EB7B3629D0C99F6E6") + + debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch") + require.Nil(t, err) + deleteme = append(deleteme, debugBatchPath) conf := Config{ - ForgerAddress: forger, + ForgerAddress: forgerAddr, + ConfirmBlocks: 5, + L1BatchTimeoutPerc: 0.5, + EthClientAttempts: 5, + EthClientAttemptsDelay: 100 * time.Millisecond, + TxManagerCheckInterval: 500 * time.Millisecond, + DebugBatchPath: debugBatchPath, } - hdb := &historydb.HistoryDB{} serverProofs := []ServerProofInterface{&ServerProofMock{}, &ServerProofMock{}} - var timer timer + scConsts := &synchronizer.SCConsts{ + Rollup: *ethClientSetup.RollupConstants, + Auction: *ethClientSetup.AuctionConstants, + WDelayer: *ethClientSetup.WDelayerConstants, + } + initSCVars := &synchronizer.SCVariables{ + Rollup: *ethClientSetup.RollupVariables, + Auction: *ethClientSetup.AuctionVariables, + WDelayer: *ethClientSetup.WDelayerVariables, + } + coord, err := NewCoordinator(conf, nil, txsel, bb, serverProofs, ethClient, scConsts, initSCVars) + require.Nil(t, err) + return coord +} + +func TestCoordinatorFlow(t *testing.T) { + if os.Getenv("TEST_COORD_FLOW") == "" { + return + } ethClientSetup := test.NewClientSetupExample() + var timer timer ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) // Bid for slot 2 and 4 _, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar") @@ -114,40 +126,231 @@ func TestCoordinator(t *testing.T) { _, err = ethClient.AuctionBidSimple(4, big.NewInt(9999)) require.Nil(t, err) - scConsts := &synchronizer.SCConsts{ - Rollup: *ethClientSetup.RollupConstants, - Auction: *ethClientSetup.AuctionConstants, - WDelayer: *ethClientSetup.WDelayerConstants, - } - initSCVars := &synchronizer.SCVariables{ - Rollup: *ethClientSetup.RollupVariables, - Auction: *ethClientSetup.AuctionVariables, - WDelayer: *ethClientSetup.WDelayerVariables, - } - c := NewCoordinator(conf, hdb, txsel, bb, serverProofs, ethClient, scConsts, initSCVars) - c.Start() + coord.Start() time.Sleep(1 * time.Second) + waitForSlot := func(slot int64) { + for { + blockNum, err := ethClient.EthLastBlock() + require.Nil(t, err) + nextBlockSlot, err := ethClient.AuctionGetSlotNumber(blockNum + 1) + require.Nil(t, err) + if nextBlockSlot == slot { + break + } + ethClient.CtlMineBlock() + time.Sleep(100 * time.Millisecond) + var stats synchronizer.Stats + stats.Eth.LastBlock = *ethClient.CtlLastBlock() + stats.Sync.LastBlock = *ethClient.CtlLastBlock() + canForge, err := ethClient.AuctionCanForge(forger, blockNum+1) + require.Nil(t, err) + if canForge { + // fmt.Println("DBG canForge") + stats.Sync.Auction.CurrentSlot.Forger = forger + } + coord.SendMsg(MsgSyncStats{ + Stats: stats, + }) + } + } + // NOTE: With the current test, the coordinator will enter in forge // time before the bidded slot because no one else is forging in the // other slots before the slot deadline. // simulate forgeSequence time - waitForSlot(t, c, ethClient, 2) + waitForSlot(2) log.Info("~~~ simulate entering in forge time") time.Sleep(1 * time.Second) // simulate going out from forgeSequence - waitForSlot(t, c, ethClient, 3) + waitForSlot(3) log.Info("~~~ simulate going out from forge time") time.Sleep(1 * time.Second) // simulate entering forgeSequence time again - waitForSlot(t, c, ethClient, 4) + waitForSlot(4) log.Info("~~~ simulate entering in forge time again") time.Sleep(2 * time.Second) // simulate stopping forgerLoop by channel log.Info("~~~ simulate stopping forgerLoop by closing coordinator stopch") - c.Stop() + coord.Stop() time.Sleep(1 * time.Second) } + +func TestCoordinatorStartStop(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + var timer timer + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) + coord.Start() + coord.Stop() +} + +func TestCoordCanForge(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + bootForger := ethClientSetup.AuctionVariables.BootCoordinator + + var timer timer + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) + _, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar") + require.Nil(t, err) + _, err = ethClient.AuctionBidSimple(2, big.NewInt(9999)) + require.Nil(t, err) + + bootCoord := newTestCoordinator(t, bootForger, ethClient, ethClientSetup) + + assert.Equal(t, forger, coord.cfg.ForgerAddress) + assert.Equal(t, bootForger, bootCoord.cfg.ForgerAddress) + ethBootCoord, err := ethClient.AuctionGetBootCoordinator() + require.Nil(t, err) + assert.Equal(t, &bootForger, ethBootCoord) + + var stats synchronizer.Stats + + // Slot 0. No bid, so the winner is the boot coordinator + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = bootForger + assert.Equal(t, false, coord.canForge(&stats)) + assert.Equal(t, true, bootCoord.canForge(&stats)) + + // Slot 0. No bid, and we reach the deadline, so anyone can forge + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + + int64(ethClientSetup.AuctionVariables.SlotDeadline) + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = bootForger + assert.Equal(t, true, coord.canForge(&stats)) + assert.Equal(t, true, bootCoord.canForge(&stats)) + + // Slot 1. coordinator bid, so the winner is the coordinator + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + + 1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot) + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = forger + assert.Equal(t, true, coord.canForge(&stats)) + assert.Equal(t, false, bootCoord.canForge(&stats)) +} + +func TestCoordHandleMsgSyncStats(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + bootForger := ethClientSetup.AuctionVariables.BootCoordinator + + var timer timer + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) + _, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar") + require.Nil(t, err) + _, err = ethClient.AuctionBidSimple(2, big.NewInt(9999)) + require.Nil(t, err) + + var stats synchronizer.Stats + + // Slot 0. No bid, so the winner is the boot coordinator + // pipelineStarted: false -> false + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = bootForger + assert.Equal(t, false, coord.canForge(&stats)) + require.Nil(t, coord.handleMsgSyncStats(&stats)) + assert.Nil(t, coord.pipeline) + + // Slot 0. No bid, and we reach the deadline, so anyone can forge + // pipelineStarted: false -> true + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + + int64(ethClientSetup.AuctionVariables.SlotDeadline) + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = bootForger + assert.Equal(t, true, coord.canForge(&stats)) + require.Nil(t, coord.handleMsgSyncStats(&stats)) + assert.NotNil(t, coord.pipeline) + + // Slot 0. No bid, and we reach the deadline, so anyone can forge + // pipelineStarted: true -> true + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + + int64(ethClientSetup.AuctionVariables.SlotDeadline) + 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = bootForger + assert.Equal(t, true, coord.canForge(&stats)) + require.Nil(t, coord.handleMsgSyncStats(&stats)) + assert.NotNil(t, coord.pipeline) + + // Slot 0. No bid, so the winner is the boot coordinator + // pipelineStarted: true -> false + stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum + + 1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot) + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.Auction.CurrentSlot.Forger = bootForger + assert.Equal(t, false, coord.canForge(&stats)) + require.Nil(t, coord.handleMsgSyncStats(&stats)) + assert.Nil(t, coord.pipeline) +} + +func TestPipelineShouldL1L2Batch(t *testing.T) { + ethClientSetup := test.NewClientSetupExample() + + var timer timer + ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) + coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) + pipeline := coord.newPipeline() + pipeline.vars = coord.vars + + // Check that the parameters are the ones we expect and use in this test + require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc) + require.Equal(t, int64(9), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout) + l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc + l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout + + var stats synchronizer.Stats + + startBlock := int64(100) + + // + // No scheduled L1Batch + // + + // Last L1Batch was a long time ago + stats.Eth.LastBlock.Num = startBlock + stats.Sync.LastBlock = stats.Eth.LastBlock + stats.Sync.LastL1BatchBlock = 0 + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch()) + + stats.Sync.LastL1BatchBlock = startBlock + + // We are are one block before the timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock + int64(float64(l1BatchTimeout)*l1BatchTimeoutPerc) - 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, false, pipeline.shouldL1L2Batch()) + + // We are are at timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock + int64(float64(l1BatchTimeout)*l1BatchTimeoutPerc) + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch()) + + // + // Scheduled L1Batch + // + pipeline.lastScheduledL1BatchBlockNum = startBlock + stats.Sync.LastL1BatchBlock = startBlock - 10 + + // We are are one block before the timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock + int64(float64(l1BatchTimeout)*l1BatchTimeoutPerc) - 1 + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, false, pipeline.shouldL1L2Batch()) + + // We are are at timeout range * 0.5 + stats.Eth.LastBlock.Num = startBlock + int64(float64(l1BatchTimeout)*l1BatchTimeoutPerc) + stats.Sync.LastBlock = stats.Eth.LastBlock + pipeline.stats = stats + assert.Equal(t, true, pipeline.shouldL1L2Batch()) +} + +// TODO: Test Reorg +// TODO: Test Pipeline +// TODO: Test TxMonitor diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 2d915ed..7afa09b 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -304,6 +304,15 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) { return batchNum, row.Scan(&batchNum) } +// GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch +func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) { + row := hdb.db.QueryRow(`SELECT eth_block_num FROM batch + WHERE forge_l1_txs_num IS NOT NULL + ORDER BY batch_num DESC LIMIT 1;`) + var blockNum int64 + return blockNum, row.Scan(&blockNum) +} + // GetLastL1TxsNum returns the greatest ForgeL1TxsNum in the DB from forged // batches. If there's no batch in the DB (nil, nil) is returned. func (hdb *HistoryDB) GetLastL1TxsNum() (*int64, error) { @@ -1410,7 +1419,7 @@ func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) { } } - if err := hdb.updateExitTree(txn, blockData.Block.EthBlockNum, + if err := hdb.updateExitTree(txn, blockData.Block.Num, blockData.Rollup.Withdrawals, blockData.WDelayer.Withdrawals); err != nil { return err } diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index f25888d..49f8ca4 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -107,7 +107,7 @@ func TestBlocks(t *testing.T) { } func assertEqualBlock(t *testing.T, expected *common.Block, actual *common.Block) { - assert.Equal(t, expected.EthBlockNum, actual.EthBlockNum) + assert.Equal(t, expected.Num, actual.Num) assert.Equal(t, expected.Hash, actual.Hash) assert.Equal(t, expected.Timestamp.Unix(), actual.Timestamp.Unix()) } @@ -150,6 +150,7 @@ func TestBatches(t *testing.T) { batches := []common.Batch{} tokensValue := make(map[common.TokenID]float64) lastL1TxsNum := new(int64) + lastL1BatchBlockNum := int64(0) for _, block := range blocks { // Insert block assert.NoError(t, historyDB.AddBlock(&block.Block)) @@ -169,6 +170,7 @@ func TestBatches(t *testing.T) { forgeTxsNum := batch.Batch.ForgeL1TxsNum if forgeTxsNum != nil && (lastL1TxsNum == nil || *lastL1TxsNum < *forgeTxsNum) { *lastL1TxsNum = *forgeTxsNum + lastL1BatchBlockNum = batch.Batch.EthBlockNum } } } @@ -199,6 +201,10 @@ func TestBatches(t *testing.T) { fetchedLastL1TxsNum, err := historyDB.GetLastL1TxsNum() assert.NoError(t, err) assert.Equal(t, lastL1TxsNum, fetchedLastL1TxsNum) + // Test GetLastL1BatchBlockNum + fetchedLastL1BatchBlockNum, err := historyDB.GetLastL1BatchBlockNum() + assert.NoError(t, err) + assert.Equal(t, lastL1BatchBlockNum, fetchedLastL1BatchBlockNum) } func TestBids(t *testing.T) { @@ -735,7 +741,7 @@ func TestUpdateExitTree(t *testing.T) { // Add withdraws to the second-to-last block, and insert block into the DB block := &blocks[len(blocks)-2] - require.Equal(t, int64(4), block.Block.EthBlockNum) + require.Equal(t, int64(4), block.Block.Num) tokenAddr := blocks[0].Rollup.AddedTokens[0].EthAddr // block.WDelayer.Deposits = append(block.WDelayer.Deposits, // common.WDelayerTransfer{Owner: tc.UsersByIdx[257].Addr, Token: tokenAddr, Amount: big.NewInt(80)}, // 257 @@ -752,7 +758,7 @@ func TestUpdateExitTree(t *testing.T) { err = historyDB.addBlock(historyDB.db, &block.Block) require.Nil(t, err) - err = historyDB.updateExitTree(historyDB.db, block.Block.EthBlockNum, + err = historyDB.updateExitTree(historyDB.db, block.Block.Num, block.Rollup.Withdrawals, block.WDelayer.Withdrawals) require.Nil(t, err) @@ -767,15 +773,15 @@ func TestUpdateExitTree(t *testing.T) { for _, withdraw := range block.Rollup.Withdrawals { assert.Equal(t, withdraw.NumExitRoot, dbExitsByIdx[withdraw.Idx].BatchNum) if withdraw.InstantWithdraw { - assert.Equal(t, &block.Block.EthBlockNum, dbExitsByIdx[withdraw.Idx].InstantWithdrawn) + assert.Equal(t, &block.Block.Num, dbExitsByIdx[withdraw.Idx].InstantWithdrawn) } else { - assert.Equal(t, &block.Block.EthBlockNum, dbExitsByIdx[withdraw.Idx].DelayedWithdrawRequest) + assert.Equal(t, &block.Block.Num, dbExitsByIdx[withdraw.Idx].DelayedWithdrawRequest) } } // Add delayed withdraw to the last block, and insert block into the DB block = &blocks[len(blocks)-1] - require.Equal(t, int64(5), block.Block.EthBlockNum) + require.Equal(t, int64(5), block.Block.Num) block.WDelayer.Withdrawals = append(block.WDelayer.Withdrawals, common.WDelayerTransfer{ Owner: tc.UsersByIdx[257].Addr, @@ -785,7 +791,7 @@ func TestUpdateExitTree(t *testing.T) { err = historyDB.addBlock(historyDB.db, &block.Block) require.Nil(t, err) - err = historyDB.updateExitTree(historyDB.db, block.Block.EthBlockNum, + err = historyDB.updateExitTree(historyDB.db, block.Block.Num, block.Rollup.Withdrawals, block.WDelayer.Withdrawals) require.Nil(t, err) @@ -795,7 +801,7 @@ func TestUpdateExitTree(t *testing.T) { for _, dbExit := range dbExits { dbExitsByIdx[dbExit.AccountIdx] = dbExit } - require.Equal(t, &block.Block.EthBlockNum, dbExitsByIdx[257].DelayedWithdrawn) + require.Equal(t, &block.Block.Num, dbExitsByIdx[257].DelayedWithdrawn) } func TestGetBestBidCoordinator(t *testing.T) { diff --git a/eth/ethereum.go b/eth/ethereum.go index 28f09a5..4a01f5e 100644 --- a/eth/ethereum.go +++ b/eth/ethereum.go @@ -257,10 +257,11 @@ func (c *EthereumClient) EthLastBlock() (int64, error) { // return c.client.HeaderByNumber(ctx, number) // } -// EthBlockByNumber internally calls ethclient.Client BlockByNumber and returns *common.Block +// EthBlockByNumber internally calls ethclient.Client BlockByNumber and returns +// *common.Block. If number == -1, the latests known block is returned. func (c *EthereumClient) EthBlockByNumber(ctx context.Context, number int64) (*common.Block, error) { blockNum := big.NewInt(number) - if number == 0 { + if number == -1 { blockNum = nil } block, err := c.client.BlockByNumber(ctx, blockNum) @@ -268,10 +269,10 @@ func (c *EthereumClient) EthBlockByNumber(ctx context.Context, number int64) (*c return nil, err } b := &common.Block{ - EthBlockNum: block.Number().Int64(), - Timestamp: time.Unix(int64(block.Time()), 0), - ParentHash: block.ParentHash(), - Hash: block.Hash(), + Num: block.Number().Int64(), + Timestamp: time.Unix(int64(block.Time()), 0), + ParentHash: block.ParentHash(), + Hash: block.Hash(), } return b, nil } diff --git a/node/node.go b/node/node.go index bdfbfd4..329b6e4 100644 --- a/node/node.go +++ b/node/node.go @@ -2,6 +2,7 @@ package node import ( "context" + "fmt" "net/http" "sync" "time" @@ -165,7 +166,7 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL) } - coord = coordinator.NewCoordinator( + coord, err = coordinator.NewCoordinator( coordinator.Config{ ForgerAddress: coordCfg.ForgerAddress, ConfirmBlocks: coordCfg.ConfirmBlocks, @@ -178,9 +179,20 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, &scConsts, &initSCVars, ) + if err != nil { + return nil, err + } } var nodeAPI *NodeAPI if cfg.API.Address != "" { + if cfg.API.UpdateMetricsInterval.Duration == 0 { + return nil, fmt.Errorf("invalid cfg.API.UpdateMetricsInterval: %v", + cfg.API.UpdateMetricsInterval.Duration) + } + if cfg.API.UpdateRecommendedFeeInterval.Duration == 0 { + return nil, fmt.Errorf("invalid cfg.API.UpdateRecommendedFeeInterval: %v", + cfg.API.UpdateRecommendedFeeInterval.Duration) + } server := gin.Default() coord := false if mode == ModeCoordinator { @@ -303,9 +315,11 @@ func (a *NodeAPI) Run(ctx context.Context) error { // TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we // don't have to pass it around. func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration) { - if blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock); err != nil { + blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock) + stats := n.sync.Stats() + if err != nil { // case: error - log.Errorw("Synchronizer.Sync", "error", err) + log.Errorw("Synchronizer.Sync", "err", err) return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration } else if discarded != nil { // case: reorg @@ -318,13 +332,13 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration n.nodeAPI.api.SetRollupVariables(*rollup) n.nodeAPI.api.SetAuctionVariables(*auction) n.nodeAPI.api.SetWDelayerVariables(*wDelayer) - - // TODO: n.nodeAPI.api.UpdateNetworkInfo() + n.nodeAPI.api.UpdateNetworkInfoBlock( + stats.Eth.LastBlock, stats.Sync.LastBlock, + ) } return nil, time.Duration(0) } else if blockData != nil { // case: new block - stats := n.sync.Stats() if n.mode == ModeCoordinator { if stats.Synced() && (blockData.Rollup.Vars != nil || blockData.Auction.Vars != nil || @@ -350,7 +364,15 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration n.nodeAPI.api.SetWDelayerVariables(*blockData.WDelayer.Vars) } - // TODO: n.nodeAPI.api.UpdateNetworkInfo() + if stats.Synced() { + if err := n.nodeAPI.api.UpdateNetworkInfo( + stats.Eth.LastBlock, stats.Sync.LastBlock, + common.BatchNum(stats.Eth.LastBatch), + stats.Sync.Auction.CurrentSlot.SlotNum, + ); err != nil { + log.Errorw("API.UpdateNetworkInfo", "err", err) + } + } } return &blockData.Block, time.Duration(0) } else { @@ -408,6 +430,38 @@ func (n *Node) StartNodeAPI() { log.Fatalw("NodeAPI.Run", "err", err) } }() + + n.wg.Add(1) + go func() { + for { + select { + case <-n.ctx.Done(): + log.Info("API.UpdateMetrics loop done") + n.wg.Done() + return + case <-time.After(n.cfg.API.UpdateMetricsInterval.Duration): + if err := n.nodeAPI.api.UpdateMetrics(); err != nil { + log.Errorw("API.UpdateMetrics", "err", err) + } + } + } + }() + + n.wg.Add(1) + go func() { + for { + select { + case <-n.ctx.Done(): + log.Info("API.UpdateRecommendedFee loop done") + n.wg.Done() + return + case <-time.After(n.cfg.API.UpdateRecommendedFeeInterval.Duration): + if err := n.nodeAPI.api.UpdateRecommendedFee(); err != nil { + log.Errorw("API.UpdateRecommendedFee", "err", err) + } + } + } + }() } // Start the node diff --git a/priceupdater/priceupdater_test.go b/priceupdater/priceupdater_test.go index 7d35538..85e1a46 100644 --- a/priceupdater/priceupdater_test.go +++ b/priceupdater/priceupdater_test.go @@ -29,7 +29,7 @@ func TestPriceUpdater(t *testing.T) { tokens := []common.Token{} tokens = append(tokens, common.Token{ TokenID: 1, - EthBlockNum: blocks[0].EthBlockNum, + EthBlockNum: blocks[0].Num, EthAddr: ethCommon.BigToAddress(big.NewInt(2)), Name: "DAI", Symbol: "DAI", diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index b12d9d5..60bd5e6 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -46,15 +46,18 @@ type Stats struct { Eth struct { RefreshPeriod time.Duration Updated time.Time - FirstBlock int64 - LastBlock int64 + FirstBlockNum int64 + LastBlock common.Block LastBatch int64 } Sync struct { Updated time.Time - LastBlock int64 + LastBlock common.Block LastBatch int64 - Auction struct { + // LastL1BatchBlock is the last ethereum block in which an + // l1Batch was forged + LastL1BatchBlock int64 + Auction struct { CurrentSlot common.Slot } } @@ -78,10 +81,10 @@ type StatsHolder struct { } // NewStatsHolder creates a new StatsHolder -func NewStatsHolder(firstBlock int64, refreshPeriod time.Duration) *StatsHolder { +func NewStatsHolder(firstBlockNum int64, refreshPeriod time.Duration) *StatsHolder { stats := Stats{} stats.Eth.RefreshPeriod = refreshPeriod - stats.Eth.FirstBlock = firstBlock + stats.Eth.FirstBlockNum = firstBlockNum return &StatsHolder{Stats: stats} } @@ -93,13 +96,16 @@ func (s *StatsHolder) UpdateCurrentSlot(slot common.Slot) { } // UpdateSync updates the synchronizer stats -func (s *StatsHolder) UpdateSync(lastBlock int64, lastBatch *common.BatchNum) { +func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum, lastL1BatchBlock *int64) { now := time.Now() s.rw.Lock() - s.Sync.LastBlock = lastBlock + s.Sync.LastBlock = *lastBlock if lastBatch != nil { s.Sync.LastBatch = int64(*lastBatch) } + if lastL1BatchBlock != nil { + s.Sync.LastL1BatchBlock = *lastL1BatchBlock + } s.Sync.Updated = now s.rw.Unlock() } @@ -114,7 +120,7 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error { return nil } - lastBlock, err := ethClient.EthLastBlock() + lastBlock, err := ethClient.EthBlockByNumber(context.TODO(), -1) if err != nil { return err } @@ -124,7 +130,7 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error { } s.rw.Lock() s.Eth.Updated = now - s.Eth.LastBlock = lastBlock + s.Eth.LastBlock = *lastBlock s.Eth.LastBatch = lastBatch s.rw.Unlock() return nil @@ -138,17 +144,21 @@ func (s *StatsHolder) CopyStats() *Stats { sCopy.Sync.Auction.CurrentSlot.BidValue = common.CopyBigInt(s.Sync.Auction.CurrentSlot.BidValue) } + if s.Sync.Auction.CurrentSlot.DefaultSlotBid != nil { + sCopy.Sync.Auction.CurrentSlot.DefaultSlotBid = + common.CopyBigInt(s.Sync.Auction.CurrentSlot.DefaultSlotBid) + } s.rw.RUnlock() return &sCopy } func (s *StatsHolder) blocksPerc() float64 { - syncLastBlock := s.Sync.LastBlock - if s.Sync.LastBlock == 0 { - syncLastBlock = s.Eth.FirstBlock - 1 + syncLastBlockNum := s.Sync.LastBlock.Num + if s.Sync.LastBlock.Num == 0 { + syncLastBlockNum = s.Eth.FirstBlockNum - 1 } - return float64(syncLastBlock-(s.Eth.FirstBlock-1)) * 100.0 / - float64(s.Eth.LastBlock-(s.Eth.FirstBlock-1)) + return float64(syncLastBlockNum-(s.Eth.FirstBlockNum-1)) * 100.0 / + float64(s.Eth.LastBlock.Num-(s.Eth.FirstBlockNum-1)) } func (s *StatsHolder) batchesPerc(batchNum int64) float64 { @@ -279,7 +289,7 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error { BatchesLen: int(s.stats.Sync.Auction.CurrentSlot.BatchesLen), } // We want the next block because the current one is already mined - blockNum := s.stats.Sync.LastBlock + 1 + blockNum := s.stats.Sync.LastBlock.Num + 1 slotNum := s.consts.Auction.SlotNum(blockNum) if batchesLen == -1 { dbBatchesLen, err := s.historyDB.GetBatchesLen(slotNum) @@ -310,8 +320,11 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error { slot.URL = "???" } else if err == nil { slot.BidValue = bidCoord.BidValue - defaultSlotBid := bidCoord.DefaultSlotSetBid[slot.SlotNum%6] - if slot.BidValue.Cmp(defaultSlotBid) >= 0 { + slot.DefaultSlotBid = bidCoord.DefaultSlotSetBid[slot.SlotNum%6] + // Only if the highest bid value is higher than the + // default slot bid, the bidder is the winner of the + // slot. Otherwise the boot coordinator is the winner. + if slot.BidValue.Cmp(slot.DefaultSlotBid) >= 0 { slot.Bidder = bidCoord.Bidder slot.Forger = bidCoord.Forger slot.URL = bidCoord.URL @@ -344,28 +357,28 @@ func (s *Synchronizer) init() error { if err := s.stats.UpdateEth(s.ethClient); err != nil { return err } - var lastBlockNum int64 + lastBlock := &common.Block{} lastSavedBlock, err := s.historyDB.GetLastBlock() if err != nil && err != sql.ErrNoRows { return err } // If there's no block in the DB (or we only have the default block 0), // make sure that the stateDB is clean - if err == sql.ErrNoRows || lastSavedBlock.EthBlockNum == 0 { + if err == sql.ErrNoRows || lastSavedBlock.Num == 0 { if err := s.stateDB.Reset(0); err != nil { return err } } else { - lastBlockNum = lastSavedBlock.EthBlockNum + lastBlock = lastSavedBlock } - if err := s.resetState(lastBlockNum); err != nil { + if err := s.resetState(lastBlock); err != nil { return err } log.Infow("Sync init block", "syncLastBlock", s.stats.Sync.LastBlock, "syncBlocksPerc", s.stats.blocksPerc(), - "ethFirstBlock", s.stats.Eth.FirstBlock, + "ethFirstBlockNum", s.stats.Eth.FirstBlockNum, "ethLastBlock", s.stats.Eth.LastBlock, ) log.Infow("Sync init batch", @@ -393,13 +406,13 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) } // If we don't have any stored block, we must do a full sync // starting from the startBlockNum - if err == sql.ErrNoRows || lastSavedBlock.EthBlockNum == 0 { + if err == sql.ErrNoRows || lastSavedBlock.Num == 0 { nextBlockNum = s.startBlockNum lastSavedBlock = nil } } if lastSavedBlock != nil { - nextBlockNum = lastSavedBlock.EthBlockNum + 1 + nextBlockNum = lastSavedBlock.Num + 1 } ethBlock, err := s.ethClient.EthBlockByNumber(ctx, nextBlockNum) @@ -408,7 +421,8 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) } else if err != nil { return nil, nil, err } - log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", ethBlock.EthBlockNum, ethBlock.ParentHash.String(), ethBlock.Hash.String()) + log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", + ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String()) if err := s.stats.UpdateEth(s.ethClient); err != nil { return nil, nil, err @@ -424,13 +438,13 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) if lastSavedBlock.Hash != ethBlock.ParentHash { // Reorg detected log.Debugw("Reorg Detected", - "blockNum", ethBlock.EthBlockNum, + "blockNum", ethBlock.Num, "block.parent(got)", ethBlock.ParentHash, "parent.hash(exp)", lastSavedBlock.Hash) lastDBBlockNum, err := s.reorg(lastSavedBlock) if err != nil { return nil, nil, err } - discarded := lastSavedBlock.EthBlockNum - lastDBBlockNum + discarded := lastSavedBlock.Num - lastDBBlockNum return nil, &discarded, nil } } @@ -456,11 +470,16 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) for i := range rollupData.Withdrawals { withdrawal := &rollupData.Withdrawals[i] if !withdrawal.InstantWithdraw { - wDelayerTransfer, ok := wDelayerData.DepositsByTxHash[withdrawal.TxHash] - if !ok { + wDelayerTransfers := wDelayerData.DepositsByTxHash[withdrawal.TxHash] + if len(wDelayerTransfers) == 0 { return nil, nil, fmt.Errorf("WDelayer deposit corresponding to " + "non-instant rollup withdrawal not found") } + // Pop the first wDelayerTransfer to consume them in chronological order + wDelayerTransfer := wDelayerTransfers[0] + wDelayerData.DepositsByTxHash[withdrawal.TxHash] = + wDelayerData.DepositsByTxHash[withdrawal.TxHash][1:] + withdrawal.Owner = wDelayerTransfer.Owner withdrawal.Token = wDelayerTransfer.Token } @@ -486,18 +505,25 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) batchesLen := len(rollupData.Batches) if batchesLen == 0 { - s.stats.UpdateSync(ethBlock.EthBlockNum, nil) + s.stats.UpdateSync(ethBlock, nil, nil) } else { - s.stats.UpdateSync(ethBlock.EthBlockNum, - &rollupData.Batches[batchesLen-1].Batch.BatchNum) + var lastL1BatchBlock *int64 + for _, batchData := range rollupData.Batches { + if batchData.L1Batch { + lastL1BatchBlock = &batchData.Batch.EthBlockNum + } + } + s.stats.UpdateSync(ethBlock, + &rollupData.Batches[batchesLen-1].Batch.BatchNum, lastL1BatchBlock) } if err := s.updateCurrentSlotIfSync(len(rollupData.Batches)); err != nil { return nil, nil, err } + log.Debugw("Synced block", - "syncLastBlock", s.stats.Sync.LastBlock, + "syncLastBlockNum", s.stats.Sync.LastBlock.Num, "syncBlocksPerc", s.stats.blocksPerc(), - "ethLastBlock", s.stats.Eth.LastBlock, + "ethLastBlockNum", s.stats.Eth.LastBlock.Num, ) for _, batchData := range rollupData.Batches { log.Debugw("Synced batch", @@ -516,8 +542,9 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) // corresponding batches in StateBD are discarded. Returns the last valid // blockNum from the HistoryDB. func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) { - blockNum := uncleBlock.EthBlockNum + blockNum := uncleBlock.Num + var block *common.Block for blockNum >= s.startBlockNum { ethBlock, err := s.ethClient.EthBlockByNumber(context.Background(), blockNum) if err != nil { @@ -525,7 +552,7 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) { return 0, err } - block, err := s.historyDB.GetBlock(blockNum) + block, err = s.historyDB.GetBlock(blockNum) if err != nil { log.Errorw("historyDB.GetBlock", "err", err) return 0, err @@ -536,23 +563,23 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) { } blockNum-- } - total := uncleBlock.EthBlockNum - blockNum - log.Debugw("Discarding blocks", "total", total, "from", uncleBlock.EthBlockNum, "to", blockNum+1) + total := uncleBlock.Num - block.Num + log.Debugw("Discarding blocks", "total", total, "from", uncleBlock.Num, "to", block.Num+1) // Set History DB and State DB to the correct state - err := s.historyDB.Reorg(blockNum) + err := s.historyDB.Reorg(block.Num) if err != nil { return 0, err } - if err := s.resetState(blockNum); err != nil { + if err := s.resetState(block); err != nil { return 0, err } - return blockNum, nil + return block.Num, nil } -func (s *Synchronizer) resetState(blockNum int64) error { +func (s *Synchronizer) resetState(block *common.Block) error { rollup, auction, wDelayer, err := s.historyDB.GetSCVars() // If SCVars are not in the HistoryDB, this is probably the first run // of the Synchronizer: store the initial vars taken from config @@ -578,13 +605,23 @@ func (s *Synchronizer) resetState(blockNum int64) error { if err == sql.ErrNoRows { batchNum = 0 } + + lastL1BatchBlockNum, err := s.historyDB.GetLastL1BatchBlockNum() + if err != nil && err != sql.ErrNoRows { + log.Errorw("historyDB.GetLastL1BatchBlockNum", "err", err) + return err + } + if err == sql.ErrNoRows { + lastL1BatchBlockNum = 0 + } + err = s.stateDB.Reset(batchNum) if err != nil { log.Errorw("stateDB.Reset", "err", err) return err } - s.stats.UpdateSync(blockNum, &batchNum) + s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum) // TODO if err := s.updateCurrentSlotIfSync(-1); err != nil { return err @@ -640,7 +677,7 @@ func (s *Synchronizer) Status() (*common.SyncStatus, error) { // rollupSync retreives all the Rollup Smart Contract Data that happened at // ethBlock.blockNum with ethBlock.Hash. func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, error) { - blockNum := ethBlock.EthBlockNum + blockNum := ethBlock.Num var rollupData = common.NewRollupData() // var forgeL1TxsNum int64 @@ -788,8 +825,8 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e batchData.CreatedAccounts = processTxsOut.CreatedAccounts slotNum := int64(0) - if ethBlock.EthBlockNum >= s.consts.Auction.GenesisBlockNum { - slotNum = (ethBlock.EthBlockNum - s.consts.Auction.GenesisBlockNum) / + if ethBlock.Num >= s.consts.Auction.GenesisBlockNum { + slotNum = (ethBlock.Num - s.consts.Auction.GenesisBlockNum) / int64(s.consts.Auction.BlocksPerSlot) } @@ -880,7 +917,7 @@ func cutStringMax(s string, max int) string { // auctionSync gets information from the Auction Contract func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, error) { - blockNum := ethBlock.EthBlockNum + blockNum := ethBlock.Num var auctionData = common.NewAuctionData() // Get auction events in the block @@ -955,7 +992,8 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, "auctionEvents.NewDefaultSlotSetBid: %v", evt.SlotSet) } s.vars.Auction.DefaultSlotSetBid[evt.SlotSet] = evt.NewInitialMinBid - s.vars.Auction.DefaultSlotSetBidSlotNum = s.consts.Auction.SlotNum(blockNum) + int64(s.vars.Auction.ClosedAuctionSlots) + 1 + s.vars.Auction.DefaultSlotSetBidSlotNum = s.consts.Auction.SlotNum(blockNum) + + int64(s.vars.Auction.ClosedAuctionSlots) + 1 varsUpdate = true } @@ -973,7 +1011,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, // wdelayerSync gets information from the Withdrawal Delayer Contract func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerData, error) { - blockNum := ethBlock.EthBlockNum + blockNum := ethBlock.Num wDelayerData := common.NewWDelayerData() // Get wDelayer events in the block @@ -998,7 +1036,8 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat Amount: evt.Amount, }) wDelayerData.DepositsByTxHash[evt.TxHash] = - &wDelayerData.Deposits[len(wDelayerData.Deposits)-1] + append(wDelayerData.DepositsByTxHash[evt.TxHash], + &wDelayerData.Deposits[len(wDelayerData.Deposits)-1]) } for _, evt := range wDelayerEvents.Withdraw { wDelayerData.Withdrawals = append(wDelayerData.Withdrawals, common.WDelayerTransfer{ diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index 9a33ce8..6ac8b2b 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -47,7 +47,7 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc require.Nil(t, err) dbBlocks = dbBlocks[1:] // ignore block 0, added by default in the DB assert.Equal(t, blockNum, len(dbBlocks)) - assert.Equal(t, int64(blockNum), dbBlocks[blockNum-1].EthBlockNum) + assert.Equal(t, int64(blockNum), dbBlocks[blockNum-1].Num) assert.NotEqual(t, dbBlocks[blockNum-1].Hash, dbBlocks[blockNum-2].Hash) assert.Greater(t, dbBlocks[blockNum-1].Timestamp.Unix(), dbBlocks[blockNum-2].Timestamp.Unix()) @@ -60,7 +60,7 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc dbToken := dbTokens[i] syncToken := syncBlock.Rollup.AddedTokens[i] - assert.Equal(t, block.Block.EthBlockNum, syncToken.EthBlockNum) + assert.Equal(t, block.Block.Num, syncToken.EthBlockNum) assert.Equal(t, token.TokenID, syncToken.TokenID) assert.Equal(t, token.EthAddr, syncToken.EthAddr) tokenConst := tokenConsts[token.TokenID] @@ -321,17 +321,36 @@ func TestSync(t *testing.T) { // // First Sync from an initial state // + var vars struct { + Rollup *common.RollupVariables + Auction *common.AuctionVariables + WDelayer *common.WDelayerVariables + } + stats := s.Stats() + assert.Equal(t, false, stats.Synced()) // Test Sync for rollup genesis block syncBlock, discards, err := s.Sync2(ctx, nil) require.Nil(t, err) require.Nil(t, discards) require.NotNil(t, syncBlock) - assert.Equal(t, int64(1), syncBlock.Block.EthBlockNum) + require.Nil(t, syncBlock.Rollup.Vars) + require.Nil(t, syncBlock.Auction.Vars) + require.Nil(t, syncBlock.WDelayer.Vars) + assert.Equal(t, int64(1), syncBlock.Block.Num) + stats = s.Stats() + assert.Equal(t, int64(1), stats.Eth.FirstBlockNum) + assert.Equal(t, int64(1), stats.Eth.LastBlock.Num) + assert.Equal(t, int64(1), stats.Sync.LastBlock.Num) + vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars() + assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) + dbBlocks, err := s.historyDB.GetAllBlocks() require.Nil(t, err) assert.Equal(t, 2, len(dbBlocks)) - assert.Equal(t, int64(1), dbBlocks[1].EthBlockNum) + assert.Equal(t, int64(1), dbBlocks[1].Num) // Sync again and expect no new blocks syncBlock, discards, err = s.Sync2(ctx, nil) @@ -388,14 +407,14 @@ func TestSync(t *testing.T) { require.Equal(t, 2, len(blocks)) // blocks 0 (blockNum=2) i := 0 - require.Equal(t, 2, int(blocks[i].Block.EthBlockNum)) + require.Equal(t, 2, int(blocks[i].Block.Num)) require.Equal(t, 3, len(blocks[i].Rollup.AddedTokens)) require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs)) require.Equal(t, 2, len(blocks[i].Rollup.Batches)) require.Equal(t, 2, len(blocks[i].Rollup.Batches[0].L1CoordinatorTxs)) // blocks 1 (blockNum=3) i = 1 - require.Equal(t, 3, int(blocks[i].Block.EthBlockNum)) + require.Equal(t, 3, int(blocks[i].Block.Num)) require.Equal(t, 4, len(blocks[i].Rollup.L1UserTxs)) require.Equal(t, 2, len(blocks[i].Rollup.Batches)) require.Equal(t, 3, len(blocks[i].Rollup.Batches[0].L2Txs)) @@ -421,7 +440,14 @@ func TestSync(t *testing.T) { require.Nil(t, err) require.Nil(t, discards) require.NotNil(t, syncBlock) - assert.Equal(t, int64(2), syncBlock.Block.EthBlockNum) + assert.Nil(t, syncBlock.Rollup.Vars) + assert.Nil(t, syncBlock.Auction.Vars) + assert.Nil(t, syncBlock.WDelayer.Vars) + assert.Equal(t, int64(2), syncBlock.Block.Num) + stats = s.Stats() + assert.Equal(t, int64(1), stats.Eth.FirstBlockNum) + assert.Equal(t, int64(3), stats.Eth.LastBlock.Num) + assert.Equal(t, int64(2), stats.Sync.LastBlock.Num) checkSyncBlock(t, s, 2, &blocks[0], syncBlock) @@ -431,7 +457,14 @@ func TestSync(t *testing.T) { require.Nil(t, err) require.Nil(t, discards) require.NotNil(t, syncBlock) - assert.Equal(t, int64(3), syncBlock.Block.EthBlockNum) + assert.Nil(t, syncBlock.Rollup.Vars) + assert.Nil(t, syncBlock.Auction.Vars) + assert.Nil(t, syncBlock.WDelayer.Vars) + assert.Equal(t, int64(3), syncBlock.Block.Num) + stats = s.Stats() + assert.Equal(t, int64(1), stats.Eth.FirstBlockNum) + assert.Equal(t, int64(3), stats.Eth.LastBlock.Num) + assert.Equal(t, int64(3), stats.Sync.LastBlock.Num) checkSyncBlock(t, s, 3, &blocks[1], syncBlock) @@ -447,7 +480,19 @@ func TestSync(t *testing.T) { require.Nil(t, err) require.Nil(t, discards) require.NotNil(t, syncBlock) - assert.Equal(t, int64(4), syncBlock.Block.EthBlockNum) + assert.Nil(t, syncBlock.Rollup.Vars) + assert.Nil(t, syncBlock.Auction.Vars) + assert.Nil(t, syncBlock.WDelayer.Vars) + assert.Equal(t, int64(4), syncBlock.Block.Num) + stats = s.Stats() + assert.Equal(t, int64(1), stats.Eth.FirstBlockNum) + assert.Equal(t, int64(4), stats.Eth.LastBlock.Num) + assert.Equal(t, int64(4), stats.Sync.LastBlock.Num) + vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars() + assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) + dbExits, err := s.historyDB.GetAllExits() require.Nil(t, err) foundA1, foundC1 := false, false @@ -486,14 +531,25 @@ func TestSync(t *testing.T) { require.Nil(t, err) require.Nil(t, discards) require.NotNil(t, syncBlock) - assert.Equal(t, int64(5), syncBlock.Block.EthBlockNum) + assert.NotNil(t, syncBlock.Rollup.Vars) + assert.NotNil(t, syncBlock.Auction.Vars) + assert.NotNil(t, syncBlock.WDelayer.Vars) + assert.Equal(t, int64(5), syncBlock.Block.Num) + stats = s.Stats() + assert.Equal(t, int64(1), stats.Eth.FirstBlockNum) + assert.Equal(t, int64(5), stats.Eth.LastBlock.Num) + assert.Equal(t, int64(5), stats.Sync.LastBlock.Num) + vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars() + assert.NotEqual(t, clientSetup.RollupVariables, vars.Rollup) + assert.NotEqual(t, clientSetup.AuctionVariables, vars.Auction) + assert.NotEqual(t, clientSetup.WDelayerVariables, vars.WDelayer) dbRollupVars, dbAuctionVars, dbWDelayerVars, err := s.historyDB.GetSCVars() require.Nil(t, err) // Set EthBlockNum for Vars to the blockNum in which they were updated (should be 5) - rollupVars.EthBlockNum = syncBlock.Block.EthBlockNum - auctionVars.EthBlockNum = syncBlock.Block.EthBlockNum - wDelayerVars.EthBlockNum = syncBlock.Block.EthBlockNum + rollupVars.EthBlockNum = syncBlock.Block.Num + auctionVars.EthBlockNum = syncBlock.Block.Num + wDelayerVars.EthBlockNum = syncBlock.Block.Num assert.Equal(t, rollupVars, dbRollupVars) assert.Equal(t, auctionVars, dbAuctionVars) assert.Equal(t, wDelayerVars, dbWDelayerVars) @@ -535,8 +591,8 @@ func TestSync(t *testing.T) { for i := 0; i < 4; i++ { client.CtlRollback() } - blockNum := client.CtlLastBlock() - require.Equal(t, int64(1), blockNum) + block := client.CtlLastBlock() + require.Equal(t, int64(1), block.Num) // Generate extra required data ethAddTokens(blocks, client) @@ -554,11 +610,18 @@ func TestSync(t *testing.T) { expetedDiscards := int64(4) require.Equal(t, &expetedDiscards, discards) require.Nil(t, syncBlock) + stats = s.Stats() + assert.Equal(t, false, stats.Synced()) + assert.Equal(t, int64(6), stats.Eth.LastBlock.Num) + vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars() + assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) // At this point, the DB only has data up to block 1 dbBlock, err := s.historyDB.GetLastBlock() require.Nil(t, err) - assert.Equal(t, int64(1), dbBlock.EthBlockNum) + assert.Equal(t, int64(1), dbBlock.Num) // Accounts in HistoryDB and StateDB must be empty dbAccounts, err := s.historyDB.GetAllAccounts() @@ -574,12 +637,30 @@ func TestSync(t *testing.T) { require.Nil(t, err) require.Nil(t, discards) require.NotNil(t, syncBlock) - assert.Equal(t, int64(2+i), syncBlock.Block.EthBlockNum) + assert.Nil(t, syncBlock.Rollup.Vars) + assert.Nil(t, syncBlock.Auction.Vars) + assert.Nil(t, syncBlock.WDelayer.Vars) + assert.Equal(t, int64(2+i), syncBlock.Block.Num) + + stats = s.Stats() + assert.Equal(t, int64(1), stats.Eth.FirstBlockNum) + assert.Equal(t, int64(6), stats.Eth.LastBlock.Num) + assert.Equal(t, int64(2+i), stats.Sync.LastBlock.Num) + if i == 4 { + assert.Equal(t, true, stats.Synced()) + } else { + assert.Equal(t, false, stats.Synced()) + } + + vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars() + assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) } dbBlock, err = s.historyDB.GetLastBlock() require.Nil(t, err) - assert.Equal(t, int64(6), dbBlock.EthBlockNum) + assert.Equal(t, int64(6), dbBlock.Num) // Accounts in HistoryDB and StateDB is only 2 entries dbAccounts, err = s.historyDB.GetAllAccounts() diff --git a/test/ethclient.go b/test/ethclient.go index 23d8a4f..74f2b24 100644 --- a/test/ethclient.go +++ b/test/ethclient.go @@ -553,10 +553,17 @@ func (c *Client) CtlRollback() { // // CtlLastBlock returns the last blockNum without checks -func (c *Client) CtlLastBlock() int64 { +func (c *Client) CtlLastBlock() *common.Block { c.rw.RLock() defer c.rw.RUnlock() - return c.blockNum + + block := c.blocks[c.blockNum] + return &common.Block{ + Num: c.blockNum, + Timestamp: time.Unix(block.Eth.Time, 0), + Hash: block.Eth.Hash, + ParentHash: block.Eth.ParentHash, + } } // EthLastBlock returns the last blockNum @@ -626,7 +633,7 @@ func (c *Client) EthERC20Consts(tokenAddr ethCommon.Address) (*eth.ERC20Consts, // } // EthBlockByNumber returns the *common.Block for the given block number in a -// deterministic way. +// deterministic way. If number == -1, the latests known block is returned. func (c *Client) EthBlockByNumber(ctx context.Context, blockNum int64) (*common.Block, error) { c.rw.RLock() defer c.rw.RUnlock() @@ -634,12 +641,15 @@ func (c *Client) EthBlockByNumber(ctx context.Context, blockNum int64) (*common. if blockNum > c.blockNum { return nil, ethereum.NotFound } + if blockNum == -1 { + blockNum = c.blockNum + } block := c.blocks[blockNum] return &common.Block{ - EthBlockNum: blockNum, - Timestamp: time.Unix(block.Eth.Time, 0), - Hash: block.Eth.Hash, - ParentHash: block.Eth.ParentHash, + Num: blockNum, + Timestamp: time.Unix(block.Eth.Time, 0), + Hash: block.Eth.Hash, + ParentHash: block.Eth.ParentHash, }, nil } diff --git a/test/ethclient_test.go b/test/ethclient_test.go index fe7f02b..47c3b33 100644 --- a/test/ethclient_test.go +++ b/test/ethclient_test.go @@ -46,7 +46,7 @@ func TestClientEth(t *testing.T) { block, err := c.EthBlockByNumber(context.TODO(), 0) require.Nil(t, err) - assert.Equal(t, int64(0), block.EthBlockNum) + assert.Equal(t, int64(0), block.Num) assert.Equal(t, time.Unix(0, 0), block.Timestamp) assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000000000000", block.Hash.Hex()) @@ -60,7 +60,7 @@ func TestClientEth(t *testing.T) { block, err = c.EthBlockByNumber(context.TODO(), 2) require.Nil(t, err) - assert.Equal(t, int64(2), block.EthBlockNum) + assert.Equal(t, int64(2), block.Num) assert.Equal(t, time.Unix(2, 0), block.Timestamp) // Add a token diff --git a/test/historydb.go b/test/historydb.go index 38898bb..2a35516 100644 --- a/test/historydb.go +++ b/test/historydb.go @@ -15,7 +15,7 @@ import ( // Block0 represents Ethereum's genesis block, // which is stored by default at HistoryDB var Block0 common.Block = common.Block{ - EthBlockNum: 0, + Num: 0, Hash: ethCommon.Hash([32]byte{ 212, 229, 103, 64, 248, 118, 174, 248, 192, 16, 184, 106, 64, 213, 245, 103, @@ -44,7 +44,7 @@ func GenBlocks(from, to int64) []common.Block { var blocks []common.Block for i := from; i < to; i++ { blocks = append(blocks, common.Block{ - EthBlockNum: i, + Num: i, //nolint:gomnd Timestamp: time.Now().Add(time.Second * 13).UTC(), Hash: ethCommon.BigToHash(big.NewInt(int64(i))), @@ -62,7 +62,7 @@ func GenTokens(nTokens int, blocks []common.Block) (tokensToAddInDB []common.Tok Name: "NAME" + fmt.Sprint(i), Symbol: fmt.Sprint(i), Decimals: uint64(i + 1), - EthBlockNum: blocks[i%len(blocks)].EthBlockNum, + EthBlockNum: blocks[i%len(blocks)].Num, EthAddr: ethCommon.BigToAddress(big.NewInt(int64(i))), } tokensToAddInDB = append(tokensToAddInDB, token) @@ -87,7 +87,7 @@ func GenBatches(nBatches int, blocks []common.Block) []common.Batch { for i := 0; i < nBatches; i++ { batch := common.Batch{ BatchNum: common.BatchNum(i + 1), - EthBlockNum: blocks[i%len(blocks)].EthBlockNum, + EthBlockNum: blocks[i%len(blocks)].Num, //nolint:gomnd ForgerAddr: ethCommon.BigToAddress(big.NewInt(6886723)), CollectedFees: collectedFees, @@ -161,7 +161,7 @@ func GenL1Txs( TokenID: token.TokenID, Amount: amount, LoadAmount: amount, - EthBlockNum: blocks[i%len(blocks)].EthBlockNum, + EthBlockNum: blocks[i%len(blocks)].Num, } if tx.UserOrigin { n := nextTxsNum @@ -287,7 +287,7 @@ func GenL2Txs( Amount: amount, Fee: fee, Nonce: common.Nonce(i + 1), - EthBlockNum: blocks[i%len(blocks)].EthBlockNum, + EthBlockNum: blocks[i%len(blocks)].Num, Type: randomTxType(i), } if i < nUserTxs { @@ -341,7 +341,7 @@ func GenCoordinators(nCoords int, blocks []common.Block) []common.Coordinator { coords := []common.Coordinator{} for i := 0; i < nCoords; i++ { coords = append(coords, common.Coordinator{ - EthBlockNum: blocks[i%len(blocks)].EthBlockNum, + EthBlockNum: blocks[i%len(blocks)].Num, Forger: ethCommon.BigToAddress(big.NewInt(int64(i))), Bidder: ethCommon.BigToAddress(big.NewInt(int64(i))), URL: "https://foo.bar", @@ -363,7 +363,7 @@ func GenBids(nBids int, blocks []common.Block, coords []common.Coordinator) []co bids = append(bids, common.Bid{ SlotNum: slotNum, BidValue: big.NewInt(int64(i)), - EthBlockNum: blocks[i%len(blocks)].EthBlockNum, + EthBlockNum: blocks[i%len(blocks)].Num, Bidder: coords[i%len(blocks)].Bidder, }) } @@ -397,13 +397,13 @@ func GenExitTree(n int, batches []common.Batch, accounts []common.Account, block Balance: big.NewInt(int64(i) * 1000), } if i%2 == 0 { - instant := int64(blocks[i%len(blocks)].EthBlockNum) + instant := int64(blocks[i%len(blocks)].Num) exitTree[i].InstantWithdrawn = &instant } else if i%3 == 0 { - delayedReq := int64(blocks[i%len(blocks)].EthBlockNum) + delayedReq := int64(blocks[i%len(blocks)].Num) exitTree[i].DelayedWithdrawRequest = &delayedReq if i%9 == 0 { - delayed := int64(blocks[i%len(blocks)].EthBlockNum) + delayed := int64(blocks[i%len(blocks)].Num) exitTree[i].DelayedWithdrawn = &delayed } } diff --git a/test/til/txs.go b/test/til/txs.go index b9a090c..b9cff9f 100644 --- a/test/til/txs.go +++ b/test/til/txs.go @@ -31,7 +31,7 @@ func newBatchData(batchNum int) common.BatchData { func newBlock(blockNum int64) common.BlockData { return common.BlockData{ Block: common.Block{ - EthBlockNum: blockNum, + Num: blockNum, }, Rollup: common.RollupData{ L1UserTxs: []common.L1Tx{}, @@ -711,7 +711,7 @@ func (tc *Context) FillBlocksExtra(blocks []common.BlockData, cfg *ConfigExtra) block := &blocks[i] for j := range block.Rollup.Batches { batch := &block.Rollup.Batches[j] - batch.Batch.EthBlockNum = block.Block.EthBlockNum + batch.Batch.EthBlockNum = block.Block.Num // til doesn't fill the batch forger addr batch.Batch.ForgerAddr = cfg.BootCoordAddr if batch.L1Batch { diff --git a/txselector/txselector_test.go b/txselector/txselector_test.go index 51ac0b0..75d3e83 100644 --- a/txselector/txselector_test.go +++ b/txselector/txselector_test.go @@ -51,7 +51,7 @@ func addTokens(t *testing.T, tokens []common.Token, db *sqlx.DB) { hdb := historydb.NewHistoryDB(db) test.WipeDB(hdb.DB()) assert.Nil(t, hdb.AddBlock(&common.Block{ - EthBlockNum: 1, + Num: 1, })) assert.Nil(t, hdb.AddTokens(tokens)) }