Browse Source

WIP

feature/serveapicli1
Eduard S 3 years ago
parent
commit
b330889570
19 changed files with 431 additions and 365 deletions
  1. +6
    -6
      api/api.go
  2. +26
    -7
      api/api_test.go
  3. +7
    -7
      api/slots_test.go
  4. +53
    -15
      api/state.go
  5. +53
    -43
      api/state_test.go
  6. +63
    -43
      cli/node/main.go
  7. +2
    -0
      common/eth.go
  8. +58
    -54
      config/config.go
  9. +6
    -6
      coordinator/coordinator.go
  10. +3
    -3
      coordinator/coordinator_test.go
  11. +8
    -8
      coordinator/pipeline.go
  12. +1
    -5
      coordinator/pipeline_test.go
  13. +5
    -5
      coordinator/txmanager.go
  14. +7
    -4
      db/historydb/apiqueries.go
  15. +1
    -1
      db/historydb/historydb.go
  16. +3
    -12
      db/historydb/historydb_test.go
  17. +22
    -68
      db/historydb/nodeinfo.go
  18. +95
    -66
      node/node.go
  19. +12
    -12
      synchronizer/synchronizer_test.go

+ 6
- 6
api/api.go

@ -34,20 +34,20 @@ func NewAPI(
if explorerEndpoints && hdb == nil {
return nil, tracerr.Wrap(errors.New("cannot serve Explorer endpoints without HistoryDB"))
}
ni, err := hdb.GetNodeInfo()
consts, err := hdb.GetConstants()
if err != nil {
return nil, err
}
a := &API{
h: hdb,
cg: &configAPI{
RollupConstants: *newRollupConstants(ni.Constants.RollupConstants),
AuctionConstants: ni.Constants.AuctionConstants,
WDelayerConstants: ni.Constants.WDelayerConstants,
RollupConstants: *newRollupConstants(consts.Rollup),
AuctionConstants: consts.Auction,
WDelayerConstants: consts.WDelayer,
},
l2: l2db,
chainID: ni.Constants.ChainID,
hermezAddress: ni.Constants.HermezAddress,
chainID: consts.ChainID,
hermezAddress: consts.HermezAddress,
}
// Add coordinator endpoints

+ 26
- 7
api/api_test.go

@ -186,6 +186,7 @@ type testCommon struct {
var tc testCommon
var config configAPI
var api *API
var stateAPIUpdater *StateAPIUpdater
// TestMain initializes the API server, and fill HistoryDB and StateDB with fake data,
// emulating the task of the synchronizer in order to have data to be returned
@ -222,15 +223,27 @@ func TestMain(m *testing.M) {
apiGin := gin.Default()
// Reset DB
test.WipeDB(hdb.DB())
if err := hdb.SetInitialNodeInfo(10, 0.0, &historydb.Constants{
RollupConstants: _config.RollupConstants,
AuctionConstants: _config.AuctionConstants,
WDelayerConstants: _config.WDelayerConstants,
ChainID: chainID,
HermezAddress: _config.HermezAddress,
}); err != nil {
constants := &historydb.Constants{
SCConsts: common.SCConsts{
Rollup: _config.RollupConstants,
Auction: _config.AuctionConstants,
WDelayer: _config.WDelayerConstants,
},
ChainID: chainID,
HermezAddress: _config.HermezAddress,
}
if err := hdb.SetConstants(constants); err != nil {
panic(err)
}
nodeConfig := &historydb.NodeConfig{
MaxPoolTxs: 10,
MinFeeUSD: 0,
}
if err := hdb.SetNodeConfig(nodeConfig); err != nil {
panic(err)
}
api, err = NewAPI(
true,
true,
@ -507,6 +520,12 @@ func TestMain(m *testing.M) {
WithdrawalDelay: uint64(3000),
}
stateAPIUpdater = NewStateAPIUpdater(hdb, nodeConfig, &common.SCVariables{
Rollup: rollupVars,
Auction: auctionVars,
WDelayer: wdelayerVars,
}, constants)
// Generate test data, as expected to be received/sended from/to the API
testCoords := genTestCoordinators(commonCoords)
testBids := genTestBids(commonBlocks, testCoords, bids)

+ 7
- 7
api/slots_test.go

@ -99,14 +99,14 @@ func TestGetSlot(t *testing.T) {
nil, &fetchedSlot,
),
)
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
emptySlot := api.getEmptyTestSlot(slotNum, ni.APIState.Network.LastSyncBlock, tc.auctionVars)
// ni, err := api.h.GetNodeInfoAPI()
// assert.NoError(t, err)
emptySlot := api.getEmptyTestSlot(slotNum, 0, tc.auctionVars)
assertSlot(t, emptySlot, fetchedSlot)
// Invalid slotNum
path := endpoint + strconv.Itoa(-2)
err = doBadReq("GET", path, nil, 400)
err := doBadReq("GET", path, nil, 400)
assert.NoError(t, err)
}
@ -129,10 +129,10 @@ func TestGetSlots(t *testing.T) {
err := doGoodReqPaginated(path, historydb.OrderAsc, &testSlotsResponse{}, appendIter)
assert.NoError(t, err)
allSlots := tc.slots
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
// ni, err := api.h.GetNodeInfoAPI()
// assert.NoError(t, err)
for i := tc.slots[len(tc.slots)-1].SlotNum; i < maxSlotNum; i++ {
emptySlot := api.getEmptyTestSlot(i+1, ni.APIState.Network.LastSyncBlock, tc.auctionVars)
emptySlot := api.getEmptyTestSlot(i+1, 0, tc.auctionVars)
allSlots = append(allSlots, emptySlot)
}
assertSlots(t, allSlots, fetchedSlots)

+ 53
- 15
api/state.go

@ -3,6 +3,7 @@ package api
import (
"database/sql"
"net/http"
"sync"
"github.com/gin-gonic/gin"
"github.com/hermeznetwork/hermez-node/common"
@ -19,30 +20,39 @@ func (a *API) getState(c *gin.Context) {
c.JSON(http.StatusOK, stateAPI)
}
type APIStateUpdater struct {
// StateAPIUpdater is an utility object to facilitate updating the StateAPI
type StateAPIUpdater struct {
hdb *historydb.HistoryDB
state historydb.StateAPI
config historydb.NodeConfig
vars common.SCVariablesPtr
consts historydb.Constants
rw sync.RWMutex
}
func NewAPIStateUpdater(hdb *historydb.HistoryDB, config *historydb.NodeConfig, vars *common.SCVariables,
consts *historydb.Constants) *APIStateUpdater {
u := APIStateUpdater{
// NewStateAPIUpdater creates a new StateAPIUpdater
func NewStateAPIUpdater(hdb *historydb.HistoryDB, config *historydb.NodeConfig, vars *common.SCVariables,
consts *historydb.Constants) *StateAPIUpdater {
u := StateAPIUpdater{
hdb: hdb,
config: *config,
consts: *consts,
}
u.SetSCVars(&common.SCVariablesPtr{&vars.Rollup, &vars.Auction, &vars.WDelayer})
u.SetSCVars(vars.AsPtr())
return &u
}
func (u *APIStateUpdater) Store() error {
return tracerr.Wrap(u.hdb.SetAPIState(&u.state))
// Store the State in the HistoryDB
func (u *StateAPIUpdater) Store() error {
u.rw.RLock()
defer u.rw.RUnlock()
return tracerr.Wrap(u.hdb.SetStateInternalAPI(&u.state))
}
func (u *APIStateUpdater) SetSCVars(vars *common.SCVariablesPtr) {
// SetSCVars sets the smart contract vars (ony updates those that are not nil)
func (u *StateAPIUpdater) SetSCVars(vars *common.SCVariablesPtr) {
u.rw.Lock()
defer u.rw.Unlock()
if vars.Rollup != nil {
u.vars.Rollup = vars.Rollup
rollupVars := historydb.NewRollupVariablesAPI(u.vars.Rollup)
@ -59,25 +69,47 @@ func (u *APIStateUpdater) SetSCVars(vars *common.SCVariablesPtr) {
}
}
func (u *APIStateUpdater) UpdateMetrics() error {
if u.state.Network.LastBatch == nil {
// UpdateRecommendedFee update Status.RecommendedFee information
func (u *StateAPIUpdater) UpdateRecommendedFee() error {
recommendedFee, err := u.hdb.GetRecommendedFee(u.config.MinFeeUSD)
if err != nil {
return tracerr.Wrap(err)
}
u.rw.Lock()
u.state.RecommendedFee = *recommendedFee
u.rw.Unlock()
return nil
}
// UpdateMetrics update Status.Metrics information
func (u *StateAPIUpdater) UpdateMetrics() error {
u.rw.RLock()
lastBatch := u.state.Network.LastBatch
u.rw.RUnlock()
if lastBatch == nil {
return nil
}
lastBatchNum := u.state.Network.LastBatch.BatchNum
lastBatchNum := lastBatch.BatchNum
metrics, err := u.hdb.GetMetricsInternalAPI(lastBatchNum)
if err != nil {
return tracerr.Wrap(err)
}
u.rw.Lock()
u.state.Metrics = *metrics
u.rw.Unlock()
return nil
}
func (u *APIStateUpdater) UpdateNetworkInfoBlock(lastEthBlock, lastSyncBlock common.Block) {
// UpdateNetworkInfoBlock update Status.Network block related information
func (u *StateAPIUpdater) UpdateNetworkInfoBlock(lastEthBlock, lastSyncBlock common.Block) {
u.rw.Lock()
u.state.Network.LastSyncBlock = lastSyncBlock.Num
u.state.Network.LastEthBlock = lastEthBlock.Num
u.rw.Unlock()
}
func (u *APIStateUpdater) UpdateNetworkInfo(
// UpdateNetworkInfo update Status.Network information
func (u *StateAPIUpdater) UpdateNetworkInfo(
lastEthBlock, lastSyncBlock common.Block,
lastBatchNum common.BatchNum, currentSlot int64,
) error {
@ -88,9 +120,12 @@ func (u *APIStateUpdater) UpdateNetworkInfo(
} else if err != nil {
return tracerr.Wrap(err)
}
u.rw.RLock()
auctionVars := u.vars.Auction
u.rw.RUnlock()
// Get next forgers
lastClosedSlot := currentSlot + int64(u.state.Auction.ClosedAuctionSlots)
nextForgers, err := u.hdb.GetNextForgersInternalAPI(u.vars.Auction, &u.consts.Auction,
lastClosedSlot := currentSlot + int64(auctionVars.ClosedAuctionSlots)
nextForgers, err := u.hdb.GetNextForgersInternalAPI(auctionVars, &u.consts.Auction,
lastSyncBlock, currentSlot, lastClosedSlot)
if tracerr.Unwrap(err) == sql.ErrNoRows {
nextForgers = nil
@ -104,6 +139,8 @@ func (u *APIStateUpdater) UpdateNetworkInfo(
} else if err != nil {
return tracerr.Wrap(err)
}
u.rw.Lock()
// Update NodeInfo struct
for i, bucketParams := range u.state.Rollup.Buckets {
for _, bucketUpdate := range bucketUpdates {
@ -119,5 +156,6 @@ func (u *APIStateUpdater) UpdateNetworkInfo(
u.state.Network.LastBatch = lastBatch
u.state.Network.CurrentSlot = currentSlot
u.state.Network.NextForgers = nextForgers
u.rw.Unlock()
return nil
}

+ 53
- 43
api/state_test.go

@ -29,10 +29,11 @@ type testNetwork struct {
}
func TestSetRollupVariables(t *testing.T) {
api.h.SetRollupVariables(&tc.rollupVars)
stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{Rollup: &tc.rollupVars})
require.NoError(t, stateAPIUpdater.Store())
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
assertEqualRollupVariables(t, tc.rollupVars, ni.APIState.Rollup, true)
require.NoError(t, err)
assertEqualRollupVariables(t, tc.rollupVars, ni.StateAPI.Rollup, true)
}
func assertEqualRollupVariables(t *testing.T, rollupVariables common.RollupVariables, apiVariables historydb.RollupVariablesAPI, checkBuckets bool) {
@ -51,17 +52,19 @@ func assertEqualRollupVariables(t *testing.T, rollupVariables common.RollupVaria
}
func TestSetWDelayerVariables(t *testing.T) {
api.h.SetWDelayerVariables(&tc.wdelayerVars)
stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{WDelayer: &tc.wdelayerVars})
require.NoError(t, stateAPIUpdater.Store())
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
assert.Equal(t, tc.wdelayerVars, ni.APIState.WithdrawalDelayer)
require.NoError(t, err)
assert.Equal(t, tc.wdelayerVars, ni.StateAPI.WithdrawalDelayer)
}
func TestSetAuctionVariables(t *testing.T) {
api.h.SetAuctionVariables(&tc.auctionVars)
stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{Auction: &tc.auctionVars})
require.NoError(t, stateAPIUpdater.Store())
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
assertEqualAuctionVariables(t, tc.auctionVars, ni.APIState.Auction)
require.NoError(t, err)
assertEqualAuctionVariables(t, tc.auctionVars, ni.StateAPI.Auction)
}
func assertEqualAuctionVariables(t *testing.T, auctionVariables common.AuctionVariables, apiVariables historydb.AuctionVariablesAPI) {
@ -113,16 +116,18 @@ func TestUpdateNetworkInfo(t *testing.T) {
err := api.h.AddBucketUpdatesTest(api.h.DB(), bucketUpdates)
require.NoError(t, err)
err = api.h.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
assert.NoError(t, err)
// stateAPIUpdater := NewStateAPIUpdater(hdb)
err = stateAPIUpdater.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
require.NoError(t, err)
require.NoError(t, stateAPIUpdater.Store())
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
assert.Equal(t, lastBlock.Num, ni.APIState.Network.LastSyncBlock)
assert.Equal(t, lastBatchNum, ni.APIState.Network.LastBatch.BatchNum)
assert.Equal(t, currentSlotNum, ni.APIState.Network.CurrentSlot)
assert.Equal(t, int(ni.APIState.Auction.ClosedAuctionSlots)+1, len(ni.APIState.Network.NextForgers))
assert.Equal(t, ni.APIState.Rollup.Buckets[0].Withdrawals, apitypes.NewBigIntStr(big.NewInt(123)))
assert.Equal(t, ni.APIState.Rollup.Buckets[2].Withdrawals, apitypes.NewBigIntStr(big.NewInt(43)))
require.NoError(t, err)
assert.Equal(t, lastBlock.Num, ni.StateAPI.Network.LastSyncBlock)
assert.Equal(t, lastBatchNum, ni.StateAPI.Network.LastBatch.BatchNum)
assert.Equal(t, currentSlotNum, ni.StateAPI.Network.CurrentSlot)
assert.Equal(t, int(ni.StateAPI.Auction.ClosedAuctionSlots)+1, len(ni.StateAPI.Network.NextForgers))
assert.Equal(t, ni.StateAPI.Rollup.Buckets[0].Withdrawals, apitypes.NewBigIntStr(big.NewInt(123)))
assert.Equal(t, ni.StateAPI.Rollup.Buckets[2].Withdrawals, apitypes.NewBigIntStr(big.NewInt(43)))
}
func TestUpdateMetrics(t *testing.T) {
@ -130,31 +135,33 @@ func TestUpdateMetrics(t *testing.T) {
lastBlock := tc.blocks[3]
lastBatchNum := common.BatchNum(12)
currentSlotNum := int64(1)
err := api.h.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
assert.NoError(t, err)
err := stateAPIUpdater.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
require.NoError(t, err)
err = api.h.UpdateMetrics()
assert.NoError(t, err)
err = stateAPIUpdater.UpdateMetrics()
require.NoError(t, err)
require.NoError(t, stateAPIUpdater.Store())
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
assert.Greater(t, ni.APIState.Metrics.TransactionsPerBatch, float64(0))
assert.Greater(t, ni.APIState.Metrics.BatchFrequency, float64(0))
assert.Greater(t, ni.APIState.Metrics.TransactionsPerSecond, float64(0))
assert.Greater(t, ni.APIState.Metrics.TotalAccounts, int64(0))
assert.Greater(t, ni.APIState.Metrics.TotalBJJs, int64(0))
assert.Greater(t, ni.APIState.Metrics.AvgTransactionFee, float64(0))
require.NoError(t, err)
assert.Greater(t, ni.StateAPI.Metrics.TransactionsPerBatch, float64(0))
assert.Greater(t, ni.StateAPI.Metrics.BatchFrequency, float64(0))
assert.Greater(t, ni.StateAPI.Metrics.TransactionsPerSecond, float64(0))
assert.Greater(t, ni.StateAPI.Metrics.TotalAccounts, int64(0))
assert.Greater(t, ni.StateAPI.Metrics.TotalBJJs, int64(0))
assert.Greater(t, ni.StateAPI.Metrics.AvgTransactionFee, float64(0))
}
func TestUpdateRecommendedFee(t *testing.T) {
err := api.h.UpdateRecommendedFee()
assert.NoError(t, err)
err := stateAPIUpdater.UpdateRecommendedFee()
require.NoError(t, err)
require.NoError(t, stateAPIUpdater.Store())
var minFeeUSD float64
if api.l2 != nil {
minFeeUSD = api.l2.MinFeeUSD()
}
ni, err := api.h.GetNodeInfoAPI()
assert.NoError(t, err)
assert.Greater(t, ni.APIState.RecommendedFee.ExistingAccount, minFeeUSD)
require.NoError(t, err)
assert.Greater(t, ni.StateAPI.RecommendedFee.ExistingAccount, minFeeUSD)
// assert.Equal(t, ni.StateAPI.RecommendedFee.CreatesAccount,
// ni.StateAPI.RecommendedFee.ExistingAccount*createAccountExtraFeePercentage)
// assert.Equal(t, ni.StateAPI.RecommendedFee.CreatesAccountAndRegister,
@ -165,20 +172,23 @@ func TestGetState(t *testing.T) {
lastBlock := tc.blocks[3]
lastBatchNum := common.BatchNum(12)
currentSlotNum := int64(1)
api.h.SetRollupVariables(&tc.rollupVars)
api.h.SetWDelayerVariables(&tc.wdelayerVars)
api.h.SetAuctionVariables(&tc.auctionVars)
err := api.h.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
assert.NoError(t, err)
err = api.h.UpdateMetrics()
assert.NoError(t, err)
err = api.h.UpdateRecommendedFee()
assert.NoError(t, err)
stateAPIUpdater.SetSCVars(&common.SCVariablesPtr{
Rollup: &tc.rollupVars,
Auction: &tc.auctionVars,
WDelayer: &tc.wdelayerVars,
})
err := stateAPIUpdater.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
require.NoError(t, err)
err = stateAPIUpdater.UpdateMetrics()
require.NoError(t, err)
err = stateAPIUpdater.UpdateRecommendedFee()
require.NoError(t, err)
require.NoError(t, stateAPIUpdater.Store())
endpoint := apiURL + "state"
var status testStatus
assert.NoError(t, doGoodReq("GET", endpoint, nil, &status))
require.NoError(t, doGoodReq("GET", endpoint, nil, &status))
// SC vars
// UpdateNetworkInfo will overwrite buckets withdrawal values

+ 63
- 43
cli/node/main.go

@ -107,17 +107,7 @@ func cmdWipeSQL(c *cli.Context) error {
return nil
}
func cmdRun(c *cli.Context) error {
cfg, err := parseCli(c)
if err != nil {
return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err))
}
node, err := node.NewNode(cfg.mode, cfg.node)
if err != nil {
return tracerr.Wrap(fmt.Errorf("error starting node: %w", err))
}
node.Start()
func waitSigInt() {
stopCh := make(chan interface{})
// catch ^C to send the stop signal
@ -138,48 +128,36 @@ func cmdRun(c *cli.Context) error {
}
}()
<-stopCh
node.Stop()
return nil
}
func cmdServeAPI(c *cli.Context) error {
cfgPath := c.String(flagCfg)
cfg, err := config.LoadAPIServer(cfgPath)
func cmdRun(c *cli.Context) error {
cfg, err := parseCli(c)
if err != nil {
if err := cli.ShowAppHelp(c); err != nil {
panic(err)
}
return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err))
}
node, err := node.NewNode(cfg.mode, cfg.node)
if err != nil {
return tracerr.Wrap(fmt.Errorf("error starting node: %w", err))
}
node.Start()
waitSigInt()
node.Stop()
stopCh := make(chan interface{})
return nil
}
// catch ^C to send the stop signal
ossig := make(chan os.Signal, 1)
signal.Notify(ossig, os.Interrupt)
const forceStopCount = 3
go func() {
n := 0
for sig := range ossig {
if sig == os.Interrupt {
log.Info("Received Interrupt Signal")
stopCh <- nil
n++
if n == forceStopCount {
log.Fatalf("Received %v Interrupt Signals", forceStopCount)
}
}
}
}()
<-stopCh
node.Stop()
func cmdServeAPI(c *cli.Context) error {
cfg, err := parseCliAPIServer(c)
if err != nil {
return tracerr.Wrap(fmt.Errorf("error parsing flags and config: %w", err))
}
srv, err := node.NewAPIServer(cfg.mode, cfg.server)
if err != nil {
return tracerr.Wrap(fmt.Errorf("error starting api server: %w", err))
}
srv.Start()
waitSigInt()
srv.Stop()
return nil
}
@ -270,13 +248,55 @@ func getConfig(c *cli.Context) (*Config, error) {
switch mode {
case modeSync:
cfg.mode = node.ModeSynchronizer
cfg.node, err = config.LoadNode(nodeCfgPath)
cfg.node, err = config.LoadNode(nodeCfgPath, false)
if err != nil {
return nil, tracerr.Wrap(err)
}
case modeCoord:
cfg.mode = node.ModeCoordinator
cfg.node, err = config.LoadNode(nodeCfgPath, true)
if err != nil {
return nil, tracerr.Wrap(err)
}
default:
return nil, tracerr.Wrap(fmt.Errorf("invalid mode \"%v\"", mode))
}
return &cfg, nil
}
// ConfigAPIServer is the configuration of the api server execution
type ConfigAPIServer struct {
mode node.Mode
server *config.APIServer
}
func parseCliAPIServer(c *cli.Context) (*ConfigAPIServer, error) {
cfg, err := getConfigAPIServer(c)
if err != nil {
if err := cli.ShowAppHelp(c); err != nil {
panic(err)
}
return nil, tracerr.Wrap(err)
}
return cfg, nil
}
func getConfigAPIServer(c *cli.Context) (*ConfigAPIServer, error) {
var cfg ConfigAPIServer
mode := c.String(flagMode)
nodeCfgPath := c.String(flagCfg)
var err error
switch mode {
case modeSync:
cfg.mode = node.ModeSynchronizer
cfg.server, err = config.LoadAPIServer(nodeCfgPath, false)
if err != nil {
return nil, tracerr.Wrap(err)
}
case modeCoord:
cfg.mode = node.ModeCoordinator
cfg.node, err = config.LoadCoordinator(nodeCfgPath)
cfg.server, err = config.LoadAPIServer(nodeCfgPath, true)
if err != nil {
return nil, tracerr.Wrap(err)
}

+ 2
- 0
common/eth.go

@ -7,6 +7,8 @@ type SCVariables struct {
WDelayer WDelayerVariables `validate:"required"`
}
// AsPtr returns the SCVariables as a SCVariablesPtr using pointers to the
// original SCVariables
func (v *SCVariables) AsPtr() *SCVariablesPtr {
return &SCVariablesPtr{
Rollup: &v.Rollup,

+ 58
- 54
config/config.go

@ -219,28 +219,9 @@ type Coordinator struct {
}
}
// NodeAPI specifies the configuration parameters of the API
type NodeAPI struct {
// Address where the API will listen if set
Address string
// Explorer enables the Explorer API endpoints
Explorer bool
// UpdateMetricsInterval is the interval between updates of the
// API metrics
UpdateMetricsInterval Duration
// UpdateRecommendedFeeInterval is the interval between updates of the
// recommended fees
UpdateRecommendedFeeInterval Duration
// Maximum concurrent connections allowed between API and SQL
MaxSQLConnections int `validate:"required"`
// SQLConnectionTimeout is the maximum amount of time that an API request
// can wait to stablish a SQL connection
SQLConnectionTimeout Duration
}
// It's possible to use diferentiated SQL connections for read/write.
// If the read configuration is not provided, the write one it's going to be used
// for both reads and writes
// PostgreSQL is the postgreSQL configuration parameters. It's possible to use
// diferentiated SQL connections for read/write. If the read configuration is
// not provided, the write one it's going to be used for both reads and writes
type PostgreSQL struct {
// Port of the PostgreSQL write server
PortWrite int `validate:"required"`
@ -324,31 +305,60 @@ type Node struct {
// TokenHEZ address
TokenHEZName string `validate:"required"`
} `validate:"required"`
API NodeAPI `validate:"required"`
// API specifies the configuration parameters of the API
API struct {
// Address where the API will listen if set
Address string
// Explorer enables the Explorer API endpoints
Explorer bool
// UpdateMetricsInterval is the interval between updates of the
// API metrics
UpdateMetricsInterval Duration
// UpdateRecommendedFeeInterval is the interval between updates of the
// recommended fees
UpdateRecommendedFeeInterval Duration
// Maximum concurrent connections allowed between API and SQL
MaxSQLConnections int `validate:"required"`
// SQLConnectionTimeout is the maximum amount of time that an API request
// can wait to stablish a SQL connection
SQLConnectionTimeout Duration
} `validate:"required"`
Debug NodeDebug `validate:"required"`
Coordinator Coordinator `validate:"-"`
}
// APIServer is the api server configuration parameters
type APIServer struct {
API NodeAPI `validate:"required"`
// NodeAPI specifies the configuration parameters of the API
API struct {
// Address where the API will listen if set
Address string `validate:"required"`
// Explorer enables the Explorer API endpoints
Explorer bool
// Maximum concurrent connections allowed between API and SQL
MaxSQLConnections int `validate:"required"`
// SQLConnectionTimeout is the maximum amount of time that an API request
// can wait to stablish a SQL connection
SQLConnectionTimeout Duration
} `validate:"required"`
PostgreSQL PostgreSQL `validate:"required"`
Coordinator struct {
API struct {
// Coordinator enables the coordinator API endpoints
Coordinator bool
} `validate:"required"`
} `validate:"required"`
L2DB struct {
// MaxTxs is the maximum number of pending L2Txs that can be
// stored in the pool. Once this number of pending L2Txs is
// reached, inserts to the pool will be denied until some of
// the pending txs are forged.
MaxTxs uint32 `validate:"required"`
// MinFeeUSD is the minimum fee in USD that a tx must pay in
// order to be accepted into the pool. Txs with lower than
// minimum fee will be rejected at the API level.
MinFeeUSD float64
} `validate:"required"`
L2DB struct {
// MaxTxs is the maximum number of pending L2Txs that can be
// stored in the pool. Once this number of pending L2Txs is
// reached, inserts to the pool will be denied until some of
// the pending txs are forged.
MaxTxs uint32 `validate:"required"`
// MinFeeUSD is the minimum fee in USD that a tx must pay in
// order to be accepted into the pool. Txs with lower than
// minimum fee will be rejected at the API level.
MinFeeUSD float64
} `validate:"required"`
}
Debug NodeDebug `validate:"required"`
}
@ -365,24 +375,8 @@ func Load(path string, cfg interface{}) error {
return nil
}
// LoadCoordinator loads the Coordinator configuration from path.
func LoadCoordinator(path string) (*Node, error) {
var cfg Node
if err := Load(path, &cfg); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error loading node configuration file: %w", err))
}
validate := validator.New()
if err := validate.Struct(cfg); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err))
}
if err := validate.Struct(cfg.Coordinator); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err))
}
return &cfg, nil
}
// LoadNode loads the Node configuration from path.
func LoadNode(path string) (*Node, error) {
func LoadNode(path string, coordinator bool) (*Node, error) {
var cfg Node
if err := Load(path, &cfg); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error loading node configuration file: %w", err))
@ -391,11 +385,16 @@ func LoadNode(path string) (*Node, error) {
if err := validate.Struct(cfg); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err))
}
if coordinator {
if err := validate.Struct(cfg.Coordinator); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err))
}
}
return &cfg, nil
}
// LoadAPIServer loads the APIServer configuration from path.
func LoadAPIServer(path string) (*APIServer, error) {
func LoadAPIServer(path string, coordinator bool) (*APIServer, error) {
var cfg APIServer
if err := Load(path, &cfg); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error loading apiServer configuration file: %w", err))
@ -404,5 +403,10 @@ func LoadAPIServer(path string) (*APIServer, error) {
if err := validate.Struct(cfg); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err))
}
if coordinator {
if err := validate.Struct(cfg.Coordinator); err != nil {
return nil, tracerr.Wrap(fmt.Errorf("error validating configuration file: %w", err))
}
}
return &cfg, nil
}

+ 6
- 6
coordinator/coordinator.go

@ -144,8 +144,8 @@ type Coordinator struct {
pipelineNum int // Pipeline sequential number. The first pipeline is 1
pipelineFromBatch fromBatch // batch from which we started the pipeline
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
consts common.SCConsts
vars common.SCVariables
stats synchronizer.Stats
started bool
@ -275,13 +275,13 @@ type MsgSyncBlock struct {
Batches []common.BatchData
// Vars contains each Smart Contract variables if they are updated, or
// nil if they haven't changed.
Vars synchronizer.SCVariablesPtr
Vars common.SCVariablesPtr
}
// MsgSyncReorg indicates a reorg
type MsgSyncReorg struct {
Stats synchronizer.Stats
Vars synchronizer.SCVariablesPtr
Vars common.SCVariablesPtr
}
// MsgStopPipeline indicates a signal to reset the pipeline
@ -300,7 +300,7 @@ func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) {
}
}
func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariablesPtr) {
func updateSCVars(vars *common.SCVariables, update common.SCVariablesPtr) {
if update.Rollup != nil {
vars.Rollup = *update.Rollup
}
@ -312,7 +312,7 @@ func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariable
}
}
func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
func (c *Coordinator) syncSCVars(vars common.SCVariablesPtr) {
updateSCVars(&c.vars, vars)
}

+ 3
- 3
coordinator/coordinator_test.go

@ -187,12 +187,12 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
&prover.MockClient{Delay: 400 * time.Millisecond},
}
scConsts := &synchronizer.SCConsts{
scConsts := &common.SCConsts{
Rollup: *ethClientSetup.RollupConstants,
Auction: *ethClientSetup.AuctionConstants,
WDelayer: *ethClientSetup.WDelayerConstants,
}
initSCVars := &synchronizer.SCVariables{
initSCVars := &common.SCVariables{
Rollup: *ethClientSetup.RollupVariables,
Auction: *ethClientSetup.AuctionVariables,
WDelayer: *ethClientSetup.WDelayerVariables,
@ -528,7 +528,7 @@ func TestCoordinatorStress(t *testing.T) {
coord.SendMsg(ctx, MsgSyncBlock{
Stats: *stats,
Batches: blockData.Rollup.Batches,
Vars: synchronizer.SCVariablesPtr{
Vars: common.SCVariablesPtr{
Rollup: blockData.Rollup.Vars,
Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars,

+ 8
- 8
coordinator/pipeline.go

@ -22,7 +22,7 @@ import (
type statsVars struct {
Stats synchronizer.Stats
Vars synchronizer.SCVariablesPtr
Vars common.SCVariablesPtr
}
type state struct {
@ -36,7 +36,7 @@ type state struct {
type Pipeline struct {
num int
cfg Config
consts synchronizer.SCConsts
consts common.SCConsts
// state
state state
@ -57,7 +57,7 @@ type Pipeline struct {
purger *Purger
stats synchronizer.Stats
vars synchronizer.SCVariables
vars common.SCVariables
statsVarsCh chan statsVars
ctx context.Context
@ -90,7 +90,7 @@ func NewPipeline(ctx context.Context,
coord *Coordinator,
txManager *TxManager,
provers []prover.Client,
scConsts *synchronizer.SCConsts,
scConsts *common.SCConsts,
) (*Pipeline, error) {
proversPool := NewProversPool(len(provers))
proversPoolSize := 0
@ -124,7 +124,7 @@ func NewPipeline(ctx context.Context,
}
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *common.SCVariablesPtr) {
select {
case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
case <-ctx.Done():
@ -133,7 +133,7 @@ func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Sta
// reset pipeline state
func (p *Pipeline) reset(batchNum common.BatchNum,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
stats *synchronizer.Stats, vars *common.SCVariables) error {
p.state = state{
batchNum: batchNum,
lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum,
@ -194,7 +194,7 @@ func (p *Pipeline) reset(batchNum common.BatchNum,
return nil
}
func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
func (p *Pipeline) syncSCVars(vars common.SCVariablesPtr) {
updateSCVars(&p.vars, vars)
}
@ -255,7 +255,7 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context,
// Start the forging pipeline
func (p *Pipeline) Start(batchNum common.BatchNum,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
stats *synchronizer.Stats, vars *common.SCVariables) error {
if p.started {
log.Fatal("Pipeline already started")
}

+ 1
- 5
coordinator/pipeline_test.go

@ -206,11 +206,7 @@ PoolTransfer(0) User2-User3: 300 (126)
require.NoError(t, err)
}
err = pipeline.reset(batchNum, syncStats, &synchronizer.SCVariables{
Rollup: *syncSCVars.Rollup,
Auction: *syncSCVars.Auction,
WDelayer: *syncSCVars.WDelayer,
})
err = pipeline.reset(batchNum, syncStats, syncSCVars)
require.NoError(t, err)
// Sanity check
sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().TestGetAccounts()

+ 5
- 5
coordinator/txmanager.go

@ -31,10 +31,10 @@ type TxManager struct {
batchCh chan *BatchInfo
chainID *big.Int
account accounts.Account
consts synchronizer.SCConsts
consts common.SCConsts
stats synchronizer.Stats
vars synchronizer.SCVariables
vars common.SCVariables
statsVarsCh chan statsVars
discardPipelineCh chan int // int refers to the pipelineNum
@ -55,7 +55,7 @@ type TxManager struct {
// NewTxManager creates a new TxManager
func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) {
coord *Coordinator, scConsts *common.SCConsts, initSCVars *common.SCVariables) (*TxManager, error) {
chainID, err := ethClient.EthChainID()
if err != nil {
return nil, tracerr.Wrap(err)
@ -102,7 +102,7 @@ func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) {
}
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *common.SCVariablesPtr) {
select {
case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
case <-ctx.Done():
@ -118,7 +118,7 @@ func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) {
}
}
func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
func (t *TxManager) syncSCVars(vars common.SCVariablesPtr) {
updateSCVars(&t.vars, vars)
}

+ 7
- 4
db/historydb/apiqueries.go

@ -38,7 +38,7 @@ func (hdb *HistoryDB) GetBatchAPI(batchNum common.BatchNum) (*BatchAPI, error) {
return hdb.getBatchAPI(hdb.dbRead, batchNum)
}
// GetBatchAPI return the batch with the given batchNum
// GetBatchInternalAPI return the batch with the given batchNum
func (hdb *HistoryDB) GetBatchInternalAPI(batchNum common.BatchNum) (*BatchAPI, error) {
return hdb.getBatchAPI(hdb.dbRead, batchNum)
}
@ -944,6 +944,7 @@ func (hdb *HistoryDB) GetCoordinatorAPI(bidderAddr ethCommon.Address) (*Coordina
defer hdb.apiConnCon.Release()
return hdb.getCoordinatorAPI(hdb.dbRead, bidderAddr)
}
func (hdb *HistoryDB) getCoordinatorAPI(d meddler.DB, bidderAddr ethCommon.Address) (*CoordinatorAPI, error) {
coordinator := &CoordinatorAPI{}
err := meddler.QueryRow(
@ -954,6 +955,7 @@ func (hdb *HistoryDB) getCoordinatorAPI(d meddler.DB, bidderAddr ethCommon.Addre
return coordinator, tracerr.Wrap(err)
}
// GetNodeInfoAPI retusnt he NodeInfo
func (hdb *HistoryDB) GetNodeInfoAPI() (*NodeInfo, error) {
cancel, err := hdb.apiConnCon.Acquire()
defer cancel()
@ -964,9 +966,9 @@ func (hdb *HistoryDB) GetNodeInfoAPI() (*NodeInfo, error) {
return hdb.GetNodeInfo()
}
// GetBucketUpdatesInternalAPI returns the latest bucket updates
func (hdb *HistoryDB) GetBucketUpdatesInternalAPI() ([]BucketUpdateAPI, error) {
var bucketUpdates []*BucketUpdateAPI
// var bucketUpdates []*common.BucketUpdate
err := meddler.QueryAll(
hdb.dbRead, &bucketUpdates,
`SELECT num_bucket, withdrawals FROM bucket_update
@ -977,7 +979,7 @@ func (hdb *HistoryDB) GetBucketUpdatesInternalAPI() ([]BucketUpdateAPI, error) {
return db.SlicePtrsToSlice(bucketUpdates).([]BucketUpdateAPI), tracerr.Wrap(err)
}
// getNextForgers returns next forgers
// GetNextForgersInternalAPI returns next forgers
func (hdb *HistoryDB) GetNextForgersInternalAPI(auctionVars *common.AuctionVariables,
auctionConsts *common.AuctionConstants,
lastBlock common.Block, currentSlot, lastClosedSlot int64) ([]NextForgerAPI, error) {
@ -1071,7 +1073,7 @@ func (hdb *HistoryDB) GetNextForgersInternalAPI(auctionVars *common.AuctionVaria
return nextForgers, nil
}
// UpdateMetrics update Status.Metrics information
// GetMetricsInternalAPI returns the MetricsAPI
func (hdb *HistoryDB) GetMetricsInternalAPI(lastBatchNum common.BatchNum) (*MetricsAPI, error) {
var metrics MetricsAPI
// Get the first and last batch of the last 24h and their timestamps
@ -1171,6 +1173,7 @@ func (hdb *HistoryDB) GetMetricsInternalAPI(lastBatchNum common.BatchNum) (*Metr
return &metrics, nil
}
// GetStateAPI returns the StateAPI
func (hdb *HistoryDB) GetStateAPI() (*StateAPI, error) {
cancel, err := hdb.apiConnCon.Acquire()
defer cancel()

+ 1
- 1
db/historydb/historydb.go

@ -1171,7 +1171,7 @@ func (hdb *HistoryDB) GetTokensTest() ([]TokenWithUSD, error) {
return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), nil
}
// UpdateRecommendedFee update Status.RecommendedFee information
// GetRecommendedFee returns the RecommendedFee information
func (hdb *HistoryDB) GetRecommendedFee(minFeeUSD float64) (*common.RecommendedFee, error) {
var recommendedFee common.RecommendedFee
// Get total txs and the batch of the first selected tx of the last hour

+ 3
- 12
db/historydb/historydb_test.go

@ -1172,15 +1172,6 @@ func TestGetMetricsAPI(t *testing.T) {
assert.NoError(t, err)
}
// clientSetupExample := test.NewClientSetupExample()
// apiStateUpdater := NewAPIStateUpdater(historyDB, &NodeConfig{1000, 0.5},
// &Constants{
// RollupConstants: *clientSetupExample.RollupConstants,
// AuctionConstants: *clientSetupExample.AuctionConstants,
// WDelayerConstants: *clientSetupExample.WDelayerConstants,
// ChainID: uint16(clientSetupExample.ChainID.Int64()),
// HermezAddress: clientSetupExample.AuctionConstants.HermezRollup,
// })
res, err := historyDB.GetMetricsInternalAPI(common.BatchNum(numBatches))
assert.NoError(t, err)
@ -1467,7 +1458,7 @@ func setTestBlocks(from, to int64) []common.Block {
func TestNodeInfo(t *testing.T) {
test.WipeDB(historyDB.DB())
err := historyDB.SetAPIState(&StateAPI{})
err := historyDB.SetStateInternalAPI(&StateAPI{})
require.NoError(t, err)
clientSetup := test.NewClientSetupExample()
@ -1503,7 +1494,7 @@ func TestNodeInfo(t *testing.T) {
ExistingAccount: 0.15,
},
}
err = historyDB.SetAPIState(stateAPI)
err = historyDB.SetStateInternalAPI(stateAPI)
require.NoError(t, err)
nodeConfig := &NodeConfig{
@ -1521,7 +1512,7 @@ func TestNodeInfo(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, nodeConfig, dbNodeConfig)
dbStateAPI, err := historyDB.GetStateAPI()
dbStateAPI, err := historyDB.getStateAPI(historyDB.dbRead)
require.NoError(t, err)
assert.Equal(t, stateAPI, dbStateAPI)
}

+ 22
- 68
db/historydb/nodeinfo.go

@ -14,6 +14,7 @@ const (
createAccountInternalExtraFeePercentage float64 = 2.5
)
// Period represents a time period in ethereum
type Period struct {
SlotNum int64 `json:"slotNum"`
FromBlock int64 `json:"fromBlock"`
@ -22,11 +23,13 @@ type Period struct {
ToTimestamp time.Time `json:"toTimestamp"`
}
// NextForgerAPI represents the next forger exposed via the API
type NextForgerAPI struct {
Coordinator CoordinatorAPI `json:"coordinator"`
Period Period `json:"period"`
}
// NetworkAPI is the network state exposed via the API
type NetworkAPI struct {
LastEthBlock int64 `json:"lastEthereumBlock"`
LastSyncBlock int64 `json:"lastSynchedBlock"`
@ -41,6 +44,7 @@ type NodePublicConfig struct {
ForgeDelay float64 `json:"forgeDelay"`
}
// StateAPI is an object representing the node and network state exposed via the API
type StateAPI struct {
// NodePublicConfig is the configuration of the node that is exposed via API
NodePublicConfig NodePublicConfig `json:"nodeConfig"`
@ -52,27 +56,28 @@ type StateAPI struct {
RecommendedFee common.RecommendedFee `json:"recommendedFee"`
}
// Constants contains network constants
type Constants struct {
// RollupConstants common.RollupConstants
// AuctionConstants common.AuctionConstants
// WDelayerConstants common.WDelayerConstants
common.SCConsts
ChainID uint16
HermezAddress ethCommon.Address
}
// NodeConfig contains the node config exposed in the API
type NodeConfig struct {
MaxPoolTxs uint32 `meddler:"max_pool_txs"`
MinFeeUSD float64 `meddler:"min_fee"`
}
// NodeInfo contains information about he node used when serving the API
type NodeInfo struct {
ItemID int `meddler:"item_id,pk"`
APIState *StateAPI `meddler:"state,json"`
StateAPI *StateAPI `meddler:"state,json"`
NodeConfig *NodeConfig `meddler:"config,json"`
Constants *Constants `meddler:"constants,json"`
}
// GetNodeInfo returns the NodeInfo
func (hdb *HistoryDB) GetNodeInfo() (*NodeInfo, error) {
ni := &NodeInfo{}
err := meddler.QueryRow(
@ -81,6 +86,7 @@ func (hdb *HistoryDB) GetNodeInfo() (*NodeInfo, error) {
return ni, tracerr.Wrap(err)
}
// GetConstants returns the Constats
func (hdb *HistoryDB) GetConstants() (*Constants, error) {
var nodeInfo NodeInfo
err := meddler.QueryRow(
@ -90,6 +96,7 @@ func (hdb *HistoryDB) GetConstants() (*Constants, error) {
return nodeInfo.Constants, tracerr.Wrap(err)
}
// SetConstants sets the Constants
func (hdb *HistoryDB) SetConstants(constants *Constants) error {
_constants := struct {
Constants *Constants `meddler:"constants,json"`
@ -105,6 +112,7 @@ func (hdb *HistoryDB) SetConstants(constants *Constants) error {
return tracerr.Wrap(err)
}
// GetStateInternalAPI returns the StateAPI
func (hdb *HistoryDB) GetStateInternalAPI() (*StateAPI, error) {
return hdb.getStateAPI(hdb.dbRead)
}
@ -115,14 +123,15 @@ func (hdb *HistoryDB) getStateAPI(d meddler.DB) (*StateAPI, error) {
d, &nodeInfo,
"SELECT state FROM node_info WHERE item_id = 1;",
)
return nodeInfo.APIState, tracerr.Wrap(err)
return nodeInfo.StateAPI, tracerr.Wrap(err)
}
func (hdb *HistoryDB) SetAPIState(apiState *StateAPI) error {
_apiState := struct {
APIState *StateAPI `meddler:"state,json"`
}{apiState}
values, err := meddler.Default.Values(&_apiState, false)
// SetStateInternalAPI sets the StateAPI
func (hdb *HistoryDB) SetStateInternalAPI(stateAPI *StateAPI) error {
_stateAPI := struct {
StateAPI *StateAPI `meddler:"state,json"`
}{stateAPI}
values, err := meddler.Default.Values(&_stateAPI, false)
if err != nil {
return tracerr.Wrap(err)
}
@ -133,6 +142,7 @@ func (hdb *HistoryDB) SetAPIState(apiState *StateAPI) error {
return tracerr.Wrap(err)
}
// GetNodeConfig returns the NodeConfig
func (hdb *HistoryDB) GetNodeConfig() (*NodeConfig, error) {
var nodeInfo NodeInfo
err := meddler.QueryRow(
@ -142,6 +152,7 @@ func (hdb *HistoryDB) GetNodeConfig() (*NodeConfig, error) {
return nodeInfo.NodeConfig, tracerr.Wrap(err)
}
// SetNodeConfig sets the NodeConfig
func (hdb *HistoryDB) SetNodeConfig(nodeConfig *NodeConfig) error {
_nodeConfig := struct {
NodeConfig *NodeConfig `meddler:"config,json"`
@ -151,65 +162,8 @@ func (hdb *HistoryDB) SetNodeConfig(nodeConfig *NodeConfig) error {
return tracerr.Wrap(err)
}
_, err = hdb.dbWrite.Exec(
"UPDATE config SET state = $1 WHERE item_id = 1;",
"UPDATE node_info SET config = $1 WHERE item_id = 1;",
values[0],
)
return tracerr.Wrap(err)
}
// func (hdb *HistoryDB) SetInitialNodeInfo(maxPoolTxs uint32, minFeeUSD float64, constants *Constants) error {
// ni := &NodeInfo{
// MaxPoolTxs: &maxPoolTxs,
// MinFeeUSD: &minFeeUSD,
// Constants: constants,
// }
// return tracerr.Wrap(meddler.Insert(hdb.dbWrite, "node_info", ni))
// }
// apiSlotToBigInts converts from [6]*apitypes.BigIntStr to [6]*big.Int
// func apiSlotToBigInts(defaultSlotSetBid [6]*apitypes.BigIntStr) ([6]*big.Int, error) {
// var slots [6]*big.Int
//
// for i, slot := range defaultSlotSetBid {
// bigInt, ok := new(big.Int).SetString(string(*slot), 10)
// if !ok {
// return slots, tracerr.Wrap(fmt.Errorf("can't convert %T into big.Int", slot))
// }
// slots[i] = bigInt
// }
//
// return slots, nil
// }
// func (hdb *HistoryDB) updateNodeInfo(setUpdatedNodeInfo func(*sqlx.Tx, *NodeInfo) error) error {
// // Create a SQL transaction or read and update atomicaly
// txn, err := hdb.dbWrite.Beginx()
// if err != nil {
// return tracerr.Wrap(err)
// }
// defer func() {
// if err != nil {
// db.Rollback(txn)
// }
// }()
// // Read current node info
// ni := &NodeInfo{}
// if err := meddler.QueryRow(
// txn, ni, "SELECT * FROM node_info;",
// ); err != nil {
// return tracerr.Wrap(err)
// }
// // Update NodeInfo struct
// if err := setUpdatedNodeInfo(txn, ni); err != nil {
// return tracerr.Wrap(err)
// }
// // Update NodeInfo at DB
// if _, err := txn.Exec("DELETE FROM node_info;"); err != nil {
// return tracerr.Wrap(err)
// }
// if err := meddler.Insert(txn, "node_info", ni); err != nil {
// return tracerr.Wrap(err)
// }
// // Commit NodeInfo update
// return tracerr.Wrap(txn.Commit())
// }

+ 95
- 66
node/node.go

@ -54,7 +54,7 @@ const (
// Node is the Hermez Node
type Node struct {
nodeAPI *NodeAPI
apiStateUpdater *api.APIStateUpdater
stateAPIUpdater *api.StateAPIUpdater
debugAPI *debugapi.DebugAPI
priceUpdater *priceupdater.PriceUpdater
// Coordinator
@ -257,7 +257,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
return nil, tracerr.Wrap(err)
}
apiStateUpdater := api.NewAPIStateUpdater(historyDB, &hdbNodeCfg, initSCVars, &hdbConsts)
stateAPIUpdater := api.NewStateAPIUpdater(historyDB, &hdbNodeCfg, initSCVars, &hdbConsts)
var coord *coordinator.Coordinator
var l2DB *l2db.L2DB
@ -437,7 +437,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
apiStateUpdater: apiStateUpdater,
stateAPIUpdater: stateAPIUpdater,
nodeAPI: nodeAPI,
debugAPI: debugAPI,
priceUpdater: priceUpdater,
@ -456,11 +456,14 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
// APIServer is a server that only runs the API
type APIServer struct {
nodeAPI *NodeAPI
mode Mode
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
}
// NewAPIServer creates a new APIServer
func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) {
// NOTE: I just copied some parts of NewNode related to starting the
// API, but it still cotains many parameters that are not available
meddler.Debug = cfg.Debug.MeddlerLogs
// Stablish DB connection
dbWrite, err := dbUtils.InitSQLDB(
@ -492,13 +495,10 @@ func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) {
return nil, tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
}
}
var apiConnCon *dbUtils.APIConnectionController
if cfg.API.Explorer || mode == ModeCoordinator {
apiConnCon = dbUtils.NewAPICnnectionController(
cfg.API.MaxSQLConnections,
cfg.API.SQLConnectionTimeout.Duration,
)
}
apiConnCon := dbUtils.NewAPICnnectionController(
cfg.API.MaxSQLConnections,
cfg.API.SQLConnectionTimeout.Duration,
)
historyDB := historydb.NewHistoryDB(dbRead, dbWrite, apiConnCon)
@ -506,47 +506,67 @@ func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) {
if mode == ModeCoordinator {
l2DB = l2db.NewL2DB(
dbRead, dbWrite,
cfg.Coordinator.L2DB.SafetyPeriod,
0,
cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD,
cfg.Coordinator.L2DB.TTL.Duration,
0,
apiConnCon,
)
}
var nodeAPI *NodeAPI
if cfg.API.Address != "" {
if cfg.Debug.GinDebugMode {
gin.SetMode(gin.DebugMode)
} else {
gin.SetMode(gin.ReleaseMode)
}
if cfg.API.UpdateMetricsInterval.Duration == 0 {
return nil, tracerr.Wrap(fmt.Errorf("invalid cfg.API.UpdateMetricsInterval: %v",
cfg.API.UpdateMetricsInterval.Duration))
}
if cfg.API.UpdateRecommendedFeeInterval.Duration == 0 {
return nil, tracerr.Wrap(fmt.Errorf("invalid cfg.API.UpdateRecommendedFeeInterval: %v",
cfg.API.UpdateRecommendedFeeInterval.Duration))
}
server := gin.Default()
coord := false
if mode == ModeCoordinator {
coord = cfg.Coordinator.API.Coordinator
}
var err error
nodeAPI, err = NewNodeAPI(
cfg.API.Address,
coord, cfg.API.Explorer,
server,
historyDB,
l2DB,
)
if err != nil {
return nil, tracerr.Wrap(err)
}
if cfg.Debug.GinDebugMode {
gin.SetMode(gin.DebugMode)
} else {
gin.SetMode(gin.ReleaseMode)
}
// ETC...
server := gin.Default()
coord := false
if mode == ModeCoordinator {
coord = cfg.Coordinator.API.Coordinator
}
nodeAPI, err := NewNodeAPI(
cfg.API.Address,
coord, cfg.API.Explorer,
server,
historyDB,
l2DB,
)
if err != nil {
return nil, tracerr.Wrap(err)
}
ctx, cancel := context.WithCancel(context.Background())
return &APIServer{
nodeAPI: nodeAPI,
mode: mode,
ctx: ctx,
cancel: cancel,
}, nil
}
// Start the APIServer
func (s *APIServer) Start() {
log.Infow("Starting api server...", "mode", s.mode)
log.Info("Starting NodeAPI...")
s.wg.Add(1)
go func() {
defer func() {
log.Info("NodeAPI routine stopped")
s.wg.Done()
}()
if err := s.nodeAPI.Run(s.ctx); err != nil {
if s.ctx.Err() != nil {
return
}
log.Fatalw("NodeAPI.Run", "err", err)
}
}()
}
// Stop the APIServer
func (s *APIServer) Stop() {
log.Infow("Stopping NodeAPI...")
s.cancel()
s.wg.Wait()
}
// NodeAPI holds the node http API
@ -627,13 +647,13 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncBlock{
Stats: *stats,
Vars: vars,
Vars: *vars,
Batches: batches,
})
}
n.apiStateUpdater.SetSCVars(vars)
n.stateAPIUpdater.SetSCVars(vars)
if stats.Synced() {
if err := n.apiStateUpdater.UpdateNetworkInfo(
if err := n.stateAPIUpdater.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatchNum),
stats.Sync.Auction.CurrentSlot.SlotNum,
@ -641,11 +661,11 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va
log.Errorw("ApiStateUpdater.UpdateNetworkInfo", "err", err)
}
} else {
n.apiStateUpdater.UpdateNetworkInfoBlock(
n.stateAPIUpdater.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
if err := n.apiStateUpdater.Store(); err != nil {
if err := n.stateAPIUpdater.Store(); err != nil {
return tracerr.Wrap(err)
}
return nil
@ -656,14 +676,14 @@ func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats,
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncReorg{
Stats: *stats,
Vars: vars,
Vars: *vars.AsPtr(),
})
}
n.apiStateUpdater.SetSCVars(vars.AsPtr())
n.apiStateUpdater.UpdateNetworkInfoBlock(
n.stateAPIUpdater.SetSCVars(vars.AsPtr())
n.stateAPIUpdater.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
if err := n.apiStateUpdater.Store(); err != nil {
if err := n.stateAPIUpdater.Store(); err != nil {
return tracerr.Wrap(err)
}
return nil
@ -713,7 +733,9 @@ func (n *Node) StartSynchronizer() {
// the last synced one) is synchronized
stats := n.sync.Stats()
vars := n.sync.SCVars()
n.handleNewBlock(n.ctx, stats, vars.AsPtr(), []common.BatchData{})
if err := n.handleNewBlock(n.ctx, stats, vars.AsPtr(), []common.BatchData{}); err != nil {
log.Fatalw("Node.handleNewBlock", "err", err)
}
n.wg.Add(1)
go func() {
@ -800,24 +822,24 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1)
go func() {
// Do an initial update on startup
if err := n.apiStateUpdater.UpdateMetrics(); err != nil {
if err := n.stateAPIUpdater.UpdateMetrics(); err != nil {
log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err)
}
if err := n.apiStateUpdater.Store(); err != nil {
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
for {
select {
case <-n.ctx.Done():
log.Info("API.UpdateMetrics loop done")
log.Info("ApiStateUpdater.UpdateMetrics loop done")
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateMetricsInterval.Duration):
if err := n.apiStateUpdater.UpdateMetrics(); err != nil {
if err := n.stateAPIUpdater.UpdateMetrics(); err != nil {
log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err)
continue
}
if err := n.apiStateUpdater.Store(); err != nil {
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
}
@ -827,18 +849,25 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1)
go func() {
// Do an initial update on startup
if err := n.historyDB.UpdateRecommendedFee(); err != nil {
log.Errorw("API.UpdateRecommendedFee", "err", err)
if err := n.stateAPIUpdater.UpdateRecommendedFee(); err != nil {
log.Errorw("ApiStateUpdater.UpdateRecommendedFee", "err", err)
}
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
for {
select {
case <-n.ctx.Done():
log.Info("API.UpdateRecommendedFee loop done")
log.Info("ApiStateUpdaterAPI.UpdateRecommendedFee loop done")
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateRecommendedFeeInterval.Duration):
if err := n.historyDB.UpdateRecommendedFee(); err != nil {
log.Errorw("API.UpdateRecommendedFee", "err", err)
if err := n.stateAPIUpdater.UpdateRecommendedFee(); err != nil {
log.Errorw("ApiStateUpdaterAPI.UpdateRecommendedFee", "err", err)
continue
}
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
}
}

+ 12
- 12
synchronizer/synchronizer_test.go

@ -372,9 +372,9 @@ func TestSyncGeneral(t *testing.T) {
assert.Equal(t, int64(1), stats.Eth.LastBlock.Num)
assert.Equal(t, int64(1), stats.Sync.LastBlock.Num)
vars := s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer)
dbBlocks, err := s.historyDB.GetAllBlocks()
require.NoError(t, err)
@ -533,9 +533,9 @@ func TestSyncGeneral(t *testing.T) {
assert.Equal(t, int64(4), stats.Eth.LastBlock.Num)
assert.Equal(t, int64(4), stats.Sync.LastBlock.Num)
vars = s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer)
dbExits, err := s.historyDB.GetAllExits()
require.NoError(t, err)
@ -665,9 +665,9 @@ func TestSyncGeneral(t *testing.T) {
assert.Equal(t, false, stats.Synced())
assert.Equal(t, int64(6), stats.Eth.LastBlock.Num)
vars = s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer)
// At this point, the DB only has data up to block 1
dbBlock, err := s.historyDB.GetLastBlock()
@ -704,9 +704,9 @@ func TestSyncGeneral(t *testing.T) {
}
vars = s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
assert.Equal(t, *clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, *clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, *clientSetup.WDelayerVariables, vars.WDelayer)
}
dbBlock, err = s.historyDB.GetLastBlock()

Loading…
Cancel
Save