This commit is contained in:
Eduard S
2021-03-08 18:17:15 +01:00
parent 5501f30062
commit a5ef822c64
12 changed files with 789 additions and 801 deletions

View File

@@ -53,9 +53,10 @@ const (
// Node is the Hermez Node
type Node struct {
nodeAPI *NodeAPI
debugAPI *debugapi.DebugAPI
priceUpdater *priceupdater.PriceUpdater
nodeAPI *NodeAPI
apiStateUpdater *api.APIStateUpdater
debugAPI *debugapi.DebugAPI
priceUpdater *priceupdater.PriceUpdater
// Coordinator
coord *coordinator.Coordinator
@@ -230,25 +231,33 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
}
initSCVars := sync.SCVars()
scConsts := synchronizer.SCConsts{
scConsts := common.SCConsts{
Rollup: *sync.RollupConstants(),
Auction: *sync.AuctionConstants(),
WDelayer: *sync.WDelayerConstants(),
}
if err := historyDB.SetInitialNodeInfo(
cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD,
&historydb.Constants{
RollupConstants: scConsts.Rollup,
AuctionConstants: scConsts.Auction,
WDelayerConstants: scConsts.WDelayer,
ChainID: chainIDU16,
HermezAddress: cfg.SmartContracts.Rollup,
},
); err != nil {
hdbNodeCfg := historydb.NodeConfig{
MaxPoolTxs: cfg.Coordinator.L2DB.MaxTxs,
MinFeeUSD: cfg.Coordinator.L2DB.MinFeeUSD,
}
if err := historyDB.SetNodeConfig(&hdbNodeCfg); err != nil {
return nil, tracerr.Wrap(err)
}
hdbConsts := historydb.Constants{
SCConsts: common.SCConsts{
Rollup: scConsts.Rollup,
Auction: scConsts.Auction,
WDelayer: scConsts.WDelayer,
},
ChainID: chainIDU16,
HermezAddress: cfg.SmartContracts.Rollup,
}
if err := historyDB.SetConstants(&hdbConsts); err != nil {
return nil, tracerr.Wrap(err)
}
apiStateUpdater := api.NewAPIStateUpdater(historyDB, &hdbNodeCfg, initSCVars, &hdbConsts)
var coord *coordinator.Coordinator
var l2DB *l2db.L2DB
@@ -379,11 +388,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
serverProofs,
client,
&scConsts,
&synchronizer.SCVariables{
Rollup: *initSCVars.Rollup,
Auction: *initSCVars.Auction,
WDelayer: *initSCVars.WDelayer,
},
initSCVars,
)
if err != nil {
return nil, tracerr.Wrap(err)
@@ -432,18 +437,19 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
nodeAPI: nodeAPI,
debugAPI: debugAPI,
priceUpdater: priceUpdater,
coord: coord,
sync: sync,
cfg: cfg,
mode: mode,
sqlConnRead: dbRead,
sqlConnWrite: dbWrite,
historyDB: historyDB,
ctx: ctx,
cancel: cancel,
apiStateUpdater: apiStateUpdater,
nodeAPI: nodeAPI,
debugAPI: debugAPI,
priceUpdater: priceUpdater,
coord: coord,
sync: sync,
cfg: cfg,
mode: mode,
sqlConnRead: dbRead,
sqlConnWrite: dbWrite,
historyDB: historyDB,
ctx: ctx,
cancel: cancel,
}, nil
}
@@ -616,8 +622,8 @@ func (a *NodeAPI) Run(ctx context.Context) error {
return nil
}
func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, vars synchronizer.SCVariablesPtr,
batches []common.BatchData) {
func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, vars *common.SCVariablesPtr,
batches []common.BatchData) error {
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncBlock{
Stats: *stats,
@@ -625,47 +631,42 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va
Batches: batches,
})
}
if n.nodeAPI != nil {
if vars.Rollup != nil {
n.historyDB.SetRollupVariables(vars.Rollup)
}
if vars.Auction != nil {
n.historyDB.SetAuctionVariables(vars.Auction)
}
if vars.WDelayer != nil {
n.historyDB.SetWDelayerVariables(vars.WDelayer)
}
if stats.Synced() {
if err := n.historyDB.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatchNum),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("API.UpdateNetworkInfo", "err", err)
}
} else {
n.historyDB.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
n.apiStateUpdater.SetSCVars(vars)
if stats.Synced() {
if err := n.apiStateUpdater.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatchNum),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("ApiStateUpdater.UpdateNetworkInfo", "err", err)
}
} else {
n.apiStateUpdater.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
if err := n.apiStateUpdater.Store(); err != nil {
return tracerr.Wrap(err)
}
return nil
}
func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats, vars synchronizer.SCVariablesPtr) {
func (n *Node) handleReorg(ctx context.Context, stats *synchronizer.Stats,
vars *common.SCVariables) error {
if n.mode == ModeCoordinator {
n.coord.SendMsg(ctx, coordinator.MsgSyncReorg{
Stats: *stats,
Vars: vars,
})
}
vars = n.sync.SCVars()
n.historyDB.SetRollupVariables(vars.Rollup)
n.historyDB.SetAuctionVariables(vars.Auction)
n.historyDB.SetWDelayerVariables(vars.WDelayer)
n.historyDB.UpdateNetworkInfoBlock(
n.apiStateUpdater.SetSCVars(vars.AsPtr())
n.apiStateUpdater.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
if err := n.apiStateUpdater.Store(); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we
@@ -680,16 +681,20 @@ func (n *Node) syncLoopFn(ctx context.Context, lastBlock *common.Block) (*common
// case: reorg
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
vars := n.sync.SCVars()
n.handleReorg(ctx, stats, vars)
if err := n.handleReorg(ctx, stats, vars); err != nil {
return nil, time.Duration(0), tracerr.Wrap(err)
}
return nil, time.Duration(0), nil
} else if blockData != nil {
// case: new block
vars := synchronizer.SCVariablesPtr{
vars := common.SCVariablesPtr{
Rollup: blockData.Rollup.Vars,
Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars,
}
n.handleNewBlock(ctx, stats, vars, blockData.Rollup.Batches)
if err := n.handleNewBlock(ctx, stats, &vars, blockData.Rollup.Batches); err != nil {
return nil, time.Duration(0), tracerr.Wrap(err)
}
return &blockData.Block, time.Duration(0), nil
} else {
// case: no block
@@ -708,7 +713,7 @@ func (n *Node) StartSynchronizer() {
// the last synced one) is synchronized
stats := n.sync.Stats()
vars := n.sync.SCVars()
n.handleNewBlock(n.ctx, stats, vars, []common.BatchData{})
n.handleNewBlock(n.ctx, stats, vars.AsPtr(), []common.BatchData{})
n.wg.Add(1)
go func() {
@@ -795,8 +800,11 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1)
go func() {
// Do an initial update on startup
if err := n.historyDB.UpdateMetrics(); err != nil {
log.Errorw("API.UpdateMetrics", "err", err)
if err := n.apiStateUpdater.UpdateMetrics(); err != nil {
log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err)
}
if err := n.apiStateUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
for {
select {
@@ -805,8 +813,12 @@ func (n *Node) StartNodeAPI() {
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateMetricsInterval.Duration):
if err := n.historyDB.UpdateMetrics(); err != nil {
log.Errorw("API.UpdateMetrics", "err", err)
if err := n.apiStateUpdater.UpdateMetrics(); err != nil {
log.Errorw("ApiStateUpdater.UpdateMetrics", "err", err)
continue
}
if err := n.apiStateUpdater.Store(); err != nil {
log.Errorw("ApiStateUpdater.Store", "err", err)
}
}
}