Compare commits

..

20 Commits

Author SHA1 Message Date
Eduard S
b2b86d0069 Add logs 2021-02-11 18:27:01 +01:00
Eduard S
60b7bef8ab WIP6 2021-02-11 18:27:01 +01:00
Eduard S
bea042ea52 Verify stateroot at sync 2021-02-11 18:27:01 +01:00
Eduard S
db46f61d64 WIP5 2021-02-11 18:26:59 +01:00
Eduard S
cdf110b8e7 WIP4 2021-02-11 18:22:54 +01:00
Eduard S
10edd5f2c2 WIP3 2021-02-11 18:22:54 +01:00
Eduard S
124d2e84f2 WIP 2021-02-11 18:22:54 +01:00
Eduard S
aa9367f7af Make TxManager more robust 2021-02-11 18:22:52 +01:00
Eduard S
7e6d820ac1 WIP 2021-02-11 18:21:36 +01:00
Eduard S
7982d9292a Update coordinator to work better under real net
- cli / node
    - Update handler of SIGINT so that after 3 SIGINTs, the process terminates
      unconditionally
- coordinator
    - Store stats without pointer
    - In all functions that send a variable via channel, check for context done
      to avoid deadlock (due to no process reading from the channel, which has
      no queue) when the node is stopped.
    - Abstract `canForge` so that it can be used outside of the `Coordinator`
    - In `canForge` check the blockNumber in current and next slot.
    - Update tests due to smart contract changes in slot handling, and minimum
      bid defaults
    - TxManager
        - Add consts, vars and stats to allow evaluating `canForge`
        - Add `canForge` method (not used yet)
        - Store batch and nonces status (last success and last pending)
        - Track nonces internally instead of relying on the ethereum node (this
          is required to work with ganache when there are pending txs)
        - Handle the (common) case of the receipt not being found after the tx
          is sent.
        - Don't start the main loop until we get an initial messae fo the stats
          and vars (so that in the loop the stats and vars are set to
          synchronizer values)
- eth / ethereum client
    - Add necessary methods to create the auth object for transactions manually
      so that we can set the nonce, gas price, gas limit, etc manually
    - Update `RollupForgeBatch` to take an auth object as input (so that the
      coordinator can set parameters manually)
- synchronizer
    - In stats, add `NextSlot`
2021-02-11 18:21:36 +01:00
Eduard S
2a77dac9c1 Merge pull request #536 from hermeznetwork/feature/sql-semaphore
Add semaphore for API queries to SQL
2021-02-10 13:47:27 +01:00
Arnau B
ac1fd9acf7 Add semaphore for API queries to SQL 2021-02-10 13:36:17 +01:00
arnau
1bf29636db Merge pull request #537 from hermeznetwork/fix/l2dbreorg
Fix l2db reorg of forging l2txs
2021-02-10 13:02:21 +01:00
Eduard S
2bf3b843ed Fix l2db reorg of forging l2txs 2021-02-09 17:13:29 +01:00
arnau
277a1bc321 Merge pull request #535 from hermeznetwork/fix/sync
Fix synchronizer, add verifier index config param
2021-02-08 17:52:26 +01:00
Eduard S
3181c8738c Fix synchronizer, add verifier index config param
- eth
    - In EventsByBlock calls ignore blockNum if blockHash != nil.  This fixes
      the issue where a blockNumber and blockHash was being passed, which the
      eth events query function doesn't allow, causing the synchronizer to fail
      at every iteration.
- Node/Config
    - Add Coordinator.Debug.RollupVerifierIndex to force choosing a particular
      verifier by index in the Rollup smart contract.
2021-02-08 17:45:33 +01:00
arnau
62df063ccf Merge pull request #534 from hermeznetwork/fix/statedb2
Pass StateDB constructor parameters as Config type
2021-02-08 13:53:43 +01:00
Eduard S
48a538faa3 Pass StateDB constructor parameters as Config type
- KVDB/StateDB
        - Pass config parameters in a Config type instead of using many
          arguments in constructor.
	- Add new parameter `NoLast` which disables having an opened DB with a
	  checkpoint to the last batchNum for thread-safe reads.  Last will be
	  disabled in the StateDB used by the TxSelector and BatchBuilder.
	- Add new parameter `NoGapsCheck` which skips checking gaps in the list
	  of checkpoints and returning errors if there are gaps.  Gaps check
	  will be disabled in the StateDB used by the TxSelector and
	  BatchBuilder, because we expect to have gaps when there are multiple
	  coordinators forging (slots not forged by our coordinator will leave
	  gaps).
2021-02-08 13:46:24 +01:00
arnau
10a34c8801 Merge pull request #532 from hermeznetwork/fix/statedb1
Fix eth events query and sync inconsistent state
2021-02-08 13:21:24 +01:00
Eduard S
6260dfedad Fix eth events query and sync inconsistent state
- kvdb
	- Fix path in Last when doing `setNew`
	- Only close if db != nil, and after closing, always set db to nil
		- This will avoid a panic in the case where the db is closed but
		  there's an error soon after, and a future call tries to close
		  again.  This is because pebble.Close() will panic if the db is
		  already closed.
	- Avoid calling pebble methods when a the Storage interface already
	  implements that method (like Close).
- statedb
	- In test, avoid calling KVDB method if the same method is available for
	  the StateDB (like MakeCheckpoint, CurrentBatch).
- eth
	- In *EventByBlock methods, take blockHash as input argument and use it
	  when querying the event logs.  Previously the blockHash was only taken
	  from the logs results *only if* there was any log.  This caused the
	  following issue: if there was no logs, it was not possible to know if
	  the result was from the expected block or an uncle block!  By querying
	  logs by blockHash we make sure that even if there are no logs, they
	  are from the right block.
	  	- Note that now the function can either be called with a
		  blockNum or blockHash, but not both at the same time.
- sync
	- If there's an error during call to Sync call resetState, which
	  internally resets the stateDB to avoid stale checkpoints (and a
	  corresponding invalid increase in the StateDB batchNum).
	- During a Sync, after very batch processed, make sure that the StateDB
	  currentBatch corresponds to the batchNum in the smart contract
	  log/event.
2021-02-05 16:06:17 +01:00
44 changed files with 1552 additions and 688 deletions

View File

@@ -216,7 +216,7 @@ func TestMain(m *testing.M) {
panic(err)
}
}()
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeTxSelector, NLevels: 0})
if err != nil {
panic(err)
}

View File

@@ -2,6 +2,7 @@ package batchbuilder
import (
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/kvdb"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/txprocessor"
"github.com/hermeznetwork/tracerr"
@@ -28,8 +29,14 @@ type ConfigBatch struct {
// NewBatchBuilder constructs a new BatchBuilder, and executes the bb.Reset
// method
func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchNum common.BatchNum, nLevels uint64) (*BatchBuilder, error) {
localStateDB, err := statedb.NewLocalStateDB(dbpath, 128, synchronizerStateDB,
statedb.TypeBatchBuilder, int(nLevels))
localStateDB, err := statedb.NewLocalStateDB(
statedb.Config{
Path: dbpath,
Keep: kvdb.DefaultKeep,
Type: statedb.TypeBatchBuilder,
NLevels: int(nLevels),
},
synchronizerStateDB)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -47,7 +54,7 @@ func NewBatchBuilder(dbpath string, synchronizerStateDB *statedb.StateDB, batchN
// copy of the rollup state from the Synchronizer at that `batchNum`, otherwise
// it can just roll back the internal copy.
func (bb *BatchBuilder) Reset(batchNum common.BatchNum, fromSynchronizer bool) error {
return bb.localStateDB.Reset(batchNum, fromSynchronizer)
return tracerr.Wrap(bb.localStateDB.Reset(batchNum, fromSynchronizer))
}
// BuildBatch takes the transactions and returns the common.ZKInputs of the next batch

View File

@@ -15,7 +15,7 @@ func TestBatchBuilder(t *testing.T) {
require.Nil(t, err)
defer assert.Nil(t, os.RemoveAll(dir))
synchDB, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 0)
synchDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeBatchBuilder, NLevels: 0})
assert.Nil(t, err)
bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB")

View File

@@ -41,12 +41,15 @@ TokenHEZ = "0x5D94e3e7aeC542aB0F9129B9a7BAdeb5B3Ca0f77"
TokenHEZName = "Hermez Network Token"
[Coordinator]
# ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordinator
ForgerAddress = "0x05c23b938a85ab26A36E6314a0D02080E9ca6BeD" # Non-Boot Coordinator
# ForgerAddressPrivateKey = "0x30f5fddb34cd4166adb2c6003fa6b18f380fd2341376be42cf1c7937004ac7a3"
ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator
# ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator
# ForgerAddressPrivateKey = "0xa8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563"
ConfirmBlocks = 10
L1BatchTimeoutPerc = 0.999
L1BatchTimeoutPerc = 0.6
StartSlotBlocksDelay = 2
ScheduleBatchBlocksAheadCheck = 3
SendBatchBlocksMarginCheck = 1
ProofServerPollInterval = "1s"
ForgeRetryInterval = "500ms"
SyncRetryInterval = "1s"
@@ -85,8 +88,11 @@ ReceiptLoopInterval = "500ms"
CheckLoopInterval = "500ms"
Attempts = 4
AttemptsDelay = "500ms"
TxResendTimeout = "2m"
NoReuseNonce = false
CallGasLimit = 300000
GasPriceDiv = 100
MaxGasPrice = "5000000000"
[Coordinator.EthClient.Keystore]
Path = "/tmp/iden3-test/hermez/ethkeystore"
@@ -98,3 +104,4 @@ Coordinator = true
[Coordinator.Debug]
BatchPath = "/tmp/iden3-test/hermez/batchesdebug"
LightScrypt = true
# RollupVerifierIndex = 0

View File

@@ -27,6 +27,24 @@ type Batch struct {
TotalFeesUSD *float64 `meddler:"total_fees_usd"`
}
// NewEmptyBatch creates a new empty batch
func NewEmptyBatch() *Batch {
return &Batch{
BatchNum: 0,
EthBlockNum: 0,
ForgerAddr: ethCommon.Address{},
CollectedFees: make(map[TokenID]*big.Int),
FeeIdxsCoordinator: make([]Idx, 0),
StateRoot: big.NewInt(0),
NumAccounts: 0,
LastIdx: 0,
ExitRoot: big.NewInt(0),
ForgeL1TxsNum: nil,
SlotNum: 0,
TotalFeesUSD: nil,
}
}
// BatchNum identifies a batch
type BatchNum int64
@@ -75,3 +93,23 @@ func NewBatchData() *BatchData {
Batch: Batch{},
}
}
// BatchSync is a subset of Batch that contains fileds needed for the
// synchronizer and coordinator
// type BatchSync struct {
// BatchNum BatchNum `meddler:"batch_num"`
// EthBlockNum int64 `meddler:"eth_block_num"` // Ethereum block in which the batch is forged
// ForgerAddr ethCommon.Address `meddler:"forger_addr"`
// StateRoot *big.Int `meddler:"state_root,bigint"`
// SlotNum int64 `meddler:"slot_num"` // Slot in which the batch is forged
// }
//
// func NewBatchSync() *BatchSync {
// return &BatchSync{
// BatchNum: 0,
// EthBlockNum: 0,
// ForgerAddr: ethCommon.Address,
// StateRoot: big.NewInt(0),
// SlotNum: 0,
// }
// }

View File

@@ -33,7 +33,8 @@ func (c *AuctionConstants) SlotNum(blockNum int64) int64 {
if blockNum >= c.GenesisBlockNum {
return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot)
}
return -1
// This result will be negative
return (blockNum - c.GenesisBlockNum) / int64(c.BlocksPerSlot)
}
// SlotBlocks returns the first and the last block numbers included in that slot

View File

