Browse Source

Add Sync stats, and report them in DebugAPI

feature/sql-semaphore1
Eduard S 4 years ago
parent
commit
6c0d48f4a2
11 changed files with 244 additions and 24 deletions
  1. +1
    -0
      cli/node/cfg.buidler.toml
  2. +9
    -0
      common/ethauction.go
  3. +6
    -0
      common/ethrollup.go
  4. +6
    -0
      common/ethwdelayer.go
  5. +4
    -3
      config/config.go
  6. +15
    -7
      eth/rollup.go
  7. +4
    -4
      node/node.go
  8. +175
    -7
      synchronizer/synchronizer.go
  9. +12
    -2
      test/debugapi/debugapi.go
  10. +2
    -1
      test/debugapi/debugapi_test.go
  11. +10
    -0
      test/ethclient.go

+ 1
- 0
cli/node/cfg.buidler.toml

@ -21,6 +21,7 @@ URL = "http://localhost:8545"
[Synchronizer] [Synchronizer]
SyncLoopInterval = "1s" SyncLoopInterval = "1s"
StatsRefreshPeriod = "1s"
[Synchronizer.StartBlockNum] [Synchronizer.StartBlockNum]
Rollup = 1 Rollup = 1

+ 9
- 0
common/ethauction.go

@ -43,3 +43,12 @@ type AuctionVariables struct {
// Number of blocks at the end of a slot in which any coordinator can forge if the winner has not forged one before // Number of blocks at the end of a slot in which any coordinator can forge if the winner has not forged one before
SlotDeadline uint8 `json:"slotDeadline" meddler:"slot_deadline" validate:"required"` SlotDeadline uint8 `json:"slotDeadline" meddler:"slot_deadline" validate:"required"`
} }
// Copy returns a deep copy of the Variables
func (v *AuctionVariables) Copy() *AuctionVariables {
vCpy := *v
for i := range v.DefaultSlotSetBid {
vCpy.DefaultSlotSetBid[i] = new(big.Int).SetBytes(v.DefaultSlotSetBid[i].Bytes())
}
return &vCpy
}

+ 6
- 0
common/ethrollup.go

@ -163,3 +163,9 @@ type RollupVariables struct {
WithdrawalDelay uint64 `json:"withdrawalDelay" meddler:"withdrawal_delay" validate:"required"` WithdrawalDelay uint64 `json:"withdrawalDelay" meddler:"withdrawal_delay" validate:"required"`
Buckets [RollupConstNumBuckets]Bucket `json:"buckets" meddler:"buckets,json"` Buckets [RollupConstNumBuckets]Bucket `json:"buckets" meddler:"buckets,json"`
} }
// Copy returns a deep copy of the Variables
func (v *RollupVariables) Copy() *RollupVariables {
vCpy := *v
return &vCpy
}

+ 6
- 0
common/ethwdelayer.go

@ -23,3 +23,9 @@ type WDelayerVariables struct {
EmergencyModeStartingTime uint64 `json:"emergencyModeStartingTime" meddler:"emergency_start_time"` EmergencyModeStartingTime uint64 `json:"emergencyModeStartingTime" meddler:"emergency_start_time"`
EmergencyMode bool `json:"emergencyMode" meddler:"emergency_mode"` EmergencyMode bool `json:"emergencyMode" meddler:"emergency_mode"`
} }
// Copy returns a deep copy of the Variables
func (v *WDelayerVariables) Copy() *WDelayerVariables {
vCpy := *v
return &vCpy
}

+ 4
- 3
config/config.go

