Update coordinator to work better under real net

- cli / node
    - Update handler of SIGINT so that after 3 SIGINTs, the process terminates
      unconditionally
- coordinator
    - Store stats without pointer
    - In all functions that send a variable via channel, check for context done
      to avoid deadlock (due to no process reading from the channel, which has
      no queue) when the node is stopped.
    - Abstract `canForge` so that it can be used outside of the `Coordinator`
    - In `canForge` check the blockNumber in current and next slot.
    - Update tests due to smart contract changes in slot handling, and minimum
      bid defaults
    - TxManager
        - Add consts, vars and stats to allow evaluating `canForge`
        - Add `canForge` method (not used yet)
        - Store batch and nonces status (last success and last pending)
        - Track nonces internally instead of relying on the ethereum node (this
          is required to work with ganache when there are pending txs)
        - Handle the (common) case of the receipt not being found after the tx
          is sent.
        - Don't start the main loop until we get an initial messae fo the stats
          and vars (so that in the loop the stats and vars are set to
          synchronizer values)
        - When a tx fails, check and discard all the failed transactions before
          sending the message to stop the pipeline.  This will avoid sending
          consecutive messages of stop the pipeline when multiple txs are
          detected to be failed consecutively.  Also, future txs of the same
          pipeline after a discarded txs are discarded, and their nonces reused.
        - Robust handling of nonces:
            - If geth returns nonce is too low, increase it
            - If geth returns nonce too hight, decrease it
            - If geth returns underpriced, increase gas price
            - If geth returns replace underpriced, increase gas price
        - Add support for resending transactions after a timeout
        - Store `BatchInfos` in a queue
    - Pipeline
        - When an error is found, stop forging batches and send a message to the
          coordinator to stop the pipeline with information of the failed batch
          number so that in a restart, non-failed batches are not repated.
        - When doing a reset of the stateDB, if possible reset from the local
          checkpoint instead of resetting from the synchronizer.  This allows
          resetting from a batch that is valid but not yet sent / synced.
    - Every time a pipeline is started, assign it a number from a counter.  This
      allows the TxManager to ignore batches from stopped pipelines, via a
      message sent by the coordinator.
    - Avoid forging when we haven't reached the rollup genesis block number.
    - Add config parameter `StartSlotBlocksDelay`: StartSlotBlocksDelay is the
      number of blocks of delay to wait before starting the pipeline when we
      reach a slot in which we can forge.
    - When detecting a reorg, only reset the pipeline if the batch from which
      the pipeline started changed and wasn't sent by us.
    - Add config parameter `ScheduleBatchBlocksAheadCheck`:
      ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which the
      forger address is checked to be allowed to forge (apart from checking the
      next block), used to decide when to stop scheduling new batches (by
      stopping the pipeline).  For example, if we are at block 10 and
      ScheduleBatchBlocksAheadCheck is 5, eventhough at block 11 we canForge,
      the pipeline will be stopped if we can't forge at block 15.  This value
      should be the expected number of blocks it takes between scheduling a
      batch and having it mined.
    - Add config parameter `SendBatchBlocksMarginCheck`:
      SendBatchBlocksMarginCheck is the number of margin blocks ahead in which
      the coordinator is also checked to be allowed to forge, apart from the
      next block; used to decide when to stop sending batches to the smart
      contract.  For example, if we are at block 10 and
      SendBatchBlocksMarginCheck is 5, eventhough at block 11 we canForge, the
      batch will be discarded if we can't forge at block 15.
    - Add config parameter `TxResendTimeout`: TxResendTimeout is the timeout
      after which a non-mined ethereum transaction will be resent (reusing the
      nonce) with a newly calculated gas price
    - Add config parameter `MaxGasPrice`: MaxGasPrice is the maximum gas price
      allowed for ethereum transactions
    - Add config parameter `NoReuseNonce`: NoReuseNonce disables reusing nonces
      of pending transactions for new replacement transactions.  This is useful
      for testing with Ganache.
    - Extend BatchInfo with more useful information for debugging

