mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-06 19:06:42 +01:00
Fix eth events query and sync inconsistent state
- kvdb - Fix path in Last when doing `setNew` - Only close if db != nil, and after closing, always set db to nil - This will avoid a panic in the case where the db is closed but there's an error soon after, and a future call tries to close again. This is because pebble.Close() will panic if the db is already closed. - Avoid calling pebble methods when a the Storage interface already implements that method (like Close). - statedb - In test, avoid calling KVDB method if the same method is available for the StateDB (like MakeCheckpoint, CurrentBatch). - eth - In *EventByBlock methods, take blockHash as input argument and use it when querying the event logs. Previously the blockHash was only taken from the logs results *only if* there was any log. This caused the following issue: if there was no logs, it was not possible to know if the result was from the expected block or an uncle block! By querying logs by blockHash we make sure that even if there are no logs, they are from the right block. - Note that now the function can either be called with a blockNum or blockHash, but not both at the same time. - sync - If there's an error during call to Sync call resetState, which internally resets the stateDB to avoid stale checkpoints (and a corresponding invalid increase in the StateDB batchNum). - During a Sync, after very batch processed, make sure that the StateDB currentBatch corresponds to the batchNum in the smart contract log/event.
This commit is contained in:
@@ -195,15 +195,16 @@ type Config struct {
|
||||
|
||||
// Synchronizer implements the Synchronizer type
|
||||
type Synchronizer struct {
|
||||
ethClient eth.ClientInterface
|
||||
consts SCConsts
|
||||
historyDB *historydb.HistoryDB
|
||||
stateDB *statedb.StateDB
|
||||
cfg Config
|
||||
initVars SCVariables
|
||||
startBlockNum int64
|
||||
vars SCVariables
|
||||
stats *StatsHolder
|
||||
ethClient eth.ClientInterface
|
||||
consts SCConsts
|
||||
historyDB *historydb.HistoryDB
|
||||
stateDB *statedb.StateDB
|
||||
cfg Config
|
||||
initVars SCVariables
|
||||
startBlockNum int64
|
||||
vars SCVariables
|
||||
stats *StatsHolder
|
||||
resetStateFailed bool
|
||||
}
|
||||
|
||||
// NewSynchronizer creates a new Synchronizer
|
||||
@@ -445,8 +446,10 @@ func (s *Synchronizer) init() error {
|
||||
lastBlock = lastSavedBlock
|
||||
}
|
||||
if err := s.resetState(lastBlock); err != nil {
|
||||
s.resetStateFailed = true
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
s.resetStateFailed = false
|
||||
|
||||
log.Infow("Sync init block",
|
||||
"syncLastBlock", s.stats.Sync.LastBlock,
|
||||
@@ -462,16 +465,37 @@ func (s *Synchronizer) init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Synchronizer) resetIntermediateState() error {
|
||||
lastBlock, err := s.historyDB.GetLastBlock()
|
||||
if tracerr.Unwrap(err) == sql.ErrNoRows {
|
||||
lastBlock = &common.Block{}
|
||||
} else if err != nil {
|
||||
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBlock: %w", err))
|
||||
}
|
||||
if err := s.resetState(lastBlock); err != nil {
|
||||
s.resetStateFailed = true
|
||||
return tracerr.Wrap(fmt.Errorf("resetState at block %v: %w", lastBlock.Num, err))
|
||||
}
|
||||
s.resetStateFailed = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sync2 attems to synchronize an ethereum block starting from lastSavedBlock.
|
||||
// If lastSavedBlock is nil, the lastSavedBlock value is obtained from de DB.
|
||||
// If a block is synched, it will be returned and also stored in the DB. If a
|
||||
// reorg is detected, the number of discarded blocks will be returned and no
|
||||
// synchronization will be made.
|
||||
// TODO: Be smart about locking: only lock during the read/write operations
|
||||
func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) (*common.BlockData, *int64, error) {
|
||||
func (s *Synchronizer) Sync2(ctx context.Context,
|
||||
lastSavedBlock *common.Block) (blockData *common.BlockData, discarded *int64, err error) {
|
||||
if s.resetStateFailed {
|
||||
if err := s.resetIntermediateState(); err != nil {
|
||||
return nil, nil, tracerr.Wrap(err)
|
||||
}
|
||||
}
|
||||
|
||||
var nextBlockNum int64 // next block number to sync
|
||||
if lastSavedBlock == nil {
|
||||
var err error
|
||||
// Get lastSavedBlock from History DB
|
||||
lastSavedBlock, err = s.historyDB.GetLastBlock()
|
||||
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
|
||||
@@ -527,6 +551,20 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
|
||||
}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// If there was an error during sync, reset to the last block
|
||||
// in the historyDB because the historyDB is written last in
|
||||
// the Sync method and is the source of consistency. This
|
||||
// allows reseting the stateDB in the case a batch was
|
||||
// processed but the historyDB block was not committed due to an
|
||||
// error.
|
||||
if err != nil {
|
||||
if err2 := s.resetIntermediateState(); err2 != nil {
|
||||
log.Errorw("sync revert", "err", err2)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Get data from the rollup contract
|
||||
rollupData, err := s.rollupSync(ethBlock)
|
||||
if err != nil {
|
||||
@@ -564,14 +602,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
|
||||
}
|
||||
|
||||
// Group all the block data into the structs to save into HistoryDB
|
||||
blockData := common.BlockData{
|
||||
blockData = &common.BlockData{
|
||||
Block: *ethBlock,
|
||||
Rollup: *rollupData,
|
||||
Auction: *auctionData,
|
||||
WDelayer: *wDelayerData,
|
||||
}
|
||||
|
||||
err = s.historyDB.AddBlockSCData(&blockData)
|
||||
err = s.historyDB.AddBlockSCData(blockData)
|
||||
if err != nil {
|
||||
return nil, nil, tracerr.Wrap(err)
|
||||
}
|
||||
@@ -613,7 +651,7 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
|
||||
)
|
||||
}
|
||||
|
||||
return &blockData, nil, nil
|
||||
return blockData, nil, nil
|
||||
}
|
||||
|
||||
// reorg manages a reorg, updating History and State DB as needed. Keeps
|
||||
@@ -645,14 +683,15 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) {
|
||||
log.Debugw("Discarding blocks", "total", total, "from", uncleBlock.Num, "to", block.Num+1)
|
||||
|
||||
// Set History DB and State DB to the correct state
|
||||
err := s.historyDB.Reorg(block.Num)
|
||||
if err != nil {
|
||||
if err := s.historyDB.Reorg(block.Num); err != nil {
|
||||
return 0, tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
if err := s.resetState(block); err != nil {
|
||||
s.resetStateFailed = true
|
||||
return 0, tracerr.Wrap(err)
|
||||
}
|
||||
s.resetStateFailed = false
|
||||
|
||||
return block.Num, nil
|
||||
}
|
||||
@@ -722,6 +761,11 @@ func (s *Synchronizer) resetState(block *common.Block) error {
|
||||
batchNum = 0
|
||||
}
|
||||
|
||||
err = s.stateDB.Reset(batchNum)
|
||||
if err != nil {
|
||||
return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err))
|
||||
}
|
||||
|
||||
lastL1BatchBlockNum, err := s.historyDB.GetLastL1BatchBlockNum()
|
||||
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
|
||||
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastL1BatchBlockNum: %w", err))
|
||||
@@ -739,11 +783,6 @@ func (s *Synchronizer) resetState(block *common.Block) error {
|
||||
lastForgeL1TxsNum = &n
|
||||
}
|
||||
|
||||
err = s.stateDB.Reset(batchNum)
|
||||
if err != nil {
|
||||
return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err))
|
||||
}
|
||||
|
||||
s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum)
|
||||
|
||||
if err := s.updateCurrentNextSlotIfSync(true, nil); err != nil {
|
||||
@@ -761,19 +800,14 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
|
||||
|
||||
// Get rollup events in the block, and make sure the block hash matches
|
||||
// the expected one.
|
||||
rollupEvents, blockHash, err := s.ethClient.RollupEventsByBlock(blockNum)
|
||||
rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, ðBlock.Hash)
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
// No events in this block
|
||||
if blockHash == nil {
|
||||
if rollupEvents == nil {
|
||||
return &rollupData, nil
|
||||
}
|
||||
if *blockHash != ethBlock.Hash {
|
||||
log.Errorw("Block hash mismatch in Rollup events", "expected", ethBlock.Hash.String(),
|
||||
"got", blockHash.String())
|
||||
return nil, tracerr.Wrap(eth.ErrBlockHashMismatchEvent)
|
||||
}
|
||||
|
||||
var nextForgeL1TxsNum int64 // forgeL1TxsNum for the next L1Batch
|
||||
nextForgeL1TxsNumPtr, err := s.historyDB.GetLastL1TxsNum()
|
||||
@@ -801,7 +835,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
|
||||
forgeBatchArgs, sender, err := s.ethClient.RollupForgeBatchArgs(evtForgeBatch.EthTxHash,
|
||||
evtForgeBatch.L1UserTxsLen)
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
return nil, tracerr.Wrap(fmt.Errorf("RollupForgeBatchArgs: %w", err))
|
||||
}
|
||||
|
||||
batchNum := common.BatchNum(evtForgeBatch.BatchNum)
|
||||
@@ -884,6 +918,10 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
if s.stateDB.CurrentBatch() != batchNum {
|
||||
return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != evtForgeBatch.BatchNum = (%v)",
|
||||
s.stateDB.CurrentBatch(), batchNum))
|
||||
}
|
||||
|
||||
// Transform processed PoolL2 txs to L2 and store in BatchData
|
||||
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
|
||||
@@ -1066,19 +1104,14 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData,
|
||||
var auctionData = common.NewAuctionData()
|
||||
|
||||
// Get auction events in the block
|
||||
auctionEvents, blockHash, err := s.ethClient.AuctionEventsByBlock(blockNum)
|
||||
auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, ðBlock.Hash)
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
// No events in this block
|
||||
if blockHash == nil {
|
||||
if auctionEvents == nil {
|
||||
return &auctionData, nil
|
||||
}
|
||||
if *blockHash != ethBlock.Hash {
|
||||
log.Errorw("Block hash mismatch in Auction events", "expected", ethBlock.Hash.String(),
|
||||
"got", blockHash.String())
|
||||
return nil, tracerr.Wrap(eth.ErrBlockHashMismatchEvent)
|
||||
}
|
||||
|
||||
// Get bids
|
||||
for _, evt := range auctionEvents.NewBid {
|
||||
@@ -1168,19 +1201,14 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat
|
||||
wDelayerData := common.NewWDelayerData()
|
||||
|
||||
// Get wDelayer events in the block
|
||||
wDelayerEvents, blockHash, err := s.ethClient.WDelayerEventsByBlock(blockNum)
|
||||
wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, ðBlock.Hash)
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
// No events in this block
|
||||
if blockHash == nil {
|
||||
if wDelayerEvents == nil {
|
||||
return &wDelayerData, nil
|
||||
}
|
||||
if *blockHash != ethBlock.Hash {
|
||||
log.Errorw("Block hash mismatch in WDelayer events", "expected", ethBlock.Hash.String(),
|
||||
"got", blockHash.String())
|
||||
return nil, tracerr.Wrap(eth.ErrBlockHashMismatchEvent)
|
||||
}
|
||||
|
||||
for _, evt := range wDelayerEvents.Deposit {
|
||||
wDelayerData.Deposits = append(wDelayerData.Deposits, common.WDelayerTransfer{
|
||||
|
||||
Reference in New Issue
Block a user