@ -66,9 +66,10 @@ type Node struct {
URL string `validate:"required"` URL string `validate:"required"`
} `validate:"required"` } `validate:"required"`
Synchronizer struct { Synchronizer struct {
SyncLoopInterval Duration `validate:"required"`
StartBlockNum synchronizer.ConfigStartBlockNum `validate:"required"`
InitialVariables synchronizer.SCVariables `validate:"required"`
SyncLoopInterval Duration `validate:"required"`
StatsRefreshPeriod Duration `validate:"required"`
StartBlockNum synchronizer.ConfigStartBlockNum `validate:"required"`
InitialVariables synchronizer.SCVariables `validate:"required"`
} `validate:"required"` } `validate:"required"`
SmartContracts struct { SmartContracts struct {
Rollup ethCommon.Address `validate:"required"` Rollup ethCommon.Address `validate:"required"`

+ 15
- 7
eth/rollup.go

@ -172,6 +172,7 @@ type RollupInterface interface {
// Viewers // Viewers
RollupRegisterTokensCount() (*big.Int, error) RollupRegisterTokensCount() (*big.Int, error)
RollupLastForgedBatch() (int64, error)
// //
// Smart Contract Status // Smart Contract Status
@ -400,14 +401,9 @@ func (c *RollupClient) RollupL1UserTxERC20Permit(fromBJJ *babyjub.PublicKey, fro
} }
// RollupRegisterTokensCount is the interface to call the smart contract function // RollupRegisterTokensCount is the interface to call the smart contract function
func (c *RollupClient) RollupRegisterTokensCount() (*big.Int, error) {
var registerTokensCount *big.Int
func (c *RollupClient) RollupRegisterTokensCount() (registerTokensCount *big.Int, err error) {
if err := c.client.Call(func(ec *ethclient.Client) error { if err := c.client.Call(func(ec *ethclient.Client) error {
hermez, err := Hermez.NewHermez(c.address, ec)
if err != nil {
return err
}
registerTokensCount, err = hermez.RegisterTokensCount(nil)
registerTokensCount, err = c.hermez.RegisterTokensCount(nil)
return err return err
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -415,6 +411,18 @@ func (c *RollupClient) RollupRegisterTokensCount() (*big.Int, error) {
return registerTokensCount, nil return registerTokensCount, nil
} }
// RollupLastForgedBatch is the interface to call the smart contract function
func (c *RollupClient) RollupLastForgedBatch() (lastForgedBatch int64, err error) {
if err := c.client.Call(func(ec *ethclient.Client) error {
_lastForgedBatch, err := c.hermez.LastForgedBatch(nil)
lastForgedBatch = int64(_lastForgedBatch)
return err
}); err != nil {
return 0, err
}
return lastForgedBatch, nil
}
// RollupUpdateForgeL1L2BatchTimeout is the interface to call the smart contract function // RollupUpdateForgeL1L2BatchTimeout is the interface to call the smart contract function
func (c *RollupClient) RollupUpdateForgeL1L2BatchTimeout(newForgeL1L2BatchTimeout int64) (tx *types.Transaction, err error) { func (c *RollupClient) RollupUpdateForgeL1L2BatchTimeout(newForgeL1L2BatchTimeout int64) (tx *types.Transaction, err error) {
if tx, err = c.client.CallAuth( if tx, err = c.client.CallAuth(

+ 4
- 4
node/node.go

@ -114,8 +114,9 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
} }
sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB, synchronizer.Config{ sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB, synchronizer.Config{
StartBlockNum: cfg.Synchronizer.StartBlockNum,
InitialVariables: cfg.Synchronizer.InitialVariables,
StartBlockNum: cfg.Synchronizer.StartBlockNum,
InitialVariables: cfg.Synchronizer.InitialVariables,
StatsRefreshPeriod: cfg.Synchronizer.StatsRefreshPeriod.Duration,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -159,9 +160,8 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
) )
} }
var debugAPI *debugapi.DebugAPI var debugAPI *debugapi.DebugAPI
println("apiaddr", cfg.Debug.APIAddress)
if cfg.Debug.APIAddress != "" { if cfg.Debug.APIAddress != "" {
debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB)
debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync)
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &Node{ return &Node{

+ 175
- 7
synchronizer/synchronizer.go

@ -5,6 +5,8 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"math/big" "math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
@ -39,6 +41,93 @@ var (
// Synchronized bool // Synchronized bool
// } // }
// Stats of the syncrhonizer
type Stats struct {
Eth struct {
RefreshPeriod time.Duration
Updated time.Time
FirstBlock int64
LastBlock int64
LastBatch int64
}
Sync struct {
Updated time.Time
LastBlock int64
LastBatch int64
}
}
// StatsHolder stores stats and that allows reading and writing them
// concurrently
type StatsHolder struct {
stats Stats
rw sync.RWMutex
}
// NewStatsHolder creates a new StatsHolder
func NewStatsHolder(firstBlock int64, refreshPeriod time.Duration) *StatsHolder {
stats := Stats{}
stats.Eth.RefreshPeriod = refreshPeriod
stats.Eth.FirstBlock = firstBlock
return &StatsHolder{stats: stats}
}
// UpdateSync updates the synchronizer stats
func (s *StatsHolder) UpdateSync(lastBlock int64, lastBatch *common.BatchNum) {
now := time.Now()
s.rw.Lock()
s.stats.Sync.LastBlock = lastBlock
if lastBatch != nil {
s.stats.Sync.LastBatch = int64(*lastBatch)
}
s.stats.Sync.Updated = now
s.rw.Unlock()
}
// UpdateEth updates the ethereum stats, only if the previous stats expired
func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error {
now := time.Now()
s.rw.RLock()
elapsed := now.Sub(s.stats.Eth.Updated)
s.rw.RUnlock()
if elapsed < s.stats.Eth.RefreshPeriod {
return nil
}
lastBlock, err := ethClient.EthCurrentBlock()
if err != nil {
return err
}
lastBatch, err := ethClient.RollupLastForgedBatch()
if err != nil {
return err
}
s.rw.Lock()
s.stats.Eth.Updated = now
s.stats.Eth.LastBlock = lastBlock
s.stats.Eth.LastBatch = lastBatch
s.rw.Unlock()
return nil
}
// CopyStats returns a copy of the inner Stats
func (s *StatsHolder) CopyStats() *Stats {
s.rw.RLock()
sCopy := s.stats
s.rw.RUnlock()
return &sCopy
}
func (s *StatsHolder) blocksPerc() float64 {
return float64(s.stats.Sync.LastBlock-s.stats.Eth.FirstBlock) * 100.0 /
float64(s.stats.Eth.LastBlock-s.stats.Eth.FirstBlock)
}
func (s *StatsHolder) batchesPerc(batchNum int64) float64 {
return float64(batchNum) * 100.0 /
float64(s.stats.Eth.LastBatch)
}
// ConfigStartBlockNum sets the first block used to start tracking the smart // ConfigStartBlockNum sets the first block used to start tracking the smart
// contracts // contracts
type ConfigStartBlockNum struct { type ConfigStartBlockNum struct {
@ -56,8 +145,9 @@ type SCVariables struct {
// Config is the Synchronizer configuration // Config is the Synchronizer configuration
type Config struct { type Config struct {
StartBlockNum ConfigStartBlockNum
InitialVariables SCVariables
StartBlockNum ConfigStartBlockNum
InitialVariables SCVariables
StatsRefreshPeriod time.Duration
} }
// Synchronizer implements the Synchronizer type // Synchronizer implements the Synchronizer type
@ -71,6 +161,7 @@ type Synchronizer struct {
cfg Config cfg Config
startBlockNum int64 startBlockNum int64
vars SCVariables vars SCVariables
stats *StatsHolder
// firstSavedBlock *common.Block // firstSavedBlock *common.Block
// mux sync.Mutex // mux sync.Mutex
} }
@ -103,6 +194,7 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History
if startBlockNum < cfg.StartBlockNum.WDelayer { if startBlockNum < cfg.StartBlockNum.WDelayer {
startBlockNum = cfg.StartBlockNum.WDelayer startBlockNum = cfg.StartBlockNum.WDelayer
} }
stats := NewStatsHolder(startBlockNum, cfg.StatsRefreshPeriod)
s := &Synchronizer{ s := &Synchronizer{
ethClient: ethClient, ethClient: ethClient,
auctionConstants: *auctionConstants, auctionConstants: *auctionConstants,
@ -112,10 +204,16 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History
stateDB: stateDB, stateDB: stateDB,
cfg: cfg, cfg: cfg,
startBlockNum: startBlockNum, startBlockNum: startBlockNum,
stats: stats,
} }
return s, s.init() return s, s.init()
} }
// Stats returns a copy of the Synchronizer Stats
func (s *Synchronizer) Stats() *Stats {
return s.stats.CopyStats()
}
// AuctionConstants returns the AuctionConstants read from the smart contract // AuctionConstants returns the AuctionConstants read from the smart contract
func (s *Synchronizer) AuctionConstants() *common.AuctionConstants { func (s *Synchronizer) AuctionConstants() *common.AuctionConstants {
return &s.auctionConstants return &s.auctionConstants
@ -133,11 +231,13 @@ func (s *Synchronizer) WDelayerConstants() *common.WDelayerConstants {
func (s *Synchronizer) init() error { func (s *Synchronizer) init() error {
rollup, auction, wDelayer, err := s.historyDB.GetSCVars() rollup, auction, wDelayer, err := s.historyDB.GetSCVars()
// If SCVars are not in the HistoryDB, this is probably the first run
// of the Synchronizer: store the initial vars taken from config
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
rollup = &s.cfg.InitialVariables.Rollup rollup = &s.cfg.InitialVariables.Rollup
auction = &s.cfg.InitialVariables.Auction auction = &s.cfg.InitialVariables.Auction
wDelayer = &s.cfg.InitialVariables.WDelayer wDelayer = &s.cfg.InitialVariables.WDelayer
log.Debug("Setting initial SCVars in HistoryDB")
log.Info("Setting initial SCVars in HistoryDB")
if err = s.historyDB.SetInitialSCVars(rollup, auction, wDelayer); err != nil { if err = s.historyDB.SetInitialSCVars(rollup, auction, wDelayer); err != nil {
return err return err
} }
@ -145,6 +245,47 @@ func (s *Synchronizer) init() error {
s.vars.Rollup = *rollup s.vars.Rollup = *rollup
s.vars.Auction = *auction s.vars.Auction = *auction
s.vars.WDelayer = *wDelayer s.vars.WDelayer = *wDelayer
// Update stats parameters so that they have valid values before the
// first Sync call
if err := s.stats.UpdateEth(s.ethClient); err != nil {
return err
}
var lastBlockNum int64
lastSavedBlock, err := s.historyDB.GetLastBlock()
if err != nil && err != sql.ErrNoRows {
return err
}
// If there's no block in the DB (or we only have the default block 0),
// make sure that the stateDB is clean
if err == sql.ErrNoRows || lastSavedBlock.EthBlockNum == 0 {
if err := s.stateDB.Reset(0); err != nil {
return err
}
} else {
lastBlockNum = lastSavedBlock.EthBlockNum
}
lastBatchNum, err := s.historyDB.GetLastBatchNum()
if err != nil && err != sql.ErrNoRows {
return err
}
if err == sql.ErrNoRows {
lastBatchNum = 0
}
s.stats.UpdateSync(lastBlockNum, &lastBatchNum)
log.Infow("Sync init block",
"syncLastBlock", s.stats.stats.Sync.LastBlock,
"syncBlocksPerc", s.stats.blocksPerc(),
"ethFirstBlock", s.stats.stats.Eth.FirstBlock,
"ethLastBlock", s.stats.stats.Eth.LastBlock,
)
log.Infow("Sync init batch",
"syncLastBatch", s.stats.stats.Sync.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.stats.Sync.LastBatch),
"ethLastBatch", s.stats.stats.Eth.LastBatch,
)
return nil return nil
} }
@ -182,7 +323,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
} }
log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", ethBlock.EthBlockNum, ethBlock.ParentHash.String(), ethBlock.Hash.String()) log.Debugf("ethBlock: num: %v, parent: %v, hash: %v", ethBlock.EthBlockNum, ethBlock.ParentHash.String(), ethBlock.Hash.String())
log.Debugw("Syncing...", "block", nextBlockNum)
if err := s.stats.UpdateEth(s.ethClient); err != nil {
return nil, nil, err
}
log.Debugw("Syncing...",
"block", nextBlockNum,
"ethLastBlock", s.stats.stats.Eth.LastBlock,
)
// Check that the obtianed ethBlock.ParentHash == prevEthBlock.Hash; if not, reorg! // Check that the obtianed ethBlock.ParentHash == prevEthBlock.Hash; if not, reorg!
if lastSavedBlock != nil { if lastSavedBlock != nil {
@ -249,6 +397,26 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
return nil, nil, err return nil, nil, err
} }
batchesLen := len(rollupData.Batches)
if batchesLen == 0 {
s.stats.UpdateSync(ethBlock.EthBlockNum, nil)
} else {
s.stats.UpdateSync(ethBlock.EthBlockNum,
&rollupData.Batches[batchesLen-1].Batch.BatchNum)
}
log.Debugw("Synced block",
"syncLastBlock", s.stats.stats.Sync.LastBlock,
"syncBlocksPerc", s.stats.blocksPerc(),
"ethLastBlock", s.stats.stats.Eth.LastBlock,
)
for _, batchData := range rollupData.Batches {
log.Debugw("Synced batch",
"syncLastBatch", batchData.Batch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)),
"ethLastBatch", s.stats.stats.Eth.LastBatch,
)
}
return &blockData, nil, nil return &blockData, nil, nil
} }
@ -566,7 +734,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
if varsUpdate { if varsUpdate {
s.vars.Rollup.EthBlockNum = blockNum s.vars.Rollup.EthBlockNum = blockNum
rollupData.Vars = &s.vars.Rollup
rollupData.Vars = s.vars.Rollup.Copy()
} }
return &rollupData, nil return &rollupData, nil
@ -665,7 +833,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData,
if varsUpdate { if varsUpdate {
s.vars.Auction.EthBlockNum = blockNum s.vars.Auction.EthBlockNum = blockNum
auctionData.Vars = &s.vars.Auction
auctionData.Vars = s.vars.Auction.Copy()
} }
return &auctionData, nil return &auctionData, nil
@ -734,7 +902,7 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat
if varsUpdate { if varsUpdate {
s.vars.WDelayer.EthBlockNum = blockNum s.vars.WDelayer.EthBlockNum = blockNum
wDelayerData.Vars = &s.vars.WDelayer
wDelayerData.Vars = s.vars.WDelayer.Copy()
} }
return &wDelayerData, nil return &wDelayerData, nil

+ 12
- 2
test/debugapi/debugapi.go

@ -10,6 +10,7 @@ import (
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer"
) )
func handleNoRoute(c *gin.Context) { func handleNoRoute(c *gin.Context) {
@ -33,13 +34,15 @@ func badReq(err error, c *gin.Context) {
type DebugAPI struct { type DebugAPI struct {
addr string addr string
stateDB *statedb.StateDB // synchronizer statedb stateDB *statedb.StateDB // synchronizer statedb
sync *synchronizer.Synchronizer
} }
// NewDebugAPI creates a new DebugAPI // NewDebugAPI creates a new DebugAPI
func NewDebugAPI(addr string, stateDB *statedb.StateDB) *DebugAPI {
func NewDebugAPI(addr string, stateDB *statedb.StateDB, sync *synchronizer.Synchronizer) *DebugAPI {
return &DebugAPI{ return &DebugAPI{
stateDB: stateDB,
addr: addr, addr: addr,
stateDB: stateDB,
sync: sync,
} }
} }
@ -82,6 +85,11 @@ func (a *DebugAPI) handleMTRoot(c *gin.Context) {
c.JSON(http.StatusOK, root) c.JSON(http.StatusOK, root)
} }
func (a *DebugAPI) handleSyncStats(c *gin.Context) {
stats := a.sync.Stats()
c.JSON(http.StatusOK, stats)
}
// Run starts the http server of the DebugAPI. To stop it, pass a context with // Run starts the http server of the DebugAPI. To stop it, pass a context with
// cancelation (see `debugapi_test.go` for an example). // cancelation (see `debugapi_test.go` for an example).
func (a *DebugAPI) Run(ctx context.Context) error { func (a *DebugAPI) Run(ctx context.Context) error {
@ -98,6 +106,8 @@ func (a *DebugAPI) Run(ctx context.Context) error {
debugAPI.GET("sdb/accounts", a.handleAccounts) debugAPI.GET("sdb/accounts", a.handleAccounts)
debugAPI.GET("sdb/accounts/:Idx", a.handleAccount) debugAPI.GET("sdb/accounts/:Idx", a.handleAccount)
debugAPI.GET("sync/stats", a.handleSyncStats)
debugAPIServer := &http.Server{ debugAPIServer := &http.Server{
Addr: a.addr, Addr: a.addr,
Handler: api, Handler: api,

+ 2
- 1
test/debugapi/debugapi_test.go

@ -50,7 +50,8 @@ func TestDebugAPI(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
addr := "localhost:12345" addr := "localhost:12345"
debugAPI := NewDebugAPI(addr, sdb)
// We won't test the sync/stats endpoint, so we can se the Syncrhonizer to nil
debugAPI := NewDebugAPI(addr, sdb, nil)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { go func() {

+ 10
- 0
test/ethclient.go

@ -762,6 +762,16 @@ func (c *Client) RollupRegisterTokensCount() (*big.Int, error) {
return nil, errTODO return nil, errTODO
} }
// RollupLastForgedBatch is the interface to call the smart contract function
func (c *Client) RollupLastForgedBatch() (int64, error) {
c.rw.RLock()
defer c.rw.RUnlock()
currentBlock := c.currentBlock()
e := currentBlock.Rollup
return int64(len(e.State.ExitRoots)) - 1, nil
}
// RollupWithdrawCircuit is the interface to call the smart contract function // RollupWithdrawCircuit is the interface to call the smart contract function
func (c *Client) RollupWithdrawCircuit(proofA, proofC [2]*big.Int, proofB [2][2]*big.Int, tokenID uint32, numExitRoot, idx int64, amount *big.Int, instantWithdraw bool) (*types.Transaction, error) { func (c *Client) RollupWithdrawCircuit(proofA, proofC [2]*big.Int, proofB [2][2]*big.Int, tokenID uint32, numExitRoot, idx int64, amount *big.Int, instantWithdraw bool) (*types.Transaction, error) {
log.Error("TODO") log.Error("TODO")

Loading…
Cancel
Save