From 6c0d48f4a227331fc6dfe7499712905e9a8e8a84 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 11 Nov 2020 18:15:25 +0100 Subject: [PATCH] Add Sync stats, and report them in DebugAPI --- cli/node/cfg.buidler.toml | 1 + common/ethauction.go | 9 ++ common/ethrollup.go | 6 ++ common/ethwdelayer.go | 6 ++ config/config.go | 7 +- eth/rollup.go | 22 ++-- node/node.go | 8 +- synchronizer/synchronizer.go | 182 +++++++++++++++++++++++++++++++-- test/debugapi/debugapi.go | 14 ++- test/debugapi/debugapi_test.go | 3 +- test/ethclient.go | 10 ++ 11 files changed, 244 insertions(+), 24 deletions(-) diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index ba0bd5c..f31994f 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -21,6 +21,7 @@ URL = "http://localhost:8545" [Synchronizer] SyncLoopInterval = "1s" +StatsRefreshPeriod = "1s" [Synchronizer.StartBlockNum] Rollup = 1 diff --git a/common/ethauction.go b/common/ethauction.go index f9171a9..edfd2d1 100644 --- a/common/ethauction.go +++ b/common/ethauction.go @@ -43,3 +43,12 @@ type AuctionVariables struct { // Number of blocks at the end of a slot in which any coordinator can forge if the winner has not forged one before SlotDeadline uint8 `json:"slotDeadline" meddler:"slot_deadline" validate:"required"` } + +// Copy returns a deep copy of the Variables +func (v *AuctionVariables) Copy() *AuctionVariables { + vCpy := *v + for i := range v.DefaultSlotSetBid { + vCpy.DefaultSlotSetBid[i] = new(big.Int).SetBytes(v.DefaultSlotSetBid[i].Bytes()) + } + return &vCpy +} diff --git a/common/ethrollup.go b/common/ethrollup.go index 3b84f3a..76abdff 100644 --- a/common/ethrollup.go +++ b/common/ethrollup.go @@ -163,3 +163,9 @@ type RollupVariables struct { WithdrawalDelay uint64 `json:"withdrawalDelay" meddler:"withdrawal_delay" validate:"required"` Buckets [RollupConstNumBuckets]Bucket `json:"buckets" meddler:"buckets,json"` } + +// Copy returns a deep copy of the Variables +func (v *RollupVariables) Copy() *RollupVariables { + vCpy := *v + return &vCpy +} diff --git a/common/ethwdelayer.go b/common/ethwdelayer.go index b07348b..82eaafd 100644 --- a/common/ethwdelayer.go +++ b/common/ethwdelayer.go @@ -23,3 +23,9 @@ type WDelayerVariables struct { EmergencyModeStartingTime uint64 `json:"emergencyModeStartingTime" meddler:"emergency_start_time"` EmergencyMode bool `json:"emergencyMode" meddler:"emergency_mode"` } + +// Copy returns a deep copy of the Variables +func (v *WDelayerVariables) Copy() *WDelayerVariables { + vCpy := *v + return &vCpy +} diff --git a/config/config.go b/config/config.go index b9b08e3..8a420ce 100644 --- a/config/config.go +++ b/config/config.go @@ -66,9 +66,10 @@ type Node struct { URL string `validate:"required"` } `validate:"required"` Synchronizer struct { - SyncLoopInterval Duration `validate:"required"` - StartBlockNum synchronizer.ConfigStartBlockNum `validate:"required"` - InitialVariables synchronizer.SCVariables `validate:"required"` + SyncLoopInterval Duration `validate:"required"` + StatsRefreshPeriod Duration `validate:"required"` + StartBlockNum synchronizer.ConfigStartBlockNum `validate:"required"` + InitialVariables synchronizer.SCVariables `validate:"required"` } `validate:"required"` SmartContracts struct { Rollup ethCommon.Address `validate:"required"` diff --git a/eth/rollup.go b/eth/rollup.go index bbe9739..ba1be96 100644 --- a/eth/rollup.go +++ b/eth/rollup.go @@ -172,6 +172,7 @@ type RollupInterface interface { // Viewers RollupRegisterTokensCount() (*big.Int, error) + RollupLastForgedBatch() (int64, error) // // Smart Contract Status @@ -400,14 +401,9 @@ func (c *RollupClient) RollupL1UserTxERC20Permit(fromBJJ *babyjub.PublicKey, fro } // RollupRegisterTokensCount is the interface to call the smart contract function -func (c *RollupClient) RollupRegisterTokensCount() (*big.Int, error) { - var registerTokensCount *big.Int +func (c *RollupClient) RollupRegisterTokensCount() (registerTokensCount *big.Int, err error) { if err := c.client.Call(func(ec *ethclient.Client) error { - hermez, err := Hermez.NewHermez(c.address, ec) - if err != nil { - return err - } - registerTokensCount, err = hermez.RegisterTokensCount(nil) + registerTokensCount, err = c.hermez.RegisterTokensCount(nil) return err }); err != nil { return nil, err @@ -415,6 +411,18 @@ func (c *RollupClient) RollupRegisterTokensCount() (*big.Int, error) { return registerTokensCount, nil } +// RollupLastForgedBatch is the interface to call the smart contract function +func (c *RollupClient) RollupLastForgedBatch() (lastForgedBatch int64, err error) { + if err := c.client.Call(func(ec *ethclient.Client) error { + _lastForgedBatch, err := c.hermez.LastForgedBatch(nil) + lastForgedBatch = int64(_lastForgedBatch) + return err + }); err != nil { + return 0, err + } + return lastForgedBatch, nil +} + // RollupUpdateForgeL1L2BatchTimeout is the interface to call the smart contract function func (c *RollupClient) RollupUpdateForgeL1L2BatchTimeout(newForgeL1L2BatchTimeout int64) (tx *types.Transaction, err error) { if tx, err = c.client.CallAuth( diff --git a/node/node.go b/node/node.go index 8a1fd15..652effc 100644 --- a/node/node.go +++ b/node/node.go @@ -114,8 +114,9 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, } sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB, synchronizer.Config{ - StartBlockNum: cfg.Synchronizer.StartBlockNum, - InitialVariables: cfg.Synchronizer.InitialVariables, + StartBlockNum: cfg.Synchronizer.StartBlockNum, + InitialVariables: cfg.Synchronizer.InitialVariables, + StatsRefreshPeriod: cfg.Synchronizer.StatsRefreshPeriod.Duration, }) if err != nil { return nil, err @@ -159,9 +160,8 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, ) } var debugAPI *debugapi.DebugAPI - println("apiaddr", cfg.Debug.APIAddress) if cfg.Debug.APIAddress != "" { - debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB) + debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync) } ctx, cancel := context.WithCancel(context.Background()) return &Node{ diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index b32d1cc..9b62d23 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -5,6 +5,8 @@ import ( "database/sql" "fmt" "math/big" + "sync" + "time" "github.com/ethereum/go-ethereum" "github.com/hermeznetwork/hermez-node/common" @@ -39,6 +41,93 @@ var ( // Synchronized bool // } +// Stats of the syncrhonizer +type Stats struct { + Eth struct { + RefreshPeriod time.Duration + Updated time.Time + FirstBlock int64 + LastBlock int64 + LastBatch int64 + } + Sync struct { + Updated time.Time + LastBlock int64 + LastBatch int64 + } +} + +// StatsHolder stores stats and that allows reading and writing them +// concurrently +type StatsHolder struct { + stats Stats + rw sync.RWMutex +} + +// NewStatsHolder creates a new StatsHolder +func NewStatsHolder(firstBlock int64, refreshPeriod time.Duration) *StatsHolder { + stats := Stats{} + stats.Eth.RefreshPeriod = refreshPeriod + stats.Eth.FirstBlock = firstBlock + return &StatsHolder{stats: stats} +} + +// UpdateSync updates the synchronizer stats +func (s *StatsHolder) UpdateSync(lastBlock int64, lastBatch *common.BatchNum) { + now := time.Now() + s.rw.Lock() + s.stats.Sync.LastBlock = lastBlock + if lastBatch != nil { + s.stats.Sync.LastBatch = int64(*lastBatch) + } + s.stats.Sync.Updated = now + s.rw.Unlock() +} + +// UpdateEth updates the ethereum stats, only if the previous stats expired +func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error { + now := time.Now() + s.rw.RLock() + elapsed := now.Sub(s.stats.Eth.Updated) + s.rw.RUnlock() + if elapsed < s.stats.Eth.RefreshPeriod { + return nil + } + + lastBlock, err := ethClient.EthCurrentBlock() + if err != nil { + return err + } + lastBatch, err := ethClient.RollupLastForgedBatch() + if err != nil { + return err + } + s.rw.Lock() + s.stats.Eth.Updated = now + s.stats.Eth.LastBlock = lastBlock + s.stats.Eth.LastBatch = lastBatch + s.rw.Unlock() + return nil +} + +// CopyStats returns a copy of the inner Stats +func (s *StatsHolder) CopyStats() *Stats { + s.rw.RLock() + sCopy := s.stats + s.rw.RUnlock() + return &sCopy +} + +func (s *StatsHolder) blocksPerc() float64 { + return float64(s.stats.Sync.LastBlock-s.stats.Eth.FirstBlock) * 100.0 / + float64(s.stats.Eth.LastBlock-s.stats.Eth.FirstBlock) +} + +func (s *StatsHolder) batchesPerc(batchNum int64) float64 { + return float64(batchNum) * 100.0 / + float64(s.stats.Eth.LastBatch) +} + // ConfigStartBlockNum sets the first block used to start tracking the smart // contracts type ConfigStartBlockNum struct { @@ -56,8 +145,9 @@ type SCVariables struct { // Config is the Synchronizer configuration type Config struct { - StartBlockNum ConfigStartBlockNum - InitialVariables SCVariables + StartBlockNum ConfigStartBlockNum + InitialVariables SCVariables + StatsRefreshPeriod time.Duration } // Synchronizer implements the Synchronizer type @@ -71,6 +161,7 @@ type Synchronizer struct { cfg Config startBlockNum int64 vars SCVariables + stats *StatsHolder // firstSavedBlock *common.Block // mux sync.Mutex } @@ -103,6 +194,7 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History if startBlockNum < cfg.StartBlockNum.WDelayer { startBlockNum = cfg.StartBlockNum.WDelayer } + stats := NewStatsHolder(startBlockNum, cfg.StatsRefreshPeriod) s := &Synchronizer{ ethClient: ethClient, auctionConstants: *auctionConstants, @@ -112,10 +204,16 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History stateDB: stateDB, cfg: cfg, startBlockNum: startBlockNum, + stats: stats, } return s, s.init() } +// Stats returns a copy of the Synchronizer Stats +func (s *Synchronizer) Stats() *Stats { + return s.stats.CopyStats() +} + // AuctionConstants returns the AuctionConstants read from the smart contract func (s *Synchronizer) AuctionConstants() *common.AuctionConstants { return &s.auctionConstants @@ -133,11 +231,13 @@ func (s *Synchronizer) WDelayerConstants() *common.WDelayerConstants { func (s *Synchronizer) init() error { rollup, auction, wDelayer, err := s.historyDB.GetSCVars() + // If SCVars are not in the HistoryDB, this is probably the first run + // of the Synchronizer: store the initial vars taken from config if err == sql.ErrNoRows { rollup = &s.cfg.InitialVariables.Rollup auction = &s.cfg.InitialVariables.Auction wDelayer = &s.cfg.InitialVariables.WDelayer - log.Debug("Setting initial SCVars in HistoryDB") + log.Info("Setting initial SCVars in HistoryDB") if err = s.historyDB.SetInitialSCVars(rollup, auction, wDelayer); err != nil { return err } @@ -145,6 +245,47 @@ func (s *Synchronizer) init() error { s.vars.Rollup = *rollup s.vars.Auction = *auction s.vars.WDelayer = *wDelayer + + // Update stats parameters so that they have valid values before the + // first Sync call + if err := s.stats.UpdateEth(s.ethClient); err != nil { + return err + } + var lastBlockNum int64 + lastSavedBlock, err := s.historyDB.GetLastBlock() + if err != nil && err != sql.ErrNoRows { + return err + } + // If there's no block in the DB (or we only have the default block 0), + // make sure that the stateDB is clean + if err == sql.ErrNoRows || lastSavedBlock.EthBlockNum == 0 { + if err := s.stateDB.Reset(0); err != nil { + return err + } + } else { + lastBlockNum = lastSavedBlock.EthBlockNum + } + lastBatchNum, err := s.historyDB.GetLastBatchNum() + if err != nil && err != sql.ErrNoRows { + return err + } + if err == sql.ErrNoRows { + lastBatchNum = 0 + } + + s.stats.UpdateSync(lastBlockNum, &lastBatchNum) + + log.Infow("Sync init block", + "syncLastBlock", s.stats.stats.Sync.LastBlock, + "syncBlocksPerc", s.stats.blocksPerc(), + "ethFirstBlock", s.stats.stats.Eth.FirstBlock, + "ethLastBlock", s.stats.stats.Eth.LastBlock, + ) + log.Infow("Sync init batch", + "syncLastBatch", s.stats.stats.Sync.LastBatch, + "syncBatchesPerc", s.stats.batchesPerc(s.stats.stats.Sync.LastBatch), + "ethLastBatch", s.stats.stats.Eth.LastBatch, + ) return nil } @@ -182,7 +323,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) } log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", ethBlock.EthBlockNum, ethBlock.ParentHash.String(), ethBlock.Hash.String()) - log.Debugw("Syncing...", "block", nextBlockNum) + if err := s.stats.UpdateEth(s.ethClient); err != nil { + return nil, nil, err + } + + log.Debugw("Syncing...", + "block", nextBlockNum, + "ethLastBlock", s.stats.stats.Eth.LastBlock, + ) // Check that the obtianed ethBlock.ParentHash == prevEthBlock.Hash; if not, reorg! if lastSavedBlock != nil { @@ -249,6 +397,26 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) return nil, nil, err } + batchesLen := len(rollupData.Batches) + if batchesLen == 0 { + s.stats.UpdateSync(ethBlock.EthBlockNum, nil) + } else { + s.stats.UpdateSync(ethBlock.EthBlockNum, + &rollupData.Batches[batchesLen-1].Batch.BatchNum) + } + log.Debugw("Synced block", + "syncLastBlock", s.stats.stats.Sync.LastBlock, + "syncBlocksPerc", s.stats.blocksPerc(), + "ethLastBlock", s.stats.stats.Eth.LastBlock, + ) + for _, batchData := range rollupData.Batches { + log.Debugw("Synced batch", + "syncLastBatch", batchData.Batch.BatchNum, + "syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)), + "ethLastBatch", s.stats.stats.Eth.LastBatch, + ) + } + return &blockData, nil, nil } @@ -566,7 +734,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e if varsUpdate { s.vars.Rollup.EthBlockNum = blockNum - rollupData.Vars = &s.vars.Rollup + rollupData.Vars = s.vars.Rollup.Copy() } return &rollupData, nil @@ -665,7 +833,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData, if varsUpdate { s.vars.Auction.EthBlockNum = blockNum - auctionData.Vars = &s.vars.Auction + auctionData.Vars = s.vars.Auction.Copy() } return &auctionData, nil @@ -734,7 +902,7 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat if varsUpdate { s.vars.WDelayer.EthBlockNum = blockNum - wDelayerData.Vars = &s.vars.WDelayer + wDelayerData.Vars = s.vars.WDelayer.Copy() } return &wDelayerData, nil diff --git a/test/debugapi/debugapi.go b/test/debugapi/debugapi.go index 7fcf10e..a3c1b76 100644 --- a/test/debugapi/debugapi.go +++ b/test/debugapi/debugapi.go @@ -10,6 +10,7 @@ import ( "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/synchronizer" ) func handleNoRoute(c *gin.Context) { @@ -33,13 +34,15 @@ func badReq(err error, c *gin.Context) { type DebugAPI struct { addr string stateDB *statedb.StateDB // synchronizer statedb + sync *synchronizer.Synchronizer } // NewDebugAPI creates a new DebugAPI -func NewDebugAPI(addr string, stateDB *statedb.StateDB) *DebugAPI { +func NewDebugAPI(addr string, stateDB *statedb.StateDB, sync *synchronizer.Synchronizer) *DebugAPI { return &DebugAPI{ - stateDB: stateDB, addr: addr, + stateDB: stateDB, + sync: sync, } } @@ -82,6 +85,11 @@ func (a *DebugAPI) handleMTRoot(c *gin.Context) { c.JSON(http.StatusOK, root) } +func (a *DebugAPI) handleSyncStats(c *gin.Context) { + stats := a.sync.Stats() + c.JSON(http.StatusOK, stats) +} + // Run starts the http server of the DebugAPI. To stop it, pass a context with // cancelation (see `debugapi_test.go` for an example). func (a *DebugAPI) Run(ctx context.Context) error { @@ -98,6 +106,8 @@ func (a *DebugAPI) Run(ctx context.Context) error { debugAPI.GET("sdb/accounts", a.handleAccounts) debugAPI.GET("sdb/accounts/:Idx", a.handleAccount) + debugAPI.GET("sync/stats", a.handleSyncStats) + debugAPIServer := &http.Server{ Addr: a.addr, Handler: api, diff --git a/test/debugapi/debugapi_test.go b/test/debugapi/debugapi_test.go index 2748893..92033b5 100644 --- a/test/debugapi/debugapi_test.go +++ b/test/debugapi/debugapi_test.go @@ -50,7 +50,8 @@ func TestDebugAPI(t *testing.T) { require.Nil(t, err) addr := "localhost:12345" - debugAPI := NewDebugAPI(addr, sdb) + // We won't test the sync/stats endpoint, so we can se the Syncrhonizer to nil + debugAPI := NewDebugAPI(addr, sdb, nil) ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/test/ethclient.go b/test/ethclient.go index b0a7b41..ad5f5bf 100644 --- a/test/ethclient.go +++ b/test/ethclient.go @@ -762,6 +762,16 @@ func (c *Client) RollupRegisterTokensCount() (*big.Int, error) { return nil, errTODO } +// RollupLastForgedBatch is the interface to call the smart contract function +func (c *Client) RollupLastForgedBatch() (int64, error) { + c.rw.RLock() + defer c.rw.RUnlock() + + currentBlock := c.currentBlock() + e := currentBlock.Rollup + return int64(len(e.State.ExitRoots)) - 1, nil +} + // RollupWithdrawCircuit is the interface to call the smart contract function func (c *Client) RollupWithdrawCircuit(proofA, proofC [2]*big.Int, proofB [2][2]*big.Int, tokenID uint32, numExitRoot, idx int64, amount *big.Int, instantWithdraw bool) (*types.Transaction, error) { log.Error("TODO")