@@ -3,6 +3,7 @@ package config
import (
"fmt"
"io/ioutil"
"math/big"
"time"
"github.com/BurntSushi/toml"
@@ -51,6 +52,27 @@ type Coordinator struct {
// L1BatchTimeoutPerc is the portion of the range before the L1Batch
// timeout that will trigger a schedule to forge an L1Batch
L1BatchTimeoutPerc float64 `validate:"required"`
// StartSlotBlocksDelay is the number of blocks of delay to wait before
// starting the pipeline when we reach a slot in which we can forge.
StartSlotBlocksDelay int64
// ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which
// the forger address is checked to be allowed to forge (apart from
// checking the next block), used to decide when to stop scheduling new
// batches (by stopping the pipeline).
// For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck
// is 5, eventhough at block 11 we canForge, the pipeline will be
// stopped if we can't forge at block 15.
// This value should be the expected number of blocks it takes between
// scheduling a batch and having it mined.
ScheduleBatchBlocksAheadCheck int64
// SendBatchBlocksMarginCheck is the number of margin blocks ahead in
// which the coordinator is also checked to be allowed to forge, apart
// from the next block; used to decide when to stop sending batches to
// the smart contract.
// For example, if we are at block 10 and SendBatchBlocksMarginCheck is
// 5, eventhough at block 11 we canForge, the batch will be discarded
// if we can't forge at block 15.
SendBatchBlocksMarginCheck int64
// ProofServerPollInterval is the waiting interval between polling the
// ProofServer while waiting for a particular status
ProofServerPollInterval Duration `validate:"required"`
@@ -90,7 +112,6 @@ type Coordinator struct {
} `validate:"required"`
ServerProofs []ServerProof `validate:"required"`
Circuit struct {
// VerifierIdx uint8 `validate:"required"`
// MaxTx is the maximum number of txs supported by the circuit
MaxTx int64 `validate:"required"`
// NLevels is the maximum number of merkle tree levels
@@ -102,6 +123,9 @@ type Coordinator struct {
// calls, except for methods where a particular gas limit is
// harcoded because it's known to be a big value
CallGasLimit uint64 `validate:"required"`
// MaxGasPrice is the maximum gas price allowed for ethereum
// transactions
MaxGasPrice *big.Int `validate:"required"`
// GasPriceDiv is the gas price division
GasPriceDiv uint64 `validate:"required"`
// CheckLoopInterval is the waiting interval between receipt
@@ -113,6 +137,13 @@ type Coordinator struct {
// AttemptsDelay is delay between attempts do do an eth client
// RPC call
AttemptsDelay Duration `validate:"required"`
// TxResendTimeout is the timeout after which a non-mined
// ethereum transaction will be resent (reusing the nonce) with
// a newly calculated gas price
TxResendTimeout Duration `validate:"required"`
// NoReuseNonce disables reusing nonces of pending transactions for
// new replacement transactions
NoReuseNonce bool
// Keystore is the ethereum keystore where private keys are kept
Keystore struct {
// Path to the keystore
@@ -132,6 +163,10 @@ type Coordinator struct {
// LightScrypt if set, uses light parameters for the ethereum
// keystore encryption algorithm.
LightScrypt bool
// RollupVerifierIndex is the index of the verifier to use in
// the Rollup smart contract. The verifier chosen by index
// must match with the Circuit parameters.
RollupVerifierIndex *int
}
}

View File

@@ -47,6 +47,8 @@ type Debug struct {
MineBlockNum int64
// SendBlockNum is the blockNum when the batch was sent to ethereum
SendBlockNum int64
// ResendNum is the number of times the tx has been resent
ResendNum int
// LastScheduledL1BatchBlockNum is the blockNum when the last L1Batch
// was scheduled
LastScheduledL1BatchBlockNum int64
@@ -64,10 +66,17 @@ type Debug struct {
// StartToSendDelay is the delay between starting a batch and sending
// it to ethereum, in seconds
StartToSendDelay float64
// StartToMineDelay is the delay between starting a batch and having
// it mined in seconds
StartToMineDelay float64
// SendToMineDelay is the delay between sending a batch tx and having
// it mined in seconds
SendToMineDelay float64
}
// BatchInfo contans the Batch information
type BatchInfo struct {
PipelineNum int
BatchNum common.BatchNum
ServerProof prover.Client
ZKInputs *common.ZKInputs
@@ -82,9 +91,15 @@ type BatchInfo struct {
CoordIdxs []common.Idx
ForgeBatchArgs *eth.RollupForgeBatchArgs
// FeesInfo
EthTx *types.Transaction
Receipt *types.Receipt
Debug Debug
EthTx *types.Transaction
// SendTimestamp the time of batch sent to ethereum
SendTimestamp time.Time
Receipt *types.Receipt
// Fail is true if:
// - The receipt status is failed
// - A previous parent batch is failed
Fail bool
Debug Debug
}
// DebugStore is a debug function to store the BatchInfo as a json text file in

View File

@@ -3,8 +3,8 @@ package coordinator
import (
"context"
"fmt"
"math/big"
"os"
"strings"
"sync"
"time"
@@ -42,6 +42,29 @@ type Config struct {
// L1BatchTimeoutPerc is the portion of the range before the L1Batch
// timeout that will trigger a schedule to forge an L1Batch
L1BatchTimeoutPerc float64
// StartSlotBlocksDelay is the number of blocks of delay to wait before
// starting the pipeline when we reach a slot in which we can forge.
StartSlotBlocksDelay int64
// ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which
// the forger address is checked to be allowed to forge (apart from
// checking the next block), used to decide when to stop scheduling new
// batches (by stopping the pipeline).
// For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck
// is 5, eventhough at block 11 we canForge, the pipeline will be
// stopped if we can't forge at block 15.
// This value should be the expected number of blocks it takes between
// scheduling a batch and having it mined.
ScheduleBatchBlocksAheadCheck int64
// SendBatchBlocksMarginCheck is the number of margin blocks ahead in
// which the coordinator is also checked to be allowed to forge, apart
// from the next block; used to decide when to stop sending batches to
// the smart contract.
// For example, if we are at block 10 and SendBatchBlocksMarginCheck is
// 5, eventhough at block 11 we canForge, the batch will be discarded
// if we can't forge at block 15.
// This value should be the expected number of blocks it takes between
// sending a batch and having it mined.
SendBatchBlocksMarginCheck int64
// EthClientAttempts is the number of attempts to do an eth client RPC
// call before giving up
EthClientAttempts int
@@ -54,13 +77,25 @@ type Config struct {
// EthClientAttemptsDelay is delay between attempts do do an eth client
// RPC call
EthClientAttemptsDelay time.Duration
// EthTxResendTimeout is the timeout after which a non-mined ethereum
// transaction will be resent (reusing the nonce) with a newly
// calculated gas price
EthTxResendTimeout time.Duration
// EthNoReuseNonce disables reusing nonces of pending transactions for
// new replacement transactions
EthNoReuseNonce bool
// MaxGasPrice is the maximum gas price allowed for ethereum
// transactions
MaxGasPrice *big.Int
// TxManagerCheckInterval is the waiting interval between receipt
// checks of ethereum transactions in the TxManager
TxManagerCheckInterval time.Duration
// DebugBatchPath if set, specifies the path where batchInfo is stored
// in JSON in every step/update of the pipeline
DebugBatchPath string
Purger PurgerCfg
DebugBatchPath string
Purger PurgerCfg
// VerifierIdx is the index of the verifier contract registered in the
// smart contract
VerifierIdx uint8
TxProcessorConfig txprocessor.Config
}
@@ -74,15 +109,22 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
}
}
type fromBatch struct {
BatchNum common.BatchNum
ForgerAddr ethCommon.Address
StateRoot *big.Int
}
// Coordinator implements the Coordinator type
type Coordinator struct {
// State
pipelineBatchNum common.BatchNum // batchNum from which we started the pipeline
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
stats synchronizer.Stats
started bool
pipelineNum int // Pipeline sequential number. The first pipeline is 1
pipelineFromBatch fromBatch // batch from which we started the pipeline
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
stats synchronizer.Stats
started bool
cfg Config
@@ -96,7 +138,8 @@ type Coordinator struct {
wg sync.WaitGroup
cancel context.CancelFunc
pipeline *Pipeline
pipeline *Pipeline
lastNonFailedBatchNum common.BatchNum
purger *Purger
txManager *TxManager
@@ -139,10 +182,15 @@ func NewCoordinator(cfg Config,
ctx, cancel := context.WithCancel(context.Background())
c := Coordinator{
pipelineBatchNum: -1,
provers: serverProofs,
consts: *scConsts,
vars: *initSCVars,
pipelineNum: 0,
pipelineFromBatch: fromBatch{
BatchNum: 0,
ForgerAddr: ethCommon.Address{},
StateRoot: big.NewInt(0),
},
provers: serverProofs,
consts: *scConsts,
vars: *initSCVars,
cfg: cfg,
@@ -183,8 +231,9 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
}
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts)
c.pipelineNum++
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts)
}
// MsgSyncBlock indicates an update to the Synchronizer stats
@@ -205,6 +254,9 @@ type MsgSyncReorg struct {
// MsgStopPipeline indicates a signal to reset the pipeline
type MsgStopPipeline struct {
Reason string
// FailedBatchNum indicates the first batchNum that faile in the
// pipeline. If FailedBatchNum is 0, it should be ignored.
FailedBatchNum common.BatchNum
}
// SendMsg is a thread safe method to pass a message to the Coordinator
@@ -215,27 +267,36 @@ func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) {
}
}
func updateSCVars(vars *synchronizer.SCVariables, update synchronizer.SCVariablesPtr) {
if update.Rollup != nil {
vars.Rollup = *update.Rollup
}
if update.Auction != nil {
vars.Auction = *update.Auction
}
if update.WDelayer != nil {
vars.WDelayer = *update.WDelayer
}
}
func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
c.vars.Rollup = *vars.Rollup
}
if vars.Auction != nil {
c.vars.Auction = *vars.Auction
}
if vars.WDelayer != nil {
c.vars.WDelayer = *vars.WDelayer
}
updateSCVars(&c.vars, vars)
}
func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables,
currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64) bool {
if blockNum < auctionConstants.GenesisBlockNum {
log.Infow("canForge: requested blockNum is < genesis", "blockNum", blockNum,
"genesis", auctionConstants.GenesisBlockNum)
return false
}
var slot *common.Slot
if currentSlot.StartBlock <= blockNum && blockNum <= currentSlot.EndBlock {
slot = currentSlot
} else if nextSlot.StartBlock <= blockNum && blockNum <= nextSlot.EndBlock {
slot = nextSlot
} else {
log.Warnw("Coordinator: requested blockNum for canForge is outside slot",
log.Warnw("canForge: requested blockNum is outside current and next slot",
"blockNum", blockNum, "currentSlot", currentSlot,
"nextSlot", nextSlot,
)
@@ -244,16 +305,23 @@ func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.Auc
anyoneForge := false
if !slot.ForgerCommitment &&
auctionConstants.RelativeBlock(blockNum) >= int64(auctionVars.SlotDeadline) {
log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)",
log.Debugw("canForge: anyone can forge in the current slot (slotDeadline passed)",
"block", blockNum)
anyoneForge = true
}
if slot.Forger == addr || anyoneForge {
return true
}
log.Debugw("canForge: can't forge", "slot.Forger", slot.Forger)
return false
}
func (c *Coordinator) canForgeAt(blockNum int64) bool {
return canForge(&c.consts.Auction, &c.vars.Auction,
&c.stats.Sync.Auction.CurrentSlot, &c.stats.Sync.Auction.NextSlot,
c.cfg.ForgerAddress, blockNum)
}
func (c *Coordinator) canForge() bool {
blockNum := c.stats.Eth.LastBlock.Num + 1
return canForge(&c.consts.Auction, &c.vars.Auction,
@@ -262,12 +330,24 @@ func (c *Coordinator) canForge() bool {
}
func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error {
canForge := c.canForge()
nextBlock := c.stats.Eth.LastBlock.Num + 1
canForge := c.canForgeAt(nextBlock)
if c.cfg.ScheduleBatchBlocksAheadCheck != 0 && canForge {
canForge = c.canForgeAt(nextBlock + c.cfg.ScheduleBatchBlocksAheadCheck)
}
if c.pipeline == nil {
if canForge {
relativeBlock := c.consts.Auction.RelativeBlock(nextBlock)
if canForge && relativeBlock < c.cfg.StartSlotBlocksDelay {
log.Debugw("Coordinator: delaying pipeline start due to "+
"relativeBlock (%v) < cfg.StartSlotBlocksDelay (%v)",
relativeBlock, c.cfg.StartSlotBlocksDelay)
} else if canForge {
log.Infow("Coordinator: forging state begin", "block",
stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch)
batchNum := common.BatchNum(stats.Sync.LastBatch)
stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum)
batchNum := stats.Sync.LastBatch.BatchNum
if c.lastNonFailedBatchNum > batchNum {
batchNum = c.lastNonFailedBatchNum
}
var err error
if c.pipeline, err = c.newPipeline(ctx); err != nil {
return tracerr.Wrap(err)
@@ -276,7 +356,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
c.pipeline = nil
return tracerr.Wrap(err)
}
c.pipelineBatchNum = batchNum
// c.pipelineBatchNum = batchNum
}
} else {
if !canForge {
@@ -293,18 +373,17 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
// return err
// }
// }
if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) {
if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil {
return tracerr.Wrap(err)
}
}
_, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
if err != nil {
// if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) {
// if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil {
// return tracerr.Wrap(err)
// }
// }
if _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)); err != nil {
return tracerr.Wrap(err)
}
_, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
if err != nil {
if _, err := c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num,
int64(stats.Sync.LastBatch.BatchNum)); err != nil {
return tracerr.Wrap(err)
}
}
@@ -331,33 +410,42 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error
if c.pipeline != nil {
c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars)
}
if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum",
"sync.LastBatch", c.stats.Sync.LastBatch,
"c.pipelineBatchNum", c.pipelineBatchNum)
if err := c.handleStopPipeline(ctx, "reorg"); err != nil {
if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress &&
c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 {
// There's been a reorg and the batch state root from which the
// pipeline was started has changed (probably because it was in
// a block that was discarded), and it was sent by a different
// coordinator than us. That batch may never be in the main
// chain, so we stop the pipeline (it will be started again
// once the node is in sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch.ForgerAddr != cfg.ForgerAddr "+
"& sync.LastBatch.StateRoot != pipelineFromBatch.StateRoot",
"sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot,
"pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot)
c.txManager.DiscardPipeline(ctx, c.pipelineNum)
if err := c.handleStopPipeline(ctx, "reorg", 0); err != nil {
return tracerr.Wrap(err)
}
}
return nil
}
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error {
if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil {
return tracerr.Wrap(err)
// handleStopPipeline handles stopping the pipeline. If failedBatchNum is 0,
// the next pipeline will start from the last state of the synchronizer,
// otherwise, it will state from failedBatchNum-1.
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string, failedBatchNum common.BatchNum) error {
batchNum := c.stats.Sync.LastBatch.BatchNum
if failedBatchNum != 0 {
batchNum = failedBatchNum - 1
}
if c.pipeline != nil {
c.pipeline.Stop(c.ctx)
c.pipeline = nil
}
if strings.Contains(reason, common.AuctionErrMsgCannotForge) { //nolint:staticcheck
// TODO: Check that we are in a slot in which we can't forge
if err := c.l2DB.Reorg(batchNum); err != nil {
return tracerr.Wrap(err)
}
c.lastNonFailedBatchNum = batchNum
return nil
}
@@ -373,7 +461,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error {
}
case MsgStopPipeline:
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
if err := c.handleStopPipeline(ctx, msg.Reason); err != nil {
if err := c.handleStopPipeline(ctx, msg.Reason, msg.FailedBatchNum); err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err))
}
default:

View File

@@ -2,6 +2,7 @@ package coordinator
import (
"context"
"errors"
"fmt"
"io/ioutil"
"math/big"
@@ -11,6 +12,7 @@ import (
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
dbUtils "github.com/hermeznetwork/hermez-node/db"
@@ -97,7 +99,8 @@ func newTestModules(t *testing.T) modules {
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err)
deleteme = append(deleteme, syncDBPath)
syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48)
syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 48})
assert.NoError(t, err)
pass := os.Getenv("POSTGRES_PASS")
@@ -260,8 +263,8 @@ func TestCoordinatorFlow(t *testing.T) {
var stats synchronizer.Stats
stats.Eth.LastBlock = *ethClient.CtlLastBlock()
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Eth.LastBatch = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch = stats.Eth.LastBatch
stats.Eth.LastBatchNum = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch.BatchNum = common.BatchNum(stats.Eth.LastBatchNum)
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
require.NoError(t, err)
var slot common.Slot
@@ -278,7 +281,7 @@ func TestCoordinatorFlow(t *testing.T) {
// Copy stateDB to synchronizer if there was a new batch
source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch)
dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch)
if stats.Sync.LastBatch != 0 {
if stats.Sync.LastBatch.BatchNum != 0 {
if _, err := os.Stat(dest); os.IsNotExist(err) {
log.Infow("Making pebble checkpoint for sync",
"source", source, "dest", dest)
@@ -565,3 +568,8 @@ func TestCoordinatorStress(t *testing.T) {
// TODO: Test forgeBatch
// TODO: Test waitServerProof
// TODO: Test handleReorg
func TestFoo(t *testing.T) {
a := tracerr.Wrap(fmt.Errorf("AAA: %w", core.ErrNonceTooLow))
fmt.Println(errors.Is(a, core.ErrNonceTooLow))
}

View File

@@ -2,6 +2,7 @@ package coordinator
import (
"context"
"database/sql"
"fmt"
"math/big"
"sync"
@@ -24,19 +25,30 @@ type statsVars struct {
Vars synchronizer.SCVariablesPtr
}
type state struct {
batchNum common.BatchNum
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
}
// Pipeline manages the forging of batches with parallel server proofs
type Pipeline struct {
num int
cfg Config
consts synchronizer.SCConsts
// state
batchNum common.BatchNum
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
started bool
state state
// batchNum common.BatchNum
// lastScheduledL1BatchBlockNum int64
// lastForgeL1TxsNum int64
started bool
rw sync.RWMutex
errAtBatchNum common.BatchNum
proversPool *ProversPool
provers []prover.Client
coord *Coordinator
txManager *TxManager
historyDB *historydb.HistoryDB
l2DB *l2db.L2DB
@@ -53,14 +65,28 @@ type Pipeline struct {
cancel context.CancelFunc
}
func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) {
p.rw.Lock()
defer p.rw.Unlock()
p.errAtBatchNum = batchNum
}
func (p *Pipeline) getErrAtBatchNum() common.BatchNum {
p.rw.RLock()
defer p.rw.RUnlock()
return p.errAtBatchNum
}
// NewPipeline creates a new Pipeline
func NewPipeline(ctx context.Context,
cfg Config,
num int, // Pipeline sequential number
historyDB *historydb.HistoryDB,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
batchBuilder *batchbuilder.BatchBuilder,
purger *Purger,
coord *Coordinator,
txManager *TxManager,
provers []prover.Client,
scConsts *synchronizer.SCConsts,
@@ -79,6 +105,7 @@ func NewPipeline(ctx context.Context,
return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
}
return &Pipeline{
num: num,
cfg: cfg,
historyDB: historyDB,
l2DB: l2DB,
@@ -87,6 +114,7 @@ func NewPipeline(ctx context.Context,
provers: provers,
proversPool: proversPool,
purger: purger,
coord: coord,
txManager: txManager,
consts: *scConsts,
statsVarsCh: make(chan statsVars, queueLen),
@@ -104,33 +132,67 @@ func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Sta
// reset pipeline state
func (p *Pipeline) reset(batchNum common.BatchNum,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
p.batchNum = batchNum
p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
p.state = state{
batchNum: batchNum,
lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum,
lastScheduledL1BatchBlockNum: 0,
}
p.stats = *stats
p.vars = *vars
p.lastScheduledL1BatchBlockNum = 0
err := p.txSelector.Reset(p.batchNum)
// Reset the StateDB in TxSelector and BatchBuilder from the
// synchronizer only if the checkpoint we reset from either:
// a. Doesn't exist in the TxSelector/BatchBuilder
// b. The batch has already been synced by the synchronizer and has a
// different MTRoot than the BatchBuilder
// Otherwise, reset from the local checkpoint.
// First attempt to reset from local checkpoint if such checkpoint exists
existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum)
if err != nil {
return tracerr.Wrap(err)
}
err = p.batchBuilder.Reset(p.batchNum, true)
fromSynchronizerTxSelector := !existsTxSelector
if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil {
return tracerr.Wrap(err)
}
existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum)
if err != nil {
return tracerr.Wrap(err)
}
fromSynchronizerBatchBuilder := !existsBatchBuilder
if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil {
return tracerr.Wrap(err)
}
// After reset, check that if the batch exists in the historyDB, the
// stateRoot matches with the local one, if not, force a reset from
// synchronizer
batch, err := p.historyDB.GetBatch(p.state.batchNum)
if tracerr.Unwrap(err) == sql.ErrNoRows {
// nothing to do
} else if err != nil {
return tracerr.Wrap(err)
} else {
localStateRoot := p.batchBuilder.LocalStateDB().MT.Root().BigInt()
if batch.StateRoot.Cmp(localStateRoot) != 0 {
log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+
"Forcing reset from Synchronizer", localStateRoot, batch.StateRoot)
// StateRoot from synchronizer doesn't match StateRoot
// from batchBuilder, force a reset from synchronizer
if err := p.txSelector.Reset(p.state.batchNum, true); err != nil {
return tracerr.Wrap(err)
}
if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil {
return tracerr.Wrap(err)
}
}
}
return nil
}
func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
p.vars.Rollup = *vars.Rollup
}
if vars.Auction != nil {
p.vars.Auction = *vars.Auction
}
if vars.WDelayer != nil {
p.vars.WDelayer = *vars.WDelayer
}
updateSCVars(&p.vars, vars)
}
// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
@@ -143,7 +205,7 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu
} else if err != nil {
if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
"lastForgeL1TxsNum", p.lastForgeL1TxsNum,
"lastForgeL1TxsNum", p.state.lastForgeL1TxsNum,
"syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
} else {
log.Errorw("forgeBatch", "err", err)
@@ -199,16 +261,35 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.stats = statsVars.Stats
p.syncSCVars(statsVars.Vars)
case <-time.After(waitDuration):
batchNum = p.batchNum + 1
if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil {
waitDuration = p.cfg.SyncRetryInterval
// Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we
// wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 {
waitDuration = p.cfg.ForgeRetryInterval
continue
} else {
p.batchNum = batchNum
select {
case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done():
}
}
batchNum = p.state.batchNum + 1
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
if p.ctx.Err() != nil {
continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
waitDuration = p.cfg.ForgeRetryInterval
continue
} else if err != nil {
p.setErrAtBatchNum(batchNum)
waitDuration = p.cfg.ForgeRetryInterval
p.coord.SendMsg(p.ctx, MsgStopPipeline{
Reason: fmt.Sprintf(
"Pipeline.handleForgBatch: %v", err),
FailedBatchNum: batchNum,
})
continue
}
p.state.batchNum = batchNum
select {
case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done():
}
}
}
@@ -223,16 +304,28 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.wg.Done()
return
case batchInfo := <-batchChSentServerProof:
// Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we
// wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 {
continue
}
err := p.waitServerProof(p.ctx, batchInfo)
// We are done with this serverProof, add it back to the pool
p.proversPool.Add(p.ctx, batchInfo.ServerProof)
batchInfo.ServerProof = nil
if p.ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("waitServerProof", "err", err)
p.setErrAtBatchNum(batchInfo.BatchNum)
p.coord.SendMsg(p.ctx, MsgStopPipeline{
Reason: fmt.Sprintf(
"Pipeline.waitServerProof: %v", err),
FailedBatchNum: batchInfo.BatchNum,
})
continue
}
// We are done with this serverProof, add it back to the pool
p.proversPool.Add(p.ctx, batchInfo.ServerProof)
// batchInfo.ServerProof = nil
p.txManager.AddBatch(p.ctx, batchInfo)
}
}
@@ -282,8 +375,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
if err != nil {
return nil, tracerr.Wrap(err)
}
batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
// Structure to accumulate data and metadata of the batch
batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum}
batchInfo.Debug.StartTimestamp = time.Now()
batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
@@ -298,22 +391,19 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
var auths [][]byte
var coordIdxs []common.Idx
// TODO: If there are no txs and we are behind the timeout, skip
// forging a batch and return a particular error that can be handleded
// in the loop where handleForgeBatch is called to retry after an
// interval
// 1. Decide if we forge L2Tx or L1+L2Tx
if p.shouldL1L2Batch(batchInfo) {
batchInfo.L1Batch = true
defer func() {
// If there's no error, update the parameters related
// to the last L1Batch forged
if err == nil {
p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
p.lastForgeL1TxsNum++
}
}()
if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
return nil, tracerr.Wrap(errLastL1BatchNotSynced)
}
// 2a: L1+L2 txs
l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -322,6 +412,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
if err != nil {
return nil, tracerr.Wrap(err)
}
p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
p.state.lastForgeL1TxsNum++
} else {
// 2b: only L2 txs
coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
@@ -397,12 +490,12 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
// Take the lastL1BatchBlockNum as the biggest between the last
// scheduled one, and the synchronized one.
lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
lastL1BatchBlockNum := p.state.lastScheduledL1BatchBlockNum
if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
}
// Set Debug information
batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
batchInfo.Debug.LastScheduledL1BatchBlockNum = p.state.lastScheduledL1BatchBlockNum
batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
batchInfo.Debug.L1BatchBlockScheduleDeadline =

View File

@@ -77,7 +77,7 @@ func TestPipelineShouldL1L2Batch(t *testing.T) {
//
// Scheduled L1Batch
//
pipeline.lastScheduledL1BatchBlockNum = startBlock
pipeline.state.lastScheduledL1BatchBlockNum = startBlock
stats.Sync.LastL1BatchBlock = startBlock - 10
// We are are one block before the timeout range * 0.5
@@ -172,7 +172,7 @@ func TestPipelineForgeBatchWithTxs(t *testing.T) {
// users with positive balances
tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB)
syncStats := sync.Stats()
batchNum := common.BatchNum(syncStats.Sync.LastBatch)
batchNum := syncStats.Sync.LastBatch.BatchNum
syncSCVars := sync.SCVars()
pipeline, err := coord.newPipeline(ctx)

View File

@@ -28,12 +28,14 @@ func newStateDB(t *testing.T) *statedb.LocalStateDB {
syncDBPath, err := ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err)
deleteme = append(deleteme, syncDBPath)
syncStateDB, err := statedb.NewStateDB(syncDBPath, 128, statedb.TypeSynchronizer, 48)
syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 48})
assert.NoError(t, err)
stateDBPath, err := ioutil.TempDir("", "tmpStateDB")
require.NoError(t, err)
deleteme = append(deleteme, stateDBPath)
stateDB, err := statedb.NewLocalStateDB(stateDBPath, 128, syncStateDB, statedb.TypeTxSelector, 0)
stateDB, err := statedb.NewLocalStateDB(statedb.Config{Path: stateDBPath, Keep: 128,
Type: statedb.TypeTxSelector, NLevels: 0}, syncStateDB)
require.NoError(t, err)
return stateDB
}

View File

