* Synchronizer * mend Synchronizer main loop & reorg * mend Synchronizer main loop & reorg * mend Synchronizer main loop & reorg * Update PR and apply small changes Update PR and apply small changes: - Remove arbitrary line jumps (for example after an `err:=` there are cases with extra line before error check, and there are cases without the extra jumpline. Another example is the empty lines between a comment and the line of comment that is explained) - Fix some typo - Fix value printing of `lastSavedBlock` instead of `latestBlockNum` - Uncomment parameters of structs and use linter syntax to avoid unused checks * Update Synchr after master-pull to last types to compile Co-authored-by: Toni Ramírez <toni@iden3.com> Co-authored-by: arnaucube <root@arnaucube.com>feature/sql-semaphore1
@ -0,0 +1,272 @@ |
|||||
|
package synchronizer |
||||
|
|
||||
|
import ( |
||||
|
"context" |
||||
|
"database/sql" |
||||
|
"errors" |
||||
|
"math/big" |
||||
|
"sync" |
||||
|
|
||||
|
ethCommon "github.com/ethereum/go-ethereum/common" |
||||
|
"github.com/hermeznetwork/hermez-node/common" |
||||
|
"github.com/hermeznetwork/hermez-node/db/historydb" |
||||
|
"github.com/hermeznetwork/hermez-node/db/statedb" |
||||
|
"github.com/hermeznetwork/hermez-node/eth" |
||||
|
"github.com/hermeznetwork/hermez-node/log" |
||||
|
) |
||||
|
|
||||
|
const ( |
||||
|
blocksToSync = 20 // TODO: This will be deleted once we can get the firstSavedBlock from the ethClient
|
||||
|
) |
||||
|
|
||||
|
var ( |
||||
|
// ErrNotAbleToSync is used when there is not possible to find a valid block to sync
|
||||
|
ErrNotAbleToSync = errors.New("it has not been possible to synchronize any block") |
||||
|
) |
||||
|
|
||||
|
// BatchData contains information about Batches from the contracts
|
||||
|
//nolint:structcheck,unused
|
||||
|
type BatchData struct { |
||||
|
l1txs []common.L1Tx |
||||
|
l2txs []common.L2Tx |
||||
|
registeredAccounts []common.Account |
||||
|
exitTree []common.ExitInfo |
||||
|
} |
||||
|
|
||||
|
// BlockData contains information about Blocks from the contracts
|
||||
|
//nolint:structcheck,unused
|
||||
|
type BlockData struct { |
||||
|
block *common.Block |
||||
|
// Rollup
|
||||
|
batches []BatchData |
||||
|
withdrawals []common.ExitInfo |
||||
|
registeredTokens []common.Token |
||||
|
rollupVars *common.RollupVars |
||||
|
// Auction
|
||||
|
bids []common.Bid |
||||
|
coordinators []common.Coordinator |
||||
|
auctionVars *common.AuctionVars |
||||
|
} |
||||
|
|
||||
|
// Status is returned by the Status method
|
||||
|
type Status struct { |
||||
|
CurrentBlock uint64 |
||||
|
CurrentBatch common.BatchNum |
||||
|
CurrentForgerAddr ethCommon.Address |
||||
|
NextForgerAddr ethCommon.Address |
||||
|
Synchronized bool |
||||
|
} |
||||
|
|
||||
|
// Synchronizer implements the Synchronizer type
|
||||
|
type Synchronizer struct { |
||||
|
ethClient *eth.Client |
||||
|
historyDB *historydb.HistoryDB |
||||
|
stateDB *statedb.StateDB |
||||
|
firstSavedBlock *common.Block |
||||
|
mux sync.Mutex |
||||
|
} |
||||
|
|
||||
|
// NewSynchronizer creates a new Synchronizer
|
||||
|
func NewSynchronizer(ethClient *eth.Client, historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *Synchronizer { |
||||
|
s := &Synchronizer{ |
||||
|
ethClient: ethClient, |
||||
|
historyDB: historyDB, |
||||
|
stateDB: stateDB, |
||||
|
} |
||||
|
return s |
||||
|
} |
||||
|
|
||||
|
// Sync updates History and State DB with information from the blockchain
|
||||
|
func (s *Synchronizer) Sync() error { |
||||
|
// Avoid new sync while performing one
|
||||
|
s.mux.Lock() |
||||
|
defer s.mux.Unlock() |
||||
|
|
||||
|
var lastStoredForgeL1TxsNum uint64 |
||||
|
|
||||
|
// TODO: Get this information from ethClient once it's implemented
|
||||
|
// for the moment we will get the latestblock - 20 as firstSavedBlock
|
||||
|
latestBlock, err := s.ethClient.BlockByNumber(context.Background(), nil) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
s.firstSavedBlock, err = s.ethClient.BlockByNumber(context.Background(), big.NewInt(int64(latestBlock.EthBlockNum-blocksToSync))) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// Get lastSavedBlock from History DB
|
||||
|
lastSavedBlock, err := s.historyDB.GetLastBlock() |
||||
|
if err != nil && err != sql.ErrNoRows { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// Check if we got a block or nil
|
||||
|
// In case of nil we must do a full sync
|
||||
|
if lastSavedBlock == nil || lastSavedBlock.EthBlockNum == 0 { |
||||
|
lastSavedBlock = s.firstSavedBlock |
||||
|
} else { |
||||
|
// Get the latest block we have in History DB from blockchain to detect a reorg
|
||||
|
ethBlock, err := s.ethClient.BlockByNumber(context.Background(), big.NewInt(int64(lastSavedBlock.EthBlockNum))) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
if ethBlock.Hash != lastSavedBlock.Hash { |
||||
|
// Reorg detected
|
||||
|
log.Debugf("Reorg Detected...") |
||||
|
err := s.reorg(lastSavedBlock) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
lastSavedBlock, err = s.historyDB.GetLastBlock() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
log.Debugf("Syncing...") |
||||
|
|
||||
|
// Get latest blockNum in blockchain
|
||||
|
latestBlockNum, err := s.ethClient.CurrentBlock() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
log.Debugf("Blocks to sync: %v (lastSavedBlock: %v, latestBlock: %v)", latestBlockNum.Uint64()-lastSavedBlock.EthBlockNum, lastSavedBlock.EthBlockNum, latestBlockNum) |
||||
|
|
||||
|
for lastSavedBlock.EthBlockNum < latestBlockNum.Uint64() { |
||||
|
ethBlock, err := s.ethClient.BlockByNumber(context.Background(), big.NewInt(int64(lastSavedBlock.EthBlockNum+1))) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// Get data from the rollup contract
|
||||
|
blockData, batchData, err := s.rollupSync(ethBlock, lastStoredForgeL1TxsNum) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// Get data from the auction contract
|
||||
|
err = s.auctionSync(blockData, batchData) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// Add rollupData and auctionData once the method is updated
|
||||
|
err = s.historyDB.AddBlock(ethBlock) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
// We get the block on every iteration
|
||||
|
lastSavedBlock, err = s.historyDB.GetLastBlock() |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
// reorg manages a reorg, updating History and State DB as needed
|
||||
|
func (s *Synchronizer) reorg(uncleBlock *common.Block) error { |
||||
|
var block *common.Block |
||||
|
blockNum := uncleBlock.EthBlockNum |
||||
|
found := false |
||||
|
|
||||
|
log.Debugf("Reorg first uncle block: %v", blockNum) |
||||
|
|
||||
|
// Iterate History DB and the blokchain looking for the latest valid block
|
||||
|
for !found && blockNum > s.firstSavedBlock.EthBlockNum { |
||||
|
header, err := s.ethClient.HeaderByNumber(context.Background(), big.NewInt(int64(blockNum))) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
block, err = s.historyDB.GetBlock(blockNum) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
if block.Hash == header.Hash() { |
||||
|
found = true |
||||
|
log.Debugf("Found valid block: %v", blockNum) |
||||
|
} else { |
||||
|
log.Debugf("Discarding block: %v", blockNum) |
||||
|
} |
||||
|
|
||||
|
blockNum-- |
||||
|
} |
||||
|
|
||||
|
if found { |
||||
|
// Set History DB and State DB to the correct state
|
||||
|
err := s.historyDB.Reorg(block.EthBlockNum) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
|
||||
|
batchNum, err := s.historyDB.GetLastBatchNum() |
||||
|
if err != nil && err != sql.ErrNoRows { |
||||
|
return err |
||||
|
} |
||||
|
if batchNum != 0 { |
||||
|
err = s.stateDB.Reset(batchNum) |
||||
|
if err != nil { |
||||
|
return err |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return nil |
||||
|
} |
||||
|
|
||||
|
return ErrNotAbleToSync |
||||
|
} |
||||
|
|
||||
|
// Status returns current status values from the Synchronizer
|
||||
|
func (s *Synchronizer) Status() (*Status, error) { |
||||
|
// Avoid possible inconsistencies
|
||||
|
s.mux.Lock() |
||||
|
defer s.mux.Unlock() |
||||
|
|
||||
|
var status *Status |
||||
|
|
||||
|
// Get latest block in History DB
|
||||
|
lastSavedBlock, err := s.historyDB.GetLastBlock() |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
status.CurrentBlock = lastSavedBlock.EthBlockNum |
||||
|
|
||||
|
// Get latest batch in History DB
|
||||
|
lastSavedBatch, err := s.historyDB.GetLastBatchNum() |
||||
|
if err != nil && err != sql.ErrNoRows { |
||||
|
return nil, err |
||||
|
} |
||||
|
status.CurrentBatch = lastSavedBatch |
||||
|
|
||||
|
// Get latest blockNum in blockchain
|
||||
|
latestBlockNum, err := s.ethClient.CurrentBlock() |
||||
|
if err != nil { |
||||
|
return nil, err |
||||
|
} |
||||
|
|
||||
|
// TODO: Get CurrentForgerAddr & NextForgerAddr
|
||||
|
|
||||
|
// Check if Synchronizer is synchronized
|
||||
|
status.Synchronized = status.CurrentBlock == latestBlockNum.Uint64() |
||||
|
return status, nil |
||||
|
} |
||||
|
|
||||
|
// rollupSync gets information from the Rollup Contract
|
||||
|
func (s *Synchronizer) rollupSync(block *common.Block, lastStoredForgeL1TxsNum uint64) (*BlockData, []*BatchData, error) { |
||||
|
// To be implemented
|
||||
|
return nil, nil, nil |
||||
|
} |
||||
|
|
||||
|
// auctionSync gets information from the Auction Contract
|
||||
|
func (s *Synchronizer) auctionSync(blockData *BlockData, batchData []*BatchData) error { |
||||
|
// To be implemented
|
||||
|
return nil |
||||
|
} |
@ -0,0 +1,72 @@ |
|||||
|
package synchronizer |
||||
|
|
||||
|
import ( |
||||
|
"fmt" |
||||
|
"io/ioutil" |
||||
|
"os" |
||||
|
"testing" |
||||
|
|
||||
|
"github.com/ethereum/go-ethereum/ethclient" |
||||
|
"github.com/hermeznetwork/hermez-node/db/historydb" |
||||
|
"github.com/hermeznetwork/hermez-node/db/statedb" |
||||
|
"github.com/hermeznetwork/hermez-node/eth" |
||||
|
"github.com/stretchr/testify/assert" |
||||
|
"github.com/stretchr/testify/require" |
||||
|
) |
||||
|
|
||||
|
func Test(t *testing.T) { |
||||
|
// Int State DB
|
||||
|
dir, err := ioutil.TempDir("", "tmpdb") |
||||
|
require.Nil(t, err) |
||||
|
|
||||
|
sdb, err := statedb.NewStateDB(dir, true, 32) |
||||
|
assert.Nil(t, err) |
||||
|
|
||||
|
// Init History DB
|
||||
|
pass := os.Getenv("POSTGRES_PASS") |
||||
|
historyDB, err := historydb.NewHistoryDB(5432, "localhost", "hermez", pass, "history") |
||||
|
require.Nil(t, err) |
||||
|
err = historyDB.Reorg(0) |
||||
|
assert.Nil(t, err) |
||||
|
|
||||
|
// Init eth client
|
||||
|
ehtClientDialURL := os.Getenv("ETHCLIENT_DIAL_URL") |
||||
|
ethClient, err := ethclient.Dial(ehtClientDialURL) |
||||
|
require.Nil(t, err) |
||||
|
|
||||
|
client := eth.NewClient(ethClient, nil, nil, nil) |
||||
|
|
||||
|
// Create Synchronizer
|
||||
|
s := NewSynchronizer(client, historyDB, sdb) |
||||
|
|
||||
|
// Test Sync
|
||||
|
err = s.Sync() |
||||
|
require.Nil(t, err) |
||||
|
|
||||
|
// TODO: Reorg will be properly tested once we have the mock ethClient implemented
|
||||
|
/* |
||||
|
// Force a Reorg
|
||||
|
lastSavedBlock, err := historyDB.GetLastBlock() |
||||
|
require.Nil(t, err) |
||||
|
|
||||
|
lastSavedBlock.EthBlockNum++ |
||||
|
err = historyDB.AddBlock(lastSavedBlock) |
||||
|
require.Nil(t, err) |
||||
|
|
||||
|
lastSavedBlock.EthBlockNum++ |
||||
|
err = historyDB.AddBlock(lastSavedBlock) |
||||
|
require.Nil(t, err) |
||||
|
|
||||
|
log.Debugf("Wait for the blockchain to generate some blocks...") |
||||
|
time.Sleep(40 * time.Second) |
||||
|
|
||||
|
|
||||
|
err = s.Sync() |
||||
|
require.Nil(t, err) |
||||
|
*/ |
||||
|
|
||||
|
// Close History DB
|
||||
|
if err := historyDB.Close(); err != nil { |
||||
|
fmt.Println("Error closing the history DB:", err) |
||||
|
} |
||||
|
} |