- eth / ethereum client
    - Add necessary methods to create the auth object for transactions manually
      so that we can set the nonce, gas price, gas limit, etc manually
    - Update `RollupForgeBatch` to take an auth object as input (so that the
      coordinator can set parameters manually)
- synchronizer
    - In stats, add `NextSlot`
    - In stats, store full last batch instead of just last batch number
    - Instead of calculating a nextSlot from scratch every time, update the
      current struct (only updating the forger info if we are Synced)
    - Afer every processed batch, check that the calculated StateDB MTRoot
      matches the StateRoot found in the forgeBatch event.
This commit is contained in:
Eduard S
2021-02-16 14:22:51 +01:00
parent 26fbeb5c68
commit f0e79f3d55
22 changed files with 935 additions and 285 deletions

View File

@@ -25,12 +25,12 @@ type Stats struct {
Updated time.Time
FirstBlockNum int64
LastBlock common.Block
LastBatch int64
LastBatchNum int64
}
Sync struct {
Updated time.Time
LastBlock common.Block
LastBatch int64
LastBatch common.Batch
// LastL1BatchBlock is the last ethereum block in which an
// l1Batch was forged
LastL1BatchBlock int64
@@ -77,13 +77,13 @@ func (s *StatsHolder) UpdateCurrentNextSlot(current *common.Slot, next *common.S
}
// UpdateSync updates the synchronizer stats
func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum,
func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.Batch,
lastL1BatchBlock *int64, lastForgeL1TxsNum *int64) {
now := time.Now()
s.rw.Lock()
s.Sync.LastBlock = *lastBlock
if lastBatch != nil {
s.Sync.LastBatch = int64(*lastBatch)
s.Sync.LastBatch = *lastBatch
}
if lastL1BatchBlock != nil {
s.Sync.LastL1BatchBlock = *lastL1BatchBlock
@@ -105,16 +105,16 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error {
lastBlock, err := ethClient.EthBlockByNumber(context.TODO(), -1)
if err != nil {
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err))
}
lastBatch, err := ethClient.RollupLastForgedBatch()
lastBatchNum, err := ethClient.RollupLastForgedBatch()
if err != nil {
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("RollupLastForgedBatch: %w", err))
}
s.rw.Lock()
s.Eth.Updated = now
s.Eth.LastBlock = *lastBlock
s.Eth.LastBatch = lastBatch
s.Eth.LastBatchNum = lastBatchNum
s.rw.Unlock()
return nil
}
@@ -139,6 +139,10 @@ func (s *StatsHolder) CopyStats() *Stats {
sCopy.Sync.Auction.NextSlot.DefaultSlotBid =
common.CopyBigInt(s.Sync.Auction.NextSlot.DefaultSlotBid)
}
if s.Sync.LastBatch.StateRoot != nil {
sCopy.Sync.LastBatch.StateRoot =
common.CopyBigInt(s.Sync.LastBatch.StateRoot)
}
s.rw.RUnlock()
return &sCopy
}
@@ -152,9 +156,9 @@ func (s *StatsHolder) blocksPerc() float64 {
float64(s.Eth.LastBlock.Num-(s.Eth.FirstBlockNum-1))
}
func (s *StatsHolder) batchesPerc(batchNum int64) float64 {
func (s *StatsHolder) batchesPerc(batchNum common.BatchNum) float64 {
return float64(batchNum) * 100.0 /
float64(s.Eth.LastBatch)
float64(s.Eth.LastBatchNum)
}
// StartBlockNums sets the first block used to start tracking the smart
@@ -329,23 +333,25 @@ func (s *Synchronizer) setSlotCoordinator(slot *common.Slot) error {
return nil
}
// firstBatchBlockNum is the blockNum of first batch in that block, if any
func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*common.Slot, error) {
slot := common.Slot{
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment,
}
// updateCurrentSlot updates the slot with information of the current slot.
// The information abouth which coordinator is allowed to forge is only updated
// when we are Synced.
// hasBatch is true when the last synced block contained at least one batch.
func (s *Synchronizer) updateCurrentSlot(slot *common.Slot, reset bool, hasBatch bool) error {
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock.Num + 1
slotNum := s.consts.Auction.SlotNum(blockNum)
firstBatchBlockNum := s.stats.Sync.LastBlock.Num
if reset {
// Using this query only to know if there
dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum)
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
return nil, tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err))
return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err))
} else if tracerr.Unwrap(err) == sql.ErrNoRows {
firstBatchBlockNum = nil
hasBatch = false
} else {
firstBatchBlockNum = &dbFirstBatchBlockNum
hasBatch = true
firstBatchBlockNum = dbFirstBatchBlockNum
}
slot.ForgerCommitment = false
} else if slotNum > slot.SlotNum {
@@ -356,11 +362,11 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
// If Synced, update the current coordinator
if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum {
if err := s.setSlotCoordinator(&slot); err != nil {
return nil, tracerr.Wrap(err)
if err := s.setSlotCoordinator(slot); err != nil {
return tracerr.Wrap(err)
}
if firstBatchBlockNum != nil &&
s.consts.Auction.RelativeBlock(*firstBatchBlockNum) <
if hasBatch &&
s.consts.Auction.RelativeBlock(firstBatchBlockNum) <
int64(s.vars.Auction.SlotDeadline) {
slot.ForgerCommitment = true
}
@@ -369,57 +375,61 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c
// BEGIN SANITY CHECK
canForge, err := s.ethClient.AuctionCanForge(slot.Forger, blockNum)
if err != nil {
return nil, tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err))
}
if !canForge {
return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
"differs from smart contract: %+v", slot))
}
// END SANITY CHECK
}
return &slot, nil
return nil
}
func (s *Synchronizer) getNextSlot() (*common.Slot, error) {
// updateNextSlot updates the slot with information of the next slot.
// The information abouth which coordinator is allowed to forge is only updated
// when we are Synced.
func (s *Synchronizer) updateNextSlot(slot *common.Slot) error {
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock.Num + 1
slotNum := s.consts.Auction.SlotNum(blockNum) + 1
slot := common.Slot{
SlotNum: slotNum,
ForgerCommitment: false,
}
slot.SlotNum = slotNum
slot.ForgerCommitment = false
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
// If Synced, update the current coordinator
if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum {
if err := s.setSlotCoordinator(&slot); err != nil {
return nil, tracerr.Wrap(err)
if err := s.setSlotCoordinator(slot); err != nil {
return tracerr.Wrap(err)
}
// TODO: Remove this SANITY CHECK once this code is tested enough
// BEGIN SANITY CHECK
canForge, err := s.ethClient.AuctionCanForge(slot.Forger, slot.StartBlock)
if err != nil {
return nil, tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err))
}
if !canForge {
return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
"differs from smart contract: %+v", slot))
}
// END SANITY CHECK
}
return &slot, nil
return nil
}
func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, firstBatchBlockNum *int64) error {
current, err := s.getCurrentSlot(reset, firstBatchBlockNum)
if err != nil {
// updateCurrentNextSlotIfSync updates the current and next slot. Information
// about forger address that is allowed to forge is only updated if we are
// Synced.
func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, hasBatch bool) error {
current := s.stats.Sync.Auction.CurrentSlot
next := s.stats.Sync.Auction.NextSlot
if err := s.updateCurrentSlot(&current, reset, hasBatch); err != nil {
return tracerr.Wrap(err)
}
next, err := s.getNextSlot()
if err != nil {
if err := s.updateNextSlot(&next); err != nil {
return tracerr.Wrap(err)
}
s.stats.UpdateCurrentNextSlot(current, next)
s.stats.UpdateCurrentNextSlot(&current, &next)
return nil
}
@@ -458,9 +468,9 @@ func (s *Synchronizer) init() error {
"ethLastBlock", s.stats.Eth.LastBlock,
)
log.Infow("Sync init batch",
"syncLastBatch", s.stats.Sync.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch),
"ethLastBatch", s.stats.Eth.LastBatch,
"syncLastBatch", s.stats.Sync.LastBatch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch.BatchNum),
"ethLastBatch", s.stats.Eth.LastBatchNum,
)
return nil
}
@@ -521,7 +531,7 @@ func (s *Synchronizer) Sync2(ctx context.Context,
if tracerr.Unwrap(err) == ethereum.NotFound {
return nil, nil, nil
} else if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err))
}
log.Debugf("ethBlock: num: %v, parent: %v, hash: %v",
ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String())
@@ -627,14 +637,14 @@ func (s *Synchronizer) Sync2(ctx context.Context,
}
}
s.stats.UpdateSync(ethBlock,
&rollupData.Batches[batchesLen-1].Batch.BatchNum,
&rollupData.Batches[batchesLen-1].Batch,
lastL1BatchBlock, lastForgeL1TxsNum)
}
var firstBatchBlockNum *int64
hasBatch := false
if len(rollupData.Batches) > 0 {
firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum
hasBatch = true
}
if err := s.updateCurrentNextSlotIfSync(false, firstBatchBlockNum); err != nil {
if err := s.updateCurrentNextSlotIfSync(false, hasBatch); err != nil {
return nil, nil, tracerr.Wrap(err)
}
@@ -646,8 +656,8 @@ func (s *Synchronizer) Sync2(ctx context.Context,
for _, batchData := range rollupData.Batches {
log.Debugw("Synced batch",
"syncLastBatch", batchData.Batch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)),
"ethLastBatch", s.stats.Eth.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum),
"ethLastBatch", s.stats.Eth.LastBatchNum,
)
}
@@ -700,15 +710,15 @@ func getInitialVariables(ethClient eth.ClientInterface,
consts *SCConsts) (*SCVariables, *StartBlockNums, error) {
rollupInit, rollupInitBlock, err := ethClient.RollupEventInit()
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("RollupEventInit: %w", err))
}
auctionInit, auctionInitBlock, err := ethClient.AuctionEventInit()
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("AuctionEventInit: %w", err))
}
wDelayerInit, wDelayerInitBlock, err := ethClient.WDelayerEventInit()
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("WDelayerEventInit: %w", err))
}
rollupVars := rollupInit.RollupVariables()
auctionVars := auctionInit.AuctionVariables(consts.Auction.InitialMinimalBidding)
@@ -753,15 +763,15 @@ func (s *Synchronizer) resetState(block *common.Block) error {
s.vars.WDelayer = *wDelayer
}
batchNum, err := s.historyDB.GetLastBatchNum()
batch, err := s.historyDB.GetLastBatch()
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err))
}
if tracerr.Unwrap(err) == sql.ErrNoRows {
batchNum = 0
batch = &common.Batch{}
}
err = s.stateDB.Reset(batchNum)
err = s.stateDB.Reset(batch.BatchNum)
if err != nil {
return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err))
}
@@ -783,9 +793,9 @@ func (s *Synchronizer) resetState(block *common.Block) error {
lastForgeL1TxsNum = &n
}
s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum)
s.stats.UpdateSync(block, batch, &lastL1BatchBlockNum, lastForgeL1TxsNum)
if err := s.updateCurrentNextSlotIfSync(true, nil); err != nil {
if err := s.updateCurrentNextSlotIfSync(true, false); err != nil {
return tracerr.Wrap(err)
}
return nil
@@ -802,7 +812,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
// the expected one.
rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, &ethBlock.Hash)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err))
}
// No events in this block
if rollupEvents == nil {
@@ -919,9 +929,15 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
return nil, tracerr.Wrap(err)
}
if s.stateDB.CurrentBatch() != batchNum {
return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != evtForgeBatch.BatchNum = (%v)",
return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != "+
"evtForgeBatch.BatchNum = (%v)",
s.stateDB.CurrentBatch(), batchNum))
}
if s.stateDB.MT.Root().BigInt().Cmp(forgeBatchArgs.NewStRoot) != 0 {
return nil, tracerr.Wrap(fmt.Errorf("stateDB.MTRoot (%v) != "+
"forgeBatchArgs.NewStRoot (%v)",
s.stateDB.MT.Root().BigInt(), forgeBatchArgs.NewStRoot))
}
// Transform processed PoolL2 txs to L2 and store in BatchData
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
@@ -1106,7 +1122,7 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData,
// Get auction events in the block
auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, &ethBlock.Hash)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err))
}
// No events in this block
if auctionEvents == nil {
@@ -1203,7 +1219,7 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat
// Get wDelayer events in the block
wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, &ethBlock.Hash)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err))
}
// No events in this block
if wDelayerEvents == nil {

View File

@@ -17,7 +17,6 @@ import (
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/test"
"github.com/hermeznetwork/hermez-node/test/til"
"github.com/jinzhu/copier"
@@ -321,6 +320,14 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) {
return stateDB, historyDB
}
func newBigInt(s string) *big.Int {
v, ok := new(big.Int).SetString(s, 10)
if !ok {
panic(fmt.Errorf("Can't set big.Int from %s", s))
}
return v
}
func TestSyncGeneral(t *testing.T) {
//
// Setup
@@ -339,7 +346,6 @@ func TestSyncGeneral(t *testing.T) {
s, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
})
log.Error(err)
require.NoError(t, err)
ctx := context.Background()
@@ -434,12 +440,22 @@ func TestSyncGeneral(t *testing.T) {
require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs))
require.Equal(t, 2, len(blocks[i].Rollup.Batches))
require.Equal(t, 2, len(blocks[i].Rollup.Batches[0].L1CoordinatorTxs))
// Set StateRoots for batches manually (til doesn't set it)
blocks[i].Rollup.Batches[0].Batch.StateRoot =
newBigInt("18906357591508007884273218035694076596537737437965299189312069102730480717391")
blocks[i].Rollup.Batches[1].Batch.StateRoot =
newBigInt("9513185123401321669660637227182204000277156839501731093239187625486561933297")
// blocks 1 (blockNum=3)
i = 1
require.Equal(t, 3, int(blocks[i].Block.Num))
require.Equal(t, 5, len(blocks[i].Rollup.L1UserTxs))
require.Equal(t, 2, len(blocks[i].Rollup.Batches))
require.Equal(t, 3, len(blocks[i].Rollup.Batches[0].L2Txs))
// Set StateRoots for batches manually (til doesn't set it)
blocks[i].Rollup.Batches[0].Batch.StateRoot =
newBigInt("13060270878200012606074130020925677466793317216609491464427188889005039616594")
blocks[i].Rollup.Batches[1].Batch.StateRoot =
newBigInt("21427104994652624302859637783375978708867165042357535792408500519060088086054")
// Generate extra required data
ethAddTokens(blocks, client)
@@ -614,6 +630,12 @@ func TestSyncGeneral(t *testing.T) {
blocks, err = tc.GenerateBlocks(set2)
require.NoError(t, err)
// Set StateRoots for batches manually (til doesn't set it)
blocks[0].Rollup.Batches[0].Batch.StateRoot =
newBigInt("11218510534825843475100588932060366395781087435899915642332104464234485046683")
blocks[0].Rollup.Batches[1].Batch.StateRoot =
newBigInt("20283020730369146334077598087403837297563965802277806438205710455191646998983")
for i := 0; i < 4; i++ {
client.CtlRollback()
}