@@ -2,6 +2,7 @@ package coordinator
import (
"context"
"errors"
"fmt"
"math/big"
"time"
@@ -9,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/l2db"
@@ -35,12 +37,22 @@ type TxManager struct {
vars synchronizer.SCVariables
statsVarsCh chan statsVars
queue []*BatchInfo
discardPipelineCh chan int // int refers to the pipelineNum
minPipelineNum int
queue Queue
// lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
lastSuccessBatch common.BatchNum
lastPendingBatch common.BatchNum
lastSuccessNonce uint64
lastPendingNonce uint64
// lastPendingBatch common.BatchNum
// accNonce is the account nonce in the last mined block (due to mined txs)
accNonce uint64
// accNextNonce is the nonce that we should use to send the next tx.
// In some cases this will be a reused nonce of an already pending tx.
accNextNonce uint64
// accPendingNonce is the pending nonce of the account due to pending txs
// accPendingNonce uint64
lastSentL1BatchBlockNum int64
}
// NewTxManager creates a new TxManager
@@ -54,26 +66,27 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac
if err != nil {
return nil, tracerr.Wrap(err)
}
lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
accNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
if err != nil {
return nil, err
}
lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
if err != nil {
return nil, err
}
if lastSuccessNonce != lastPendingNonce {
return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)",
lastSuccessNonce, lastPendingNonce))
}
log.Infow("TxManager started", "nonce", lastSuccessNonce)
// accPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
// if err != nil {
// return nil, err
// }
// if accNonce != accPendingNonce {
// return nil, tracerr.Wrap(fmt.Errorf("currentNonce (%v) != accPendingNonce (%v)",
// accNonce, accPendingNonce))
// }
log.Infow("TxManager started", "nonce", accNonce)
return &TxManager{
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
statsVarsCh: make(chan statsVars, queueLen),
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
statsVarsCh: make(chan statsVars, queueLen),
discardPipelineCh: make(chan int, queueLen),
account: accounts.Account{
Address: *address,
},
@@ -82,8 +95,11 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac
vars: *initSCVars,
lastSuccessNonce: lastSuccessNonce,
lastPendingNonce: lastPendingNonce,
minPipelineNum: 0,
queue: NewQueue(),
accNonce: accNonce,
accNextNonce: accNonce,
// accPendingNonce: accPendingNonce,
}, nil
}
@@ -104,16 +120,17 @@ func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.St
}
}
// DiscardPipeline is a thread safe method to notify about a discarded pipeline
// due to a reorg
func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) {
select {
case t.discardPipelineCh <- pipelineNum:
case <-ctx.Done():
}
}
func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
t.vars.Rollup = *vars.Rollup
}
if vars.Auction != nil {
t.vars.Auction = *vars.Auction
}
if vars.WDelayer != nil {
t.vars.WDelayer = *vars.WDelayer
}
updateSCVars(&t.vars, vars)
}
// NewAuth generates a new auth object for an ethereum transaction
@@ -123,6 +140,7 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
return nil, tracerr.Wrap(err)
}
inc := new(big.Int).Set(gasPrice)
// TODO: Replace this by a value of percentage
const gasPriceDiv = 100
inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv))
gasPrice.Add(gasPrice, inc)
@@ -141,29 +159,75 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
return auth, nil
}
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
// TODO: Check if we can forge in the next blockNum, abort if we can't
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
batchInfo.Debug.SendTimestamp = time.Now()
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error {
nextBlock := t.stats.Eth.LastBlock.Num + 1
if !t.canForgeAt(nextBlock) {
return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock))
}
if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch {
return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock))
}
margin := t.cfg.SendBatchBlocksMarginCheck
if margin != 0 {
if !t.canForgeAt(nextBlock + margin) {
return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v",
margin, nextBlock))
}
if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch {
return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v",
margin, nextBlock))
}
}
return nil
}
func addPerc(v *big.Int, p int64) *big.Int {
r := new(big.Int).Set(v)
r.Mul(r, big.NewInt(p))
// nolint reason: to calculate percetnages we divide by 100
r.Div(r, big.NewInt(100)) //nolit:gomnd
return r.Add(v, r)
}
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error {
var ethTx *types.Transaction
var err error
auth, err := t.NewAuth(ctx)
if err != nil {
return tracerr.Wrap(err)
}
auth.Nonce = big.NewInt(int64(t.lastPendingNonce))
t.lastPendingNonce++
auth.Nonce = big.NewInt(int64(t.accNextNonce))
if resend {
auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce()))
}
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 {
return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)",
auth.GasPrice, t.cfg.MaxGasPrice))
}
// RollupForgeBatch() calls ethclient.SendTransaction()
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
if err != nil {
// if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
// log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err,
// "block", t.stats.Eth.LastBlock.Num+1)
// return tracerr.Wrap(err)
// }
if errors.Is(err, core.ErrNonceTooLow) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Add(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrNonceTooHigh) {
log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Sub(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrUnderpriced) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if errors.Is(err, core.ErrReplaceUnderpriced) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if err != nil {
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
"batchNum", batchInfo.BatchNum)
@@ -179,10 +243,30 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
}
if !resend {
t.accNextNonce = auth.Nonce.Uint64() + 1
}
batchInfo.EthTx = ethTx
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash())
now := time.Now()
batchInfo.SendTimestamp = now
if resend {
batchInfo.Debug.ResendNum++
}
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
t.cfg.debugBatchStore(batchInfo)
t.lastPendingBatch = batchInfo.BatchNum
// t.lastPendingBatch = batchInfo.BatchNum
if !resend {
if batchInfo.L1Batch {
t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1
}
}
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
return tracerr.Wrap(err)
}
@@ -225,13 +309,19 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B
func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo.Receipt
if receipt != nil {
if batchInfo.EthTx.Nonce()+1 > t.accNonce {
t.accNonce = batchInfo.EthTx.Nonce() + 1
}
if receipt.Status == types.ReceiptStatusFailed {
batchInfo.Debug.Status = StatusFailed
t.cfg.debugBatchStore(batchInfo)
_, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(),
log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash,
"batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
"err", err)
if batchInfo.BatchNum <= t.lastSuccessBatch {
t.lastSuccessBatch = batchInfo.BatchNum - 1
}
return nil, tracerr.Wrap(fmt.Errorf(
"ethereum transaction receipt status is failed: %w", err))
} else if receipt.Status == types.ReceiptStatusSuccessful {
@@ -239,6 +329,17 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
batchInfo.Debug.StartBlockNum
if batchInfo.Debug.StartToMineDelay == 0 {
if block, err := t.ethClient.EthBlockByNumber(ctx,
receipt.BlockNumber.Int64()); err != nil {
log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err)
} else {
batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub(
batchInfo.Debug.SendTimestamp).Seconds()
batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
}
}
t.cfg.debugBatchStore(batchInfo)
if batchInfo.BatchNum > t.lastSuccessBatch {
t.lastSuccessBatch = batchInfo.BatchNum
@@ -250,9 +351,72 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i
return nil, nil
}
// TODO:
// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions)
// Queue of BatchInfos
type Queue struct {
list []*BatchInfo
// nonceByBatchNum map[common.BatchNum]uint64
next int
}
// NewQueue returns a new queue
func NewQueue() Queue {
return Queue{
list: make([]*BatchInfo, 0),
// nonceByBatchNum: make(map[common.BatchNum]uint64),
next: 0,
}
}
// Len is the length of the queue
func (q *Queue) Len() int {
return len(q.list)
}
// At returns the BatchInfo at position (or nil if position is out of bounds)
func (q *Queue) At(position int) *BatchInfo {
if position >= len(q.list) {
return nil
}
return q.list[position]
}
// Next returns the next BatchInfo (or nil if queue is empty)
func (q *Queue) Next() (int, *BatchInfo) {
if len(q.list) == 0 {
return 0, nil
}
defer func() { q.next = (q.next + 1) % len(q.list) }()
return q.next, q.list[q.next]
}
// Remove removes the BatchInfo at position
func (q *Queue) Remove(position int) {
// batchInfo := q.list[position]
// delete(q.nonceByBatchNum, batchInfo.BatchNum)
q.list = append(q.list[:position], q.list[position+1:]...)
if len(q.list) == 0 {
q.next = 0
} else {
q.next = position % len(q.list)
}
}
// Push adds a new BatchInfo
func (q *Queue) Push(batchInfo *BatchInfo) {
q.list = append(q.list, batchInfo)
// q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce()
}
// func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) {
// nonce, ok := q.nonceByBatchNum[batchNum]
// return nonce, ok
// }
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
next := 0
waitDuration := longWaitDuration
var statsVars statsVars
@@ -263,7 +427,7 @@ func (t *TxManager) Run(ctx context.Context) {
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch)
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
for {
select {
@@ -273,8 +437,27 @@ func (t *TxManager) Run(ctx context.Context) {
case statsVars := <-t.statsVarsCh:
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
case pipelineNum := <-t.discardPipelineCh:
t.minPipelineNum = pipelineNum + 1
if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("TxManager: removeBadBatchInfos", "err", err)
continue
}
case batchInfo := <-t.batchCh:
if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
if batchInfo.PipelineNum < t.minPipelineNum {
log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum",
"num", batchInfo.PipelineNum, "minNum", t.minPipelineNum)
}
if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil {
log.Warnw("TxManager: shouldSend", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)})
continue
}
if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil {
continue
} else if err != nil {
// If we reach here it's because our ethNode has
@@ -282,19 +465,20 @@ func (t *TxManager) Run(ctx context.Context) {
// ethereum. This could be due to the ethNode
// failure, or an invalid transaction (that
// can't be mined)
t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch send: %v", err)})
log.Warnw("TxManager: forgeBatch send failed", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch send: %v", err)})
continue
}
t.queue = append(t.queue, batchInfo)
t.queue.Push(batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil {
waitDuration = longWaitDuration
continue
}
current := next
next = (current + 1) % len(t.queue)
batchInfo := t.queue[current]
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck
@@ -304,7 +488,8 @@ func (t *TxManager) Run(ctx context.Context) {
// if it was not mined, mined and succesfull or
// mined and failed. This could be due to the
// ethNode failure.
t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
}
confirm, err := t.handleReceipt(ctx, batchInfo)
@@ -312,32 +497,108 @@ func (t *TxManager) Run(ctx context.Context) {
continue
} else if err != nil { //nolint:staticcheck
// Transaction was rejected
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
next = 0
} else {
next = current % len(t.queue)
if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("TxManager: removeBadBatchInfos", "err", err)
continue
}
t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
continue
}
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager tx for RollupForgeBatch confirmed",
"batch", batchInfo.BatchNum)
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
next = 0
} else {
next = current % len(t.queue)
now := time.Now()
if !t.cfg.EthNoReuseNonce && confirm == nil &&
now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout {
log.Infow("TxManager: forgeBatch tx not been mined timeout, resending",
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil {
continue
} else if err != nil {
// If we reach here it's because our ethNode has
// been unable to send the transaction to
// ethereum. This could be due to the ethNode
// failure, or an invalid transaction (that
// can't be mined)
log.Warnw("TxManager: forgeBatch resend failed", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch resend: %v", err)})
continue
}
}
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager: forgeBatch tx confirmed",
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
t.queue.Remove(queuePosition)
}
}
}
}
// nolint reason: this function will be used in the future
//nolint:unused
func (t *TxManager) canForge(stats *synchronizer.Stats, blockNum int64) bool {
func (t *TxManager) removeBadBatchInfos(ctx context.Context) error {
next := 0
// batchNum := 0
for {
batchInfo := t.queue.At(next)
if batchInfo == nil {
break
}
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
return nil
} else if err != nil {
// Our ethNode is giving an error different
// than "not found" when getting the receipt
// for the transaction, so we can't figure out
// if it was not mined, mined and succesfull or
// mined and failed. This could be due to the
// ethNode failure.
next++
continue
}
confirm, err := t.handleReceipt(ctx, batchInfo)
if ctx.Err() != nil {
return nil
} else if err != nil {
// Transaction was rejected
if t.minPipelineNum <= batchInfo.PipelineNum {
t.minPipelineNum = batchInfo.PipelineNum + 1
}
t.queue.Remove(next)
continue
}
// If tx is pending but is from a cancelled pipeline, remove it
// from the queue
if confirm == nil {
if batchInfo.PipelineNum < t.minPipelineNum {
// batchNum++
t.queue.Remove(next)
continue
}
}
next++
}
accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil)
if err != nil {
return err
}
if !t.cfg.EthNoReuseNonce {
t.accNextNonce = accNonce
}
return nil
}
func (t *TxManager) canForgeAt(blockNum int64) bool {
return canForge(&t.consts.Auction, &t.vars.Auction,
&stats.Sync.Auction.CurrentSlot, &stats.Sync.Auction.NextSlot,
&t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot,
t.cfg.ForgerAddress, blockNum)
}
func (t *TxManager) mustL1L2Batch(blockNum int64) bool {
lastL1BatchBlockNum := t.lastSentL1BatchBlockNum
if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock
}
return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1
}

View File

@@ -0,0 +1,15 @@
package coordinator
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAddPerc(t *testing.T) {
assert.Equal(t, "110", addPerc(big.NewInt(100), 10).String())
assert.Equal(t, "101", addPerc(big.NewInt(100), 1).String())
assert.Equal(t, "12", addPerc(big.NewInt(10), 20).String())
assert.Equal(t, "1500", addPerc(big.NewInt(1000), 50).String())
}

View File

