Merge pull request #631 from hermeznetwork/feature/serveapicli2

Allow serving API only via new cli command
This commit is contained in:
arnau
2021-03-15 15:41:36 +01:00
committed by GitHub
25 changed files with 1436 additions and 869 deletions

View File

@@ -27,6 +27,7 @@ import (
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/priceupdater"
"github.com/hermeznetwork/hermez-node/prover"
"github.com/hermeznetwork/hermez-node/stateapiupdater"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/test/debugapi"
"github.com/hermeznetwork/hermez-node/txprocessor"
@@ -53,9 +54,10 @@ const (
// Node is the Hermez Node
type Node struct {
nodeAPI *NodeAPI
debugAPI *debugapi.DebugAPI
priceUpdater *priceupdater.PriceUpdater
nodeAPI *NodeAPI
stateAPIUpdater *stateapiupdater.Updater
debugAPI *debugapi.DebugAPI
priceUpdater *priceupdater.PriceUpdater
// Coordinator
coord *coordinator.Coordinator
@@ -67,6 +69,7 @@ type Node struct {
mode Mode
sqlConnRead *sqlx.DB
sqlConnWrite *sqlx.DB
historyDB *historydb.HistoryDB
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
@@ -241,12 +244,35 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
}
initSCVars := sync.SCVars()
scConsts := synchronizer.SCConsts{
scConsts := common.SCConsts{
Rollup: *sync.RollupConstants(),
Auction: *sync.AuctionConstants(),
WDelayer: *sync.WDelayerConstants(),
}
hdbNodeCfg := historydb.NodeConfig{
MaxPoolTxs: cfg.Coordinator.L2DB.MaxTxs,
MinFeeUSD: cfg.Coordinator.L2DB.MinFeeUSD,
ForgeDelay: cfg.Coordinator.ForgeDelay.Duration.Seconds(),
}
if err := historyDB.SetNodeConfig(&hdbNodeCfg); err != nil {
return nil, tracerr.Wrap(err)
}
hdbConsts := historydb.Constants{
SCConsts: common.SCConsts{
Rollup: scConsts.Rollup,
Auction: scConsts.Auction,
WDelayer: scConsts.WDelayer,
},
ChainID: chainIDU16,
HermezAddress: cfg.SmartContracts.Rollup,
}
if err := historyDB.SetConstants(&hdbConsts); err != nil {
return nil, tracerr.Wrap(err)
}
stateAPIUpdater := stateapiupdater.NewUpdater(historyDB, &hdbNodeCfg, initSCVars, &hdbConsts)
var coord *coordinator.Coordinator
if mode == ModeCoordinator {
// Unlock FeeAccount EthAddr in the keystore to generate the
@@ -369,11 +395,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
serverProofs,
client,
&scConsts,
&synchronizer.SCVariables{
Rollup: *initSCVars.Rollup,
Auction: *initSCVars.Auction,
WDelayer: *initSCVars.WDelayer,
},
initSCVars,
)
if err != nil {
return nil, tracerr.Wrap(err)
@@ -405,23 +427,11 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
coord, cfg.API.Explorer,
server,
historyDB,
stateDB,
l2DB,
&api.Config{
RollupConstants: scConsts.Rollup,
AuctionConstants: scConsts.Auction,
WDelayerConstants: scConsts.WDelayer,
ChainID: chainIDU16,
HermezAddress: cfg.SmartContracts.Rollup,
},
cfg.Coordinator.ForgeDelay.Duration,
)
if err != nil {
return nil, tracerr.Wrap(err)
}
nodeAPI.api.SetRollupVariables(*initSCVars.Rollup)
nodeAPI.api.SetAuctionVariables(*initSCVars.Auction)
nodeAPI.api.SetWDelayerVariables(*initSCVars.WDelayer)
}
var debugAPI *debugapi.DebugAPI
if cfg.Debug.APIAddress != "" {
@@ -439,20 +449,138 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
nodeAPI: nodeAPI,
debugAPI: debugAPI,
priceUpdater: priceUpdater,
coord: coord,
sync: sync,
cfg: cfg,
mode: mode,
sqlConnRead: dbRead,
sqlConnWrite: dbWrite,
ctx: ctx,
cancel: cancel,
stateAPIUpdater: stateAPIUpdater,
nodeAPI: nodeAPI,
debugAPI: debugAPI,
priceUpdater: priceUpdater,
coord: coord,
sync: sync,
cfg: cfg,
mode: mode,
sqlConnRead: dbRead,
sqlConnWrite: dbWrite,
historyDB: historyDB,
ctx: ctx,
cancel: cancel,
}, nil
}
// APIServer is a server that only runs the API
type APIServer struct {
nodeAPI *NodeAPI
mode Mode
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
}
// NewAPIServer creates a new APIServer
func NewAPIServer(mode Mode, cfg *config.APIServer) (*APIServer, error) {
meddler.Debug = cfg.Debug.MeddlerLogs
// Stablish DB connection
dbWrite, err := dbUtils.InitSQLDB(
cfg.PostgreSQL.PortWrite,
cfg.PostgreSQL.HostWrite,
cfg.PostgreSQL.UserWrite,
cfg.PostgreSQL.PasswordWrite,
cfg.PostgreSQL.NameWrite,
)
if err != nil {
return nil, tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
}
var dbRead *sqlx.DB
if cfg.PostgreSQL.HostRead == "" {
dbRead = dbWrite
} else if cfg.PostgreSQL.HostRead == cfg.PostgreSQL.HostWrite {
return nil, tracerr.Wrap(fmt.Errorf(
"PostgreSQL.HostRead and PostgreSQL.HostWrite must be different",
))
} else {
dbRead, err = dbUtils.InitSQLDB(
cfg.PostgreSQL.PortRead,
cfg.PostgreSQL.HostRead,
cfg.PostgreSQL.UserRead,
cfg.PostgreSQL.PasswordRead,
cfg.PostgreSQL.NameRead,
)
if err != nil {
return nil, tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
}
}
apiConnCon := dbUtils.NewAPIConnectionController(
cfg.API.MaxSQLConnections,
cfg.API.SQLConnectionTimeout.Duration,
)
historyDB := historydb.NewHistoryDB(dbRead, dbWrite, apiConnCon)
var l2DB *l2db.L2DB
if mode == ModeCoordinator {
l2DB = l2db.NewL2DB(
dbRead, dbWrite,
0,
cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD,
0,
apiConnCon,
)
}
if cfg.Debug.GinDebugMode {
gin.SetMode(gin.DebugMode)
} else {
gin.SetMode(gin.ReleaseMode)
}
server := gin.Default()
coord := false
if mode == ModeCoordinator {
coord = cfg.Coordinator.API.Coordinator
}
nodeAPI, err := NewNodeAPI(
cfg.API.Address,
coord, cfg.API.Explorer,
server,
historyDB,
l2DB,
)
if err != nil {
return nil, tracerr.Wrap(err)
}
ctx, cancel := context.WithCancel(context.Background())
return &APIServer{
nodeAPI: nodeAPI,
mode: mode,
ctx: ctx,
cancel: cancel,
}, nil
}
// Start the APIServer
func (s *APIServer) Start() {
log.Infow("Starting api server...", "mode", s.mode)
log.Info("Starting NodeAPI...")
s.wg.Add(1)
go func() {
defer func() {
log.Info("NodeAPI routine stopped")
s.wg.Done()
}()
if err := s.nodeAPI.Run(s.ctx); err != nil {
if s.ctx.Err() != nil {
return
}
log.Fatalw("NodeAPI.Run", "err", err)
}
}()
}
// Stop the APIServer
func (s *APIServer) Stop() {
log.Infow("Stopping NodeAPI...")
s.cancel()
s.wg.Wait()
}
// NodeAPI holds the node http API
type NodeAPI struct { //nolint:golint
api *api.API
@@ -472,10 +600,7 @@ func NewNodeAPI(
coordinatorEndpoints, explorerEndpoints bool,
server *gin.Engine,
hdb *historydb.HistoryDB,
sdb *statedb.StateDB,
l2db *l2db.L2DB,
config *api.Config,
forgeDelay time.Duration,
) (*NodeAPI, error) {
engine := gin.Default()
engine.NoRoute(handleNoRoute)
@@ -485,10 +610,6 @@ func NewNodeAPI(
engine,
hdb,
l2db,
config,
&api.NodeConfig{
ForgeDelay: forgeDelay.Seconds(),
},
)
if err != nil {
return nil, tracerr.Wrap(err)
@@ -534,58 +655,50 @@ func (a *NodeAPI) Run(ctx context.Context) error {
}
func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats,
vars synchronizer.SCVariablesPtr, batches []common.BatchData) {
vars *common.SCVariablesPtr, batches []common.BatchData) error {
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncBlock{
Stats: *stats,
Vars: vars,
Vars: *vars,
Batches: batches,
})
}
if n.nodeAPI != nil {
if vars.Rollup != nil {
n.nodeAPI.api.SetRollupVariables(*vars.Rollup)
n.stateAPIUpdater.SetSCVars(vars)
if stats.Synced() {
if err := n.stateAPIUpdater.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatchNum),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("ApiStateUpdater.UpdateNetworkInfo", "err", err)
}
if vars.Auction != nil {
n.nodeAPI.api.SetAuctionVariables(*vars.Auction)
}
if vars.WDelayer != nil {
n.nodeAPI.api.SetWDelayerVariables(*vars.WDelayer)
}
if stats.Synced() {
if err := n.nodeAPI.api.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatchNum),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("API.UpdateNetworkInfo", "err", err)
}
} else {
n.nodeAPI.api.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
}
}
func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats,
vars synchronizer.SCVariablesPtr) {
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncReorg{
Stats: *stats,
Vars: vars,
})
}
if n.nodeAPI != nil {
vars := n.sync.SCVars()
n.nodeAPI.api.SetRollupVariables(*vars.Rollup)
n.nodeAPI.api.SetAuctionVariables(*vars.Auction)
n.nodeAPI.api.SetWDelayerVariables(*vars.WDelayer)
n.nodeAPI.api.UpdateNetworkInfoBlock(
} else {
n.stateAPIUpdater.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
if err := n.stateAPIUpdater.Store(); err != nil {
return tracerr.Wrap(err)
}
return nil
}
func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats,
vars *common.SCVariables) error {
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncReorg{
Stats: *stats,
Vars: *vars.AsPtr(),
})
}
n.stateAPIUpdater.SetSCVars(vars.AsPtr())
n.stateAPIUpdater.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
if err := n.stateAPIUpdater.Store(); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we
@@ -601,16 +714,20 @@ func (n *Node) syncLoopFn(ctx context.Context, lastBlock *common.Block) (*common
// case: reorg
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
vars := n.sync.SCVars()
n.handleReorg(ctx, stats, vars)
if err := n.handleReorg(ctx, stats, vars); err != nil {
return nil, time.Duration(0), tracerr.Wrap(err)
}
return nil, time.Duration(0), nil
} else if blockData != nil {
// case: new block
vars := synchronizer.SCVariablesPtr{
vars := common.SCVariablesPtr{
Rollup: blockData.Rollup.Vars,
Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars,
}
n.handleNewBlock(ctx, stats, vars, blockData.Rollup.Batches)
if err := n.handleNewBlock(ctx, stats, &vars, blockData.Rollup.Batches); err != nil {
return nil, time.Duration(0), tracerr.Wrap(err)
}
return &blockData.Block, time.Duration(0), nil
} else {
// case: no block
@@ -629,7 +746,9 @@ func (n *Node) StartSynchronizer() {
// the last synced one) is synchronized
stats := n.sync.Stats()
vars := n.sync.SCVars()
n.handleNewBlock(n.ctx, stats, vars, []common.BatchData{})
if err := n.handleNewBlock(n.ctx, stats, vars.AsPtr(), []common.BatchData{}); err != nil {
log.Fatalw("Node.handleNewBlock", "err", err)
}
n.wg.Add(1)
go func() {
@@ -716,18 +835,25 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1)
go func() {
// Do an initial update on startup
if err := n.nodeAPI.api.UpdateMetrics(); err != nil {
log.Errorw("API.UpdateMetrics", "err", err)
if err := n.stateAPIUpdater.UpdateMetrics(); err != nil {
log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err)
}
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
for {
select {
case <-n.ctx.Done():
log.Info("API.UpdateMetrics loop done")
log.Info("ApiStateUpdater.UpdateMetrics loop done")
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateMetricsInterval.Duration):
if err := n.nodeAPI.api.UpdateMetrics(); err != nil {
log.Errorw("API.UpdateMetrics", "err", err)
if err := n.stateAPIUpdater.UpdateMetrics(); err != nil {
log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err)
continue
}
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
}
}
@@ -736,18 +862,25 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1)
go func() {
// Do an initial update on startup
if err := n.nodeAPI.api.UpdateRecommendedFee(); err != nil {
log.Errorw("API.UpdateRecommendedFee", "err", err)
if err := n.stateAPIUpdater.UpdateRecommendedFee(); err != nil {
log.Errorw("ApiStateUpdater.UpdateRecommendedFee", "err", err)
}
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
for {
select {
case <-n.ctx.Done():
log.Info("API.UpdateRecommendedFee loop done")
log.Info("ApiStateUpdaterAPI.UpdateRecommendedFee loop done")
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateRecommendedFeeInterval.Duration):
if err := n.nodeAPI.api.UpdateRecommendedFee(); err != nil {
log.Errorw("API.UpdateRecommendedFee", "err", err)
if err := n.stateAPIUpdater.UpdateRecommendedFee(); err != nil {
log.Errorw("ApiStateUpdaterAPI.UpdateRecommendedFee", "err", err)
continue
}
if err := n.stateAPIUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
}
}