Update coordinator, call all api update functions

- Common:
	- Rename Block.EthBlockNum to Block.Num to avoid unneeded repetition
- API:
	- Add UpdateNetworkInfoBlock to update just block information, to be
	  used when the node is not yet synchronized
- Node:
	- Call API.UpdateMetrics and UpdateRecommendedFee in a loop, with
	  configurable time intervals
- Synchronizer:
	- When mapping events by TxHash, use an array to support the possibility
	  of multiple calls of the same function happening in the same
	  transaction (for example, a smart contract in a single transaction
	  could call withdraw with delay twice, which would generate 2 withdraw
	  events, and 2 deposit events).
	- In Stats, keep entire LastBlock instead of just the blockNum
	- In Stats, add lastL1BatchBlock
	- Test Stats and SCVars
- Coordinator:
	- Enable writing the BatchInfo in every step of the pipeline to disk
	  (with JSON text files) for debugging purposes.
	- Move the Pipeline functionality from the Coordinator to its own struct
	  (Pipeline)
	- Implement shouldL1lL2Batch
	- In TxManager, implement logic to perform several attempts when doing
	  ethereum node RPC calls before considering the error. (Both for calls
	  to forgeBatch and transaction receipt)
	- In TxManager, reorganize the flow and note the specific points in
	  which actions are made when err != nil
- HistoryDB:
	- Implement GetLastL1BatchBlockNum: returns the blockNum of the latest
	  forged l1Batch, to help the coordinator decide when to forge an
	  L1Batch.
- EthereumClient and test.Client:
	- Update EthBlockByNumber to return the last block when the passed
	  number is -1.
This commit is contained in:
Eduard S
2020-11-20 14:46:01 +01:00
parent 58c9be3644
commit 8f1cf2f145
26 changed files with 1039 additions and 402 deletions

View File

@@ -2,6 +2,7 @@ package node
import (
"context"
"fmt"
"net/http"
"sync"
"time"
@@ -165,7 +166,7 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL)
}
coord = coordinator.NewCoordinator(
coord, err = coordinator.NewCoordinator(
coordinator.Config{
ForgerAddress: coordCfg.ForgerAddress,
ConfirmBlocks: coordCfg.ConfirmBlocks,
@@ -178,9 +179,20 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
&scConsts,
&initSCVars,
)
if err != nil {
return nil, err
}
}
var nodeAPI *NodeAPI
if cfg.API.Address != "" {
if cfg.API.UpdateMetricsInterval.Duration == 0 {
return nil, fmt.Errorf("invalid cfg.API.UpdateMetricsInterval: %v",
cfg.API.UpdateMetricsInterval.Duration)
}
if cfg.API.UpdateRecommendedFeeInterval.Duration == 0 {
return nil, fmt.Errorf("invalid cfg.API.UpdateRecommendedFeeInterval: %v",
cfg.API.UpdateRecommendedFeeInterval.Duration)
}
server := gin.Default()
coord := false
if mode == ModeCoordinator {
@@ -303,9 +315,11 @@ func (a *NodeAPI) Run(ctx context.Context) error {
// TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we
// don't have to pass it around.
func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration) {
if blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock); err != nil {
blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock)
stats := n.sync.Stats()
if err != nil {
// case: error
log.Errorw("Synchronizer.Sync", "error", err)
log.Errorw("Synchronizer.Sync", "err", err)
return nil, n.cfg.Synchronizer.SyncLoopInterval.Duration
} else if discarded != nil {
// case: reorg
@@ -318,13 +332,13 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
n.nodeAPI.api.SetRollupVariables(*rollup)
n.nodeAPI.api.SetAuctionVariables(*auction)
n.nodeAPI.api.SetWDelayerVariables(*wDelayer)
// TODO: n.nodeAPI.api.UpdateNetworkInfo()
n.nodeAPI.api.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
return nil, time.Duration(0)
} else if blockData != nil {
// case: new block
stats := n.sync.Stats()
if n.mode == ModeCoordinator {
if stats.Synced() && (blockData.Rollup.Vars != nil ||
blockData.Auction.Vars != nil ||
@@ -350,7 +364,15 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
n.nodeAPI.api.SetWDelayerVariables(*blockData.WDelayer.Vars)
}
// TODO: n.nodeAPI.api.UpdateNetworkInfo()
if stats.Synced() {
if err := n.nodeAPI.api.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatch),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("API.UpdateNetworkInfo", "err", err)
}
}
}
return &blockData.Block, time.Duration(0)
} else {
@@ -408,6 +430,38 @@ func (n *Node) StartNodeAPI() {
log.Fatalw("NodeAPI.Run", "err", err)
}
}()
n.wg.Add(1)
go func() {
for {
select {
case <-n.ctx.Done():
log.Info("API.UpdateMetrics loop done")
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateMetricsInterval.Duration):
if err := n.nodeAPI.api.UpdateMetrics(); err != nil {
log.Errorw("API.UpdateMetrics", "err", err)
}
}
}
}()
n.wg.Add(1)
go func() {
for {
select {
case <-n.ctx.Done():
log.Info("API.UpdateRecommendedFee loop done")
n.wg.Done()
return
case <-time.After(n.cfg.API.UpdateRecommendedFeeInterval.Duration):
if err := n.nodeAPI.api.UpdateRecommendedFee(); err != nil {
log.Errorw("API.UpdateRecommendedFee", "err", err)
}
}
}
}()
}
// Start the node