Redo coordinator structure, connect API to node

- API:
	- Modify the constructor so that hardcoded rollup constants don't need
	  to be passed (introduce a `Config` and use `configAPI` internally)
- Common:
	- Update rollup constants with proper *big.Int when required
	- Add BidCoordinator and Slot structs used by the HistoryDB and
	  Synchronizer.
	- Add helper methods to AuctionConstants
	- AuctionVariables: Add column `DefaultSlotSetBidSlotNum` (in the SQL
	  table: `default_slot_set_bid_slot_num`), which indicates at which
	  slotNum does the `DefaultSlotSetBid` specified starts applying.
- Config:
	- Move coordinator exclusive configuration from the node config to the
	  coordinator config
- Coordinator:
	- Reorganize the code towards having the goroutines started and stopped
	  from the coordinator itself instead of the node.
	- Remove all stop and stopped channels, and use context.Context and
	  sync.WaitGroup instead.
	- Remove BatchInfo setters and assing variables directly
	- In ServerProof and ServerProofPool use context instead stop channel.
	- Use message passing to notify the coordinator about sync updates and
	  reorgs
	- Introduce the Pipeline, which can be started and stopped by the
	  Coordinator
	- Introduce the TxManager, which manages ethereum transactions (the
	  TxManager is also in charge of making the forge call to the rollup
	  smart contract).  The TxManager keeps ethereum transactions and:
	  	1. Waits for the transaction to be accepted
		2. Waits for the transaction to be confirmed for N blocks
	- In forge logic, first prepare a batch and then wait for an available
	  server proof to have all work ready once the proof server is ready.
	- Remove the `isForgeSequence` method which was querying the smart
	  contract, and instead use notifications sent by the Synchronizer to
	  figure out if it's forging time.
	- Update test (which is a minimal test to manually see if the
	  coordinator starts)
- HistoryDB:
	- Add method to get the number of batches in a slot (used to detect when
	  a slot has passed the bid winner forging deadline)
	- Add method to get the best bid and associated coordinator of a slot
	  (used to detect the forgerAddress that can forge the slot)
- General:
	- Rename some instances of `currentBlock` to `lastBlock` to be more
	  clear.
- Node:
	- Connect the API to the node and call the methods to update cached
	  state when the sync advances blocks.
	- Call methods to update Coordinator state when the sync advances blocks
	  and finds reorgs.
- Synchronizer:
	- Add Auction field in the Stats, which contain the current slot with
	  info about highest bidder and other related info required to know who
	  can forge in the current block.
	- Better organization of cached state:
		- On Sync, update the internal cached state
		- On Init or Reorg, load the state from HistoryDB into the
		  internal cached state.
This commit is contained in:
Eduard S
2020-11-13 18:11:58 +01:00
parent bf88eb60b8
commit 3b99953007
31 changed files with 1195 additions and 716 deletions

View File