@@ -164,6 +164,19 @@ func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error {
return nil
}
// GetBatch returns the batch with the given batchNum
func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error) {
var batch common.Batch
err := meddler.QueryRow(
hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr,
batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root,
batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num,
batch.slot_num, batch.total_fees_usd FROM batch WHERE batch_num = $1;`,
batchNum,
)
return &batch, err
}
// GetAllBatches retrieve all batches from the DB
func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) {
var batches []*common.Batch
@@ -208,6 +221,18 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) {
return batchNum, tracerr.Wrap(row.Scan(&batchNum))
}
// GetLastBatch returns the last forged batch
func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) {
var batch common.Batch
err := meddler.QueryRow(
hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr,
batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root,
batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num,
batch.slot_num, batch.total_fees_usd FROM batch ORDER BY batch_num DESC LIMIT 1;`,
)
return &batch, err
}
// GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch
func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) {
row := hdb.db.QueryRow(`SELECT eth_block_num FROM batch

View File

@@ -203,6 +203,10 @@ func TestBatches(t *testing.T) {
fetchedLastBatchNum, err := historyDB.GetLastBatchNum()
assert.NoError(t, err)
assert.Equal(t, batches[len(batches)-1].BatchNum, fetchedLastBatchNum)
// Test GetLastBatch
fetchedLastBatch, err := historyDB.GetLastBatch()
assert.NoError(t, err)
assert.Equal(t, &batches[len(batches)-1], fetchedLastBatch)
// Test GetLastL1TxsNum
fetchedLastL1TxsNum, err := historyDB.GetLastL1TxsNum()
assert.NoError(t, err)
@@ -211,6 +215,12 @@ func TestBatches(t *testing.T) {
fetchedLastL1BatchBlockNum, err := historyDB.GetLastL1BatchBlockNum()
assert.NoError(t, err)
assert.Equal(t, lastL1BatchBlockNum, fetchedLastL1BatchBlockNum)
// Test GetBatch
fetchedBatch, err := historyDB.GetBatch(1)
require.NoError(t, err)
assert.Equal(t, &batches[0], fetchedBatch)
_, err = historyDB.GetBatch(common.BatchNum(len(batches) + 1))
assert.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err))
}
func TestBids(t *testing.T) {

View File

@@ -27,6 +27,8 @@ const (
// PathLast defines the subpath of the last Batch in the subpath
// of the StateDB
PathLast = "last"
// DefaultKeep is the default value for the Keep parameter
DefaultKeep = 128
)
var (
@@ -34,16 +36,18 @@ var (
KeyCurrentBatch = []byte("k:currentbatch")
// keyCurrentIdx is used as key in the db to store the CurrentIdx
keyCurrentIdx = []byte("k:idx")
// ErrNoLast is returned when the KVDB has been configured to not have
// a Last checkpoint but a Last method is used
ErrNoLast = fmt.Errorf("no last checkpoint")
)
// KVDB represents the Key-Value DB object
type KVDB struct {
path string
db *pebble.Storage
cfg Config
db *pebble.Storage
// CurrentIdx holds the current Idx that the BatchBuilder is using
CurrentIdx common.Idx
CurrentBatch common.BatchNum
keep int
m sync.Mutex
last *Last
}
@@ -61,13 +65,13 @@ func (k *Last) setNew() error {
defer k.rw.Unlock()
if k.db != nil {
k.db.Close()
k.db = nil
}
lastPath := path.Join(k.path, PathLast)
err := os.RemoveAll(lastPath)
if err != nil {
if err := os.RemoveAll(lastPath); err != nil {
return tracerr.Wrap(err)
}
db, err := pebble.NewPebbleStorage(path.Join(k.path, lastPath), false)
db, err := pebble.NewPebbleStorage(lastPath, false)
if err != nil {
return tracerr.Wrap(err)
}
@@ -80,6 +84,7 @@ func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error {
defer k.rw.Unlock()
if k.db != nil {
k.db.Close()
k.db = nil
}
lastPath := path.Join(k.path, PathLast)
if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil {
@@ -96,26 +101,48 @@ func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error {
func (k *Last) close() {
k.rw.Lock()
defer k.rw.Unlock()
k.db.Close()
if k.db != nil {
k.db.Close()
k.db = nil
}
}
// Config of the KVDB
type Config struct {
// Path where the checkpoints will be stored
Path string
// Keep is the number of old checkpoints to keep. If 0, all
// checkpoints are kept.
Keep int
// At every checkpoint, check that there are no gaps between the
// checkpoints
NoGapsCheck bool
// NoLast skips having an opened DB with a checkpoint to the last
// batchNum for thread-safe reads.
NoLast bool
}
// NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
// Checkpoints older than the value defined by `keep` will be deleted.
func NewKVDB(pathDB string, keep int) (*KVDB, error) {
// func NewKVDB(pathDB string, keep int) (*KVDB, error) {
func NewKVDB(cfg Config) (*KVDB, error) {
var sto *pebble.Storage
var err error
sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
sto, err = pebble.NewPebbleStorage(path.Join(cfg.Path, PathCurrent), false)
if err != nil {
return nil, tracerr.Wrap(err)
}
var last *Last
if !cfg.NoLast {
last = &Last{
path: cfg.Path,
}
}
kvdb := &KVDB{
path: pathDB,
cfg: cfg,
db: sto,
keep: keep,
last: &Last{
path: pathDB,
},
last: last,
}
// load currentBatch
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
@@ -133,29 +160,32 @@ func NewKVDB(pathDB string, keep int) (*KVDB, error) {
}
// LastRead is a thread-safe method to query the last KVDB
func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error {
kvdb.last.rw.RLock()
defer kvdb.last.rw.RUnlock()
return fn(kvdb.last.db)
func (k *KVDB) LastRead(fn func(db *pebble.Storage) error) error {
if k.last == nil {
return tracerr.Wrap(ErrNoLast)
}
k.last.rw.RLock()
defer k.last.rw.RUnlock()
return fn(k.last.db)
}
// DB returns the *pebble.Storage from the KVDB
func (kvdb *KVDB) DB() *pebble.Storage {
return kvdb.db
func (k *KVDB) DB() *pebble.Storage {
return k.db
}
// StorageWithPrefix returns the db.Storage with the given prefix from the
// current KVDB
func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
return kvdb.db.WithPrefix(prefix)
func (k *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
return k.db.WithPrefix(prefix)
}
// Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
// not delete the checkpoints between old current and the new current, those
// checkpoints will remain in the storage, and eventually will be deleted when
// MakeCheckpoint overwrites them.
func (kvdb *KVDB) Reset(batchNum common.BatchNum) error {
return kvdb.reset(batchNum, true)
func (k *KVDB) Reset(batchNum common.BatchNum) error {
return k.reset(batchNum, true)
}
// reset resets the KVDB to the checkpoint at the given batchNum. Reset does
@@ -163,21 +193,19 @@ func (kvdb *KVDB) Reset(batchNum common.BatchNum) error {
// checkpoints will remain in the storage, and eventually will be deleted when
// MakeCheckpoint overwrites them. `closeCurrent` will close the currently
// opened db before doing the reset.
func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
currentPath := path.Join(kvdb.path, PathCurrent)
func (k *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
currentPath := path.Join(k.cfg.Path, PathCurrent)
if closeCurrent {
if err := kvdb.db.Pebble().Close(); err != nil {
return tracerr.Wrap(err)
}
if closeCurrent && k.db != nil {
k.db.Close()
k.db = nil
}
// remove 'current'
err := os.RemoveAll(currentPath)
if err != nil {
if err := os.RemoveAll(currentPath); err != nil {
return tracerr.Wrap(err)
}
// remove all checkpoints > batchNum
list, err := kvdb.ListCheckpoints()
list, err := k.ListCheckpoints()
if err != nil {
return tracerr.Wrap(err)
}
@@ -190,7 +218,7 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
}
}
for _, bn := range list[start:] {
if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
return tracerr.Wrap(err)
}
}
@@ -201,23 +229,27 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
if err != nil {
return tracerr.Wrap(err)
}
kvdb.db = sto
kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
kvdb.CurrentBatch = 0
if err := kvdb.last.setNew(); err != nil {
return tracerr.Wrap(err)
k.db = sto
k.CurrentIdx = common.RollupConstReservedIDx // 255
k.CurrentBatch = 0
if k.last != nil {
if err := k.last.setNew(); err != nil {
return tracerr.Wrap(err)
}
}
return nil
}
// copy 'batchNum' to 'current'
if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
if err := k.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
return tracerr.Wrap(err)
}
// copy 'batchNum' to 'last'
if err := kvdb.last.set(kvdb, batchNum); err != nil {
return tracerr.Wrap(err)
if k.last != nil {
if err := k.last.set(k, batchNum); err != nil {
return tracerr.Wrap(err)
}
}
// open the new 'current'
@@ -225,15 +257,15 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
if err != nil {
return tracerr.Wrap(err)
}
kvdb.db = sto
k.db = sto
// get currentBatch num
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
k.CurrentBatch, err = k.GetCurrentBatch()
if err != nil {
return tracerr.Wrap(err)
}
// idx is obtained from the statedb reset
kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
k.CurrentIdx, err = k.GetCurrentIdx()
if err != nil {
return tracerr.Wrap(err)
}
@@ -243,28 +275,28 @@ func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
// ResetFromSynchronizer performs a reset in the KVDB getting the state from
// synchronizerKVDB for the given batchNum.
func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
func (k *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
if synchronizerKVDB == nil {
return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
}
currentPath := path.Join(kvdb.path, PathCurrent)
if err := kvdb.db.Pebble().Close(); err != nil {
return tracerr.Wrap(err)
currentPath := path.Join(k.cfg.Path, PathCurrent)
if k.db != nil {
k.db.Close()
k.db = nil
}
// remove 'current'
err := os.RemoveAll(currentPath)
if err != nil {
if err := os.RemoveAll(currentPath); err != nil {
return tracerr.Wrap(err)
}
// remove all checkpoints
list, err := kvdb.ListCheckpoints()
list, err := k.ListCheckpoints()
if err != nil {
return tracerr.Wrap(err)
}
for _, bn := range list {
if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
if err := k.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
return tracerr.Wrap(err)
}
}
@@ -275,14 +307,14 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV
if err != nil {
return tracerr.Wrap(err)
}
kvdb.db = sto
kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
kvdb.CurrentBatch = 0
k.db = sto
k.CurrentIdx = common.RollupConstReservedIDx // 255
k.CurrentBatch = 0
return nil
}
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
// copy synchronizer'BatchNumX' to 'BatchNumX'
if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
@@ -290,7 +322,7 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV
}
// copy 'BatchNumX' to 'current'
err = kvdb.MakeCheckpointFromTo(batchNum, currentPath)
err = k.MakeCheckpointFromTo(batchNum, currentPath)
if err != nil {
return tracerr.Wrap(err)
}
@@ -300,15 +332,15 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV
if err != nil {
return tracerr.Wrap(err)
}
kvdb.db = sto
k.db = sto
// get currentBatch num
kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
k.CurrentBatch, err = k.GetCurrentBatch()
if err != nil {
return tracerr.Wrap(err)
}
// get currentIdx
kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
k.CurrentIdx, err = k.GetCurrentIdx()
if err != nil {
return tracerr.Wrap(err)
}
@@ -317,8 +349,8 @@ func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKV
}
// GetCurrentBatch returns the current BatchNum stored in the KVDB
func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) {
cbBytes, err := kvdb.db.Get(KeyCurrentBatch)
func (k *KVDB) GetCurrentBatch() (common.BatchNum, error) {
cbBytes, err := k.db.Get(KeyCurrentBatch)
if tracerr.Unwrap(err) == db.ErrNotFound {
return 0, nil
}
@@ -329,12 +361,12 @@ func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) {
}
// setCurrentBatch stores the current BatchNum in the KVDB
func (kvdb *KVDB) setCurrentBatch() error {
tx, err := kvdb.db.NewTx()
func (k *KVDB) setCurrentBatch() error {
tx, err := k.db.NewTx()
if err != nil {
return tracerr.Wrap(err)
}
err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes())
err = tx.Put(KeyCurrentBatch, k.CurrentBatch.Bytes())
if err != nil {
return tracerr.Wrap(err)
}
@@ -345,9 +377,9 @@ func (kvdb *KVDB) setCurrentBatch() error {
}
// GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
// used for an Account in the KVDB.
func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) {
idxBytes, err := kvdb.db.Get(keyCurrentIdx)
// used for an Account in the k.
func (k *KVDB) GetCurrentIdx() (common.Idx, error) {
idxBytes, err := k.db.Get(keyCurrentIdx)
if tracerr.Unwrap(err) == db.ErrNotFound {
return common.RollupConstReservedIDx, nil // 255, nil
}
@@ -358,10 +390,10 @@ func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) {
}
// SetCurrentIdx stores Idx in the KVDB
func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error {
kvdb.CurrentIdx = idx
func (k *KVDB) SetCurrentIdx(idx common.Idx) error {
k.CurrentIdx = idx
tx, err := kvdb.db.NewTx()
tx, err := k.db.NewTx()
if err != nil {
return tracerr.Wrap(err)
}
@@ -381,49 +413,64 @@ func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error {
// MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
// Internally this advances & stores the current BatchNum, and then stores a
// Checkpoint of the current state of the KVDB.
func (kvdb *KVDB) MakeCheckpoint() error {
// Checkpoint of the current state of the k.
func (k *KVDB) MakeCheckpoint() error {
// advance currentBatch
kvdb.CurrentBatch++
k.CurrentBatch++
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch))
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, k.CurrentBatch))
if err := kvdb.setCurrentBatch(); err != nil {
if err := k.setCurrentBatch(); err != nil {
return tracerr.Wrap(err)
}
// if checkpoint BatchNum already exist in disk, delete it
if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
err := os.RemoveAll(checkpointPath)
if err != nil {
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
} else if err != nil {
return tracerr.Wrap(err)
} else {
if err := os.RemoveAll(checkpointPath); err != nil {
return tracerr.Wrap(err)
}
} else if err != nil && !os.IsNotExist(err) {
return tracerr.Wrap(err)
}
// execute Checkpoint
if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil {
if err := k.db.Pebble().Checkpoint(checkpointPath); err != nil {
return tracerr.Wrap(err)
}
// copy 'CurrentBatch' to 'last'
if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil {
return tracerr.Wrap(err)
if k.last != nil {
if err := k.last.set(k, k.CurrentBatch); err != nil {
return tracerr.Wrap(err)
}
}
// delete old checkpoints
if err := kvdb.deleteOldCheckpoints(); err != nil {
if err := k.deleteOldCheckpoints(); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// CheckpointExists returns true if the checkpoint exists
func (k *KVDB) CheckpointExists(batchNum common.BatchNum) (bool, error) {
source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(source); os.IsNotExist(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
// DeleteCheckpoint removes if exist the checkpoint of the given batchNum
func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
func (k *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
checkpointPath := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
} else if err != nil {
return tracerr.Wrap(err)
}
return os.RemoveAll(checkpointPath)
@@ -431,8 +478,8 @@ func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
// ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
// If there's a gap between the list of checkpoints, an error is returned.
func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
files, err := ioutil.ReadDir(kvdb.path)
func (k *KVDB) ListCheckpoints() ([]int, error) {
files, err := ioutil.ReadDir(k.cfg.Path)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -449,12 +496,12 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
}
}
sort.Ints(checkpoints)
if len(checkpoints) > 0 {
if !k.cfg.NoGapsCheck && len(checkpoints) > 0 {
first := checkpoints[0]
for _, checkpoint := range checkpoints[1:] {
first++
if checkpoint != first {
log.Errorw("GAP", "checkpoints", checkpoints)
log.Errorw("gap between checkpoints", "checkpoints", checkpoints)
return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
}
}
@@ -464,14 +511,14 @@ func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
// deleteOldCheckpoints deletes old checkpoints when there are more than
// `s.keep` checkpoints
func (kvdb *KVDB) deleteOldCheckpoints() error {
list, err := kvdb.ListCheckpoints()
func (k *KVDB) deleteOldCheckpoints() error {
list, err := k.ListCheckpoints()
if err != nil {
return tracerr.Wrap(err)
}
if len(list) > kvdb.keep {
for _, checkpoint := range list[:len(list)-kvdb.keep] {
if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
if k.cfg.Keep > 0 && len(list) > k.cfg.Keep {
for _, checkpoint := range list[:len(list)-k.cfg.Keep] {
if err := k.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
return tracerr.Wrap(err)
}
}
@@ -482,43 +529,40 @@ func (kvdb *KVDB) deleteOldCheckpoints() error {
// MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
// to the dest folder. This method is locking, so it can be called from
// multiple places at the same time.
func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
func (k *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
source := path.Join(k.cfg.Path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
if _, err := os.Stat(source); os.IsNotExist(err) {
// if kvdb does not have checkpoint at batchNum, return err
return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
} else if err != nil {
return tracerr.Wrap(err)
}
// By locking we allow calling MakeCheckpointFromTo from multiple
// places at the same time for the same stateDB. This allows the
// synchronizer to do a reset to a batchNum at the same time as the
// pipeline is doing a txSelector.Reset and batchBuilder.Reset from
// synchronizer to the same batchNum
kvdb.m.Lock()
defer kvdb.m.Unlock()
k.m.Lock()
defer k.m.Unlock()
return pebbleMakeCheckpoint(source, dest)
}
func pebbleMakeCheckpoint(source, dest string) error {
// Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) {
err := os.RemoveAll(dest)
if err != nil {
if _, err := os.Stat(dest); os.IsNotExist(err) {
} else if err != nil {
return tracerr.Wrap(err)
} else {
if err := os.RemoveAll(dest); err != nil {
return tracerr.Wrap(err)
}
} else if err != nil && !os.IsNotExist(err) {
return tracerr.Wrap(err)
}
sto, err := pebble.NewPebbleStorage(source, false)
if err != nil {
return tracerr.Wrap(err)
}
defer func() {
errClose := sto.Pebble().Close()
if errClose != nil {
log.Errorw("Pebble.Close", "err", errClose)
}
}()
defer sto.Close()
// execute Checkpoint
err = sto.Pebble().Checkpoint(dest)
@@ -530,7 +574,12 @@ func pebbleMakeCheckpoint(source, dest string) error {
}
// Close the DB
func (kvdb *KVDB) Close() {
kvdb.db.Close()
kvdb.last.close()
func (k *KVDB) Close() {
if k.db != nil {
k.db.Close()
k.db = nil
}
if k.last != nil {
k.last.close()
}
}

View File

@@ -37,7 +37,7 @@ func TestCheckpoints(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
db, err := NewKVDB(dir, 128)
db, err := NewKVDB(Config{Path: dir, Keep: 128})
require.NoError(t, err)
// add test key-values
@@ -72,7 +72,7 @@ func TestCheckpoints(t *testing.T) {
err = db.Reset(3)
require.NoError(t, err)
printCheckpoints(t, db.path)
printCheckpoints(t, db.cfg.Path)
// check that currentBatch is as expected after Reset
cb, err = db.GetCurrentBatch()
@@ -99,7 +99,7 @@ func TestCheckpoints(t *testing.T) {
dirLocal, err := ioutil.TempDir("", "ldb")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dirLocal))
ldb, err := NewKVDB(dirLocal, 128)
ldb, err := NewKVDB(Config{Path: dirLocal, Keep: 128})
require.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@@ -120,7 +120,7 @@ func TestCheckpoints(t *testing.T) {
dirLocal2, err := ioutil.TempDir("", "ldb2")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dirLocal2))
ldb2, err := NewKVDB(dirLocal2, 128)
ldb2, err := NewKVDB(Config{Path: dirLocal2, Keep: 128})
require.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@@ -139,9 +139,9 @@ func TestCheckpoints(t *testing.T) {
debug := false
if debug {
printCheckpoints(t, db.path)
printCheckpoints(t, ldb.path)
printCheckpoints(t, ldb2.path)
printCheckpoints(t, db.cfg.Path)
printCheckpoints(t, ldb.cfg.Path)
printCheckpoints(t, ldb2.cfg.Path)
}
}
@@ -150,7 +150,7 @@ func TestListCheckpoints(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
db, err := NewKVDB(dir, 128)
db, err := NewKVDB(Config{Path: dir, Keep: 128})
require.NoError(t, err)
numCheckpoints := 16
@@ -181,7 +181,7 @@ func TestDeleteOldCheckpoints(t *testing.T) {
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
db, err := NewKVDB(dir, keep)
db, err := NewKVDB(Config{Path: dir, Keep: keep})
require.NoError(t, err)
numCheckpoints := 32
@@ -202,7 +202,7 @@ func TestGetCurrentIdx(t *testing.T) {
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
db, err := NewKVDB(dir, keep)
db, err := NewKVDB(Config{Path: dir, Keep: keep})
require.NoError(t, err)
idx, err := db.GetCurrentIdx()
@@ -211,7 +211,7 @@ func TestGetCurrentIdx(t *testing.T) {
db.Close()
db, err = NewKVDB(dir, keep)
db, err = NewKVDB(Config{Path: dir, Keep: keep})
require.NoError(t, err)
idx, err = db.GetCurrentIdx()
@@ -227,7 +227,7 @@ func TestGetCurrentIdx(t *testing.T) {
db.Close()
db, err = NewKVDB(dir, keep)
db, err = NewKVDB(Config{Path: dir, Keep: keep})
require.NoError(t, err)
idx, err = db.GetCurrentIdx()

View File

@@ -323,9 +323,10 @@ func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxNonce, batchNu
// The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
_, err := l2db.db.Exec(
`UPDATE tx_pool SET batch_num = NULL, state = $1
WHERE (state = $2 OR state = $3) AND batch_num > $4`,
`UPDATE tx_pool SET batch_num = NULL, state = $1
WHERE (state = $2 OR state = $3 OR state = $4) AND batch_num > $5`,
common.PoolL2TxStatePending,
common.PoolL2TxStateForging,
common.PoolL2TxStateForged,
common.PoolL2TxStateInvalid,
lastValidBatch,

View File

@@ -52,19 +52,40 @@ const (
// TypeBatchBuilder defines a StateDB used by the BatchBuilder, that
// generates the ExitTree and the ZKInput when processing the txs
TypeBatchBuilder = "batchbuilder"
// MaxNLevels is the maximum value of NLevels for the merkle tree,
// which comes from the fact that AccountIdx has 48 bits.
MaxNLevels = 48
)
// TypeStateDB determines the type of StateDB
type TypeStateDB string
// Config of the StateDB
type Config struct {
// Path where the checkpoints will be stored
Path string
// Keep is the number of old checkpoints to keep. If 0, all
// checkpoints are kept.
Keep int
// NoLast skips having an opened DB with a checkpoint to the last
// batchNum for thread-safe reads.
NoLast bool
// Type of StateDB (
Type TypeStateDB
// NLevels is the number of merkle tree levels in case the Type uses a
// merkle tree. If the Type doesn't use a merkle tree, NLevels should
// be 0.
NLevels int
// At every checkpoint, check that there are no gaps between the
// checkpoints
noGapsCheck bool
}
// StateDB represents the StateDB object
type StateDB struct {
path string
Typ TypeStateDB
db *kvdb.KVDB
nLevels int
MT *merkletree.MerkleTree
keep int
cfg Config
db *kvdb.KVDB
MT *merkletree.MerkleTree
}
// Last offers a subset of view methods of the StateDB that can be
@@ -104,36 +125,40 @@ func (s *Last) GetAccounts() ([]common.Account, error) {
// NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk
// storage. Checkpoints older than the value defined by `keep` will be
// deleted.
func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int) (*StateDB, error) {
// func NewStateDB(pathDB string, keep int, typ TypeStateDB, nLevels int) (*StateDB, error) {
func NewStateDB(cfg Config) (*StateDB, error) {
var kv *kvdb.KVDB
var err error
kv, err = kvdb.NewKVDB(pathDB, keep)
kv, err = kvdb.NewKVDB(kvdb.Config{Path: cfg.Path, Keep: cfg.Keep,
NoGapsCheck: cfg.noGapsCheck, NoLast: cfg.NoLast})
if err != nil {
return nil, tracerr.Wrap(err)
}
var mt *merkletree.MerkleTree = nil
if typ == TypeSynchronizer || typ == TypeBatchBuilder {
mt, err = merkletree.NewMerkleTree(kv.StorageWithPrefix(PrefixKeyMT), nLevels)
if cfg.Type == TypeSynchronizer || cfg.Type == TypeBatchBuilder {
mt, err = merkletree.NewMerkleTree(kv.StorageWithPrefix(PrefixKeyMT), cfg.NLevels)
if err != nil {
return nil, tracerr.Wrap(err)
}
}
if typ == TypeTxSelector && nLevels != 0 {
if cfg.Type == TypeTxSelector && cfg.NLevels != 0 {
return nil, tracerr.Wrap(fmt.Errorf("invalid StateDB parameters: StateDB type==TypeStateDB can not have nLevels!=0"))
}
return &StateDB{
path: pathDB,
db: kv,
nLevels: nLevels,
MT: mt,
Typ: typ,
keep: keep,
cfg: cfg,
db: kv,
MT: mt,
}, nil
}
// Type returns the StateDB configured Type
func (s *StateDB) Type() TypeStateDB {
return s.cfg.Type
}
// LastRead is a thread-safe method to query the last checkpoint of the StateDB
// via the Last type methods
func (s *StateDB) LastRead(fn func(sdbLast *Last) error) error {
@@ -179,7 +204,7 @@ func (s *StateDB) LastGetCurrentBatch() (common.BatchNum, error) {
func (s *StateDB) LastMTGetRoot() (*big.Int, error) {
var root *big.Int
if err := s.LastRead(func(sdb *Last) error {
mt, err := merkletree.NewMerkleTree(sdb.DB().WithPrefix(PrefixKeyMT), s.nLevels)
mt, err := merkletree.NewMerkleTree(sdb.DB().WithPrefix(PrefixKeyMT), s.cfg.NLevels)
if err != nil {
return tracerr.Wrap(err)
}
@@ -195,7 +220,7 @@ func (s *StateDB) LastMTGetRoot() (*big.Int, error) {
// Internally this advances & stores the current BatchNum, and then stores a
// Checkpoint of the current state of the StateDB.
func (s *StateDB) MakeCheckpoint() error {
log.Debugw("Making StateDB checkpoint", "batch", s.CurrentBatch()+1, "type", s.Typ)
log.Debugw("Making StateDB checkpoint", "batch", s.CurrentBatch()+1, "type", s.cfg.Type)
return s.db.MakeCheckpoint()
}
@@ -230,8 +255,8 @@ func (s *StateDB) SetCurrentIdx(idx common.Idx) error {
// those checkpoints will remain in the storage, and eventually will be
// deleted when MakeCheckpoint overwrites them.
func (s *StateDB) Reset(batchNum common.BatchNum) error {
err := s.db.Reset(batchNum)
if err != nil {
log.Debugw("Making StateDB Reset", "batch", batchNum, "type", s.cfg.Type)
if err := s.db.Reset(batchNum); err != nil {
return tracerr.Wrap(err)
}
if s.MT != nil {
@@ -242,7 +267,6 @@ func (s *StateDB) Reset(batchNum common.BatchNum) error {
}
s.MT = mt
}
log.Debugw("Making StateDB Reset", "batch", batchNum)
return nil
}
@@ -461,9 +485,10 @@ type LocalStateDB struct {
// NewLocalStateDB returns a new LocalStateDB connected to the given
// synchronizerDB. Checkpoints older than the value defined by `keep` will be
// deleted.
func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeStateDB,
nLevels int) (*LocalStateDB, error) {
s, err := NewStateDB(path, keep, typ, nLevels)
func NewLocalStateDB(cfg Config, synchronizerDB *StateDB) (*LocalStateDB, error) {
cfg.noGapsCheck = true
cfg.NoLast = true
s, err := NewStateDB(cfg)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -473,18 +498,24 @@ func NewLocalStateDB(path string, keep int, synchronizerDB *StateDB, typ TypeSta
}, nil
}
// CheckpointExists returns true if the checkpoint exists
func (l *LocalStateDB) CheckpointExists(batchNum common.BatchNum) (bool, error) {
return l.db.CheckpointExists(batchNum)
}
// Reset performs a reset in the LocaStateDB. If fromSynchronizer is true, it
// gets the state from LocalStateDB.synchronizerStateDB for the given batchNum.
// If fromSynchronizer is false, get the state from LocalStateDB checkpoints.
func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) error {
if fromSynchronizer {
err := l.db.ResetFromSynchronizer(batchNum, l.synchronizerStateDB.db)
if err != nil {
log.Debugw("Making StateDB ResetFromSynchronizer", "batch", batchNum, "type", l.cfg.Type)
if err := l.db.ResetFromSynchronizer(batchNum, l.synchronizerStateDB.db); err != nil {
return tracerr.Wrap(err)
}
// open the MT for the current s.db
if l.MT != nil {
mt, err := merkletree.NewMerkleTree(l.db.StorageWithPrefix(PrefixKeyMT), l.MT.MaxLevels())
mt, err := merkletree.NewMerkleTree(l.db.StorageWithPrefix(PrefixKeyMT),
l.MT.MaxLevels())
if err != nil {
return tracerr.Wrap(err)
}

View File

@@ -45,7 +45,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0})
require.NoError(t, err)
// test values
@@ -78,7 +78,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
// call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey0'&'testvalue0' data)
sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0)
sdb, err = NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0})
require.NoError(t, err)
v, err = sdb.db.DB().Get(k0)
assert.NotNil(t, err)
@@ -116,7 +116,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
bn, err := sdb.getCurrentBatch()
require.NoError(t, err)
assert.Equal(t, common.BatchNum(0), bn)
err = sdb.db.MakeCheckpoint()
err = sdb.MakeCheckpoint()
require.NoError(t, err)
bn, err = sdb.getCurrentBatch()
require.NoError(t, err)
@@ -158,7 +158,7 @@ func TestNewStateDBIntermediateState(t *testing.T) {
// call NewStateDB which should get the db at the last checkpoint state
// executing a Reset (discarding the last 'testkey1'&'testvalue1' data)
sdb, err = NewStateDB(dir, 128, TypeTxSelector, 0)
sdb, err = NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0})
require.NoError(t, err)
bn, err = sdb.getCurrentBatch()
@@ -182,7 +182,7 @@ func TestStateDBWithoutMT(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0})
require.NoError(t, err)
// create test accounts
@@ -236,7 +236,7 @@ func TestStateDBWithMT(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
// create test accounts
@@ -290,7 +290,7 @@ func TestCheckpoints(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
err = sdb.Reset(0)
@@ -335,7 +335,7 @@ func TestCheckpoints(t *testing.T) {
assert.Equal(t, common.BatchNum(i+1), cb)
}
// printCheckpoints(t, sdb.path)
// printCheckpoints(t, sdb.cfg.Path)
// reset checkpoint
err = sdb.Reset(3)
@@ -371,7 +371,7 @@ func TestCheckpoints(t *testing.T) {
dirLocal, err := ioutil.TempDir("", "ldb")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dirLocal))
ldb, err := NewLocalStateDB(dirLocal, 128, sdb, TypeBatchBuilder, 32)
ldb, err := NewLocalStateDB(Config{Path: dirLocal, Keep: 128, Type: TypeBatchBuilder, NLevels: 32}, sdb)
require.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
@@ -392,28 +392,26 @@ func TestCheckpoints(t *testing.T) {
dirLocal2, err := ioutil.TempDir("", "ldb2")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dirLocal2))
ldb2, err := NewLocalStateDB(dirLocal2, 128, sdb, TypeBatchBuilder, 32)
ldb2, err := NewLocalStateDB(Config{Path: dirLocal2, Keep: 128, Type: TypeBatchBuilder, NLevels: 32}, sdb)
require.NoError(t, err)
// get checkpoint 4 from sdb (StateDB) to ldb (LocalStateDB)
err = ldb2.Reset(4, true)
require.NoError(t, err)
// check that currentBatch is 4 after the Reset
cb, err = ldb2.db.GetCurrentBatch()
require.NoError(t, err)
cb = ldb2.CurrentBatch()
assert.Equal(t, common.BatchNum(4), cb)
// advance one checkpoint in ldb2
err = ldb2.db.MakeCheckpoint()
require.NoError(t, err)
cb, err = ldb2.db.GetCurrentBatch()
err = ldb2.MakeCheckpoint()
require.NoError(t, err)
cb = ldb2.CurrentBatch()
assert.Equal(t, common.BatchNum(5), cb)
debug := false
if debug {
printCheckpoints(t, sdb.path)
printCheckpoints(t, ldb.path)
printCheckpoints(t, ldb2.path)
printCheckpoints(t, sdb.cfg.Path)
printCheckpoints(t, ldb.cfg.Path)
printCheckpoints(t, ldb2.cfg.Path)
}
}
@@ -421,7 +419,7 @@ func TestStateDBGetAccounts(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0})
require.NoError(t, err)
// create test accounts
@@ -468,7 +466,7 @@ func TestCheckAccountsTreeTestVectors(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
ay0 := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(253), nil), big.NewInt(1))
@@ -542,7 +540,7 @@ func TestListCheckpoints(t *testing.T) {
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeSynchronizer, 32)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
numCheckpoints := 16
@@ -575,7 +573,7 @@ func TestDeleteOldCheckpoints(t *testing.T) {
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32)
sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
numCheckpoints := 32
@@ -596,7 +594,7 @@ func TestCurrentIdx(t *testing.T) {
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
sdb, err := NewStateDB(dir, keep, TypeSynchronizer, 32)
sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
idx := sdb.CurrentIdx()
@@ -604,7 +602,7 @@ func TestCurrentIdx(t *testing.T) {
sdb.Close()
sdb, err = NewStateDB(dir, keep, TypeSynchronizer, 32)
sdb, err = NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
idx = sdb.CurrentIdx()
@@ -618,9 +616,30 @@ func TestCurrentIdx(t *testing.T) {
sdb.Close()
sdb, err = NewStateDB(dir, keep, TypeSynchronizer, 32)
sdb, err = NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
idx = sdb.CurrentIdx()
assert.Equal(t, common.Idx(255), idx)
}
func TestResetFromBadCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer require.NoError(t, os.RemoveAll(dir))
keep := 16
sdb, err := NewStateDB(Config{Path: dir, Keep: keep, Type: TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
err = sdb.MakeCheckpoint()
require.NoError(t, err)
err = sdb.MakeCheckpoint()
require.NoError(t, err)
err = sdb.MakeCheckpoint()
require.NoError(t, err)
// reset from a checkpoint that doesn't exist
err = sdb.Reset(10)
require.Error(t, err)
}

View File

@@ -18,7 +18,7 @@ func TestGetIdx(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := NewStateDB(dir, 128, TypeTxSelector, 0)
sdb, err := NewStateDB(Config{Path: dir, Keep: 128, Type: TypeTxSelector, NLevels: 0})
assert.NoError(t, err)
var sk babyjub.PrivateKey

View File

@@ -254,7 +254,7 @@ type AuctionInterface interface {
//
AuctionConstants() (*common.AuctionConstants, error)
AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *ethCommon.Hash, error)
AuctionEventsByBlock(blockNum int64, blockHash *ethCommon.Hash) (*AuctionEvents, error)
AuctionEventInit() (*AuctionEventInitialize, int64, error)
}
@@ -797,15 +797,22 @@ func (c *AuctionClient) AuctionEventInit() (*AuctionEventInitialize, int64, erro
}
// AuctionEventsByBlock returns the events in a block that happened in the
// Auction Smart Contract and the blockHash where the eents happened. If there
// are no events in that block, blockHash is nil.
func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *ethCommon.Hash, error) {
// Auction Smart Contract.
// To query by blockNum, set blockNum >= 0 and blockHash == nil.
// To query by blockHash set blockHash != nil, and blockNum will be ignored.
// If there are no events in that block the result is nil.
func (c *AuctionClient) AuctionEventsByBlock(blockNum int64,
blockHash *ethCommon.Hash) (*AuctionEvents, error) {
var auctionEvents AuctionEvents
var blockHash *ethCommon.Hash
var blockNumBigInt *big.Int
if blockHash == nil {
blockNumBigInt = big.NewInt(blockNum)
}
query := ethereum.FilterQuery{
FromBlock: big.NewInt(blockNum),
ToBlock: big.NewInt(blockNum),
BlockHash: blockHash,
FromBlock: blockNumBigInt,
ToBlock: blockNumBigInt,
Addresses: []ethCommon.Address{
c.address,
},
@@ -814,15 +821,16 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
logs, err := c.client.client.FilterLogs(context.TODO(), query)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
if len(logs) > 0 {
blockHash = &logs[0].BlockHash
if len(logs) == 0 {
return nil, nil
}
for _, vLog := range logs {
if vLog.BlockHash != *blockHash {
if blockHash != nil && vLog.BlockHash != *blockHash {
log.Errorw("Block hash mismatch", "expected", blockHash.String(), "got", vLog.BlockHash.String())
return nil, nil, tracerr.Wrap(ErrBlockHashMismatchEvent)
return nil, tracerr.Wrap(ErrBlockHashMismatchEvent)
}
switch vLog.Topics[0] {
case logAuctionNewBid:
@@ -833,7 +841,7 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
}
var newBid AuctionEventNewBid
if err := c.contractAbi.UnpackIntoInterface(&auxNewBid, "NewBid", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
newBid.BidAmount = auxNewBid.BidAmount
newBid.Slot = new(big.Int).SetBytes(vLog.Topics[1][:]).Int64()
@@ -842,19 +850,19 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
case logAuctionNewSlotDeadline:
var newSlotDeadline AuctionEventNewSlotDeadline
if err := c.contractAbi.UnpackIntoInterface(&newSlotDeadline, "NewSlotDeadline", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
auctionEvents.NewSlotDeadline = append(auctionEvents.NewSlotDeadline, newSlotDeadline)
case logAuctionNewClosedAuctionSlots:
var newClosedAuctionSlots AuctionEventNewClosedAuctionSlots
if err := c.contractAbi.UnpackIntoInterface(&newClosedAuctionSlots, "NewClosedAuctionSlots", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
auctionEvents.NewClosedAuctionSlots = append(auctionEvents.NewClosedAuctionSlots, newClosedAuctionSlots)
case logAuctionNewOutbidding:
var newOutbidding AuctionEventNewOutbidding
if err := c.contractAbi.UnpackIntoInterface(&newOutbidding, "NewOutbidding", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
auctionEvents.NewOutbidding = append(auctionEvents.NewOutbidding, newOutbidding)
case logAuctionNewDonationAddress:
@@ -864,26 +872,26 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
case logAuctionNewBootCoordinator:
var newBootCoordinator AuctionEventNewBootCoordinator
if err := c.contractAbi.UnpackIntoInterface(&newBootCoordinator, "NewBootCoordinator", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
newBootCoordinator.NewBootCoordinator = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
auctionEvents.NewBootCoordinator = append(auctionEvents.NewBootCoordinator, newBootCoordinator)
case logAuctionNewOpenAuctionSlots:
var newOpenAuctionSlots AuctionEventNewOpenAuctionSlots
if err := c.contractAbi.UnpackIntoInterface(&newOpenAuctionSlots, "NewOpenAuctionSlots", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
auctionEvents.NewOpenAuctionSlots = append(auctionEvents.NewOpenAuctionSlots, newOpenAuctionSlots)
case logAuctionNewAllocationRatio:
var newAllocationRatio AuctionEventNewAllocationRatio
if err := c.contractAbi.UnpackIntoInterface(&newAllocationRatio, "NewAllocationRatio", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
auctionEvents.NewAllocationRatio = append(auctionEvents.NewAllocationRatio, newAllocationRatio)
case logAuctionSetCoordinator:
var setCoordinator AuctionEventSetCoordinator
if err := c.contractAbi.UnpackIntoInterface(&setCoordinator, "SetCoordinator", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
setCoordinator.BidderAddress = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
setCoordinator.ForgerAddress = ethCommon.BytesToAddress(vLog.Topics[2].Bytes())
@@ -891,7 +899,7 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
case logAuctionNewForgeAllocated:
var newForgeAllocated AuctionEventNewForgeAllocated
if err := c.contractAbi.UnpackIntoInterface(&newForgeAllocated, "NewForgeAllocated", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
newForgeAllocated.Bidder = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
newForgeAllocated.Forger = ethCommon.BytesToAddress(vLog.Topics[2].Bytes())
@@ -904,7 +912,7 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
}
var newDefaultSlotSetBid AuctionEventNewDefaultSlotSetBid
if err := c.contractAbi.UnpackIntoInterface(&auxNewDefaultSlotSetBid, "NewDefaultSlotSetBid", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
newDefaultSlotSetBid.NewInitialMinBid = auxNewDefaultSlotSetBid.NewInitialMinBid
newDefaultSlotSetBid.SlotSet = auxNewDefaultSlotSetBid.SlotSet.Int64()
@@ -917,11 +925,11 @@ func (c *AuctionClient) AuctionEventsByBlock(blockNum int64) (*AuctionEvents, *e
case logAuctionHEZClaimed:
var HEZClaimed AuctionEventHEZClaimed
if err := c.contractAbi.UnpackIntoInterface(&HEZClaimed, "HEZClaimed", vLog.Data); err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
HEZClaimed.Owner = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
auctionEvents.HEZClaimed = append(auctionEvents.HEZClaimed, HEZClaimed)
}
}
return &auctionEvents, blockHash, nil
return &auctionEvents, nil
}

View File

@@ -88,7 +88,7 @@ func TestAuctionSetSlotDeadline(t *testing.T) {
assert.Equal(t, newSlotDeadline, slotDeadline)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newSlotDeadline, auctionEvents.NewSlotDeadline[0].NewSlotDeadline)
}
@@ -109,7 +109,7 @@ func TestAuctionSetOpenAuctionSlots(t *testing.T) {
assert.Equal(t, newOpenAuctionSlots, openAuctionSlots)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newOpenAuctionSlots, auctionEvents.NewOpenAuctionSlots[0].NewOpenAuctionSlots)
}
@@ -130,7 +130,7 @@ func TestAuctionSetClosedAuctionSlots(t *testing.T) {
assert.Equal(t, newClosedAuctionSlots, closedAuctionSlots)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newClosedAuctionSlots, auctionEvents.NewClosedAuctionSlots[0].NewClosedAuctionSlots)
_, err = auctionClientTest.AuctionSetClosedAuctionSlots(closedAuctionSlots)
@@ -153,7 +153,7 @@ func TestAuctionSetOutbidding(t *testing.T) {
assert.Equal(t, newOutbidding, outbidding)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newOutbidding, auctionEvents.NewOutbidding[0].NewOutbidding)
_, err = auctionClientTest.AuctionSetOutbidding(outbiddingConst)
@@ -176,7 +176,7 @@ func TestAuctionSetAllocationRatio(t *testing.T) {
assert.Equal(t, newAllocationRatio, allocationRatio)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newAllocationRatio, auctionEvents.NewAllocationRatio[0].NewAllocationRatio)
_, err = auctionClientTest.AuctionSetAllocationRatio(allocationRatioConst)
@@ -205,7 +205,7 @@ func TestAuctionSetDonationAddress(t *testing.T) {
assert.Equal(t, &newDonationAddress, donationAddress)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newDonationAddress, auctionEvents.NewDonationAddress[0].NewDonationAddress)
_, err = auctionClientTest.AuctionSetDonationAddress(donationAddressConst)
@@ -224,7 +224,7 @@ func TestAuctionSetBootCoordinator(t *testing.T) {
assert.Equal(t, &newBootCoordinator, bootCoordinator)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, newBootCoordinator, auctionEvents.NewBootCoordinator[0].NewBootCoordinator)
assert.Equal(t, newBootCoordinatorURL, auctionEvents.NewBootCoordinator[0].NewBootCoordinatorURL)
@@ -261,7 +261,7 @@ func TestAuctionChangeDefaultSlotSetBid(t *testing.T) {
assert.Equal(t, minBid, newInitialMinBid)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, slotSet, auctionEvents.NewDefaultSlotSetBid[0].SlotSet)
assert.Equal(t, newInitialMinBid, auctionEvents.NewDefaultSlotSetBid[0].NewInitialMinBid)
@@ -287,7 +287,7 @@ func TestAuctionRegisterCoordinator(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, forgerAddress, auctionEvents.SetCoordinator[0].ForgerAddress)
assert.Equal(t, bidderAddress, auctionEvents.SetCoordinator[0].BidderAddress)
@@ -306,7 +306,7 @@ func TestAuctionBid(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, bidAmount, auctionEvents.NewBid[0].BidAmount)
assert.Equal(t, bidderAddress, auctionEvents.NewBid[0].Bidder)
@@ -346,7 +346,7 @@ func TestAuctionMultiBid(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, bidderAddress, auctionEvents.NewBid[0].Bidder)
assert.Equal(t, currentSlot+4, auctionEvents.NewBid[0].Slot)
@@ -376,7 +376,7 @@ func TestAuctionClaimHEZ(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := auctionClientTest.client.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum)
auctionEvents, err := auctionClientTest.AuctionEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, amount, auctionEvents.HEZClaimed[0].Amount)
assert.Equal(t, governanceAddressConst, auctionEvents.HEZClaimed[0].Owner)

View File

@@ -264,7 +264,7 @@ type RollupInterface interface {
//
RollupConstants() (*common.RollupConstants, error)
RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethCommon.Hash, error)
RollupEventsByBlock(blockNum int64, blockHash *ethCommon.Hash) (*RollupEvents, error)
RollupForgeBatchArgs(ethCommon.Hash, uint16) (*RollupForgeBatchArgs, *ethCommon.Address, error)
RollupEventInit() (*RollupEventInitialize, int64, error)
}
@@ -735,31 +735,40 @@ func (c *RollupClient) RollupEventInit() (*RollupEventInitialize, int64, error)
return &rollupInit, int64(vLog.BlockNumber), tracerr.Wrap(err)
}
// RollupEventsByBlock returns the events in a block that happened in the Rollup Smart Contract
func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethCommon.Hash, error) {
// RollupEventsByBlock returns the events in a block that happened in the
// Rollup Smart Contract.
// To query by blockNum, set blockNum >= 0 and blockHash == nil.
// To query by blockHash set blockHash != nil, and blockNum will be ignored.
// If there are no events in that block the result is nil.
func (c *RollupClient) RollupEventsByBlock(blockNum int64,
blockHash *ethCommon.Hash) (*RollupEvents, error) {
var rollupEvents RollupEvents
var blockHash *ethCommon.Hash
var blockNumBigInt *big.Int
if blockHash == nil {
blockNumBigInt = big.NewInt(blockNum)
}
query := ethereum.FilterQuery{
FromBlock: big.NewInt(blockNum),
ToBlock: big.NewInt(blockNum),
BlockHash: blockHash,
FromBlock: blockNumBigInt,
ToBlock: blockNumBigInt,
Addresses: []ethCommon.Address{
c.address,
},
BlockHash: nil,
Topics: [][]ethCommon.Hash{},
Topics: [][]ethCommon.Hash{},
}
logs, err := c.client.client.FilterLogs(context.Background(), query)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
if len(logs) > 0 {
blockHash = &logs[0].BlockHash
if len(logs) == 0 {
return nil, nil
}
for _, vLog := range logs {
if vLog.BlockHash != *blockHash {
if blockHash != nil && vLog.BlockHash != *blockHash {
log.Errorw("Block hash mismatch", "expected", blockHash.String(), "got", vLog.BlockHash.String())
return nil, nil, tracerr.Wrap(ErrBlockHashMismatchEvent)
return nil, tracerr.Wrap(ErrBlockHashMismatchEvent)
}
switch vLog.Topics[0] {
case logHermezL1UserTxEvent:
@@ -767,11 +776,11 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var L1UserTx RollupEventL1UserTx
err := c.contractAbi.UnpackIntoInterface(&L1UserTxAux, "L1UserTxEvent", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
L1Tx, err := common.L1UserTxFromBytes(L1UserTxAux.L1UserTx)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
toForgeL1TxsNum := new(big.Int).SetBytes(vLog.Topics[1][:]).Int64()
L1Tx.ToForgeL1TxsNum = &toForgeL1TxsNum
@@ -783,7 +792,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var addToken RollupEventAddToken
err := c.contractAbi.UnpackIntoInterface(&addToken, "AddToken", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
addToken.TokenAddress = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
rollupEvents.AddToken = append(rollupEvents.AddToken, addToken)
@@ -791,7 +800,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var forgeBatch RollupEventForgeBatch
err := c.contractAbi.UnpackIntoInterface(&forgeBatch, "ForgeBatch", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
forgeBatch.BatchNum = new(big.Int).SetBytes(vLog.Topics[1][:]).Int64()
forgeBatch.EthTxHash = vLog.TxHash
@@ -803,7 +812,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
}
err := c.contractAbi.UnpackIntoInterface(&updateForgeL1L2BatchTimeout, "UpdateForgeL1L2BatchTimeout", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
rollupEvents.UpdateForgeL1L2BatchTimeout = append(rollupEvents.UpdateForgeL1L2BatchTimeout,
RollupEventUpdateForgeL1L2BatchTimeout{
@@ -813,7 +822,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var updateFeeAddToken RollupEventUpdateFeeAddToken
err := c.contractAbi.UnpackIntoInterface(&updateFeeAddToken, "UpdateFeeAddToken", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
rollupEvents.UpdateFeeAddToken = append(rollupEvents.UpdateFeeAddToken, updateFeeAddToken)
case logHermezWithdrawEvent:
@@ -831,7 +840,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var updateBucketWithdraw RollupEventUpdateBucketWithdraw
err := c.contractAbi.UnpackIntoInterface(&updateBucketWithdrawAux, "UpdateBucketWithdraw", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
updateBucketWithdraw.Withdrawals = updateBucketWithdrawAux.Withdrawals
updateBucketWithdraw.NumBucket = int(new(big.Int).SetBytes(vLog.Topics[1][:]).Int64())
@@ -842,7 +851,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var withdrawalDelay RollupEventUpdateWithdrawalDelay
err := c.contractAbi.UnpackIntoInterface(&withdrawalDelay, "UpdateWithdrawalDelay", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
rollupEvents.UpdateWithdrawalDelay = append(rollupEvents.UpdateWithdrawalDelay, withdrawalDelay)
case logHermezUpdateBucketsParameters:
@@ -850,7 +859,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var bucketsParameters RollupEventUpdateBucketsParameters
err := c.contractAbi.UnpackIntoInterface(&bucketsParametersAux, "UpdateBucketsParameters", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
for i, bucket := range bucketsParametersAux.ArrayBuckets {
bucketsParameters.ArrayBuckets[i].CeilUSD = bucket[0]
@@ -863,7 +872,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
var tokensExchange RollupEventUpdateTokenExchange
err := c.contractAbi.UnpackIntoInterface(&tokensExchange, "UpdateTokenExchange", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
rollupEvents.UpdateTokenExchange = append(rollupEvents.UpdateTokenExchange, tokensExchange)
case logHermezSafeMode:
@@ -885,7 +894,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
bucketsParameters)
}
}
return &rollupEvents, blockHash, nil
return &rollupEvents, nil
}
// RollupForgeBatchArgs returns the arguments used in a ForgeBatch call in the
@@ -893,7 +902,7 @@ func (c *RollupClient) RollupEventsByBlock(blockNum int64) (*RollupEvents, *ethC
func (c *RollupClient) RollupForgeBatchArgs(ethTxHash ethCommon.Hash, l1UserTxsLen uint16) (*RollupForgeBatchArgs, *ethCommon.Address, error) {
tx, _, err := c.client.client.TransactionByHash(context.Background(), ethTxHash)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("TransactionByHash: %w", err))
}
txData := tx.Data()

View File

@@ -91,7 +91,7 @@ func TestRollupAddToken(t *testing.T) {
require.NoError(t, err)
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, tokenHEZAddressConst, rollupEvents.AddToken[0].TokenAddress)
@@ -174,7 +174,7 @@ func TestRollupForgeBatch(t *testing.T) {
currentBlockNum, err = rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, int64(1), rollupEvents.ForgeBatch[0].BatchNum)
@@ -203,7 +203,7 @@ func TestRollupUpdateForgeL1L2BatchTimeout(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, newForgeL1L2BatchTimeout, rollupEvents.UpdateForgeL1L2BatchTimeout[0].NewForgeL1L2BatchTimeout)
@@ -216,7 +216,7 @@ func TestRollupUpdateFeeAddToken(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, newFeeAddToken, rollupEvents.UpdateFeeAddToken[0].NewFeeAddToken)
@@ -235,7 +235,7 @@ func TestRollupUpdateBucketsParameters(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
blockStampBucket = currentBlockNum
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, bucketsParameters, rollupEvents.UpdateBucketsParameters[0].ArrayBuckets)
}
@@ -246,7 +246,7 @@ func TestRollupUpdateWithdrawalDelay(t *testing.T) {
require.NoError(t, err)
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, newWithdrawalDelay, int64(rollupEvents.UpdateWithdrawalDelay[0].NewWithdrawalDelay))
}
@@ -263,7 +263,7 @@ func TestRollupUpdateTokenExchange(t *testing.T) {
require.NoError(t, err)
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, addressArray, rollupEvents.UpdateTokenExchange[0].AddressArray)
assert.Equal(t, valueArray, rollupEvents.UpdateTokenExchange[0].ValueArray)
@@ -292,7 +292,7 @@ func TestRollupL1UserTxETHCreateAccountDeposit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.FromBJJ, rollupEvents.L1UserTx[0].L1UserTx.FromBJJ)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
@@ -324,7 +324,7 @@ func TestRollupL1UserTxERC20CreateAccountDeposit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.FromBJJ, rollupEvents.L1UserTx[0].L1UserTx.FromBJJ)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
@@ -356,7 +356,7 @@ func TestRollupL1UserTxERC20PermitCreateAccountDeposit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.FromBJJ, rollupEvents.L1UserTx[0].L1UserTx.FromBJJ)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
@@ -388,7 +388,7 @@ func TestRollupL1UserTxETHDeposit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -418,7 +418,7 @@ func TestRollupL1UserTxERC20Deposit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -447,7 +447,7 @@ func TestRollupL1UserTxERC20PermitDeposit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -478,7 +478,7 @@ func TestRollupL1UserTxETHDepositTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -508,7 +508,7 @@ func TestRollupL1UserTxERC20DepositTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -538,7 +538,7 @@ func TestRollupL1UserTxERC20PermitDepositTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -569,7 +569,7 @@ func TestRollupL1UserTxETHCreateAccountDepositTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -599,7 +599,7 @@ func TestRollupL1UserTxERC20CreateAccountDepositTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -629,7 +629,7 @@ func TestRollupL1UserTxERC20PermitCreateAccountDepositTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -659,7 +659,7 @@ func TestRollupL1UserTxETHForceTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -688,7 +688,7 @@ func TestRollupL1UserTxERC20ForceTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -717,7 +717,7 @@ func TestRollupL1UserTxERC20PermitForceTransfer(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -747,7 +747,7 @@ func TestRollupL1UserTxETHForceExit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -776,7 +776,7 @@ func TestRollupL1UserTxERC20ForceExit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -807,7 +807,7 @@ func TestRollupL1UserTxERC20PermitForceExit(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, l1Tx.ToIdx, rollupEvents.L1UserTx[0].L1UserTx.ToIdx)
assert.Equal(t, l1Tx.DepositAmount, rollupEvents.L1UserTx[0].L1UserTx.DepositAmount)
@@ -822,7 +822,7 @@ func TestRollupForgeBatch2(t *testing.T) {
require.NoError(t, err)
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, int64(2), rollupEvents.ForgeBatch[0].BatchNum)
@@ -876,7 +876,7 @@ func TestRollupForgeBatch2(t *testing.T) {
currentBlockNum, err = rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err = rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err = rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, int64(3), rollupEvents.ForgeBatch[0].BatchNum)
@@ -928,7 +928,7 @@ func TestRollupWithdrawMerkleProof(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
assert.Equal(t, uint64(fromIdx), rollupEvents.Withdraw[0].Idx)
@@ -951,7 +951,7 @@ func TestRollupSafeMode(t *testing.T) {
currentBlockNum, err := rollupClient.client.EthLastBlock()
require.NoError(t, err)
rollupEvents, _, err := rollupClient.RollupEventsByBlock(currentBlockNum)
rollupEvents, err := rollupClient.RollupEventsByBlock(currentBlockNum, nil)
require.NoError(t, err)
auxEvent := new(RollupEventSafeMode)
assert.Equal(t, auxEvent, &rollupEvents.SafeMode[0])

View File

@@ -134,7 +134,7 @@ type WDelayerInterface interface {
WDelayerWithdrawal(owner, token ethCommon.Address) (*types.Transaction, error)
WDelayerEscapeHatchWithdrawal(to, token ethCommon.Address, amount *big.Int) (*types.Transaction, error)
WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents, *ethCommon.Hash, error)
WDelayerEventsByBlock(blockNum int64, blockHash *ethCommon.Hash) (*WDelayerEvents, error)
WDelayerConstants() (*common.WDelayerConstants, error)
WDelayerEventInit() (*WDelayerEventInitialize, int64, error)
}
@@ -424,40 +424,47 @@ func (c *WDelayerClient) WDelayerEventInit() (*WDelayerEventInitialize, int64, e
}
// WDelayerEventsByBlock returns the events in a block that happened in the
// WDelayer Smart Contract and the blockHash where the eents happened. If
// there are no events in that block, blockHash is nil.
func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents, *ethCommon.Hash, error) {
// WDelayer Smart Contract.
// To query by blockNum, set blockNum >= 0 and blockHash == nil.
// To query by blockHash set blockHash != nil, and blockNum will be ignored.
// If there are no events in that block the result is nil.
func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64,
blockHash *ethCommon.Hash) (*WDelayerEvents, error) {
var wdelayerEvents WDelayerEvents
var blockHash *ethCommon.Hash
var blockNumBigInt *big.Int
if blockHash == nil {
blockNumBigInt = big.NewInt(blockNum)
}
query := ethereum.FilterQuery{
FromBlock: big.NewInt(blockNum),
ToBlock: big.NewInt(blockNum),
BlockHash: blockHash,
FromBlock: blockNumBigInt,
ToBlock: blockNumBigInt,
Addresses: []ethCommon.Address{
c.address,
},
BlockHash: nil,
Topics: [][]ethCommon.Hash{},
Topics: [][]ethCommon.Hash{},
}
logs, err := c.client.client.FilterLogs(context.Background(), query)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
if len(logs) > 0 {
blockHash = &logs[0].BlockHash
if len(logs) == 0 {
return nil, nil
}
for _, vLog := range logs {
if vLog.BlockHash != *blockHash {
if blockHash != nil && vLog.BlockHash != *blockHash {
log.Errorw("Block hash mismatch", "expected", blockHash.String(), "got", vLog.BlockHash.String())
return nil, nil, tracerr.Wrap(ErrBlockHashMismatchEvent)
return nil, tracerr.Wrap(ErrBlockHashMismatchEvent)
}
switch vLog.Topics[0] {
case logWDelayerDeposit:
var deposit WDelayerEventDeposit
err := c.contractAbi.UnpackIntoInterface(&deposit, "Deposit", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
deposit.Owner = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
deposit.Token = ethCommon.BytesToAddress(vLog.Topics[2].Bytes())
@@ -468,7 +475,7 @@ func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents,
var withdraw WDelayerEventWithdraw
err := c.contractAbi.UnpackIntoInterface(&withdraw, "Withdraw", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
withdraw.Token = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
withdraw.Owner = ethCommon.BytesToAddress(vLog.Topics[2].Bytes())
@@ -482,7 +489,7 @@ func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents,
var withdrawalDelay WDelayerEventNewWithdrawalDelay
err := c.contractAbi.UnpackIntoInterface(&withdrawalDelay, "NewWithdrawalDelay", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
wdelayerEvents.NewWithdrawalDelay = append(wdelayerEvents.NewWithdrawalDelay, withdrawalDelay)
@@ -490,7 +497,7 @@ func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents,
var escapeHatchWithdrawal WDelayerEventEscapeHatchWithdrawal
err := c.contractAbi.UnpackIntoInterface(&escapeHatchWithdrawal, "EscapeHatchWithdrawal", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
escapeHatchWithdrawal.Who = ethCommon.BytesToAddress(vLog.Topics[1].Bytes())
escapeHatchWithdrawal.To = ethCommon.BytesToAddress(vLog.Topics[2].Bytes())
@@ -501,7 +508,7 @@ func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents,
var emergencyCouncil WDelayerEventNewEmergencyCouncil
err := c.contractAbi.UnpackIntoInterface(&emergencyCouncil, "NewEmergencyCouncil", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
wdelayerEvents.NewEmergencyCouncil = append(wdelayerEvents.NewEmergencyCouncil, emergencyCouncil)
@@ -509,10 +516,10 @@ func (c *WDelayerClient) WDelayerEventsByBlock(blockNum int64) (*WDelayerEvents,
var governanceAddress WDelayerEventNewHermezGovernanceAddress
err := c.contractAbi.UnpackIntoInterface(&governanceAddress, "NewHermezGovernanceAddress", vLog.Data)
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(err)
}
wdelayerEvents.NewHermezGovernanceAddress = append(wdelayerEvents.NewHermezGovernanceAddress, governanceAddress)
}
}
return &wdelayerEvents, blockHash, nil
return &wdelayerEvents, nil
}

View File

@@ -52,7 +52,7 @@ func TestWDelayerSetHermezGovernanceAddress(t *testing.T) {
assert.Equal(t, &auxAddressConst, auxAddress)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, auxAddressConst, wdelayerEvents.NewHermezGovernanceAddress[0].NewHermezGovernanceAddress)
_, err = wdelayerClientAux.WDelayerTransferGovernance(governanceAddressConst)
@@ -81,7 +81,7 @@ func TestWDelayerSetEmergencyCouncil(t *testing.T) {
assert.Equal(t, &auxAddressConst, auxAddress)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, auxAddressConst, wdelayerEvents.NewEmergencyCouncil[0].NewEmergencyCouncil)
_, err = wdelayerClientAux.WDelayerTransferEmergencyCouncil(emergencyCouncilAddressConst)
@@ -110,7 +110,7 @@ func TestWDelayerChangeWithdrawalDelay(t *testing.T) {
assert.Equal(t, newWithdrawalDelay, withdrawalDelay)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, uint64(newWithdrawalDelay), wdelayerEvents.NewWithdrawalDelay[0].WithdrawalDelay)
}
@@ -124,7 +124,7 @@ func TestWDelayerDeposit(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, amount, wdelayerEvents.Deposit[0].Amount)
assert.Equal(t, auxAddressConst, wdelayerEvents.Deposit[0].Owner)
@@ -150,7 +150,7 @@ func TestWDelayerWithdrawal(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, amount, wdelayerEvents.Withdraw[0].Amount)
assert.Equal(t, auxAddressConst, wdelayerEvents.Withdraw[0].Owner)
@@ -166,7 +166,7 @@ func TestWDelayerSecondDeposit(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, amount, wdelayerEvents.Deposit[0].Amount)
assert.Equal(t, auxAddressConst, wdelayerEvents.Deposit[0].Owner)
@@ -181,7 +181,7 @@ func TestWDelayerEnableEmergencyMode(t *testing.T) {
assert.Equal(t, true, emergencyMode)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
auxEvent := new(WDelayerEventEmergencyModeEnabled)
assert.Equal(t, auxEvent, &wdelayerEvents.EmergencyModeEnabled[0])
@@ -210,7 +210,7 @@ func TestWDelayerEscapeHatchWithdrawal(t *testing.T) {
require.Nil(t, err)
currentBlockNum, err := wdelayerClientTest.client.EthLastBlock()
require.Nil(t, err)
wdelayerEvents, _, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum)
wdelayerEvents, err := wdelayerClientTest.WDelayerEventsByBlock(currentBlockNum, nil)
require.Nil(t, err)
assert.Equal(t, tokenHEZAddressConst, wdelayerEvents.EscapeHatchWithdrawal[0].Token)
assert.Equal(t, governanceAddressConst, wdelayerEvents.EscapeHatchWithdrawal[0].To)

3
go.mod
View File

@@ -6,6 +6,7 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/dghubble/sling v1.3.0
github.com/ethereum/go-ethereum v1.9.25
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 // indirect
github.com/getkin/kin-openapi v0.22.0
github.com/gin-contrib/cors v1.3.1
github.com/gin-gonic/gin v1.5.0
@@ -24,6 +25,8 @@ require (
github.com/mitchellh/mapstructure v1.3.0
github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351
github.com/russross/meddler v1.0.0
github.com/sirupsen/logrus v1.5.0 // indirect
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect
github.com/stretchr/testify v1.6.1
github.com/urfave/cli/v2 v2.2.0
go.uber.org/zap v1.16.0

10
go.sum
View File

@@ -24,6 +24,8 @@ github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/uf
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
@@ -84,6 +86,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/cp v1.1.1 h1:nCb6ZLdB7NRaqsm91JtQTAme2SKJzXVsdPIPkyJr1MU=
github.com/cespare/cp v1.1.1/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.0.1-0.20190104013014-3767db7a7e18/go.mod h1:HD5P3vAIAh+Y2GAxg0PrPN1P8WkepXGpjbUPDHJqqKM=
@@ -169,6 +173,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc h1:jtW8jbpkO4YirRSyepBOH8E+2HEw6/hKkBvFPwhUN8c=
github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c=
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
@@ -599,6 +605,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q=
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
@@ -617,6 +625,8 @@ github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DM
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 h1:ju5UTwk5Odtm4trrY+4Ca4RMj5OyXbmVeDAVad2T0Jw=
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 h1:gIlAHnH1vJb5vwEjIp5kBj/eu99p/bl0Ay2goiPe5xE=
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570/go.mod h1:8OR4w3TdeIHIh1g6EMY5p0gVNOovcWC+1vpc7naMuAw=
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 h1:njlZPzLwU639dk2kqnCPPv+wNjq7Xb6EfUxe/oX0/NM=

View File

@@ -2,6 +2,7 @@ package node
import (
"context"
"errors"
"fmt"
"net/http"
"sync"
@@ -171,8 +172,12 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
return nil, tracerr.Wrap(fmt.Errorf("cfg.StateDB.Keep = %v < %v, which is unsafe",
cfg.StateDB.Keep, safeStateDBKeep))
}
stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, cfg.StateDB.Keep,
statedb.TypeSynchronizer, 32)
stateDB, err := statedb.NewStateDB(statedb.Config{
Path: cfg.StateDB.Path,
Keep: cfg.StateDB.Keep,
Type: statedb.TypeSynchronizer,
NLevels: statedb.MaxNLevels,
})
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -256,14 +261,37 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
MaxFeeTx: common.RollupConstMaxFeeIdxCoordinator,
MaxL1Tx: common.RollupConstMaxL1Tx,
}
verifierIdx, err := scConsts.Rollup.FindVerifierIdx(
cfg.Coordinator.Circuit.MaxTx,
cfg.Coordinator.Circuit.NLevels,
)
if err != nil {
return nil, tracerr.Wrap(err)
var verifierIdx int
if cfg.Coordinator.Debug.RollupVerifierIndex == nil {
verifierIdx, err = scConsts.Rollup.FindVerifierIdx(
cfg.Coordinator.Circuit.MaxTx,
cfg.Coordinator.Circuit.NLevels,
)
if err != nil {
return nil, tracerr.Wrap(err)
}
log.Infow("Found verifier that matches circuit config", "verifierIdx", verifierIdx)
} else {
verifierIdx = *cfg.Coordinator.Debug.RollupVerifierIndex
log.Infow("Using debug verifier index from config", "verifierIdx", verifierIdx)
if verifierIdx >= len(scConsts.Rollup.Verifiers) {
return nil, tracerr.Wrap(
fmt.Errorf("verifierIdx (%v) >= "+
"len(scConsts.Rollup.Verifiers) (%v)",
verifierIdx, len(scConsts.Rollup.Verifiers)))
}
verifier := scConsts.Rollup.Verifiers[verifierIdx]
if verifier.MaxTx != cfg.Coordinator.Circuit.MaxTx ||
verifier.NLevels != cfg.Coordinator.Circuit.NLevels {
return nil, tracerr.Wrap(
fmt.Errorf("Circuit config and verifier params don't match. "+
"circuit.MaxTx = %v, circuit.NLevels = %v, "+
"verifier.MaxTx = %v, verifier.NLevels = %v",
cfg.Coordinator.Circuit.MaxTx, cfg.Coordinator.Circuit.NLevels,
verifier.MaxTx, verifier.NLevels,
))
}
}
log.Infow("Found verifier that matches circuit config", "verifierIdx", verifierIdx)
coord, err = coordinator.NewCoordinator(
coordinator.Config{
@@ -274,6 +302,9 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
EthClientAttempts: cfg.Coordinator.EthClient.Attempts,
EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration,
EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,
EthTxResendTimeout: cfg.Coordinator.EthClient.TxResendTimeout.Duration,
MaxGasPrice: cfg.Coordinator.EthClient.MaxGasPrice,
TxManagerCheckInterval: cfg.Coordinator.EthClient.CheckLoopInterval.Duration,
DebugBatchPath: cfg.Coordinator.Debug.BatchPath,
Purger: coordinator.PurgerCfg{
@@ -460,11 +491,15 @@ func (n *Node) handleNewBlock(ctx context.Context, stats *synchronizer.Stats, va
if stats.Synced() {
if err := n.nodeAPI.api.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatch),
common.BatchNum(stats.Eth.LastBatchNum),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("API.UpdateNetworkInfo", "err", err)
}
} else {
n.nodeAPI.api.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
}
}
@@ -546,7 +581,11 @@ func (n *Node) StartSynchronizer() {
if n.ctx.Err() != nil {
continue
}
log.Errorw("Synchronizer.Sync", "err", err)
if errors.Is(err, eth.ErrBlockHashMismatchEvent) {
log.Warnw("Synchronizer.Sync", "err", err)
} else {
log.Errorw("Synchronizer.Sync", "err", err)
}
}
}
}

View File

@@ -25,12 +25,12 @@ type Stats struct {
Updated time.Time
FirstBlockNum int64
LastBlock common.Block
LastBatch int64
LastBatchNum int64
}
Sync struct {
Updated time.Time
LastBlock common.Block
LastBatch int64
LastBatch common.Batch
// LastL1BatchBlock is the last ethereum block in which an
// l1Batch was forged
LastL1BatchBlock int64
@@ -77,13 +77,13 @@ func (s *StatsHolder) UpdateCurrentNextSlot(current *common.Slot, next *common.S
}
// UpdateSync updates the synchronizer stats
func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.BatchNum,
func (s *StatsHolder) UpdateSync(lastBlock *common.Block, lastBatch *common.Batch,
lastL1BatchBlock *int64, lastForgeL1TxsNum *int64) {
now := time.Now()
s.rw.Lock()
s.Sync.LastBlock = *lastBlock
if lastBatch != nil {
s.Sync.LastBatch = int64(*lastBatch)
s.Sync.LastBatch = *lastBatch
}
if lastL1BatchBlock != nil {
s.Sync.LastL1BatchBlock = *lastL1BatchBlock
@@ -105,16 +105,16 @@ func (s *StatsHolder) UpdateEth(ethClient eth.ClientInterface) error {
lastBlock, err := ethClient.EthBlockByNumber(context.TODO(), -1)
if err != nil {
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err))
}
lastBatch, err := ethClient.RollupLastForgedBatch()
lastBatchNum, err := ethClient.RollupLastForgedBatch()
if err != nil {
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("RollupLastForgedBatch: %w", err))
}
s.rw.Lock()
s.Eth.Updated = now
s.Eth.LastBlock = *lastBlock
s.Eth.LastBatch = lastBatch
s.Eth.LastBatchNum = lastBatchNum
s.rw.Unlock()
return nil
}
@@ -139,6 +139,10 @@ func (s *StatsHolder) CopyStats() *Stats {
sCopy.Sync.Auction.NextSlot.DefaultSlotBid =
common.CopyBigInt(s.Sync.Auction.NextSlot.DefaultSlotBid)
}
if s.Sync.LastBatch.StateRoot != nil {
sCopy.Sync.LastBatch.StateRoot =
common.CopyBigInt(s.Sync.LastBatch.StateRoot)
}
s.rw.RUnlock()
return &sCopy
}
@@ -152,9 +156,9 @@ func (s *StatsHolder) blocksPerc() float64 {
float64(s.Eth.LastBlock.Num-(s.Eth.FirstBlockNum-1))
}
func (s *StatsHolder) batchesPerc(batchNum int64) float64 {
func (s *StatsHolder) batchesPerc(batchNum common.BatchNum) float64 {
return float64(batchNum) * 100.0 /
float64(s.Eth.LastBatch)
float64(s.Eth.LastBatchNum)
}
// StartBlockNums sets the first block used to start tracking the smart
@@ -195,15 +199,16 @@ type Config struct {
// Synchronizer implements the Synchronizer type
type Synchronizer struct {
ethClient eth.ClientInterface
consts SCConsts
historyDB *historydb.HistoryDB
stateDB *statedb.StateDB
cfg Config
initVars SCVariables
startBlockNum int64
vars SCVariables
stats *StatsHolder
ethClient eth.ClientInterface
consts SCConsts
historyDB *historydb.HistoryDB
stateDB *statedb.StateDB
cfg Config
initVars SCVariables
startBlockNum int64
vars SCVariables
stats *StatsHolder
resetStateFailed bool
}
// NewSynchronizer creates a new Synchronizer
@@ -328,23 +333,25 @@ func (s *Synchronizer) setSlotCoordinator(slot *common.Slot) error {
return nil
}
// firstBatchBlockNum is the blockNum of first batch in that block, if any
func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*common.Slot, error) {
slot := common.Slot{
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment,
}
// updateCurrentSlot updates the slot with information of the current slot.
// The information abouth which coordinator is allowed to forge is only updated
// when we are Synced.
// hasBatch is true when the last synced block contained at least one batch.
func (s *Synchronizer) updateCurrentSlot(slot *common.Slot, reset bool, hasBatch bool) error {
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock.Num + 1
slotNum := s.consts.Auction.SlotNum(blockNum)
firstBatchBlockNum := s.stats.Sync.LastBlock.Num
if reset {
// Using this query only to know if there
dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum)
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
return nil, tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err))
return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err))
} else if tracerr.Unwrap(err) == sql.ErrNoRows {
firstBatchBlockNum = nil
hasBatch = false
} else {
firstBatchBlockNum = &dbFirstBatchBlockNum
hasBatch = true
firstBatchBlockNum = dbFirstBatchBlockNum
}
slot.ForgerCommitment = false
} else if slotNum > slot.SlotNum {
@@ -355,11 +362,11 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
// If Synced, update the current coordinator
if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum {
if err := s.setSlotCoordinator(&slot); err != nil {
return nil, tracerr.Wrap(err)
if err := s.setSlotCoordinator(slot); err != nil {
return tracerr.Wrap(err)
}
if firstBatchBlockNum != nil &&
s.consts.Auction.RelativeBlock(*firstBatchBlockNum) <
if hasBatch &&
s.consts.Auction.RelativeBlock(firstBatchBlockNum) <
int64(s.vars.Auction.SlotDeadline) {
slot.ForgerCommitment = true
}
@@ -368,57 +375,61 @@ func (s *Synchronizer) getCurrentSlot(reset bool, firstBatchBlockNum *int64) (*c
// BEGIN SANITY CHECK
canForge, err := s.ethClient.AuctionCanForge(slot.Forger, blockNum)
if err != nil {
return nil, tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err))
}
if !canForge {
return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
"differs from smart contract: %+v", slot))
}
// END SANITY CHECK
}
return &slot, nil
return nil
}
func (s *Synchronizer) getNextSlot() (*common.Slot, error) {
// updateNextSlot updates the slot with information of the next slot.
// The information abouth which coordinator is allowed to forge is only updated
// when we are Synced.
func (s *Synchronizer) updateNextSlot(slot *common.Slot) error {
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock.Num + 1
slotNum := s.consts.Auction.SlotNum(blockNum) + 1
slot := common.Slot{
SlotNum: slotNum,
ForgerCommitment: false,
}
slot.SlotNum = slotNum
slot.ForgerCommitment = false
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
// If Synced, update the current coordinator
if s.stats.Synced() && blockNum >= s.consts.Auction.GenesisBlockNum {
if err := s.setSlotCoordinator(&slot); err != nil {
return nil, tracerr.Wrap(err)
if err := s.setSlotCoordinator(slot); err != nil {
return tracerr.Wrap(err)
}
// TODO: Remove this SANITY CHECK once this code is tested enough
// BEGIN SANITY CHECK
canForge, err := s.ethClient.AuctionCanForge(slot.Forger, slot.StartBlock)
if err != nil {
return nil, tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("AuctionCanForge: %w", err))
}
if !canForge {
return nil, tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
return tracerr.Wrap(fmt.Errorf("Synchronized value of forger address for closed slot "+
"differs from smart contract: %+v", slot))
}
// END SANITY CHECK
}
return &slot, nil
return nil
}
func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, firstBatchBlockNum *int64) error {
current, err := s.getCurrentSlot(reset, firstBatchBlockNum)
if err != nil {
// updateCurrentNextSlotIfSync updates the current and next slot. Information
// about forger address that is allowed to forge is only updated if we are
// Synced.
func (s *Synchronizer) updateCurrentNextSlotIfSync(reset bool, hasBatch bool) error {
current := s.stats.Sync.Auction.CurrentSlot
next := s.stats.Sync.Auction.NextSlot
if err := s.updateCurrentSlot(&current, reset, hasBatch); err != nil {
return tracerr.Wrap(err)
}
next, err := s.getNextSlot()
if err != nil {
if err := s.updateNextSlot(&next); err != nil {
return tracerr.Wrap(err)
}
s.stats.UpdateCurrentNextSlot(current, next)
s.stats.UpdateCurrentNextSlot(&current, &next)
return nil
}
@@ -445,8 +456,10 @@ func (s *Synchronizer) init() error {
lastBlock = lastSavedBlock
}
if err := s.resetState(lastBlock); err != nil {
s.resetStateFailed = true
return tracerr.Wrap(err)
}
s.resetStateFailed = false
log.Infow("Sync init block",
"syncLastBlock", s.stats.Sync.LastBlock,
@@ -455,23 +468,44 @@ func (s *Synchronizer) init() error {
"ethLastBlock", s.stats.Eth.LastBlock,
)
log.Infow("Sync init batch",
"syncLastBatch", s.stats.Sync.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch),
"ethLastBatch", s.stats.Eth.LastBatch,
"syncLastBatch", s.stats.Sync.LastBatch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(s.stats.Sync.LastBatch.BatchNum),
"ethLastBatch", s.stats.Eth.LastBatchNum,
)
return nil
}
func (s *Synchronizer) resetIntermediateState() error {
lastBlock, err := s.historyDB.GetLastBlock()
if tracerr.Unwrap(err) == sql.ErrNoRows {
lastBlock = &common.Block{}
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBlock: %w", err))
}
if err := s.resetState(lastBlock); err != nil {
s.resetStateFailed = true
return tracerr.Wrap(fmt.Errorf("resetState at block %v: %w", lastBlock.Num, err))
}
s.resetStateFailed = false
return nil
}
// Sync2 attems to synchronize an ethereum block starting from lastSavedBlock.
// If lastSavedBlock is nil, the lastSavedBlock value is obtained from de DB.
// If a block is synched, it will be returned and also stored in the DB. If a
// reorg is detected, the number of discarded blocks will be returned and no
// synchronization will be made.
// TODO: Be smart about locking: only lock during the read/write operations
func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) (*common.BlockData, *int64, error) {
func (s *Synchronizer) Sync2(ctx context.Context,
lastSavedBlock *common.Block) (blockData *common.BlockData, discarded *int64, err error) {
if s.resetStateFailed {
if err := s.resetIntermediateState(); err != nil {
return nil, nil, tracerr.Wrap(err)
}
}
var nextBlockNum int64 // next block number to sync
if lastSavedBlock == nil {
var err error
// Get lastSavedBlock from History DB
lastSavedBlock, err = s.historyDB.GetLastBlock()
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
@@ -497,7 +531,7 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
if tracerr.Unwrap(err) == ethereum.NotFound {
return nil, nil, nil
} else if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("EthBlockByNumber: %w", err))
}
log.Debugf("ethBlock: num: %v, parent: %v, hash: %v",
ethBlock.Num, ethBlock.ParentHash.String(), ethBlock.Hash.String())
@@ -527,6 +561,20 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
}
}
defer func() {
// If there was an error during sync, reset to the last block
// in the historyDB because the historyDB is written last in
// the Sync method and is the source of consistency. This
// allows reseting the stateDB in the case a batch was
// processed but the historyDB block was not committed due to an
// error.
if err != nil {
if err2 := s.resetIntermediateState(); err2 != nil {
log.Errorw("sync revert", "err", err2)
}
}
}()
// Get data from the rollup contract
rollupData, err := s.rollupSync(ethBlock)
if err != nil {
@@ -564,14 +612,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
}
// Group all the block data into the structs to save into HistoryDB
blockData := common.BlockData{
blockData = &common.BlockData{
Block: *ethBlock,
Rollup: *rollupData,
Auction: *auctionData,
WDelayer: *wDelayerData,
}
err = s.historyDB.AddBlockSCData(&blockData)
err = s.historyDB.AddBlockSCData(blockData)
if err != nil {
return nil, nil, tracerr.Wrap(err)
}
@@ -589,14 +637,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
}
}
s.stats.UpdateSync(ethBlock,
&rollupData.Batches[batchesLen-1].Batch.BatchNum,
&rollupData.Batches[batchesLen-1].Batch,
lastL1BatchBlock, lastForgeL1TxsNum)
}
var firstBatchBlockNum *int64
hasBatch := false
if len(rollupData.Batches) > 0 {
firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum
hasBatch = true
}
if err := s.updateCurrentNextSlotIfSync(false, firstBatchBlockNum); err != nil {
if err := s.updateCurrentNextSlotIfSync(false, hasBatch); err != nil {
return nil, nil, tracerr.Wrap(err)
}
@@ -608,12 +656,12 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
for _, batchData := range rollupData.Batches {
log.Debugw("Synced batch",
"syncLastBatch", batchData.Batch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(int64(batchData.Batch.BatchNum)),
"ethLastBatch", s.stats.Eth.LastBatch,
"syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum),
"ethLastBatch", s.stats.Eth.LastBatchNum,
)
}
return &blockData, nil, nil
return blockData, nil, nil
}
// reorg manages a reorg, updating History and State DB as needed. Keeps
@@ -645,14 +693,15 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) {
log.Debugw("Discarding blocks", "total", total, "from", uncleBlock.Num, "to", block.Num+1)
// Set History DB and State DB to the correct state
err := s.historyDB.Reorg(block.Num)
if err != nil {
if err := s.historyDB.Reorg(block.Num); err != nil {
return 0, tracerr.Wrap(err)
}
if err := s.resetState(block); err != nil {
s.resetStateFailed = true
return 0, tracerr.Wrap(err)
}
s.resetStateFailed = false
return block.Num, nil
}
@@ -661,15 +710,15 @@ func getInitialVariables(ethClient eth.ClientInterface,
consts *SCConsts) (*SCVariables, *StartBlockNums, error) {
rollupInit, rollupInitBlock, err := ethClient.RollupEventInit()
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("RollupEventInit: %w", err))
}
auctionInit, auctionInitBlock, err := ethClient.AuctionEventInit()
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("AuctionEventInit: %w", err))
}
wDelayerInit, wDelayerInitBlock, err := ethClient.WDelayerEventInit()
if err != nil {
return nil, nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(fmt.Errorf("WDelayerEventInit: %w", err))
}
rollupVars := rollupInit.RollupVariables()
auctionVars := auctionInit.AuctionVariables(consts.Auction.InitialMinimalBidding)
@@ -714,12 +763,17 @@ func (s *Synchronizer) resetState(block *common.Block) error {
s.vars.WDelayer = *wDelayer
}
batchNum, err := s.historyDB.GetLastBatchNum()
batch, err := s.historyDB.GetLastBatch()
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err))
}
if tracerr.Unwrap(err) == sql.ErrNoRows {
batchNum = 0
batch = &common.Batch{}
}
err = s.stateDB.Reset(batch.BatchNum)
if err != nil {
return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err))
}
lastL1BatchBlockNum, err := s.historyDB.GetLastL1BatchBlockNum()
@@ -739,14 +793,9 @@ func (s *Synchronizer) resetState(block *common.Block) error {
lastForgeL1TxsNum = &n
}
err = s.stateDB.Reset(batchNum)
if err != nil {
return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err))
}
s.stats.UpdateSync(block, batch, &lastL1BatchBlockNum, lastForgeL1TxsNum)
s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum)
if err := s.updateCurrentNextSlotIfSync(true, nil); err != nil {
if err := s.updateCurrentNextSlotIfSync(true, false); err != nil {
return tracerr.Wrap(err)
}
return nil
@@ -761,19 +810,14 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
// Get rollup events in the block, and make sure the block hash matches
// the expected one.
rollupEvents, blockHash, err := s.ethClient.RollupEventsByBlock(blockNum)
rollupEvents, err := s.ethClient.RollupEventsByBlock(blockNum, &ethBlock.Hash)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("RollupEventsByBlock: %w", err))
}
// No events in this block
if blockHash == nil {
if rollupEvents == nil {
return &rollupData, nil
}
if *blockHash != ethBlock.Hash {
log.Errorw("Block hash mismatch in Rollup events", "expected", ethBlock.Hash.String(),
"got", blockHash.String())
return nil, tracerr.Wrap(eth.ErrBlockHashMismatchEvent)
}
var nextForgeL1TxsNum int64 // forgeL1TxsNum for the next L1Batch
nextForgeL1TxsNumPtr, err := s.historyDB.GetLastL1TxsNum()
@@ -801,7 +845,7 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
forgeBatchArgs, sender, err := s.ethClient.RollupForgeBatchArgs(evtForgeBatch.EthTxHash,
evtForgeBatch.L1UserTxsLen)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("RollupForgeBatchArgs: %w", err))
}
batchNum := common.BatchNum(evtForgeBatch.BatchNum)
@@ -884,6 +928,16 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
if err != nil {
return nil, tracerr.Wrap(err)
}
if s.stateDB.CurrentBatch() != batchNum {
return nil, tracerr.Wrap(fmt.Errorf("stateDB.BatchNum (%v) != "+
"evtForgeBatch.BatchNum = (%v)",
s.stateDB.CurrentBatch(), batchNum))
}
if s.stateDB.MT.Root().BigInt().Cmp(forgeBatchArgs.NewStRoot) != 0 {
return nil, tracerr.Wrap(fmt.Errorf("stateDB.MTRoot (%v) != "+
"forgeBatchArgs.NewStRoot (%v)",
s.stateDB.MT.Root().BigInt(), forgeBatchArgs.NewStRoot))
}
// Transform processed PoolL2 txs to L2 and store in BatchData
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
@@ -1066,19 +1120,14 @@ func (s *Synchronizer) auctionSync(ethBlock *common.Block) (*common.AuctionData,
var auctionData = common.NewAuctionData()
// Get auction events in the block
auctionEvents, blockHash, err := s.ethClient.AuctionEventsByBlock(blockNum)
auctionEvents, err := s.ethClient.AuctionEventsByBlock(blockNum, &ethBlock.Hash)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("AuctionEventsByBlock: %w", err))
}
// No events in this block
if blockHash == nil {
if auctionEvents == nil {
return &auctionData, nil
}
if *blockHash != ethBlock.Hash {
log.Errorw("Block hash mismatch in Auction events", "expected", ethBlock.Hash.String(),
"got", blockHash.String())
return nil, tracerr.Wrap(eth.ErrBlockHashMismatchEvent)
}
// Get bids
for _, evt := range auctionEvents.NewBid {
@@ -1168,19 +1217,14 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat
wDelayerData := common.NewWDelayerData()
// Get wDelayer events in the block
wDelayerEvents, blockHash, err := s.ethClient.WDelayerEventsByBlock(blockNum)
wDelayerEvents, err := s.ethClient.WDelayerEventsByBlock(blockNum, &ethBlock.Hash)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("WDelayerEventsByBlock: %w", err))
}
// No events in this block
if blockHash == nil {
if wDelayerEvents == nil {
return &wDelayerData, nil
}
if *blockHash != ethBlock.Hash {
log.Errorw("Block hash mismatch in WDelayer events", "expected", ethBlock.Hash.String(),
"got", blockHash.String())
return nil, tracerr.Wrap(eth.ErrBlockHashMismatchEvent)
}
for _, evt := range wDelayerEvents.Deposit {
wDelayerData.Deposits = append(wDelayerData.Deposits, common.WDelayerTransfer{

View File

@@ -307,7 +307,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) {
require.NoError(t, err)
deleteme = append(deleteme, dir)
stateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32)
stateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeSynchronizer, NLevels: 32})
require.NoError(t, err)
// Init History DB

View File

@@ -44,7 +44,7 @@ func TestDebugAPI(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.Nil(t, err)
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: statedb.TypeSynchronizer, NLevels: 32})
require.Nil(t, err)
err = sdb.MakeCheckpoint() // Make a checkpoint to increment the batchNum
require.Nil(t, err)

View File

@@ -1116,15 +1116,20 @@ func (c *Client) RollupConstants() (*common.RollupConstants, error) {
}
// RollupEventsByBlock returns the events in a block that happened in the Rollup Smart Contract
func (c *Client) RollupEventsByBlock(blockNum int64) (*eth.RollupEvents, *ethCommon.Hash, error) {
func (c *Client) RollupEventsByBlock(blockNum int64,
blockHash *ethCommon.Hash) (*eth.RollupEvents, error) {
c.rw.RLock()
defer c.rw.RUnlock()
block, ok := c.blocks[blockNum]
if !ok {
return nil, nil, tracerr.Wrap(fmt.Errorf("Block %v doesn't exist", blockNum))
return nil, tracerr.Wrap(fmt.Errorf("Block %v doesn't exist", blockNum))
}
return &block.Rollup.Events, &block.Eth.Hash, nil
if blockHash != nil && *blockHash != block.Eth.Hash {
return nil, tracerr.Wrap(fmt.Errorf("Hash mismatch, requested %v got %v",
blockHash, block.Eth.Hash))
}
return &block.Rollup.Events, nil
}
// RollupEventInit returns the initialize event with its corresponding block number
@@ -1573,15 +1578,20 @@ func (c *Client) AuctionConstants() (*common.AuctionConstants, error) {
}
// AuctionEventsByBlock returns the events in a block that happened in the Auction Smart Contract
func (c *Client) AuctionEventsByBlock(blockNum int64) (*eth.AuctionEvents, *ethCommon.Hash, error) {
func (c *Client) AuctionEventsByBlock(blockNum int64,
blockHash *ethCommon.Hash) (*eth.AuctionEvents, error) {
c.rw.RLock()
defer c.rw.RUnlock()
block, ok := c.blocks[blockNum]
if !ok {
return nil, nil, tracerr.Wrap(fmt.Errorf("Block %v doesn't exist", blockNum))
return nil, tracerr.Wrap(fmt.Errorf("Block %v doesn't exist", blockNum))
}
return &block.Auction.Events, &block.Eth.Hash, nil
if blockHash != nil && *blockHash != block.Eth.Hash {
return nil, tracerr.Wrap(fmt.Errorf("Hash mismatch, requested %v got %v",
blockHash, block.Eth.Hash))
}
return &block.Auction.Events, nil
}
// AuctionEventInit returns the initialize event with its corresponding block number
@@ -1789,15 +1799,20 @@ func (c *Client) WDelayerEscapeHatchWithdrawal(to, token ethCommon.Address, amou
}
// WDelayerEventsByBlock returns the events in a block that happened in the WDelayer Contract
func (c *Client) WDelayerEventsByBlock(blockNum int64) (*eth.WDelayerEvents, *ethCommon.Hash, error) {
func (c *Client) WDelayerEventsByBlock(blockNum int64,
blockHash *ethCommon.Hash) (*eth.WDelayerEvents, error) {
c.rw.RLock()
defer c.rw.RUnlock()
block, ok := c.blocks[blockNum]
if !ok {
return nil, nil, tracerr.Wrap(fmt.Errorf("Block %v doesn't exist", blockNum))
return nil, tracerr.Wrap(fmt.Errorf("Block %v doesn't exist", blockNum))
}
return &block.WDelayer.Events, &block.Eth.Hash, nil
if blockHash != nil && *blockHash != block.Eth.Hash {
return nil, tracerr.Wrap(fmt.Errorf("Hash mismatch, requested %v got %v",
blockHash, block.Eth.Hash))
}
return &block.WDelayer.Events, nil
}
// WDelayerConstants returns the Constants of the WDelayer Contract

View File

@@ -130,7 +130,7 @@ func TestClientAuction(t *testing.T) {
blockNum, err := c.EthLastBlock()
require.Nil(t, err)
auctionEvents, _, err := c.AuctionEventsByBlock(blockNum)
auctionEvents, err := c.AuctionEventsByBlock(blockNum, nil)
require.Nil(t, err)
assert.Equal(t, 2, len(auctionEvents.NewBid))
}
@@ -171,7 +171,7 @@ func TestClientRollup(t *testing.T) {
blockNum, err := c.EthLastBlock()
require.Nil(t, err)
rollupEvents, _, err := c.RollupEventsByBlock(blockNum)
rollupEvents, err := c.RollupEventsByBlock(blockNum, nil)
require.Nil(t, err)
assert.Equal(t, N, len(rollupEvents.L1UserTx))
assert.Equal(t, 1, len(rollupEvents.AddToken))
@@ -192,7 +192,7 @@ func TestClientRollup(t *testing.T) {
blockNumA, err := c.EthLastBlock()
require.Nil(t, err)
rollupEvents, hashA, err := c.RollupEventsByBlock(blockNumA)
rollupEvents, err = c.RollupEventsByBlock(blockNumA, nil)
require.Nil(t, err)
assert.Equal(t, 0, len(rollupEvents.L1UserTx))
assert.Equal(t, 0, len(rollupEvents.AddToken))
@@ -205,14 +205,14 @@ func TestClientRollup(t *testing.T) {
blockNumB, err := c.EthLastBlock()
require.Nil(t, err)
rollupEvents, hashB, err := c.RollupEventsByBlock(blockNumA)
rollupEventsB, err := c.RollupEventsByBlock(blockNumA, nil)
require.Nil(t, err)
assert.Equal(t, 0, len(rollupEvents.L1UserTx))
assert.Equal(t, 0, len(rollupEvents.AddToken))
assert.Equal(t, 0, len(rollupEvents.ForgeBatch))
assert.Equal(t, 0, len(rollupEventsB.L1UserTx))
assert.Equal(t, 0, len(rollupEventsB.AddToken))
assert.Equal(t, 0, len(rollupEventsB.ForgeBatch))
assert.Equal(t, blockNumA, blockNumB)
assert.NotEqual(t, hashA, hashB)
assert.NotEqual(t, rollupEvents, rollupEventsB)
// Forge again
rollupForgeBatchArgs0 := &eth.RollupForgeBatchArgs{
@@ -232,7 +232,7 @@ func TestClientRollup(t *testing.T) {
blockNum, err = c.EthLastBlock()
require.Nil(t, err)
rollupEvents, _, err = c.RollupEventsByBlock(blockNum)
rollupEvents, err = c.RollupEventsByBlock(blockNum, nil)
require.Nil(t, err)
rollupForgeBatchArgs1, sender, err := c.RollupForgeBatchArgs(rollupEvents.ForgeBatch[0].EthTxHash,

View File

@@ -80,7 +80,8 @@ func initTxSelector(t *testing.T, chainID uint16, hermezContractAddr ethCommon.A
dir, err := ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
syncStateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 0)
syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 0})
require.NoError(t, err)
txselDir, err := ioutil.TempDir("", "tmpTxSelDB")

View File

@@ -50,7 +50,7 @@ func initStateDB(t *testing.T, typ statedb.TypeStateDB) *statedb.StateDB {
require.NoError(t, err)
defer assert.Nil(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, typ, NLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128, Type: typ, NLevels: NLevels})
require.NoError(t, err)
return sdb
}

View File

@@ -127,7 +127,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
exits := make([]processedExit, nTx)
if tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeBatchBuilder {
tp.zki = common.NewZKInputs(tp.config.ChainID, tp.config.MaxTx, tp.config.MaxL1Tx,
tp.config.MaxFeeTx, tp.config.NLevels, (tp.s.CurrentBatch() + 1).BigInt())
tp.zki.OldLastIdx = tp.s.CurrentIdx().BigInt()
@@ -137,7 +137,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
// TBD if ExitTree is only in memory or stored in disk, for the moment
// is only needed in memory
if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder {
tmpDir, err := ioutil.TempDir("", "hermez-statedb-exittree")
if err != nil {
return nil, tracerr.Wrap(err)
@@ -166,7 +166,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
if err != nil {
return nil, tracerr.Wrap(err)
}
if tp.s.Typ == statedb.TypeSynchronizer {
if tp.s.Type() == statedb.TypeSynchronizer {
if createdAccount != nil {
createdAccounts = append(createdAccounts, *createdAccount)
l1usertxs[i].EffectiveFromIdx = createdAccount.Idx
@@ -195,7 +195,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
tp.zki.ISExitRoot[tp.i] = exitTree.Root().BigInt()
}
}
if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder {
if exitIdx != nil && exitTree != nil {
exits[tp.i] = processedExit{
exit: true,
@@ -217,7 +217,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
if exitIdx != nil {
log.Error("Unexpected Exit in L1CoordinatorTx")
}
if tp.s.Typ == statedb.TypeSynchronizer {
if tp.s.Type() == statedb.TypeSynchronizer {
if createdAccount != nil {
createdAccounts = append(createdAccounts, *createdAccount)
l1coordinatortxs[i].EffectiveFromIdx = createdAccount.Idx
@@ -276,7 +276,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
// collectedFees will contain the amount of fee collected for each
// TokenID
var collectedFees map[common.TokenID]*big.Int
if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder {
collectedFees = make(map[common.TokenID]*big.Int)
for tokenID := range coordIdxsMap {
collectedFees[tokenID] = big.NewInt(0)
@@ -317,7 +317,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
}
}
}
if tp.s.Typ == statedb.TypeSynchronizer || tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder {
if exitIdx != nil && exitTree != nil {
exits[tp.i] = processedExit{
exit: true,
@@ -401,7 +401,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
}
}
if tp.s.Typ == statedb.TypeTxSelector {
if tp.s.Type() == statedb.TypeTxSelector {
return nil, nil
}
@@ -436,8 +436,8 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
}
}
if tp.s.Typ == statedb.TypeSynchronizer {
// return exitInfos, createdAccounts and collectedFees, so Synchronizer will
if tp.s.Type() == statedb.TypeSynchronizer {
// retuTypeexitInfos, createdAccounts and collectedFees, so Synchronizer will
// be able to store it into HistoryDB for the concrete BatchNum
return &ProcessTxOutput{
ZKInputs: nil,
@@ -588,7 +588,7 @@ func (tp *TxProcessor) ProcessL1Tx(exitTree *merkletree.MerkleTree, tx *common.L
}
var createdAccount *common.Account
if tp.s.Typ == statedb.TypeSynchronizer &&
if tp.s.Type() == statedb.TypeSynchronizer &&
(tx.Type == common.TxTypeCreateAccountDeposit ||
tx.Type == common.TxTypeCreateAccountDepositTransfer) {
var err error
@@ -612,8 +612,8 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx,
var err error
// if tx.ToIdx==0, get toIdx by ToEthAddr or ToBJJ
if tx.ToIdx == common.Idx(0) && tx.AuxToIdx == common.Idx(0) {
if tp.s.Typ == statedb.TypeSynchronizer {
// this should never be reached
if tp.s.Type() == statedb.TypeSynchronizer {
// thisTypeould never be reached
log.Error("WARNING: In StateDB with Synchronizer mode L2.ToIdx can't be 0")
return nil, nil, false, tracerr.Wrap(fmt.Errorf("In StateDB with Synchronizer mode L2.ToIdx can't be 0"))
}
@@ -676,8 +676,8 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx,
}
// if StateDB type==TypeSynchronizer, will need to add Nonce
if tp.s.Typ == statedb.TypeSynchronizer {
// as type==TypeSynchronizer, always tx.ToIdx!=0
if tp.s.Type() == statedb.TypeSynchronizer {
// as tType==TypeSynchronizer, always tx.ToIdx!=0
acc, err := tp.s.GetAccount(tx.FromIdx)
if err != nil {
log.Errorw("GetAccount", "fromIdx", tx.FromIdx, "err", err)
@@ -889,8 +889,8 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx,
accumulated := tp.AccumulatedFees[accCoord.Idx]
accumulated.Add(accumulated, fee)
if tp.s.Typ == statedb.TypeSynchronizer ||
tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeSynchronizer ||
tp.s.Type() == statedb.TypeBatchBuilder {
collected := collectedFees[accCoord.TokenID]
collected.Add(collected, fee)
}
@@ -1094,8 +1094,8 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
accumulated := tp.AccumulatedFees[accCoord.Idx]
accumulated.Add(accumulated, fee)
if tp.s.Typ == statedb.TypeSynchronizer ||
tp.s.Typ == statedb.TypeBatchBuilder {
if tp.s.Type() == statedb.TypeSynchronizer ||
tp.s.Type() == statedb.TypeBatchBuilder {
collected := collectedFees[accCoord.TokenID]
collected.Add(collected, fee)
}

View File

@@ -36,7 +36,8 @@ func TestComputeEffectiveAmounts(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 32})
assert.NoError(t, err)
set := `
@@ -212,7 +213,8 @@ func TestProcessTxsBalances(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 32})
assert.NoError(t, err)
chainID := uint16(0)
@@ -358,7 +360,8 @@ func TestProcessTxsSynchronizer(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 32})
assert.NoError(t, err)
chainID := uint16(0)
@@ -489,7 +492,8 @@ func TestProcessTxsBatchBuilder(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: 32})
assert.NoError(t, err)
chainID := uint16(0)
@@ -580,7 +584,8 @@ func TestProcessTxsRootTestVectors(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: 32})
assert.NoError(t, err)
// same values than in the js test
@@ -631,7 +636,8 @@ func TestCreateAccountDepositMaxValue(t *testing.T) {
defer assert.NoError(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
assert.NoError(t, err)
users := txsets.GenerateJsUsers(t)
@@ -700,7 +706,8 @@ func initTestMultipleCoordIdxForTokenID(t *testing.T) (*TxProcessor, *til.Contex
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: 32})
assert.NoError(t, err)
chainID := uint16(1)
@@ -798,7 +805,8 @@ func TestTwoExits(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeSynchronizer, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 32})
assert.NoError(t, err)
chainID := uint16(1)
@@ -865,7 +873,8 @@ func TestTwoExits(t *testing.T) {
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir2))
sdb2, err := statedb.NewStateDB(dir2, 128, statedb.TypeSynchronizer, 32)
sdb2, err := statedb.NewStateDB(statedb.Config{Path: dir2, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 32})
assert.NoError(t, err)
tc = til.NewContext(chainID, common.RollupConstMaxL1UserTx)

