From b330889570e0df6acc549c7ecbc778c137e7f764 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Tue, 9 Mar 2021 14:32:23 +0100 Subject: [PATCH] WIP --- api/api.go | 12 +-- api/api_test.go | 33 ++++-- api/slots_test.go | 14 +-- api/state.go | 68 ++++++++++--- api/state_test.go | 96 ++++++++++-------- cli/node/main.go | 106 ++++++++++++-------- common/eth.go | 2 + config/config.go | 112 +++++++++++---------- coordinator/coordinator.go | 12 +-- coordinator/coordinator_test.go | 6 +- coordinator/pipeline.go | 16 +-- coordinator/pipeline_test.go | 6 +- coordinator/txmanager.go | 10 +- db/historydb/apiqueries.go | 11 +- db/historydb/historydb.go | 2 +- db/historydb/historydb_test.go | 15 +-- db/historydb/nodeinfo.go | 90 ++++------------- node/node.go | 161 ++++++++++++++++++------------ synchronizer/synchronizer_test.go | 24 ++--- 19 files changed, 431 insertions(+), 365 deletions(-) diff --git a/api/api.go b/api/api.go index 5afa8a8..90e147b 100644 --- a/api/api.go +++ b/api/api.go @@ -34,20 +34,20 @@ func NewAPI( if explorerEndpoints && hdb == nil { return nil, tracerr.Wrap(errors.New("cannot serve Explorer endpoints without HistoryDB")) } - ni, err := hdb.GetNodeInfo() + consts, err := hdb.GetConstants() if err != nil { return nil, err } a := &API{ h: hdb, cg: &configAPI{ - RollupConstants: *newRollupConstants(ni.Constants.RollupConstants), - AuctionConstants: ni.Constants.AuctionConstants, - WDelayerConstants: ni.Constants.WDelayerConstants, + RollupConstants: *newRollupConstants(consts.Rollup), + AuctionConstants: consts.Auction, + WDelayerConstants: consts.WDelayer, }, l2: l2db, - chainID: ni.Constants.ChainID, - hermezAddress: ni.Constants.HermezAddress, + chainID: consts.ChainID, + hermezAddress: consts.HermezAddress, } // Add coordinator endpoints diff --git a/api/api_test.go b/api/api_test.go index 7db94e1..50889c0 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -186,6 +186,7 @@ type testCommon struct { var tc testCommon var config configAPI var api *API +var stateAPIUpdater *StateAPIUpdater // TestMain initializes the API server, and fill HistoryDB and StateDB with fake data, // emulating the task of the synchronizer in order to have data to be returned @@ -222,15 +223,27 @@ func TestMain(m *testing.M) { apiGin := gin.Default() // Reset DB test.WipeDB(hdb.DB()) - if err := hdb.SetInitialNodeInfo(10, 0.0, &historydb.Constants{ - RollupConstants: _config.RollupConstants, - AuctionConstants: _config.AuctionConstants, - WDelayerConstants: _config.WDelayerConstants, - ChainID: chainID, - HermezAddress: _config.HermezAddress, - }); err != nil { + + constants := &historydb.Constants{ + SCConsts: common.SCConsts{ + Rollup: _config.RollupConstants, + Auction: _config.AuctionConstants, + WDelayer: _config.WDelayerConstants, + }, + ChainID: chainID, + HermezAddress: _config.HermezAddress, + } + if err := hdb.SetConstants(constants); err != nil { panic(err) } + nodeConfig := &historydb.NodeConfig{ + MaxPoolTxs: 10, + MinFeeUSD: 0, + } + if err := hdb.SetNodeConfig(nodeConfig); err != nil { + panic(err) + } + api, err = NewAPI( true, true, @@ -507,6 +520,12 @@ func TestMain(m *testing.M) { WithdrawalDelay: uint64(3000), } + stateAPIUpdater = NewStateAPIUpdater(hdb, nodeConfig, &common.SCVariables{ + Rollup: rollupVars, + Auction: auctionVars, + WDelayer: wdelayerVars, + }, constants) + // Generate test data, as expected to be received/sended from/to the API testCoords := genTestCoordinators(commonCoords) testBids := genTestBids(commonBlocks, testCoords, bids) diff --git a/api/slots_test.go b/api/slots_test.go index 11ac0d3..ae5dae1 100644 --- a/api/slots_test.go +++ b/api/slots_test.go @@ -99,14 +99,14 @@ func TestGetSlot(t *testing.T) { nil, &fetchedSlot, ), ) - ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - emptySlot := api.getEmptyTestSlot(slotNum, ni.APIState.Network.LastSyncBlock, tc.auctionVars) + // ni, err := api.h.GetNodeInfoAPI() + // assert.NoError(t, err) + emptySlot := api.getEmptyTestSlot(slotNum, 0, tc.auctionVars) assertSlot(t, emptySlot, fetchedSlot) // Invalid slotNum path := endpoint + strconv.Itoa(-2) - err = doBadReq("GET", path, nil, 400) + err := doBadReq("GET", path, nil, 400) assert.NoError(t, err) } @@ -129,10 +129,10 @@ func TestGetSlots(t *testing.T) { err := doGoodReqPaginated(path, historydb.OrderAsc, &testSlotsResponse{}, appendIter) assert.NoError(t, err) allSlots := tc.slots - ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) + // ni, err := api.h.GetNodeInfoAPI() + // assert.NoError(t, err) for i := tc.slots[len(tc.slots)-1].SlotNum; i < maxSlotNum; i++ { - emptySlot := api.getEmptyTestSlot(i+1, ni.APIState.Network.LastSyncBlock, tc.auctionVars) + emptySlot := api.getEmptyTestSlot(i+1, 0, tc.auctionVars) allSlots = append(allSlots, emptySlot) } assertSlots(t, allSlots, fetchedSlots) diff --git a/api/state.go b/api/state.go index 6e515e2..4d380a5 100644 --- a/api/state.go +++ b/api/state.go @@ -3,6 +3,7 @@ package api import ( "database/sql" "net/http" + "sync" "github.com/gin-gonic/gin" "github.com/hermeznetwork/hermez-node/common" @@ -19,30 +20,39 @@ func (a *API) getState(c *gin.Context) { c.JSON(http.StatusOK, stateAPI) } -type APIStateUpdater struct { +// StateAPIUpdater is an utility object to facilitate updating the StateAPI +type StateAPIUpdater struct { hdb *historydb.HistoryDB state historydb.StateAPI config historydb.NodeConfig vars common.SCVariablesPtr consts historydb.Constants + rw sync.RWMutex } -func NewAPIStateUpdater(hdb *historydb.HistoryDB, config *historydb.NodeConfig, vars *common.SCVariables, - consts *historydb.Constants) *APIStateUpdater { - u := APIStateUpdater{ +// NewStateAPIUpdater creates a new StateAPIUpdater +func NewStateAPIUpdater(hdb *historydb.HistoryDB, config *historydb.NodeConfig, vars *common.SCVariables, + consts *historydb.Constants) *StateAPIUpdater { + u := StateAPIUpdater{ hdb: hdb, config: *config, consts: *consts, } - u.SetSCVars(&common.SCVariablesPtr{&vars.Rollup, &vars.Auction, &vars.WDelayer}) + u.SetSCVars(vars.AsPtr()) return &u } -func (u *APIStateUpdater) Store() error { - return tracerr.Wrap(u.hdb.SetAPIState(&u.state)) +// Store the State in the HistoryDB +func (u *StateAPIUpdater) Store() error { + u.rw.RLock() + defer u.rw.RUnlock() + return tracerr.Wrap(u.hdb.SetStateInternalAPI(&u.state)) } -func (u *APIStateUpdater) SetSCVars(vars *common.SCVariablesPtr) { +// SetSCVars sets the smart contract vars (ony updates those that are not nil) +func (u *StateAPIUpdater) SetSCVars(vars *common.SCVariablesPtr) { + u.rw.Lock() + defer u.rw.Unlock() if vars.Rollup != nil { u.vars.Rollup = vars.Rollup rollupVars := historydb.NewRollupVariablesAPI(u.vars.Rollup) @@ -59,25 +69,47 @@ func (u *APIStateUpdater) SetSCVars(vars *common.SCVariablesPtr) { } } -func (u *APIStateUpdater) UpdateMetrics() error { - if u.state.Network.LastBatch == nil { +// UpdateRecommendedFee update Status.RecommendedFee information +func (u *StateAPIUpdater) UpdateRecommendedFee() error { + recommendedFee, err := u.hdb.GetRecommendedFee(u.config.MinFeeUSD) + if err != nil { + return tracerr.Wrap(err) + } + u.rw.Lock() + u.state.RecommendedFee = *recommendedFee + u.rw.Unlock() + return nil +} + +// UpdateMetrics update Status.Metrics information +func (u *StateAPIUpdater) UpdateMetrics() error { + u.rw.RLock() + lastBatch := u.state.Network.LastBatch + u.rw.RUnlock() + if lastBatch == nil { return nil } - lastBatchNum := u.state.Network.LastBatch.BatchNum + lastBatchNum := lastBatch.BatchNum metrics, err := u.hdb.GetMetricsInternalAPI(lastBatchNum) if err != nil { return tracerr.Wrap(err) } + u.rw.Lock() u.state.Metrics = *metrics + u.rw.Unlock() return nil } -func (u *APIStateUpdater) UpdateNetworkInfoBlock(lastEthBlock, lastSyncBlock common.Block) { +// UpdateNetworkInfoBlock update Status.Network block related information +func (u *StateAPIUpdater) UpdateNetworkInfoBlock(lastEthBlock, lastSyncBlock common.Block) { + u.rw.Lock() u.state.Network.LastSyncBlock = lastSyncBlock.Num u.state.Network.LastEthBlock = lastEthBlock.Num + u.rw.Unlock() } -func (u *APIStateUpdater) UpdateNetworkInfo( +// UpdateNetworkInfo update Status.Network information +func (u *StateAPIUpdater) UpdateNetworkInfo( lastEthBlock, lastSyncBlock common.Block, lastBatchNum common.BatchNum, currentSlot int64, ) error { @@ -88,9 +120,12 @@ func (u *APIStateUpdater) UpdateNetworkInfo( } else if err != nil { return tracerr.Wrap(err) } + u.rw.RLock() + auctionVars := u.vars.Auction + u.rw.RUnlock() // Get next forgers - lastClosedSlot := currentSlot + int64(u.state.Auction.ClosedAuctionSlots) - nextForgers, err := u.hdb.GetNextForgersInternalAPI(u.vars.Auction, &u.consts.Auction, + lastClosedSlot := currentSlot + int64(auctionVars.ClosedAuctionSlots) + nextForgers, err := u.hdb.GetNextForgersInternalAPI(auctionVars, &u.consts.Auction, lastSyncBlock, currentSlot, lastClosedSlot) if tracerr.Unwrap(err) == sql.ErrNoRows { nextForgers = nil @@ -104,6 +139,8 @@ func (u *APIStateUpdater) UpdateNetworkInfo( } else if err != nil { return tracerr.Wrap(err) } + + u.rw.Lock() // Update NodeInfo struct for i, bucketParams := range u.state.Rollup.Buckets { for _, bucketUpdate := range bucketUpdates { @@ -119,5 +156,6 @@ func (u *APIStateUpdater) UpdateNetworkInfo( u.state.Network.LastBatch = lastBatch u.state.Network.CurrentSlot = currentSlot u.state.Network.NextForgers = nextForgers + u.rw.Unlock() return nil } diff --git a/api/state_test.go b/api/state_test.go index f3aa39b..ddd6a69 100644 --- a/api/state_test.go +++ b/api/state_test.go @@ -29,10 +29,11 @@ type testNetwork struct { } func TestSetRollupVariables(t *testing.T) { - api.h.SetRollupVariables(&tc.rollupVars) + stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{Rollup: &tc.rollupVars}) + require.NoError(t, stateAPIUpdater.Store()) ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - assertEqualRollupVariables(t, tc.rollupVars, ni.APIState.Rollup, true) + require.NoError(t, err) + assertEqualRollupVariables(t, tc.rollupVars, ni.StateAPI.Rollup, true) } func assertEqualRollupVariables(t *testing.T, rollupVariables common.RollupVariables, apiVariables historydb.RollupVariablesAPI, checkBuckets bool) { @@ -51,17 +52,19 @@ func assertEqualRollupVariables(t *testing.T, rollupVariables common.RollupVaria } func TestSetWDelayerVariables(t *testing.T) { - api.h.SetWDelayerVariables(&tc.wdelayerVars) + stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{WDelayer: &tc.wdelayerVars}) + require.NoError(t, stateAPIUpdater.Store()) ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - assert.Equal(t, tc.wdelayerVars, ni.APIState.WithdrawalDelayer) + require.NoError(t, err) + assert.Equal(t, tc.wdelayerVars, ni.StateAPI.WithdrawalDelayer) } func TestSetAuctionVariables(t *testing.T) { - api.h.SetAuctionVariables(&tc.auctionVars) + stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{Auction: &tc.auctionVars}) + require.NoError(t, stateAPIUpdater.Store()) ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - assertEqualAuctionVariables(t, tc.auctionVars, ni.APIState.Auction) + require.NoError(t, err) + assertEqualAuctionVariables(t, tc.auctionVars, ni.StateAPI.Auction) } func assertEqualAuctionVariables(t *testing.T, auctionVariables common.AuctionVariables, apiVariables historydb.AuctionVariablesAPI) { @@ -113,16 +116,18 @@ func TestUpdateNetworkInfo(t *testing.T) { err := api.h.AddBucketUpdatesTest(api.h.DB(), bucketUpdates) require.NoError(t, err) - err = api.h.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) - assert.NoError(t, err) + // stateAPIUpdater := NewStateAPIUpdater(hdb) + err = stateAPIUpdater.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) + require.NoError(t, err) + require.NoError(t, stateAPIUpdater.Store()) ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - assert.Equal(t, lastBlock.Num, ni.APIState.Network.LastSyncBlock) - assert.Equal(t, lastBatchNum, ni.APIState.Network.LastBatch.BatchNum) - assert.Equal(t, currentSlotNum, ni.APIState.Network.CurrentSlot) - assert.Equal(t, int(ni.APIState.Auction.ClosedAuctionSlots)+1, len(ni.APIState.Network.NextForgers)) - assert.Equal(t, ni.APIState.Rollup.Buckets[0].Withdrawals, apitypes.NewBigIntStr(big.NewInt(123))) - assert.Equal(t, ni.APIState.Rollup.Buckets[2].Withdrawals, apitypes.NewBigIntStr(big.NewInt(43))) + require.NoError(t, err) + assert.Equal(t, lastBlock.Num, ni.StateAPI.Network.LastSyncBlock) + assert.Equal(t, lastBatchNum, ni.StateAPI.Network.LastBatch.BatchNum) + assert.Equal(t, currentSlotNum, ni.StateAPI.Network.CurrentSlot) + assert.Equal(t, int(ni.StateAPI.Auction.ClosedAuctionSlots)+1, len(ni.StateAPI.Network.NextForgers)) + assert.Equal(t, ni.StateAPI.Rollup.Buckets[0].Withdrawals, apitypes.NewBigIntStr(big.NewInt(123))) + assert.Equal(t, ni.StateAPI.Rollup.Buckets[2].Withdrawals, apitypes.NewBigIntStr(big.NewInt(43))) } func TestUpdateMetrics(t *testing.T) { @@ -130,31 +135,33 @@ func TestUpdateMetrics(t *testing.T) { lastBlock := tc.blocks[3] lastBatchNum := common.BatchNum(12) currentSlotNum := int64(1) - err := api.h.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) - assert.NoError(t, err) + err := stateAPIUpdater.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) + require.NoError(t, err) - err = api.h.UpdateMetrics() - assert.NoError(t, err) + err = stateAPIUpdater.UpdateMetrics() + require.NoError(t, err) + require.NoError(t, stateAPIUpdater.Store()) ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - assert.Greater(t, ni.APIState.Metrics.TransactionsPerBatch, float64(0)) - assert.Greater(t, ni.APIState.Metrics.BatchFrequency, float64(0)) - assert.Greater(t, ni.APIState.Metrics.TransactionsPerSecond, float64(0)) - assert.Greater(t, ni.APIState.Metrics.TotalAccounts, int64(0)) - assert.Greater(t, ni.APIState.Metrics.TotalBJJs, int64(0)) - assert.Greater(t, ni.APIState.Metrics.AvgTransactionFee, float64(0)) + require.NoError(t, err) + assert.Greater(t, ni.StateAPI.Metrics.TransactionsPerBatch, float64(0)) + assert.Greater(t, ni.StateAPI.Metrics.BatchFrequency, float64(0)) + assert.Greater(t, ni.StateAPI.Metrics.TransactionsPerSecond, float64(0)) + assert.Greater(t, ni.StateAPI.Metrics.TotalAccounts, int64(0)) + assert.Greater(t, ni.StateAPI.Metrics.TotalBJJs, int64(0)) + assert.Greater(t, ni.StateAPI.Metrics.AvgTransactionFee, float64(0)) } func TestUpdateRecommendedFee(t *testing.T) { - err := api.h.UpdateRecommendedFee() - assert.NoError(t, err) + err := stateAPIUpdater.UpdateRecommendedFee() + require.NoError(t, err) + require.NoError(t, stateAPIUpdater.Store()) var minFeeUSD float64 if api.l2 != nil { minFeeUSD = api.l2.MinFeeUSD() } ni, err := api.h.GetNodeInfoAPI() - assert.NoError(t, err) - assert.Greater(t, ni.APIState.RecommendedFee.ExistingAccount, minFeeUSD) + require.NoError(t, err) + assert.Greater(t, ni.StateAPI.RecommendedFee.ExistingAccount, minFeeUSD) // assert.Equal(t, ni.StateAPI.RecommendedFee.CreatesAccount, // ni.StateAPI.RecommendedFee.ExistingAccount*createAccountExtraFeePercentage) // assert.Equal(t, ni.StateAPI.RecommendedFee.CreatesAccountAndRegister, @@ -165,20 +172,23 @@ func TestGetState(t *testing.T) { lastBlock := tc.blocks[3] lastBatchNum := common.BatchNum(12) currentSlotNum := int64(1) - api.h.SetRollupVariables(&tc.rollupVars) - api.h.SetWDelayerVariables(&tc.wdelayerVars) - api.h.SetAuctionVariables(&tc.auctionVars) - err := api.h.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) - assert.NoError(t, err) - err = api.h.UpdateMetrics() - assert.NoError(t, err) - err = api.h.UpdateRecommendedFee() - assert.NoError(t, err) + stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{ + Rollup: &tc.rollupVars, + Auction: &tc.auctionVars, + WDelayer: &tc.wdelayerVars, + }) + err := stateAPIUpdater.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) + require.NoError(t, err) + err = stateAPIUpdater.UpdateMetrics() + require.NoError(t, err) + err = stateAPIUpdater.UpdateRecommendedFee() + require.NoError(t, err) + require.NoError(t, stateAPIUpdater.Store()) endpoint := apiURL + "state" var status testStatus - assert.NoError(t, doGoodReq("GET", endpoint, nil, &status)) + require.NoError(t, doGoodReq("GET", endpoint, nil, &status)) // SC vars // UpdateNetworkInfo will overwrite buckets withdrawal values diff --git a/cli/node/main.go b/cli/node/main.go index d7bf69d..a724451 100644 --- a/cli/node/main.go +++ b/cli/node/main.go @@ -107,17 +107,7 @@ func cmdWipeSQL(c *cli.Context) error { return nil } -func cmdRun(c *cli.Context) error { - cfg, err := parseCli(c) - if err != nil { - return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err)) - } - node, err := node.NewNode(cfg.mode, cfg.node) - if err != nil { - return tracerr.Wrap(fmt.Errorf("error starting node: %w", err)) - } - node.Start() - +func waitSigInt() { stopCh := make(chan interface{}) // catch ^C to send the stop signal @@ -138,48 +128,36 @@ func cmdRun(c *cli.Context) error { } }() <-stopCh - node.Stop() - - return nil } -func cmdServeAPI(c *cli.Context) error { - cfgPath := c.String(flagCfg) - cfg, err := config.LoadAPIServer(cfgPath) +func cmdRun(c *cli.Context) error { + cfg, err := parseCli(c) if err != nil { - if err := cli.ShowAppHelp(c); err != nil { - panic(err) - } return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err)) } - node, err := node.NewNode(cfg.mode, cfg.node) if err != nil { return tracerr.Wrap(fmt.Errorf("error starting node: %w", err)) } node.Start() + waitSigInt() + node.Stop() - stopCh := make(chan interface{}) + return nil +} - // catch ^C to send the stop signal - ossig := make(chan os.Signal, 1) - signal.Notify(ossig, os.Interrupt) - const forceStopCount = 3 - go func() { - n := 0 - for sig := range ossig { - if sig == os.Interrupt { - log.Info("Received Interrupt Signal") - stopCh <- nil - n++ - if n == forceStopCount { - log.Fatalf("Received %v Interrupt Signals", forceStopCount) - } - } - } - }() - <-stopCh - node.Stop() +func cmdServeAPI(c *cli.Context) error { + cfg, err := parseCliAPIServer(c) + if err != nil { + return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err)) + } + srv, err := node.NewAPIServer(cfg.mode, cfg.server) + if err != nil { + return tracerr.Wrap(fmt.Errorf("error starting api server: %w", err)) + } + srv.Start() + waitSigInt() + srv.Stop() return nil } @@ -270,13 +248,55 @@ func getConfig(c *cli.Context) (*Config, error) { switch mode { case modeSync: cfg.mode = node.ModeSynchronizer - cfg.node, err = config.LoadNode(nodeCfgPath) + cfg.node, err = config.LoadNode(nodeCfgPath, false) + if err != nil { + return nil, tracerr.Wrap(err) + } + case modeCoord: + cfg.mode = node.ModeCoordinator + cfg.node, err = config.LoadNode(nodeCfgPath, true) + if err != nil { + return nil, tracerr.Wrap(err) + } + default: + return nil, tracerr.Wrap(fmt.Errorf("invalid mode \"%v\"", mode)) + } + + return &cfg, nil +} + +// ConfigAPIServer is the configuration of the api server execution +type ConfigAPIServer struct { + mode node.Mode + server *config.APIServer +} + +func parseCliAPIServer(c *cli.Context) (*ConfigAPIServer, error) { + cfg, err := getConfigAPIServer(c) + if err != nil { + if err := cli.ShowAppHelp(c); err != nil { + panic(err) + } + return nil, tracerr.Wrap(err) + } + return cfg, nil +} + +func getConfigAPIServer(c *cli.Context) (*ConfigAPIServer, error) { + var cfg ConfigAPIServer + mode := c.String(flagMode) + nodeCfgPath := c.String(flagCfg) + var err error + switch mode { + case modeSync: + cfg.mode = node.ModeSynchronizer + cfg.server, err = config.LoadAPIServer(nodeCfgPath, false) if err != nil { return nil, tracerr.Wrap(err) } case modeCoord: cfg.mode = node.ModeCoordinator - cfg.node, err = config.LoadCoordinator(nodeCfgPath) + cfg.server, err = config.LoadAPIServer(nodeCfgPath, true) if err != nil { return nil, tracerr.Wrap(err) } diff --git a/common/eth.go b/common/eth.go index 51ca39b..b46122e 100644 --- a/common/eth.go +++ b/common/eth.go @@ -7,6 +7,8 @@ type SCVariables struct { WDelayer WDelayerVariables `validate:"required"` } +// AsPtr returns the SCVariables as a SCVariablesPtr using pointers to the +// original SCVariables func (v *SCVariables) AsPtr() *SCVariablesPtr { return &SCVariablesPtr{ Rollup: &v.Rollup, diff --git a/config/config.go b/config/config.go index f517497..64e0b97 100644 --- a/config/config.go +++ b/config/config.go @@ -219,28 +219,9 @@ type Coordinator struct { } } -// NodeAPI specifies the configuration parameters of the API -type NodeAPI struct { - // Address where the API will listen if set - Address string - // Explorer enables the Explorer API endpoints - Explorer bool - // UpdateMetricsInterval is the interval between updates of the - // API metrics - UpdateMetricsInterval Duration - // UpdateRecommendedFeeInterval is the interval between updates of the - // recommended fees - UpdateRecommendedFeeInterval Duration - // Maximum concurrent connections allowed between API and SQL - MaxSQLConnections int `validate:"required"` - // SQLConnectionTimeout is the maximum amount of time that an API request - // can wait to stablish a SQL connection - SQLConnectionTimeout Duration -} - -// It's possible to use diferentiated SQL connections for read/write. -// If the read configuration is not provided, the write one it's going to be used -// for both reads and writes +// PostgreSQL is the postgreSQL configuration parameters. It's possible to use +// diferentiated SQL connections for read/write. If the read configuration is +// not provided, the write one it's going to be used for both reads and writes type PostgreSQL struct { // Port of the PostgreSQL write server PortWrite int `validate:"required"` @@ -324,31 +305,60 @@ type Node struct { // TokenHEZ address TokenHEZName string `validate:"required"` } `validate:"required"` - API NodeAPI `validate:"required"` + // API specifies the configuration parameters of the API + API struct { + // Address where the API will listen if set + Address string + // Explorer enables the Explorer API endpoints + Explorer bool + // UpdateMetricsInterval is the interval between updates of the + // API metrics + UpdateMetricsInterval Duration + // UpdateRecommendedFeeInterval is the interval between updates of the + // recommended fees + UpdateRecommendedFeeInterval Duration + // Maximum concurrent connections allowed between API and SQL + MaxSQLConnections int `validate:"required"` + // SQLConnectionTimeout is the maximum amount of time that an API request + // can wait to stablish a SQL connection + SQLConnectionTimeout Duration + } `validate:"required"` Debug NodeDebug `validate:"required"` Coordinator Coordinator `validate:"-"` } +// APIServer is the api server configuration parameters type APIServer struct { - API NodeAPI `validate:"required"` + // NodeAPI specifies the configuration parameters of the API + API struct { + // Address where the API will listen if set + Address string `validate:"required"` + // Explorer enables the Explorer API endpoints + Explorer bool + // Maximum concurrent connections allowed between API and SQL + MaxSQLConnections int `validate:"required"` + // SQLConnectionTimeout is the maximum amount of time that an API request + // can wait to stablish a SQL connection + SQLConnectionTimeout Duration + } `validate:"required"` PostgreSQL PostgreSQL `validate:"required"` Coordinator struct { API struct { // Coordinator enables the coordinator API endpoints Coordinator bool } `validate:"required"` - } `validate:"required"` - L2DB struct { - // MaxTxs is the maximum number of pending L2Txs that can be - // stored in the pool. Once this number of pending L2Txs is - // reached, inserts to the pool will be denied until some of - // the pending txs are forged. - MaxTxs uint32 `validate:"required"` - // MinFeeUSD is the minimum fee in USD that a tx must pay in - // order to be accepted into the pool. Txs with lower than - // minimum fee will be rejected at the API level. - MinFeeUSD float64 - } `validate:"required"` + L2DB struct { + // MaxTxs is the maximum number of pending L2Txs that can be + // stored in the pool. Once this number of pending L2Txs is + // reached, inserts to the pool will be denied until some of + // the pending txs are forged. + MaxTxs uint32 `validate:"required"` + // MinFeeUSD is the minimum fee in USD that a tx must pay in + // order to be accepted into the pool. Txs with lower than + // minimum fee will be rejected at the API level. + MinFeeUSD float64 + } `validate:"required"` + } Debug NodeDebug `validate:"required"` } @@ -365,24 +375,8 @@ func Load(path string, cfg interface{}) error { return nil } -// LoadCoordinator loads the Coordinator configuration from path. -func LoadCoordinator(path string) (*Node, error) { - var cfg Node - if err := Load(path, &cfg); err != nil { - return nil, tracerr.Wrap(fmt.Errorf("error loading node configuration file: %w", err)) - } - validate := validator.New() - if err := validate.Struct(cfg); err != nil { - return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err)) - } - if err := validate.Struct(cfg.Coordinator); err != nil { - return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err)) - } - return &cfg, nil -} - // LoadNode loads the Node configuration from path. -func LoadNode(path string) (*Node, error) { +func LoadNode(path string, coordinator bool) (*Node, error) { var cfg Node if err := Load(path, &cfg); err != nil { return nil, tracerr.Wrap(fmt.Errorf("error loading node configuration file: %w", err)) @@ -391,11 +385,16 @@ func LoadNode(path string) (*Node, error) { if err := validate.Struct(cfg); err != nil { return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err)) } + if coordinator { + if err := validate.Struct(cfg.Coordinator); err != nil { + return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err)) + } + } return &cfg, nil } // LoadAPIServer loads the APIServer configuration from path. -func LoadAPIServer(path string) (*APIServer, error) { +func LoadAPIServer(path string, coordinator bool) (*APIServer, error) { var cfg APIServer if err := Load(path, &cfg); err != nil { return nil, tracerr.Wrap(fmt.Errorf("error loading apiServer configuration file: %w", err)) @@ -404,5 +403,10 @@ func LoadAPIServer(path string) (*APIServer, error) { if err := validate.Struct(cfg); err != nil { return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err)) } + if coordinator { + if err := validate.Struct(cfg.Coordinator); err != nil { + return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err)) + } + } return &cfg, nil } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index c07538a..a846df3 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -144,8 +144,8 @@ type Coordinator struct { pipelineNum int // Pipeline sequential number. The first pipeline is 1 pipelineFromBatch fromBatch // batch from which we started the pipeline provers []prover.Client - consts synchronizer.SCConsts - vars synchronizer.SCVariables + consts common.SCConsts + vars common.SCVariables stats synchronizer.Stats started bool @@ -275,13 +275,13 @@ type MsgSyncBlock struct { Batches []common.BatchData // Vars contains each Smart Contract variables if they are updated, or // nil if they haven't changed. - Vars synchronizer.SCVariablesPtr + Vars common.SCVariablesPtr } // MsgSyncReorg indicates a reorg type MsgSyncReorg struct { Stats synchronizer.Stats - Vars synchronizer.SCVariablesPtr + Vars common.SCVariablesPtr } // MsgStopPipeline indicates a signal to reset the pipeline @@ -300,7 +300,7 @@ func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) { } } -func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariablesPtr) { +func updateSCVars(vars *common.SCVariables, update common.SCVariablesPtr) { if update.Rollup != nil { vars.Rollup = *update.Rollup } @@ -312,7 +312,7 @@ func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariable } } -func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { +func (c *Coordinator) syncSCVars(vars common.SCVariablesPtr) { updateSCVars(&c.vars, vars) } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index eb68e94..182bc66 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -187,12 +187,12 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t &prover.MockClient{Delay: 400 * time.Millisecond}, } - scConsts := &synchronizer.SCConsts{ + scConsts := &common.SCConsts{ Rollup: *ethClientSetup.RollupConstants, Auction: *ethClientSetup.AuctionConstants, WDelayer: *ethClientSetup.WDelayerConstants, } - initSCVars := &synchronizer.SCVariables{ + initSCVars := &common.SCVariables{ Rollup: *ethClientSetup.RollupVariables, Auction: *ethClientSetup.AuctionVariables, WDelayer: *ethClientSetup.WDelayerVariables, @@ -528,7 +528,7 @@ func TestCoordinatorStress(t *testing.T) { coord.SendMsg(ctx, MsgSyncBlock{ Stats: *stats, Batches: blockData.Rollup.Batches, - Vars: synchronizer.SCVariablesPtr{ + Vars: common.SCVariablesPtr{ Rollup: blockData.Rollup.Vars, Auction: blockData.Auction.Vars, WDelayer: blockData.WDelayer.Vars, diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index b7e0b7e..2c19dfe 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -22,7 +22,7 @@ import ( type statsVars struct { Stats synchronizer.Stats - Vars synchronizer.SCVariablesPtr + Vars common.SCVariablesPtr } type state struct { @@ -36,7 +36,7 @@ type state struct { type Pipeline struct { num int cfg Config - consts synchronizer.SCConsts + consts common.SCConsts // state state state @@ -57,7 +57,7 @@ type Pipeline struct { purger *Purger stats synchronizer.Stats - vars synchronizer.SCVariables + vars common.SCVariables statsVarsCh chan statsVars ctx context.Context @@ -90,7 +90,7 @@ func NewPipeline(ctx context.Context, coord *Coordinator, txManager *TxManager, provers []prover.Client, - scConsts *synchronizer.SCConsts, + scConsts *common.SCConsts, ) (*Pipeline, error) { proversPool := NewProversPool(len(provers)) proversPoolSize := 0 @@ -124,7 +124,7 @@ func NewPipeline(ctx context.Context, } // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats -func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { +func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *common.SCVariablesPtr) { select { case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}: case <-ctx.Done(): @@ -133,7 +133,7 @@ func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Sta // reset pipeline state func (p *Pipeline) reset(batchNum common.BatchNum, - stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { + stats *synchronizer.Stats, vars *common.SCVariables) error { p.state = state{ batchNum: batchNum, lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, @@ -194,7 +194,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum, return nil } -func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { +func (p *Pipeline) syncSCVars(vars common.SCVariablesPtr) { updateSCVars(&p.vars, vars) } @@ -255,7 +255,7 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, // Start the forging pipeline func (p *Pipeline) Start(batchNum common.BatchNum, - stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { + stats *synchronizer.Stats, vars *common.SCVariables) error { if p.started { log.Fatal("Pipeline already started") } diff --git a/coordinator/pipeline_test.go b/coordinator/pipeline_test.go index 8bb228b..ec83126 100644 --- a/coordinator/pipeline_test.go +++ b/coordinator/pipeline_test.go @@ -206,11 +206,7 @@ PoolTransfer(0) User2-User3: 300 (126) require.NoError(t, err) } - err = pipeline.reset(batchNum, syncStats, &synchronizer.SCVariables{ - Rollup: *syncSCVars.Rollup, - Auction: *syncSCVars.Auction, - WDelayer: *syncSCVars.WDelayer, - }) + err = pipeline.reset(batchNum, syncStats, syncSCVars) require.NoError(t, err) // Sanity check sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().TestGetAccounts() diff --git a/coordinator/txmanager.go b/coordinator/txmanager.go index 1ac43db..d594018 100644 --- a/coordinator/txmanager.go +++ b/coordinator/txmanager.go @@ -31,10 +31,10 @@ type TxManager struct { batchCh chan *BatchInfo chainID *big.Int account accounts.Account - consts synchronizer.SCConsts + consts common.SCConsts stats synchronizer.Stats - vars synchronizer.SCVariables + vars common.SCVariables statsVarsCh chan statsVars discardPipelineCh chan int // int refers to the pipelineNum @@ -55,7 +55,7 @@ type TxManager struct { // NewTxManager creates a new TxManager func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, - coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) { + coord *Coordinator, scConsts *common.SCConsts, initSCVars *common.SCVariables) (*TxManager, error) { chainID, err := ethClient.EthChainID() if err != nil { return nil, tracerr.Wrap(err) @@ -102,7 +102,7 @@ func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) { } // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats -func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { +func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *common.SCVariablesPtr) { select { case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}: case <-ctx.Done(): @@ -118,7 +118,7 @@ func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) { } } -func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) { +func (t *TxManager) syncSCVars(vars common.SCVariablesPtr) { updateSCVars(&t.vars, vars) } diff --git a/db/historydb/apiqueries.go b/db/historydb/apiqueries.go index 84dc0bc..9fcd0f9 100644 --- a/db/historydb/apiqueries.go +++ b/db/historydb/apiqueries.go @@ -38,7 +38,7 @@ func (hdb *HistoryDB) GetBatchAPI(batchNum common.BatchNum) (*BatchAPI, error) { return hdb.getBatchAPI(hdb.dbRead, batchNum) } -// GetBatchAPI return the batch with the given batchNum +// GetBatchInternalAPI return the batch with the given batchNum func (hdb *HistoryDB) GetBatchInternalAPI(batchNum common.BatchNum) (*BatchAPI, error) { return hdb.getBatchAPI(hdb.dbRead, batchNum) } @@ -944,6 +944,7 @@ func (hdb *HistoryDB) GetCoordinatorAPI(bidderAddr ethCommon.Address) (*Coordina defer hdb.apiConnCon.Release() return hdb.getCoordinatorAPI(hdb.dbRead, bidderAddr) } + func (hdb *HistoryDB) getCoordinatorAPI(d meddler.DB, bidderAddr ethCommon.Address) (*CoordinatorAPI, error) { coordinator := &CoordinatorAPI{} err := meddler.QueryRow( @@ -954,6 +955,7 @@ func (hdb *HistoryDB) getCoordinatorAPI(d meddler.DB, bidderAddr ethCommon.Addre return coordinator, tracerr.Wrap(err) } +// GetNodeInfoAPI retusnt he NodeInfo func (hdb *HistoryDB) GetNodeInfoAPI() (*NodeInfo, error) { cancel, err := hdb.apiConnCon.Acquire() defer cancel() @@ -964,9 +966,9 @@ func (hdb *HistoryDB) GetNodeInfoAPI() (*NodeInfo, error) { return hdb.GetNodeInfo() } +// GetBucketUpdatesInternalAPI returns the latest bucket updates func (hdb *HistoryDB) GetBucketUpdatesInternalAPI() ([]BucketUpdateAPI, error) { var bucketUpdates []*BucketUpdateAPI - // var bucketUpdates []*common.BucketUpdate err := meddler.QueryAll( hdb.dbRead, &bucketUpdates, `SELECT num_bucket, withdrawals FROM bucket_update @@ -977,7 +979,7 @@ func (hdb *HistoryDB) GetBucketUpdatesInternalAPI() ([]BucketUpdateAPI, error) { return db.SlicePtrsToSlice(bucketUpdates).([]BucketUpdateAPI), tracerr.Wrap(err) } -// getNextForgers returns next forgers +// GetNextForgersInternalAPI returns next forgers func (hdb *HistoryDB) GetNextForgersInternalAPI(auctionVars *common.AuctionVariables, auctionConsts *common.AuctionConstants, lastBlock common.Block, currentSlot, lastClosedSlot int64) ([]NextForgerAPI, error) { @@ -1071,7 +1073,7 @@ func (hdb *HistoryDB) GetNextForgersInternalAPI(auctionVars *common.AuctionVaria return nextForgers, nil } -// UpdateMetrics update Status.Metrics information +// GetMetricsInternalAPI returns the MetricsAPI func (hdb *HistoryDB) GetMetricsInternalAPI(lastBatchNum common.BatchNum) (*MetricsAPI, error) { var metrics MetricsAPI // Get the first and last batch of the last 24h and their timestamps @@ -1171,6 +1173,7 @@ func (hdb *HistoryDB) GetMetricsInternalAPI(lastBatchNum common.BatchNum) (*Metr return &metrics, nil } +// GetStateAPI returns the StateAPI func (hdb *HistoryDB) GetStateAPI() (*StateAPI, error) { cancel, err := hdb.apiConnCon.Acquire() defer cancel() diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 332f96a..47a4945 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -1171,7 +1171,7 @@ func (hdb *HistoryDB) GetTokensTest() ([]TokenWithUSD, error) { return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), nil } -// UpdateRecommendedFee update Status.RecommendedFee information +// GetRecommendedFee returns the RecommendedFee information func (hdb *HistoryDB) GetRecommendedFee(minFeeUSD float64) (*common.RecommendedFee, error) { var recommendedFee common.RecommendedFee // Get total txs and the batch of the first selected tx of the last hour diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index 7f64405..e01bf77 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -1172,15 +1172,6 @@ func TestGetMetricsAPI(t *testing.T) { assert.NoError(t, err) } - // clientSetupExample := test.NewClientSetupExample() - // apiStateUpdater := NewAPIStateUpdater(historyDB, &NodeConfig{1000, 0.5}, - // &Constants{ - // RollupConstants: *clientSetupExample.RollupConstants, - // AuctionConstants: *clientSetupExample.AuctionConstants, - // WDelayerConstants: *clientSetupExample.WDelayerConstants, - // ChainID: uint16(clientSetupExample.ChainID.Int64()), - // HermezAddress: clientSetupExample.AuctionConstants.HermezRollup, - // }) res, err := historyDB.GetMetricsInternalAPI(common.BatchNum(numBatches)) assert.NoError(t, err) @@ -1467,7 +1458,7 @@ func setTestBlocks(from, to int64) []common.Block { func TestNodeInfo(t *testing.T) { test.WipeDB(historyDB.DB()) - err := historyDB.SetAPIState(&StateAPI{}) + err := historyDB.SetStateInternalAPI(&StateAPI{}) require.NoError(t, err) clientSetup := test.NewClientSetupExample() @@ -1503,7 +1494,7 @@ func TestNodeInfo(t *testing.T) { ExistingAccount: 0.15, }, } - err = historyDB.SetAPIState(stateAPI) + err = historyDB.SetStateInternalAPI(stateAPI) require.NoError(t, err) nodeConfig := &NodeConfig{ @@ -1521,7 +1512,7 @@ func TestNodeInfo(t *testing.T) { require.NoError(t, err) assert.Equal(t, nodeConfig, dbNodeConfig) - dbStateAPI, err := historyDB.GetStateAPI() + dbStateAPI, err := historyDB.getStateAPI(historyDB.dbRead) require.NoError(t, err) assert.Equal(t, stateAPI, dbStateAPI) } diff --git a/db/historydb/nodeinfo.go b/db/historydb/nodeinfo.go index 7d26eeb..1d58af3 100644 --- a/db/historydb/nodeinfo.go +++ b/db/historydb/nodeinfo.go @@ -14,6 +14,7 @@ const ( createAccountInternalExtraFeePercentage float64 = 2.5 ) +// Period represents a time period in ethereum type Period struct { SlotNum int64 `json:"slotNum"` FromBlock int64 `json:"fromBlock"` @@ -22,11 +23,13 @@ type Period struct { ToTimestamp time.Time `json:"toTimestamp"` } +// NextForgerAPI represents the next forger exposed via the API type NextForgerAPI struct { Coordinator CoordinatorAPI `json:"coordinator"` Period Period `json:"period"` } +// NetworkAPI is the network state exposed via the API type NetworkAPI struct { LastEthBlock int64 `json:"lastEthereumBlock"` LastSyncBlock int64 `json:"lastSynchedBlock"` @@ -41,6 +44,7 @@ type NodePublicConfig struct { ForgeDelay float64 `json:"forgeDelay"` } +// StateAPI is an object representing the node and network state exposed via the API type StateAPI struct { // NodePublicConfig is the configuration of the node that is exposed via API NodePublicConfig NodePublicConfig `json:"nodeConfig"` @@ -52,27 +56,28 @@ type StateAPI struct { RecommendedFee common.RecommendedFee `json:"recommendedFee"` } +// Constants contains network constants type Constants struct { - // RollupConstants common.RollupConstants - // AuctionConstants common.AuctionConstants - // WDelayerConstants common.WDelayerConstants common.SCConsts ChainID uint16 HermezAddress ethCommon.Address } +// NodeConfig contains the node config exposed in the API type NodeConfig struct { MaxPoolTxs uint32 `meddler:"max_pool_txs"` MinFeeUSD float64 `meddler:"min_fee"` } +// NodeInfo contains information about he node used when serving the API type NodeInfo struct { ItemID int `meddler:"item_id,pk"` - APIState *StateAPI `meddler:"state,json"` + StateAPI *StateAPI `meddler:"state,json"` NodeConfig *NodeConfig `meddler:"config,json"` Constants *Constants `meddler:"constants,json"` } +// GetNodeInfo returns the NodeInfo func (hdb *HistoryDB) GetNodeInfo() (*NodeInfo, error) { ni := &NodeInfo{} err := meddler.QueryRow( @@ -81,6 +86,7 @@ func (hdb *HistoryDB) GetNodeInfo() (*NodeInfo, error) { return ni, tracerr.Wrap(err) } +// GetConstants returns the Constats func (hdb *HistoryDB) GetConstants() (*Constants, error) { var nodeInfo NodeInfo err := meddler.QueryRow( @@ -90,6 +96,7 @@ func (hdb *HistoryDB) GetConstants() (*Constants, error) { return nodeInfo.Constants, tracerr.Wrap(err) } +// SetConstants sets the Constants func (hdb *HistoryDB) SetConstants(constants *Constants) error { _constants := struct { Constants *Constants `meddler:"constants,json"` @@ -105,6 +112,7 @@ func (hdb *HistoryDB) SetConstants(constants *Constants) error { return tracerr.Wrap(err) } +// GetStateInternalAPI returns the StateAPI func (hdb *HistoryDB) GetStateInternalAPI() (*StateAPI, error) { return hdb.getStateAPI(hdb.dbRead) } @@ -115,14 +123,15 @@ func (hdb *HistoryDB) getStateAPI(d meddler.DB) (*StateAPI, error) { d, &nodeInfo, "SELECT state FROM node_info WHERE item_id = 1;", ) - return nodeInfo.APIState, tracerr.Wrap(err) + return nodeInfo.StateAPI, tracerr.Wrap(err) } -func (hdb *HistoryDB) SetAPIState(apiState *StateAPI) error { - _apiState := struct { - APIState *StateAPI `meddler:"state,json"` - }{apiState} - values, err := meddler.Default.Values(&_apiState, false) +// SetStateInternalAPI sets the StateAPI +func (hdb *HistoryDB) SetStateInternalAPI(stateAPI *StateAPI) error { + _stateAPI := struct { + StateAPI *StateAPI `meddler:"state,json"` + }{stateAPI} + values, err := meddler.Default.Values(&_stateAPI, false) if err != nil { return tracerr.Wrap(err) } @@ -133,6 +142,7 @@ func (hdb *HistoryDB) SetAPIState(apiState *StateAPI) error { return tracerr.Wrap(err) } +// GetNodeConfig returns the NodeConfig func (hdb *HistoryDB) GetNodeConfig() (*NodeConfig, error) { var nodeInfo NodeInfo err := meddler.QueryRow( @@ -142,6 +152,7 @@ func (hdb *HistoryDB) GetNodeConfig() (*NodeConfig, error) { return nodeInfo.NodeConfig, tracerr.Wrap(err) } +// SetNodeConfig sets the NodeConfig func (hdb *HistoryDB) SetNodeConfig(nodeConfig *NodeConfig) error { _nodeConfig := struct { NodeConfig *NodeConfig `meddler:"config,json"` @@ -151,65 +162,8 @@ func (hdb *HistoryDB) SetNodeConfig(nodeConfig *NodeConfig) error { return tracerr.Wrap(err) } _, err = hdb.dbWrite.Exec( - "UPDATE config SET state = $1 WHERE item_id = 1;", + "UPDATE node_info SET config = $1 WHERE item_id = 1;", values[0], ) return tracerr.Wrap(err) } - -// func (hdb *HistoryDB) SetInitialNodeInfo(maxPoolTxs uint32, minFeeUSD float64, constants *Constants) error { -// ni := &NodeInfo{ -// MaxPoolTxs: &maxPoolTxs, -// MinFeeUSD: &minFeeUSD, -// Constants: constants, -// } -// return tracerr.Wrap(meddler.Insert(hdb.dbWrite, "node_info", ni)) -// } - -// apiSlotToBigInts converts from [6]*apitypes.BigIntStr to [6]*big.Int -// func apiSlotToBigInts(defaultSlotSetBid [6]*apitypes.BigIntStr) ([6]*big.Int, error) { -// var slots [6]*big.Int -// -// for i, slot := range defaultSlotSetBid { -// bigInt, ok := new(big.Int).SetString(string(*slot), 10) -// if !ok { -// return slots, tracerr.Wrap(fmt.Errorf("can't convert %T into big.Int", slot)) -// } -// slots[i] = bigInt -// } -// -// return slots, nil -// } - -// func (hdb *HistoryDB) updateNodeInfo(setUpdatedNodeInfo func(*sqlx.Tx, *NodeInfo) error) error { -// // Create a SQL transaction or read and update atomicaly -// txn, err := hdb.dbWrite.Beginx() -// if err != nil { -// return tracerr.Wrap(err) -// } -// defer func() { -// if err != nil { -// db.Rollback(txn) -// } -// }() -// // Read current node info -// ni := &NodeInfo{} -// if err := meddler.QueryRow( -// txn, ni, "SELECT * FROM node_info;", -// ); err != nil { -// return tracerr.Wrap(err) -// } -// // Update NodeInfo struct -// if err := setUpdatedNodeInfo(txn, ni); err != nil { -// return tracerr.Wrap(err) -// } -// // Update NodeInfo at DB -// if _, err := txn.Exec("DELETE FROM node_info;"); err != nil { -// return tracerr.Wrap(err) -// } -// if err := meddler.Insert(txn, "node_info", ni); err != nil { -// return tracerr.Wrap(err) -// } -// // Commit NodeInfo update -// return tracerr.Wrap(txn.Commit()) -// } diff --git a/node/node.go b/node/node.go index 68681c5..ce0291b 100644 --- a/node/node.go +++ b/node/node.go @@ -54,7 +54,7 @@ const ( // Node is the Hermez Node type Node struct { nodeAPI *NodeAPI - apiStateUpdater *api.APIStateUpdater + stateAPIUpdater *api.StateAPIUpdater debugAPI *debugapi.DebugAPI priceUpdater *priceupdater.PriceUpdater // Coordinator @@ -257,7 +257,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { return nil, tracerr.Wrap(err) } - apiStateUpdater := api.NewAPIStateUpdater(historyDB, &hdbNodeCfg, initSCVars, &hdbConsts) + stateAPIUpdater := api.NewStateAPIUpdater(historyDB, &hdbNodeCfg, initSCVars, &hdbConsts) var coord *coordinator.Coordinator var l2DB *l2db.L2DB @@ -437,7 +437,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { } ctx, cancel := context.WithCancel(context.Background()) return &Node{ - apiStateUpdater: apiStateUpdater, + stateAPIUpdater: stateAPIUpdater, nodeAPI: nodeAPI, debugAPI: debugAPI, priceUpdater: priceUpdater, @@ -456,11 +456,14 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { // APIServer is a server that only runs the API type APIServer struct { nodeAPI *NodeAPI + mode Mode + ctx context.Context + wg sync.WaitGroup + cancel context.CancelFunc } +// NewAPIServer creates a new APIServer func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) { - // NOTE: I just copied some parts of NewNode related to starting the - // API, but it still cotains many parameters that are not available meddler.Debug = cfg.Debug.MeddlerLogs // Stablish DB connection dbWrite, err := dbUtils.InitSQLDB( @@ -492,13 +495,10 @@ func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) { return nil, tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err)) } } - var apiConnCon *dbUtils.APIConnectionController - if cfg.API.Explorer || mode == ModeCoordinator { - apiConnCon = dbUtils.NewAPICnnectionController( - cfg.API.MaxSQLConnections, - cfg.API.SQLConnectionTimeout.Duration, - ) - } + apiConnCon := dbUtils.NewAPICnnectionController( + cfg.API.MaxSQLConnections, + cfg.API.SQLConnectionTimeout.Duration, + ) historyDB := historydb.NewHistoryDB(dbRead, dbWrite, apiConnCon) @@ -506,47 +506,67 @@ func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) { if mode == ModeCoordinator { l2DB = l2db.NewL2DB( dbRead, dbWrite, - cfg.Coordinator.L2DB.SafetyPeriod, + 0, cfg.Coordinator.L2DB.MaxTxs, cfg.Coordinator.L2DB.MinFeeUSD, - cfg.Coordinator.L2DB.TTL.Duration, + 0, apiConnCon, ) } - var nodeAPI *NodeAPI - if cfg.API.Address != "" { - if cfg.Debug.GinDebugMode { - gin.SetMode(gin.DebugMode) - } else { - gin.SetMode(gin.ReleaseMode) - } - if cfg.API.UpdateMetricsInterval.Duration == 0 { - return nil, tracerr.Wrap(fmt.Errorf("invalid cfg.API.UpdateMetricsInterval: %v", - cfg.API.UpdateMetricsInterval.Duration)) - } - if cfg.API.UpdateRecommendedFeeInterval.Duration == 0 { - return nil, tracerr.Wrap(fmt.Errorf("invalid cfg.API.UpdateRecommendedFeeInterval: %v", - cfg.API.UpdateRecommendedFeeInterval.Duration)) - } - server := gin.Default() - coord := false - if mode == ModeCoordinator { - coord = cfg.Coordinator.API.Coordinator - } - var err error - nodeAPI, err = NewNodeAPI( - cfg.API.Address, - coord, cfg.API.Explorer, - server, - historyDB, - l2DB, - ) - if err != nil { - return nil, tracerr.Wrap(err) - } + if cfg.Debug.GinDebugMode { + gin.SetMode(gin.DebugMode) + } else { + gin.SetMode(gin.ReleaseMode) } - // ETC... + server := gin.Default() + coord := false + if mode == ModeCoordinator { + coord = cfg.Coordinator.API.Coordinator + } + nodeAPI, err := NewNodeAPI( + cfg.API.Address, + coord, cfg.API.Explorer, + server, + historyDB, + l2DB, + ) + if err != nil { + return nil, tracerr.Wrap(err) + } + ctx, cancel := context.WithCancel(context.Background()) + return &APIServer{ + nodeAPI: nodeAPI, + mode: mode, + ctx: ctx, + cancel: cancel, + }, nil +} + +// Start the APIServer +func (s *APIServer) Start() { + log.Infow("Starting api server...", "mode", s.mode) + log.Info("Starting NodeAPI...") + s.wg.Add(1) + go func() { + defer func() { + log.Info("NodeAPI routine stopped") + s.wg.Done() + }() + if err := s.nodeAPI.Run(s.ctx); err != nil { + if s.ctx.Err() != nil { + return + } + log.Fatalw("NodeAPI.Run", "err", err) + } + }() +} + +// Stop the APIServer +func (s *APIServer) Stop() { + log.Infow("Stopping NodeAPI...") + s.cancel() + s.wg.Wait() } // NodeAPI holds the node http API @@ -627,13 +647,13 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va if n.mode == ModeCoordinator { n.coord.SendMsg(ctx, coordinator.MsgSyncBlock{ Stats: *stats, - Vars: vars, + Vars: *vars, Batches: batches, }) } - n.apiStateUpdater.SetSCVars(vars) + n.stateAPIUpdater.SetSCVars(vars) if stats.Synced() { - if err := n.apiStateUpdater.UpdateNetworkInfo( + if err := n.stateAPIUpdater.UpdateNetworkInfo( stats.Eth.LastBlock, stats.Sync.LastBlock, common.BatchNum(stats.Eth.LastBatchNum), stats.Sync.Auction.CurrentSlot.SlotNum, @@ -641,11 +661,11 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va log.Errorw("ApiStateUpdater.UpdateNetworkInfo", "err", err) } } else { - n.apiStateUpdater.UpdateNetworkInfoBlock( + n.stateAPIUpdater.UpdateNetworkInfoBlock( stats.Eth.LastBlock, stats.Sync.LastBlock, ) } - if err := n.apiStateUpdater.Store(); err != nil { + if err := n.stateAPIUpdater.Store(); err != nil { return tracerr.Wrap(err) } return nil @@ -656,14 +676,14 @@ func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats, if n.mode == ModeCoordinator { n.coord.SendMsg(ctx, coordinator.MsgSyncReorg{ Stats: *stats, - Vars: vars, + Vars: *vars.AsPtr(), }) } - n.apiStateUpdater.SetSCVars(vars.AsPtr()) - n.apiStateUpdater.UpdateNetworkInfoBlock( + n.stateAPIUpdater.SetSCVars(vars.AsPtr()) + n.stateAPIUpdater.UpdateNetworkInfoBlock( stats.Eth.LastBlock, stats.Sync.LastBlock, ) - if err := n.apiStateUpdater.Store(); err != nil { + if err := n.stateAPIUpdater.Store(); err != nil { return tracerr.Wrap(err) } return nil @@ -713,7 +733,9 @@ func (n *Node) StartSynchronizer() { // the last synced one) is synchronized stats := n.sync.Stats() vars := n.sync.SCVars() - n.handleNewBlock(n.ctx, stats, vars.AsPtr(), []common.BatchData{}) + if err := n.handleNewBlock(n.ctx, stats, vars.AsPtr(), []common.BatchData{}); err != nil { + log.Fatalw("Node.handleNewBlock", "err", err) + } n.wg.Add(1) go func() { @@ -800,24 +822,24 @@ func (n *Node) StartNodeAPI() { n.wg.Add(1) go func() { // Do an initial update on startup - if err := n.apiStateUpdater.UpdateMetrics(); err != nil { + if err := n.stateAPIUpdater.UpdateMetrics(); err != nil { log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err) } - if err := n.apiStateUpdater.Store(); err != nil { + if err := n.stateAPIUpdater.Store(); err != nil { log.Errorw("ApiStateUpdater.Store", "err", err) } for { select { case <-n.ctx.Done(): - log.Info("API.UpdateMetrics loop done") + log.Info("ApiStateUpdater.UpdateMetrics loop done") n.wg.Done() return case <-time.After(n.cfg.API.UpdateMetricsInterval.Duration): - if err := n.apiStateUpdater.UpdateMetrics(); err != nil { + if err := n.stateAPIUpdater.UpdateMetrics(); err != nil { log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err) continue } - if err := n.apiStateUpdater.Store(); err != nil { + if err := n.stateAPIUpdater.Store(); err != nil { log.Errorw("ApiStateUpdater.Store", "err", err) } } @@ -827,18 +849,25 @@ func (n *Node) StartNodeAPI() { n.wg.Add(1) go func() { // Do an initial update on startup - if err := n.historyDB.UpdateRecommendedFee(); err != nil { - log.Errorw("API.UpdateRecommendedFee", "err", err) + if err := n.stateAPIUpdater.UpdateRecommendedFee(); err != nil { + log.Errorw("ApiStateUpdater.UpdateRecommendedFee", "err", err) + } + if err := n.stateAPIUpdater.Store(); err != nil { + log.Errorw("ApiStateUpdater.Store", "err", err) } for { select { case <-n.ctx.Done(): - log.Info("API.UpdateRecommendedFee loop done") + log.Info("ApiStateUpdaterAPI.UpdateRecommendedFee loop done") n.wg.Done() return case <-time.After(n.cfg.API.UpdateRecommendedFeeInterval.Duration): - if err := n.historyDB.UpdateRecommendedFee(); err != nil { - log.Errorw("API.UpdateRecommendedFee", "err", err) + if err := n.stateAPIUpdater.UpdateRecommendedFee(); err != nil { + log.Errorw("ApiStateUpdaterAPI.UpdateRecommendedFee", "err", err) + continue + } + if err := n.stateAPIUpdater.Store(); err != nil { + log.Errorw("ApiStateUpdater.Store", "err", err) } } } diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index ed8ce7b..c56fefa 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -372,9 +372,9 @@ func TestSyncGeneral(t *testing.T) { assert.Equal(t, int64(1), stats.Eth.LastBlock.Num) assert.Equal(t, int64(1), stats.Sync.LastBlock.Num) vars := s.SCVars() - assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) - assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) - assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) + assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer) dbBlocks, err := s.historyDB.GetAllBlocks() require.NoError(t, err) @@ -533,9 +533,9 @@ func TestSyncGeneral(t *testing.T) { assert.Equal(t, int64(4), stats.Eth.LastBlock.Num) assert.Equal(t, int64(4), stats.Sync.LastBlock.Num) vars = s.SCVars() - assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) - assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) - assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) + assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer) dbExits, err := s.historyDB.GetAllExits() require.NoError(t, err) @@ -665,9 +665,9 @@ func TestSyncGeneral(t *testing.T) { assert.Equal(t, false, stats.Synced()) assert.Equal(t, int64(6), stats.Eth.LastBlock.Num) vars = s.SCVars() - assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) - assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) - assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) + assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer) // At this point, the DB only has data up to block 1 dbBlock, err := s.historyDB.GetLastBlock() @@ -704,9 +704,9 @@ func TestSyncGeneral(t *testing.T) { } vars = s.SCVars() - assert.Equal(t, clientSetup.RollupVariables, vars.Rollup) - assert.Equal(t, clientSetup.AuctionVariables, vars.Auction) - assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer) + assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup) + assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction) + assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer) } dbBlock, err = s.historyDB.GetLastBlock()