@@ -54,14 +54,27 @@ type Stats struct {
Updated time.Time
LastBlock int64
LastBatch int64
Auction struct {
CurrentSlot common.Slot
}
}
}
// Synced returns true if the Synchronizer is up to date with the last ethereum block
func (s *Stats) Synced() bool {
return s.Eth.LastBlock == s.Sync.LastBlock
}
// TODO(Edu): Consider removing all the mutexes from StatsHolder, make
// Synchronizer.Stats not thread-safe, don't pass the synchronizer to the
// debugAPI, and have a copy of the Stats in the DebugAPI that the node passes
// when the Sync updates.
// StatsHolder stores stats and that allows reading and writing them
// concurrently
type StatsHolder struct {
stats Stats
rw sync.RWMutex
Stats
rw sync.RWMutex
}
// NewStatsHolder creates a new StatsHolder
@@ -69,18 +82,25 @@ func NewStatsHolder(firstBlock int64, refreshPeriod time.Duration) *StatsHolder
stats := Stats{}
stats.Eth.RefreshPeriod = refreshPeriod
stats.Eth.FirstBlock = firstBlock
return &StatsHolder{stats: stats}
return &StatsHolder{Stats: stats}
}
// UpdateCurrentSlot updates the auction stats
func (s *StatsHolder) UpdateCurrentSlot(slot common.Slot) {
s.rw.Lock()
s.Sync.Auction.CurrentSlot = slot
s.rw.Unlock()
}
// UpdateSync updates the synchronizer stats
func (s *StatsHolder) UpdateSync(lastBlock int64, lastBatch *common.BatchNum) {
now := time.Now()
s.rw.Lock()
s.stats.Sync.LastBlock = lastBlock
s.Sync.LastBlock = lastBlock
if lastBatch != nil {
s.stats.Sync.LastBatch = int64(*lastBatch)
s.Sync.LastBatch = int64(*lastBatch)
}
s.stats.Sync.Updated = now
s.Sync.Updated = now
s.rw.Unlock()
}
@@ -88,13 +108,13 @@ func (s *StatsHolder) UpdateSync(lastBlock int64, lastBatch *common.BatchNum) {
func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error {
now := time.Now()
s.rw.RLock()
elapsed := now.Sub(s.stats.Eth.Updated)
elapsed := now.Sub(s.Eth.Updated)
s.rw.RUnlock()
if elapsed < s.stats.Eth.RefreshPeriod {
if elapsed < s.Eth.RefreshPeriod {
return nil
}
lastBlock, err := ethClient.EthCurrentBlock()
lastBlock, err := ethClient.EthLastBlock()
if err != nil {
return err
}
@@ -103,9 +123,9 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error {
return err
}
s.rw.Lock()
s.stats.Eth.Updated = now
s.stats.Eth.LastBlock = lastBlock
s.stats.Eth.LastBatch = lastBatch
s.Eth.Updated = now
s.Eth.LastBlock = lastBlock
s.Eth.LastBatch = lastBatch
s.rw.Unlock()
return nil
}
@@ -113,19 +133,27 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error {
// CopyStats returns a copy of the inner Stats
func (s *StatsHolder) CopyStats() *Stats {
s.rw.RLock()
sCopy := s.stats
sCopy := s.Stats
if s.Sync.Auction.CurrentSlot.BidValue != nil {
sCopy.Sync.Auction.CurrentSlot.BidValue =
common.CopyBigInt(s.Sync.Auction.CurrentSlot.BidValue)
}
s.rw.RUnlock()
return &sCopy
}
func (s *StatsHolder) blocksPerc() float64 {
return float64(s.stats.Sync.LastBlock-s.stats.Eth.FirstBlock) * 100.0 /
float64(s.stats.Eth.LastBlock-s.stats.Eth.FirstBlock)
syncLastBlock := s.Sync.LastBlock
if s.Sync.LastBlock == 0 {
syncLastBlock = s.Eth.FirstBlock - 1
}
return float64(syncLastBlock-(s.Eth.FirstBlock-1)) * 100.0 /
float64(s.Eth.LastBlock-(s.Eth.FirstBlock-1))
}
func (s *StatsHolder) batchesPerc(batchNum int64) float64 {
return float64(batchNum) * 100.0 /
float64(s.stats.Eth.LastBatch)
float64(s.Eth.LastBatch)
}
// ConfigStartBlockNum sets the first block used to start tracking the smart
@@ -143,6 +171,13 @@ type SCVariables struct {
WDelayer common.WDelayerVariables `validate:"required"`
}
// SCConsts joins all the smart contract constants in a single struct
type SCConsts struct {
Rollup common.RollupConstants
Auction common.AuctionConstants
WDelayer common.WDelayerConstants
}
// Config is the Synchronizer configuration
type Config struct {
StartBlockNum ConfigStartBlockNum
@@ -152,16 +187,17 @@ type Config struct {
// Synchronizer implements the Synchronizer type
type Synchronizer struct {
ethClient eth.ClientInterface
auctionConstants common.AuctionConstants
rollupConstants common.RollupConstants
wDelayerConstants common.WDelayerConstants
historyDB *historydb.HistoryDB
stateDB *statedb.StateDB
cfg Config
startBlockNum int64
vars SCVariables
stats *StatsHolder
ethClient eth.ClientInterface
// auctionConstants common.AuctionConstants
// rollupConstants common.RollupConstants
// wDelayerConstants common.WDelayerConstants
consts SCConsts
historyDB *historydb.HistoryDB
stateDB *statedb.StateDB
cfg Config
startBlockNum int64
vars SCVariables
stats *StatsHolder
// firstSavedBlock *common.Block
// mux sync.Mutex
}
@@ -196,56 +232,113 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History
}
stats := NewStatsHolder(startBlockNum, cfg.StatsRefreshPeriod)
s := &Synchronizer{
ethClient: ethClient,
auctionConstants: *auctionConstants,
rollupConstants: *rollupConstants,
wDelayerConstants: *wDelayerConstants,
historyDB: historyDB,
stateDB: stateDB,
cfg: cfg,
startBlockNum: startBlockNum,
stats: stats,
ethClient: ethClient,
consts: SCConsts{
Rollup: *rollupConstants,
Auction: *auctionConstants,
WDelayer: *wDelayerConstants,
},
historyDB: historyDB,
stateDB: stateDB,
cfg: cfg,
startBlockNum: startBlockNum,
stats: stats,
}
return s, s.init()
}
// Stats returns a copy of the Synchronizer Stats
// Stats returns a copy of the Synchronizer Stats. It is safe to call Stats()
// during a Sync call
func (s *Synchronizer) Stats() *Stats {
return s.stats.CopyStats()
}
// AuctionConstants returns the AuctionConstants read from the smart contract
func (s *Synchronizer) AuctionConstants() *common.AuctionConstants {
return &s.auctionConstants
return &s.consts.Auction
}
// RollupConstants returns the RollupConstants read from the smart contract
func (s *Synchronizer) RollupConstants() *common.RollupConstants {
return &s.rollupConstants
return &s.consts.Rollup
}
// WDelayerConstants returns the WDelayerConstants read from the smart contract
func (s *Synchronizer) WDelayerConstants() *common.WDelayerConstants {
return &s.wDelayerConstants
return &s.consts.WDelayer
}
// SCVars returns a copy of the Smart Contract Variables
func (s *Synchronizer) SCVars() (*common.RollupVariables, *common.AuctionVariables, *common.WDelayerVariables) {
return s.vars.Rollup.Copy(), s.vars.Auction.Copy(), s.vars.WDelayer.Copy()
}
func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
slot := common.Slot{
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
BatchesLen: int(s.stats.Sync.Auction.CurrentSlot.BatchesLen),
}
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock + 1
slotNum := s.consts.Auction.SlotNum(blockNum)
if batchesLen == -1 {
dbBatchesLen, err := s.historyDB.GetBatchesLen(slotNum)
// fmt.Printf("DBG -1 from: %v, to: %v, len: %v\n", from, to, dbBatchesLen)
if err != nil {
log.Errorw("historyDB.GetBatchesLen", "err", err)
return err
}
slot.BatchesLen = dbBatchesLen
} else if slotNum > slot.SlotNum {
// fmt.Printf("DBG batchesLen Reset len: %v (%v %v)\n", batchesLen, slotNum, slot.SlotNum)
slot.BatchesLen = batchesLen
} else {
// fmt.Printf("DBG batchesLen add len: %v: %v\n", batchesLen, slot.BatchesLen+batchesLen)
slot.BatchesLen += batchesLen
}
slot.SlotNum = slotNum
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
// If Synced, update the current coordinator
if s.stats.Synced() {
bidCoord, err := s.historyDB.GetBestBidCoordinator(slot.SlotNum)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == sql.ErrNoRows {
slot.BootCoord = true
slot.Forger = s.vars.Auction.BootCoordinator
slot.URL = "???"
} else if err == nil {
slot.BidValue = bidCoord.BidValue
defaultSlotBid := bidCoord.DefaultSlotSetBid[slot.SlotNum%6]
if slot.BidValue.Cmp(defaultSlotBid) >= 0 {
slot.Bidder = bidCoord.Bidder
slot.Forger = bidCoord.Forger
slot.URL = bidCoord.URL
} else {
slot.BootCoord = true
slot.Forger = s.vars.Auction.BootCoordinator
slot.URL = "???"
}
}
// TODO: Remove this SANITY CHECK once this code is tested enough
// BEGIN SANITY CHECK
canForge, err := s.ethClient.AuctionCanForge(slot.Forger, blockNum)
if err != nil {
return err
}
if !canForge {
return fmt.Errorf("Synchronized value of forger address for closed slot "+
"differs from smart contract: %+v", slot)
}
// END SANITY CHECK
}
s.stats.UpdateCurrentSlot(slot)
return nil
}
func (s *Synchronizer) init() error {
rollup, auction, wDelayer, err := s.historyDB.GetSCVars()
// If SCVars are not in the HistoryDB, this is probably the first run
// of the Synchronizer: store the initial vars taken from config
if err == sql.ErrNoRows {
rollup = &s.cfg.InitialVariables.Rollup
auction = &s.cfg.InitialVariables.Auction
wDelayer = &s.cfg.InitialVariables.WDelayer
log.Info("Setting initial SCVars in HistoryDB")
if err = s.historyDB.SetInitialSCVars(rollup, auction, wDelayer); err != nil {
return err
}
}
s.vars.Rollup = *rollup
s.vars.Auction = *auction
s.vars.WDelayer = *wDelayer
// Update stats parameters so that they have valid values before the
// first Sync call
if err := s.stats.UpdateEth(s.ethClient); err != nil {
@@ -265,26 +358,20 @@ func (s *Synchronizer) init() error {
} else {
lastBlockNum = lastSavedBlock.EthBlockNum
}
lastBatchNum, err := s.historyDB.GetLastBatchNum()
if err != nil && err != sql.ErrNoRows {
if err := s.resetState(lastBlockNum); err != nil {
return err
}
if err == sql.ErrNoRows {
lastBatchNum = 0
}
s.stats.UpdateSync(lastBlockNum, &lastBatchNum)
log.Infow("Sync init block",
"syncLastBlock", s.stats.stats.Sync.LastBlock,
"syncLastBlock", s.stats.Sync.LastBlock,
"syncBlocksPerc", s.stats.blocksPerc(),
"ethFirstBlock", s.stats.stats.Eth.FirstBlock,
"ethLastBlock", s.stats.stats.Eth.LastBlock,
"ethFirstBlock", s.stats.Eth.FirstBlock,
"ethLastBlock", s.stats.Eth.LastBlock,
)
log.Infow("Sync init batch",
"syncLastBatch", s.stats.stats.Sync.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.stats.Sync.LastBatch),
"ethLastBatch", s.stats.stats.Eth.LastBatch,
"syncLastBatch", s.stats.Sync.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch),
"ethLastBatch", s.stats.Eth.LastBatch,
)
return nil
}
@@ -329,7 +416,7 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
log.Debugw("Syncing...",
"block", nextBlockNum,
"ethLastBlock", s.stats.stats.Eth.LastBlock,
"ethLastBlock", s.stats.Eth.LastBlock,
)
// Check that the obtianed ethBlock.ParentHash == prevEthBlock.Hash; if not, reorg!
@@ -404,16 +491,19 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
s.stats.UpdateSync(ethBlock.EthBlockNum,
&rollupData.Batches[batchesLen-1].Batch.BatchNum)
}
if err := s.updateCurrentSlotIfSync(len(rollupData.Batches)); err != nil {
return nil, nil, err
}
log.Debugw("Synced block",
"syncLastBlock", s.stats.stats.Sync.LastBlock,
"syncLastBlock", s.stats.Sync.LastBlock,
"syncBlocksPerc", s.stats.blocksPerc(),
"ethLastBlock", s.stats.stats.Eth.LastBlock,
"ethLastBlock", s.stats.Eth.LastBlock,
)
for _, batchData := range rollupData.Batches {
log.Debugw("Synced batch",
"syncLastBatch", batchData.Batch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)),
"ethLastBatch", s.stats.stats.Eth.LastBatch,
"ethLastBatch", s.stats.Eth.LastBatch,
)
}
@@ -431,11 +521,13 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) {
for blockNum >= s.startBlockNum {
ethBlock, err := s.ethClient.EthBlockByNumber(context.Background(), blockNum)
if err != nil {
log.Errorw("ethClient.EthBlockByNumber", "err", err)
return 0, err
}
block, err := s.historyDB.GetBlock(blockNum)
if err != nil {
log.Errorw("historyDB.GetBlock", "err", err)
return 0, err
}
if block.Hash == ethBlock.Hash {
@@ -453,19 +545,51 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) {
return 0, err
}
if err := s.resetState(blockNum); err != nil {
return 0, err
}
return blockNum, nil
}
func (s *Synchronizer) resetState(blockNum int64) error {
rollup, auction, wDelayer, err := s.historyDB.GetSCVars()
// If SCVars are not in the HistoryDB, this is probably the first run
// of the Synchronizer: store the initial vars taken from config
if err == sql.ErrNoRows {
rollup = &s.cfg.InitialVariables.Rollup
auction = &s.cfg.InitialVariables.Auction
wDelayer = &s.cfg.InitialVariables.WDelayer
log.Info("Setting initial SCVars in HistoryDB")
if err = s.historyDB.SetInitialSCVars(rollup, auction, wDelayer); err != nil {
log.Errorw("historyDB.SetInitialSCVars", "err", err)
return err
}
}
s.vars.Rollup = *rollup
s.vars.Auction = *auction
s.vars.WDelayer = *wDelayer
batchNum, err := s.historyDB.GetLastBatchNum()
if err != nil && err != sql.ErrNoRows {
return 0, err
log.Errorw("historyDB.GetLastBatchNum", "err", err)
return err
}
if err == sql.ErrNoRows {
batchNum = 0
}
err = s.stateDB.Reset(batchNum)
if err != nil {
return 0, err
log.Errorw("stateDB.Reset", "err", err)
return err
}
return blockNum, nil
s.stats.UpdateSync(blockNum, &batchNum)
if err := s.updateCurrentSlotIfSync(-1); err != nil {
return err
}
return nil
}
// TODO: Figure out who will use the Status output, and only return what's strictly need
@@ -500,7 +624,7 @@ func (s *Synchronizer) Status() (*common.SyncStatus, error) {
status.CurrentBatch = lastSavedBatch
// Get latest blockNum in blockchain
latestBlockNum, err := s.ethClient.EthCurrentBlock()
latestBlockNum, err := s.ethClient.EthLastBlock()
if err != nil {
return nil, err
}
@@ -664,9 +788,9 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
batchData.CreatedAccounts = processTxsOut.CreatedAccounts
slotNum := int64(0)
if ethBlock.EthBlockNum >= s.auctionConstants.GenesisBlockNum {
slotNum = (ethBlock.EthBlockNum - s.auctionConstants.GenesisBlockNum) /
int64(s.auctionConstants.BlocksPerSlot)
if ethBlock.EthBlockNum >= s.consts.Auction.GenesisBlockNum {
slotNum = (ethBlock.EthBlockNum - s.consts.Auction.GenesisBlockNum) /
int64(s.consts.Auction.BlocksPerSlot)
}
// Get Batch information
@@ -831,6 +955,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData,
"auctionEvents.NewDefaultSlotSetBid: %v", evt.SlotSet)
}
s.vars.Auction.DefaultSlotSetBid[evt.SlotSet] = evt.NewInitialMinBid
s.vars.Auction.DefaultSlotSetBidSlotNum = s.consts.Auction.SlotNum(blockNum) + int64(s.vars.Auction.ClosedAuctionSlots) + 1
varsUpdate = true
}

View File

@@ -535,7 +535,7 @@ func TestSync(t *testing.T) {
for i := 0; i < 4; i++ {
client.CtlRollback()
}
blockNum := client.CtlCurrentBlock()
blockNum := client.CtlLastBlock()
require.Equal(t, int64(1), blockNum)
// Generate extra required data