View File

@@ -41,7 +41,8 @@ func TestZKInputsHashTestVector0(t *testing.T) {
require.NoError(t, err)
defer assert.Nil(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: 32})
require.NoError(t, err)
chainID := uint16(0)
@@ -90,7 +91,8 @@ func TestZKInputsHashTestVector1(t *testing.T) {
require.NoError(t, err)
defer assert.Nil(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, 32)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: 32})
require.NoError(t, err)
chainID := uint16(0)
@@ -149,7 +151,8 @@ func TestZKInputsEmpty(t *testing.T) {
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -266,7 +269,8 @@ func TestZKInputs0(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -322,7 +326,8 @@ func TestZKInputs1(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -385,7 +390,8 @@ func TestZKInputs2(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -456,7 +462,8 @@ func TestZKInputs3(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -527,7 +534,8 @@ func TestZKInputs4(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -598,7 +606,8 @@ func TestZKInputs5(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)
@@ -661,7 +670,8 @@ func TestZKInputs6(t *testing.T) {
defer assert.Nil(t, os.RemoveAll(dir))
nLevels := 16
sdb, err := statedb.NewStateDB(dir, 128, statedb.TypeBatchBuilder, nLevels)
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: nLevels})
require.NoError(t, err)
chainID := uint16(0)

View File

@@ -10,6 +10,7 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/kvdb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/log"
@@ -62,8 +63,14 @@ type TxSelector struct {
// NewTxSelector returns a *TxSelector
func NewTxSelector(coordAccount *CoordAccount, dbpath string,
synchronizerStateDB *statedb.StateDB, l2 *l2db.L2DB) (*TxSelector, error) {
localAccountsDB, err := statedb.NewLocalStateDB(dbpath, 128,
synchronizerStateDB, statedb.TypeTxSelector, 0) // without merkletree
localAccountsDB, err := statedb.NewLocalStateDB(
statedb.Config{
Path: dbpath,
Keep: kvdb.DefaultKeep,
Type: statedb.TypeTxSelector,
NLevels: 0,
},
synchronizerStateDB) // without merkletree
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -82,12 +89,8 @@ func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB {
// Reset tells the TxSelector to get it's internal AccountsDB
// from the required `batchNum`
func (txsel *TxSelector) Reset(batchNum common.BatchNum) error {
err := txsel.localAccountsDB.Reset(batchNum, true)
if err != nil {
return tracerr.Wrap(err)
}
return nil
func (txsel *TxSelector) Reset(batchNum common.BatchNum, fromSynchronizer bool) error {
return tracerr.Wrap(txsel.localAccountsDB.Reset(batchNum, fromSynchronizer))
}
func (txsel *TxSelector) getCoordIdx(tokenID common.TokenID) (common.Idx, error) {

View File

@@ -34,7 +34,8 @@ func initTest(t *testing.T, chainID uint16, hermezContractAddr ethCommon.Address
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
syncStateDB, err := statedb.NewStateDB(dir, 128, statedb.TypeTxSelector, 0)
syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeTxSelector, NLevels: 0})
require.NoError(t, err)
txselDir, err := ioutil.TempDir("", "tmpTxSelDB")