mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 03:16:45 +01:00
Merge pull request #332 from hermeznetwork/feature/integration21
Integrate purger to node
This commit is contained in:
@@ -257,3 +257,9 @@ func AccountFromBytes(b [32 * NLeafElems]byte) (*Account, error) {
|
|||||||
}
|
}
|
||||||
return &a, nil
|
return &a, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IdxNonce is a pair of Idx and Nonce representing an account
|
||||||
|
type IdxNonce struct {
|
||||||
|
Idx Idx `db:"idx"`
|
||||||
|
Nonce Nonce `db:"nonce"`
|
||||||
|
}
|
||||||
|
|||||||
@@ -35,13 +35,27 @@ type ServerProof struct {
|
|||||||
|
|
||||||
// Coordinator is the coordinator specific configuration.
|
// Coordinator is the coordinator specific configuration.
|
||||||
type Coordinator struct {
|
type Coordinator struct {
|
||||||
|
// ForgerAddress is the address under which this coordinator is forging
|
||||||
ForgerAddress ethCommon.Address `validate:"required"`
|
ForgerAddress ethCommon.Address `validate:"required"`
|
||||||
ForgeLoopInterval Duration `validate:"required"`
|
ForgeLoopInterval Duration `validate:"required"`
|
||||||
ConfirmBlocks int64 `validate:"required"`
|
// ConfirmBlocks is the number of confirmation blocks to wait for sent
|
||||||
L2DB struct {
|
// ethereum transactions before forgetting about them
|
||||||
|
ConfirmBlocks int64 `validate:"required"`
|
||||||
|
// L1BatchTimeoutPerc is the portion of the range before the L1Batch
|
||||||
|
// timeout that will trigger a schedule to forge an L1Batch
|
||||||
|
L1BatchTimeoutPerc float64 `validate:"required"`
|
||||||
|
L2DB struct {
|
||||||
SafetyPeriod common.BatchNum `validate:"required"`
|
SafetyPeriod common.BatchNum `validate:"required"`
|
||||||
MaxTxs uint32 `validate:"required"`
|
MaxTxs uint32 `validate:"required"`
|
||||||
TTL Duration `validate:"required"`
|
TTL Duration `validate:"required"`
|
||||||
|
// PurgeBatchDelay is the delay between batches to purge outdated transactions
|
||||||
|
PurgeBatchDelay int64 `validate:"required"`
|
||||||
|
// InvalidateBatchDelay is the delay between batches to mark invalid transactions
|
||||||
|
InvalidateBatchDelay int64 `validate:"required"`
|
||||||
|
// PurgeBlockDelay is the delay between blocks to purge outdated transactions
|
||||||
|
PurgeBlockDelay int64 `validate:"required"`
|
||||||
|
// InvalidateBlockDelay is the delay between blocks to mark invalid transactions
|
||||||
|
InvalidateBlockDelay int64 `validate:"required"`
|
||||||
} `validate:"required"`
|
} `validate:"required"`
|
||||||
TxSelector struct {
|
TxSelector struct {
|
||||||
Path string `validate:"required"`
|
Path string `validate:"required"`
|
||||||
@@ -56,10 +70,24 @@ type Coordinator struct {
|
|||||||
GasPriceDiv uint64 `validate:"required"`
|
GasPriceDiv uint64 `validate:"required"`
|
||||||
ReceiptTimeout Duration `validate:"required"`
|
ReceiptTimeout Duration `validate:"required"`
|
||||||
IntervalReceiptLoop Duration `validate:"required"`
|
IntervalReceiptLoop Duration `validate:"required"`
|
||||||
|
// IntervalCheckLoop is the waiting interval between receipt
|
||||||
|
// checks of ethereum transactions in the TxManager
|
||||||
|
IntervalCheckLoop Duration `validate:"required"`
|
||||||
|
// Attempts is the number of attempts to do an eth client RPC
|
||||||
|
// call before giving up
|
||||||
|
Attempts int `validate:"required"`
|
||||||
|
// AttemptsDelay is delay between attempts do do an eth client
|
||||||
|
// RPC call
|
||||||
|
AttemptsDelay Duration `validate:"required"`
|
||||||
} `validate:"required"`
|
} `validate:"required"`
|
||||||
API struct {
|
API struct {
|
||||||
Coordinator bool
|
Coordinator bool
|
||||||
} `validate:"required"`
|
} `validate:"required"`
|
||||||
|
Debug struct {
|
||||||
|
// BatchPath if set, specifies the path where batchInfo is stored
|
||||||
|
// in JSON in every step/update of the pipeline
|
||||||
|
BatchPath string
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node is the hermez node configuration.
|
// Node is the hermez node configuration.
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ type Config struct {
|
|||||||
// DebugBatchPath if set, specifies the path where batchInfo is stored
|
// DebugBatchPath if set, specifies the path where batchInfo is stored
|
||||||
// in JSON in every step/update of the pipeline
|
// in JSON in every step/update of the pipeline
|
||||||
DebugBatchPath string
|
DebugBatchPath string
|
||||||
|
Purger PurgerCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
|
func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
|
||||||
@@ -79,6 +80,7 @@ type Coordinator struct {
|
|||||||
|
|
||||||
pipeline *Pipeline
|
pipeline *Pipeline
|
||||||
|
|
||||||
|
purger *Purger
|
||||||
txManager *TxManager
|
txManager *TxManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,6 +105,14 @@ func NewCoordinator(cfg Config,
|
|||||||
cfg.EthClientAttempts))
|
cfg.EthClientAttempts))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
purger := Purger{
|
||||||
|
cfg: cfg.Purger,
|
||||||
|
lastPurgeBlock: 0,
|
||||||
|
lastPurgeBatch: 0,
|
||||||
|
lastInvalidateBlock: 0,
|
||||||
|
lastInvalidateBatch: 0,
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
c := Coordinator{
|
c := Coordinator{
|
||||||
pipelineBatchNum: -1,
|
pipelineBatchNum: -1,
|
||||||
@@ -117,6 +127,8 @@ func NewCoordinator(cfg Config,
|
|||||||
txSelector: txSelector,
|
txSelector: txSelector,
|
||||||
batchBuilder: batchBuilder,
|
batchBuilder: batchBuilder,
|
||||||
|
|
||||||
|
purger: &purger,
|
||||||
|
|
||||||
// ethClient: ethClient,
|
// ethClient: ethClient,
|
||||||
|
|
||||||
msgCh: make(chan interface{}),
|
msgCh: make(chan interface{}),
|
||||||
@@ -130,16 +142,18 @@ func NewCoordinator(cfg Config,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
|
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
|
||||||
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB,
|
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector,
|
||||||
c.txSelector, c.batchBuilder, c.txManager, c.provers, &c.consts)
|
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MsgSyncStats indicates an update to the Synchronizer stats
|
// MsgSyncBlock indicates an update to the Synchronizer stats
|
||||||
type MsgSyncStats struct {
|
type MsgSyncBlock struct {
|
||||||
Stats synchronizer.Stats
|
Stats synchronizer.Stats
|
||||||
|
Batches []common.BatchData
|
||||||
}
|
}
|
||||||
|
|
||||||
// MsgSyncSCVars indicates an update to Smart Contract Vars
|
// MsgSyncSCVars indicates an update to Smart Contract Vars
|
||||||
|
// TODO: Move this to MsgSyncBlock and remove MsgSyncSCVars
|
||||||
type MsgSyncSCVars struct {
|
type MsgSyncSCVars struct {
|
||||||
Rollup *common.RollupVariables
|
Rollup *common.RollupVariables
|
||||||
Auction *common.AuctionVariables
|
Auction *common.AuctionVariables
|
||||||
@@ -186,7 +200,9 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronizer.Stats) error {
|
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
|
||||||
|
stats := &msg.Stats
|
||||||
|
// batches := msg.Batches
|
||||||
if !stats.Synced() {
|
if !stats.Synced() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -218,6 +234,29 @@ func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronize
|
|||||||
c.pipeline = nil
|
c.pipeline = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if c.pipeline == nil {
|
||||||
|
// Mark invalid in Pool due to forged L2Txs
|
||||||
|
// for _, batch := range batches {
|
||||||
|
// if err := poolMarkInvalidOldNoncesFromL2Txs(c.l2DB,
|
||||||
|
// idxsNonceFromL2Txs(batch.L2Txs), batch.Batch.BatchNum); err != nil {
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) {
|
||||||
|
if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
|
||||||
|
stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -254,12 +293,11 @@ func (c *Coordinator) Start() {
|
|||||||
return
|
return
|
||||||
case msg := <-c.msgCh:
|
case msg := <-c.msgCh:
|
||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case MsgSyncStats:
|
case MsgSyncBlock:
|
||||||
stats := msg.Stats
|
if err := c.handleMsgSyncBlock(c.ctx, &msg); common.IsErrDone(err) {
|
||||||
if err := c.handleMsgSyncStats(c.ctx, &stats); common.IsErrDone(err) {
|
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Errorw("Coordinator.handleMsgSyncStats error", "err", err)
|
log.Errorw("Coordinator.handleMsgSyncBlock error", "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case MsgSyncReorg:
|
case MsgSyncReorg:
|
||||||
@@ -522,6 +560,7 @@ type Pipeline struct {
|
|||||||
l2DB *l2db.L2DB
|
l2DB *l2db.L2DB
|
||||||
txSelector *txselector.TxSelector
|
txSelector *txselector.TxSelector
|
||||||
batchBuilder *batchbuilder.BatchBuilder
|
batchBuilder *batchbuilder.BatchBuilder
|
||||||
|
purger *Purger
|
||||||
|
|
||||||
stats synchronizer.Stats
|
stats synchronizer.Stats
|
||||||
statsCh chan synchronizer.Stats
|
statsCh chan synchronizer.Stats
|
||||||
@@ -538,6 +577,7 @@ func NewPipeline(ctx context.Context,
|
|||||||
l2DB *l2db.L2DB,
|
l2DB *l2db.L2DB,
|
||||||
txSelector *txselector.TxSelector,
|
txSelector *txselector.TxSelector,
|
||||||
batchBuilder *batchbuilder.BatchBuilder,
|
batchBuilder *batchbuilder.BatchBuilder,
|
||||||
|
purger *Purger,
|
||||||
txManager *TxManager,
|
txManager *TxManager,
|
||||||
provers []prover.Client,
|
provers []prover.Client,
|
||||||
scConsts *synchronizer.SCConsts,
|
scConsts *synchronizer.SCConsts,
|
||||||
@@ -563,6 +603,7 @@ func NewPipeline(ctx context.Context,
|
|||||||
batchBuilder: batchBuilder,
|
batchBuilder: batchBuilder,
|
||||||
provers: provers,
|
provers: provers,
|
||||||
proversPool: proversPool,
|
proversPool: proversPool,
|
||||||
|
purger: purger,
|
||||||
txManager: txManager,
|
txManager: txManager,
|
||||||
consts: *scConsts,
|
consts: *scConsts,
|
||||||
statsCh: make(chan synchronizer.Stats, queueLen),
|
statsCh: make(chan synchronizer.Stats, queueLen),
|
||||||
@@ -679,7 +720,12 @@ func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID {
|
|||||||
// circuit inputs to the proof server.
|
// circuit inputs to the proof server.
|
||||||
func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
||||||
// remove transactions from the pool that have been there for too long
|
// remove transactions from the pool that have been there for too long
|
||||||
err := p.l2DB.Purge(common.BatchNum(p.stats.Sync.LastBatch))
|
_, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
|
||||||
|
p.stats.Sync.LastBlock.Num, int64(batchNum))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
_, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, tracerr.Wrap(err)
|
return nil, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
@@ -721,11 +767,11 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
|
|||||||
return nil, tracerr.Wrap(err)
|
return nil, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run purger to invalidate transactions that become invalid beause of
|
// Invalidate transactions that become invalid beause of
|
||||||
// the poolL2Txs selected. Will mark as invalid the txs that have a
|
// the poolL2Txs selected. Will mark as invalid the txs that have a
|
||||||
// (fromIdx, nonce) which already appears in the selected txs (includes
|
// (fromIdx, nonce) which already appears in the selected txs (includes
|
||||||
// all the nonces smaller than the current one)
|
// all the nonces smaller than the current one)
|
||||||
err = p.purgeInvalidDueToL2TxsSelection(poolL2Txs)
|
err = poolMarkInvalidOldNoncesFromL2Txs(p.l2DB, idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, tracerr.Wrap(err)
|
return nil, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
@@ -784,10 +830,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pipeline) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error {
|
|
||||||
return nil // TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Pipeline) shouldL1L2Batch() bool {
|
func (p *Pipeline) shouldL1L2Batch() bool {
|
||||||
// Take the lastL1BatchBlockNum as the biggest between the last
|
// Take the lastL1BatchBlockNum as the biggest between the last
|
||||||
// scheduled one, and the synchronized one.
|
// scheduled one, and the synchronized one.
|
||||||
|
|||||||
@@ -79,28 +79,29 @@ func newTestModules(t *testing.T) (*historydb.HistoryDB, *l2db.L2DB,
|
|||||||
|
|
||||||
var err error
|
var err error
|
||||||
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
|
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
deleteme = append(deleteme, syncDBPath)
|
deleteme = append(deleteme, syncDBPath)
|
||||||
syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels)
|
syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels)
|
||||||
assert.Nil(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
pass := os.Getenv("POSTGRES_PASS")
|
pass := os.Getenv("POSTGRES_PASS")
|
||||||
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
|
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
|
test.WipeDB(db)
|
||||||
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour)
|
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour)
|
||||||
historyDB := historydb.NewHistoryDB(db)
|
historyDB := historydb.NewHistoryDB(db)
|
||||||
|
|
||||||
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")
|
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
deleteme = append(deleteme, txSelDBPath)
|
deleteme = append(deleteme, txSelDBPath)
|
||||||
txsel, err := txselector.NewTxSelector(txSelDBPath, syncSdb, l2DB, 10, 10, 10)
|
txsel, err := txselector.NewTxSelector(txSelDBPath, syncSdb, l2DB, 10, 10, 10)
|
||||||
assert.Nil(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB")
|
batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
deleteme = append(deleteme, batchBuilderDBPath)
|
deleteme = append(deleteme, batchBuilderDBPath)
|
||||||
bb, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncSdb, nil, 0, uint64(nLevels))
|
bb, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncSdb, nil, 0, uint64(nLevels))
|
||||||
assert.Nil(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0)
|
// l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0)
|
||||||
|
|
||||||
@@ -124,7 +125,7 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
|
|||||||
historyDB, l2DB, txsel, bb := newTestModules(t)
|
historyDB, l2DB, txsel, bb := newTestModules(t)
|
||||||
|
|
||||||
debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch")
|
debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
deleteme = append(deleteme, debugBatchPath)
|
deleteme = append(deleteme, debugBatchPath)
|
||||||
|
|
||||||
conf := Config{
|
conf := Config{
|
||||||
@@ -150,7 +151,7 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
|
|||||||
}
|
}
|
||||||
coord, err := NewCoordinator(conf, historyDB, l2DB, txsel, bb, serverProofs,
|
coord, err := NewCoordinator(conf, historyDB, l2DB, txsel, bb, serverProofs,
|
||||||
ethClient, scConsts, initSCVars)
|
ethClient, scConsts, initSCVars)
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
return coord
|
return coord
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,11 +166,11 @@ func TestCoordinatorFlow(t *testing.T) {
|
|||||||
|
|
||||||
// Bid for slot 2 and 4
|
// Bid for slot 2 and 4
|
||||||
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
|
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
|
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
_, err = ethClient.AuctionBidSimple(4, big.NewInt(9999))
|
_, err = ethClient.AuctionBidSimple(4, big.NewInt(9999))
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
coord.Start()
|
coord.Start()
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
@@ -177,9 +178,9 @@ func TestCoordinatorFlow(t *testing.T) {
|
|||||||
waitForSlot := func(slot int64) {
|
waitForSlot := func(slot int64) {
|
||||||
for {
|
for {
|
||||||
blockNum, err := ethClient.EthLastBlock()
|
blockNum, err := ethClient.EthLastBlock()
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
nextBlockSlot, err := ethClient.AuctionGetSlotNumber(blockNum + 1)
|
nextBlockSlot, err := ethClient.AuctionGetSlotNumber(blockNum + 1)
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
if nextBlockSlot == slot {
|
if nextBlockSlot == slot {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -191,7 +192,7 @@ func TestCoordinatorFlow(t *testing.T) {
|
|||||||
stats.Eth.LastBatch = ethClient.CtlLastForgedBatch()
|
stats.Eth.LastBatch = ethClient.CtlLastForgedBatch()
|
||||||
stats.Sync.LastBatch = stats.Eth.LastBatch
|
stats.Sync.LastBatch = stats.Eth.LastBatch
|
||||||
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
|
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
if canForge {
|
if canForge {
|
||||||
// fmt.Println("DBG canForge")
|
// fmt.Println("DBG canForge")
|
||||||
stats.Sync.Auction.CurrentSlot.Forger = forger
|
stats.Sync.Auction.CurrentSlot.Forger = forger
|
||||||
@@ -207,7 +208,7 @@ func TestCoordinatorFlow(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
coord.SendMsg(MsgSyncStats{
|
coord.SendMsg(MsgSyncBlock{
|
||||||
Stats: stats,
|
Stats: stats,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -254,16 +255,16 @@ func TestCoordCanForge(t *testing.T) {
|
|||||||
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
|
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
|
||||||
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
|
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
|
||||||
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
|
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
|
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
bootCoord := newTestCoordinator(t, bootForger, ethClient, ethClientSetup)
|
bootCoord := newTestCoordinator(t, bootForger, ethClient, ethClientSetup)
|
||||||
|
|
||||||
assert.Equal(t, forger, coord.cfg.ForgerAddress)
|
assert.Equal(t, forger, coord.cfg.ForgerAddress)
|
||||||
assert.Equal(t, bootForger, bootCoord.cfg.ForgerAddress)
|
assert.Equal(t, bootForger, bootCoord.cfg.ForgerAddress)
|
||||||
ethBootCoord, err := ethClient.AuctionGetBootCoordinator()
|
ethBootCoord, err := ethClient.AuctionGetBootCoordinator()
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
assert.Equal(t, &bootForger, ethBootCoord)
|
assert.Equal(t, &bootForger, ethBootCoord)
|
||||||
|
|
||||||
var stats synchronizer.Stats
|
var stats synchronizer.Stats
|
||||||
@@ -300,11 +301,12 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
|
|||||||
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
|
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
|
||||||
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
|
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
|
||||||
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
|
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
|
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var stats synchronizer.Stats
|
var msg MsgSyncBlock
|
||||||
|
stats := &msg.Stats
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Slot 0. No bid, so the winner is the boot coordinator
|
// Slot 0. No bid, so the winner is the boot coordinator
|
||||||
@@ -312,8 +314,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
|
|||||||
stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum
|
stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum
|
||||||
stats.Sync.LastBlock = stats.Eth.LastBlock
|
stats.Sync.LastBlock = stats.Eth.LastBlock
|
||||||
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
||||||
assert.Equal(t, false, coord.canForge(&stats))
|
assert.Equal(t, false, coord.canForge(stats))
|
||||||
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
|
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
|
||||||
assert.Nil(t, coord.pipeline)
|
assert.Nil(t, coord.pipeline)
|
||||||
|
|
||||||
// Slot 0. No bid, and we reach the deadline, so anyone can forge
|
// Slot 0. No bid, and we reach the deadline, so anyone can forge
|
||||||
@@ -322,8 +324,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
|
|||||||
int64(ethClientSetup.AuctionVariables.SlotDeadline)
|
int64(ethClientSetup.AuctionVariables.SlotDeadline)
|
||||||
stats.Sync.LastBlock = stats.Eth.LastBlock
|
stats.Sync.LastBlock = stats.Eth.LastBlock
|
||||||
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
||||||
assert.Equal(t, true, coord.canForge(&stats))
|
assert.Equal(t, true, coord.canForge(stats))
|
||||||
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
|
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
|
||||||
assert.NotNil(t, coord.pipeline)
|
assert.NotNil(t, coord.pipeline)
|
||||||
|
|
||||||
// Slot 0. No bid, and we reach the deadline, so anyone can forge
|
// Slot 0. No bid, and we reach the deadline, so anyone can forge
|
||||||
@@ -332,8 +334,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
|
|||||||
int64(ethClientSetup.AuctionVariables.SlotDeadline) + 1
|
int64(ethClientSetup.AuctionVariables.SlotDeadline) + 1
|
||||||
stats.Sync.LastBlock = stats.Eth.LastBlock
|
stats.Sync.LastBlock = stats.Eth.LastBlock
|
||||||
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
||||||
assert.Equal(t, true, coord.canForge(&stats))
|
assert.Equal(t, true, coord.canForge(stats))
|
||||||
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
|
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
|
||||||
assert.NotNil(t, coord.pipeline)
|
assert.NotNil(t, coord.pipeline)
|
||||||
|
|
||||||
// Slot 0. No bid, so the winner is the boot coordinator
|
// Slot 0. No bid, so the winner is the boot coordinator
|
||||||
@@ -342,8 +344,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
|
|||||||
1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
|
1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
|
||||||
stats.Sync.LastBlock = stats.Eth.LastBlock
|
stats.Sync.LastBlock = stats.Eth.LastBlock
|
||||||
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
stats.Sync.Auction.CurrentSlot.Forger = bootForger
|
||||||
assert.Equal(t, false, coord.canForge(&stats))
|
assert.Equal(t, false, coord.canForge(stats))
|
||||||
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
|
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
|
||||||
assert.Nil(t, coord.pipeline)
|
assert.Nil(t, coord.pipeline)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
152
coordinator/purger.go
Normal file
152
coordinator/purger.go
Normal file
@@ -0,0 +1,152 @@
|
|||||||
|
package coordinator
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hermeznetwork/hermez-node/common"
|
||||||
|
"github.com/hermeznetwork/hermez-node/db/l2db"
|
||||||
|
"github.com/hermeznetwork/hermez-node/db/statedb"
|
||||||
|
"github.com/hermeznetwork/hermez-node/log"
|
||||||
|
"github.com/hermeznetwork/tracerr"
|
||||||
|
"github.com/iden3/go-merkletree/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PurgerCfg is the purger configuration
|
||||||
|
type PurgerCfg struct {
|
||||||
|
// PurgeBatchDelay is the delay between batches to purge outdated transactions
|
||||||
|
PurgeBatchDelay int64
|
||||||
|
// InvalidateBatchDelay is the delay between batches to mark invalid transactions
|
||||||
|
InvalidateBatchDelay int64
|
||||||
|
// PurgeBlockDelay is the delay between blocks to purge outdated transactions
|
||||||
|
PurgeBlockDelay int64
|
||||||
|
// InvalidateBlockDelay is the delay between blocks to mark invalid transactions
|
||||||
|
InvalidateBlockDelay int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purger manages cleanup of transactions in the pool
|
||||||
|
type Purger struct {
|
||||||
|
cfg PurgerCfg
|
||||||
|
lastPurgeBlock int64
|
||||||
|
lastPurgeBatch int64
|
||||||
|
lastInvalidateBlock int64
|
||||||
|
lastInvalidateBatch int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanPurge returns true if it's a good time to purge according to the
|
||||||
|
// configuration
|
||||||
|
func (p *Purger) CanPurge(blockNum, batchNum int64) bool {
|
||||||
|
if blockNum > p.lastPurgeBlock+p.cfg.PurgeBlockDelay {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if batchNum > p.lastPurgeBatch+p.cfg.PurgeBatchDelay {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// CanInvalidate returns true if it's a good time to invalidate according to
|
||||||
|
// the configuration
|
||||||
|
func (p *Purger) CanInvalidate(blockNum, batchNum int64) bool {
|
||||||
|
if blockNum > p.lastInvalidateBlock+p.cfg.InvalidateBlockDelay {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if batchNum > p.lastInvalidateBatch+p.cfg.InvalidateBatchDelay {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// PurgeMaybe purges txs if it's a good time to do so
|
||||||
|
func (p *Purger) PurgeMaybe(l2DB *l2db.L2DB, blockNum, batchNum int64) (bool, error) {
|
||||||
|
if !p.CanPurge(blockNum, batchNum) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
p.lastPurgeBlock = blockNum
|
||||||
|
p.lastPurgeBatch = batchNum
|
||||||
|
log.Debugw("Purger: purging l2txs in pool", "block", blockNum, "batch", batchNum)
|
||||||
|
err := l2DB.Purge(common.BatchNum(batchNum))
|
||||||
|
return true, tracerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// InvalidateMaybe invalidates txs if it's a good time to do so
|
||||||
|
func (p *Purger) InvalidateMaybe(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB,
|
||||||
|
blockNum, batchNum int64) (bool, error) {
|
||||||
|
if !p.CanInvalidate(blockNum, batchNum) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
p.lastInvalidateBlock = blockNum
|
||||||
|
p.lastInvalidateBatch = batchNum
|
||||||
|
log.Debugw("Purger: invalidating l2txs in pool", "block", blockNum, "batch", batchNum)
|
||||||
|
err := poolMarkInvalidOldNonces(l2DB, stateDB, common.BatchNum(batchNum))
|
||||||
|
return true, err
|
||||||
|
}
|
||||||
|
|
||||||
|
//nolint:unused,deadcode
|
||||||
|
func idxsNonceFromL2Txs(txs []common.L2Tx) []common.IdxNonce {
|
||||||
|
idxNonceMap := map[common.Idx]common.Nonce{}
|
||||||
|
for _, tx := range txs {
|
||||||
|
if nonce, ok := idxNonceMap[tx.FromIdx]; !ok {
|
||||||
|
idxNonceMap[tx.FromIdx] = tx.Nonce
|
||||||
|
} else if tx.Nonce > nonce {
|
||||||
|
idxNonceMap[tx.FromIdx] = tx.Nonce
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idxsNonce := make([]common.IdxNonce, 0, len(idxNonceMap))
|
||||||
|
for idx, nonce := range idxNonceMap {
|
||||||
|
idxsNonce = append(idxsNonce, common.IdxNonce{Idx: idx, Nonce: nonce})
|
||||||
|
}
|
||||||
|
return idxsNonce
|
||||||
|
}
|
||||||
|
|
||||||
|
func idxsNonceFromPoolL2Txs(txs []common.PoolL2Tx) []common.IdxNonce {
|
||||||
|
idxNonceMap := map[common.Idx]common.Nonce{}
|
||||||
|
for _, tx := range txs {
|
||||||
|
if nonce, ok := idxNonceMap[tx.FromIdx]; !ok {
|
||||||
|
idxNonceMap[tx.FromIdx] = tx.Nonce
|
||||||
|
} else if tx.Nonce > nonce {
|
||||||
|
idxNonceMap[tx.FromIdx] = tx.Nonce
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idxsNonce := make([]common.IdxNonce, 0, len(idxNonceMap))
|
||||||
|
for idx, nonce := range idxNonceMap {
|
||||||
|
idxsNonce = append(idxsNonce, common.IdxNonce{Idx: idx, Nonce: nonce})
|
||||||
|
}
|
||||||
|
return idxsNonce
|
||||||
|
}
|
||||||
|
|
||||||
|
// poolMarkInvalidOldNoncesFromL2Txs marks as invalid the txs in the pool that
|
||||||
|
// contain nonces equal or older to the highest nonce used in a forged l2Tx for
|
||||||
|
// the
|
||||||
|
// corresponding sender account
|
||||||
|
func poolMarkInvalidOldNoncesFromL2Txs(l2DB *l2db.L2DB,
|
||||||
|
idxsNonce []common.IdxNonce, batchNum common.BatchNum) error {
|
||||||
|
return l2DB.CheckNonces(idxsNonce, batchNum)
|
||||||
|
}
|
||||||
|
|
||||||
|
// poolMarkInvalidOldNonces marks as invalid txs in the pool that contain
|
||||||
|
// nonces equal or older to the nonce of the corresponding sender account
|
||||||
|
func poolMarkInvalidOldNonces(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB,
|
||||||
|
batchNum common.BatchNum) error {
|
||||||
|
idxs, err := l2DB.GetPendingUniqueFromIdxs()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
idxsNonce := make([]common.IdxNonce, len(idxs))
|
||||||
|
lastIdx, err := stateDB.GetIdx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for i, idx := range idxs {
|
||||||
|
acc, err := stateDB.GetAccount(idx)
|
||||||
|
if err != nil {
|
||||||
|
if tracerr.Unwrap(err) != db.ErrNotFound {
|
||||||
|
return err
|
||||||
|
} else if idx <= lastIdx {
|
||||||
|
return fmt.Errorf("account with idx %v not found: %w", idx, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idxsNonce[i].Idx = idx
|
||||||
|
idxsNonce[i].Nonce = acc.Nonce
|
||||||
|
}
|
||||||
|
return l2DB.CheckNonces(idxsNonce, batchNum)
|
||||||
|
}
|
||||||
3
coordinator/purger_test.go
Normal file
3
coordinator/purger_test.go
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
package coordinator
|
||||||
|
|
||||||
|
// TODO: Test purger functions
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
package l2db
|
package l2db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -242,38 +243,52 @@ func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) e
|
|||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces.
|
// GetPendingUniqueFromIdxs returns from all the pending transactions, the set
|
||||||
// The state of the affected txs will be changed from Pending -> Invalid
|
// of unique FromIdx
|
||||||
func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) (err error) {
|
func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.Idx, error) {
|
||||||
|
var idxs []common.Idx
|
||||||
|
rows, err := l2db.db.Query(`SELECT DISTINCT from_idx FROM tx_pool
|
||||||
|
WHERE state = $1;`, common.PoolL2TxStatePending)
|
||||||
|
if err != nil {
|
||||||
|
return nil, tracerr.Wrap(err)
|
||||||
|
}
|
||||||
|
var idx common.Idx
|
||||||
|
for rows.Next() {
|
||||||
|
err = rows.Scan(&idx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, tracerr.Wrap(err)
|
||||||
|
}
|
||||||
|
idxs = append(idxs, idx)
|
||||||
|
}
|
||||||
|
return idxs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var checkNoncesQuery = fmt.Sprintf(`
|
||||||
|
UPDATE tx_pool SET
|
||||||
|
state = '%s',
|
||||||
|
batch_num = %%d
|
||||||
|
FROM (VALUES
|
||||||
|
(NULL::::BIGINT, NULL::::BIGINT),
|
||||||
|
(:idx, :nonce)
|
||||||
|
) as updated_acc (idx, nonce)
|
||||||
|
WHERE tx_pool.from_idx = updated_acc.idx AND tx_pool.nonce <= updated_acc.nonce;
|
||||||
|
`, common.PoolL2TxStateInvalid)
|
||||||
|
|
||||||
|
// CheckNonces invalidate txs with nonces that are smaller or equal than their
|
||||||
|
// respective accounts nonces. The state of the affected txs will be changed
|
||||||
|
// from Pending to Invalid
|
||||||
|
func (l2db *L2DB) CheckNonces(updatedAccounts []common.IdxNonce, batchNum common.BatchNum) (err error) {
|
||||||
if len(updatedAccounts) == 0 {
|
if len(updatedAccounts) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
txn, err := l2db.db.Beginx()
|
// Fill the batch_num in the query with Sprintf because we are using a
|
||||||
if err != nil {
|
// named query which works with slices, and doens't handle an extra
|
||||||
|
// individual argument.
|
||||||
|
query := fmt.Sprintf(checkNoncesQuery, batchNum)
|
||||||
|
if _, err := sqlx.NamedQuery(l2db.db, query, updatedAccounts); err != nil {
|
||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
defer func() {
|
return nil
|
||||||
// Rollback the transaction if there was an error.
|
|
||||||
if err != nil {
|
|
||||||
db.Rollback(txn)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
for i := 0; i < len(updatedAccounts); i++ {
|
|
||||||
_, err = txn.Exec(
|
|
||||||
`UPDATE tx_pool
|
|
||||||
SET state = $1, batch_num = $2
|
|
||||||
WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`,
|
|
||||||
common.PoolL2TxStateInvalid,
|
|
||||||
batchNum,
|
|
||||||
common.PoolL2TxStatePending,
|
|
||||||
updatedAccounts[i].Idx,
|
|
||||||
updatedAccounts[i].Nonce,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return tracerr.Wrap(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return tracerr.Wrap(txn.Commit())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
|
// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
|
||||||
|
|||||||
@@ -334,12 +334,13 @@ func TestCheckNonces(t *testing.T) {
|
|||||||
poolL2Txs, err := generatePoolL2Txs()
|
poolL2Txs, err := generatePoolL2Txs()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
// Update Accounts currentNonce
|
// Update Accounts currentNonce
|
||||||
var updateAccounts []common.Account
|
var updateAccounts []common.IdxNonce
|
||||||
const currentNonce = common.Nonce(1)
|
const currentNonce = common.Nonce(1)
|
||||||
for i := range accs {
|
for i := range accs {
|
||||||
account := accs[i]
|
updateAccounts = append(updateAccounts, common.IdxNonce{
|
||||||
account.Nonce = common.Nonce(currentNonce)
|
Idx: accs[i].Idx,
|
||||||
updateAccounts = append(updateAccounts, account)
|
Nonce: common.Nonce(currentNonce),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
// Add txs to DB
|
// Add txs to DB
|
||||||
var invalidTxIDs []common.TxID
|
var invalidTxIDs []common.TxID
|
||||||
|
|||||||
@@ -303,7 +303,7 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
|
|||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
// idx is obtained from the statedb reset
|
// idx is obtained from the statedb reset
|
||||||
s.idx, err = s.getIdx()
|
s.idx, err = s.GetIdx()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1118,9 +1118,9 @@ func (s *StateDB) computeEffectiveAmounts(tx *common.L1Tx) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getIdx returns the stored Idx from the localStateDB, which is the last Idx
|
// GetIdx returns the stored Idx from the localStateDB, which is the last Idx
|
||||||
// used for an Account in the localStateDB.
|
// used for an Account in the localStateDB.
|
||||||
func (s *StateDB) getIdx() (common.Idx, error) {
|
func (s *StateDB) GetIdx() (common.Idx, error) {
|
||||||
idxBytes, err := s.DB().Get(keyidx)
|
idxBytes, err := s.DB().Get(keyidx)
|
||||||
if tracerr.Unwrap(err) == db.ErrNotFound {
|
if tracerr.Unwrap(err) == db.ErrNotFound {
|
||||||
return 0, nil
|
return 0, nil
|
||||||
|
|||||||
@@ -65,8 +65,8 @@ type EthereumConfig struct {
|
|||||||
CallGasLimit uint64
|
CallGasLimit uint64
|
||||||
DeployGasLimit uint64
|
DeployGasLimit uint64
|
||||||
GasPriceDiv uint64
|
GasPriceDiv uint64
|
||||||
ReceiptTimeout time.Duration // in seconds
|
ReceiptTimeout time.Duration
|
||||||
IntervalReceiptLoop time.Duration // in milliseconds
|
IntervalReceiptLoop time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// EthereumClient is an ethereum client to call Smart Contract methods and check blockchain information.
|
// EthereumClient is an ethereum client to call Smart Contract methods and check blockchain information.
|
||||||
|
|||||||
24
node/node.go
24
node/node.go
@@ -170,8 +170,19 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
|
|||||||
|
|
||||||
coord, err = coordinator.NewCoordinator(
|
coord, err = coordinator.NewCoordinator(
|
||||||
coordinator.Config{
|
coordinator.Config{
|
||||||
ForgerAddress: coordCfg.ForgerAddress,
|
ForgerAddress: coordCfg.ForgerAddress,
|
||||||
ConfirmBlocks: coordCfg.ConfirmBlocks,
|
ConfirmBlocks: coordCfg.ConfirmBlocks,
|
||||||
|
L1BatchTimeoutPerc: coordCfg.L1BatchTimeoutPerc,
|
||||||
|
EthClientAttempts: coordCfg.EthClient.Attempts,
|
||||||
|
EthClientAttemptsDelay: coordCfg.EthClient.AttemptsDelay.Duration,
|
||||||
|
TxManagerCheckInterval: coordCfg.EthClient.IntervalCheckLoop.Duration,
|
||||||
|
DebugBatchPath: coordCfg.Debug.BatchPath,
|
||||||
|
Purger: coordinator.PurgerCfg{
|
||||||
|
PurgeBatchDelay: coordCfg.L2DB.PurgeBatchDelay,
|
||||||
|
InvalidateBatchDelay: coordCfg.L2DB.InvalidateBatchDelay,
|
||||||
|
PurgeBlockDelay: coordCfg.L2DB.PurgeBlockDelay,
|
||||||
|
InvalidateBlockDelay: coordCfg.L2DB.InvalidateBlockDelay,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
historyDB,
|
historyDB,
|
||||||
l2DB,
|
l2DB,
|
||||||
@@ -327,7 +338,9 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
|
|||||||
// case: reorg
|
// case: reorg
|
||||||
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
|
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
|
||||||
if n.mode == ModeCoordinator {
|
if n.mode == ModeCoordinator {
|
||||||
n.coord.SendMsg(coordinator.MsgSyncReorg{})
|
n.coord.SendMsg(coordinator.MsgSyncReorg{
|
||||||
|
Stats: *stats,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if n.nodeAPI != nil {
|
if n.nodeAPI != nil {
|
||||||
rollup, auction, wDelayer := n.sync.SCVars()
|
rollup, auction, wDelayer := n.sync.SCVars()
|
||||||
@@ -351,8 +364,9 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
|
|||||||
WDelayer: blockData.WDelayer.Vars,
|
WDelayer: blockData.WDelayer.Vars,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
n.coord.SendMsg(coordinator.MsgSyncStats{
|
n.coord.SendMsg(coordinator.MsgSyncBlock{
|
||||||
Stats: *stats,
|
Stats: *stats,
|
||||||
|
Batches: blockData.Rollup.Batches,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if n.nodeAPI != nil {
|
if n.nodeAPI != nil {
|
||||||
|
|||||||
@@ -801,11 +801,16 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
|
|||||||
MaxTx: 512,
|
MaxTx: 512,
|
||||||
MaxL1Tx: 64,
|
MaxL1Tx: 64,
|
||||||
}
|
}
|
||||||
processTxsOut, err := s.stateDB.ProcessTxs(ptc, forgeBatchArgs.FeeIdxCoordinator, l1UserTxs,
|
processTxsOut, err := s.stateDB.ProcessTxs(ptc, forgeBatchArgs.FeeIdxCoordinator,
|
||||||
batchData.L1CoordinatorTxs, poolL2Txs)
|
l1UserTxs, batchData.L1CoordinatorTxs, poolL2Txs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, tracerr.Wrap(err)
|
return nil, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
// Set the BatchNum in the forged L1UserTxs
|
||||||
|
for i := range l1UserTxs {
|
||||||
|
l1UserTxs[i].BatchNum = &batchNum
|
||||||
|
}
|
||||||
|
batchData.L1UserTxs = l1UserTxs
|
||||||
|
|
||||||
// Set batchNum in exits
|
// Set batchNum in exits
|
||||||
for i := range processTxsOut.ExitInfos {
|
for i := range processTxsOut.ExitInfos {
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc
|
|||||||
assert.Equal(t, tokenCpy, dbToken)
|
assert.Equal(t, tokenCpy, dbToken)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check L1UserTxs
|
// Check submitted L1UserTxs
|
||||||
assert.Equal(t, len(block.Rollup.L1UserTxs), len(syncBlock.Rollup.L1UserTxs))
|
assert.Equal(t, len(block.Rollup.L1UserTxs), len(syncBlock.Rollup.L1UserTxs))
|
||||||
dbL1UserTxs, err := s.historyDB.GetAllL1UserTxs()
|
dbL1UserTxs, err := s.historyDB.GetAllL1UserTxs()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
@@ -85,9 +85,8 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc
|
|||||||
// because this value is set by StateDB.ProcessTxs.
|
// because this value is set by StateDB.ProcessTxs.
|
||||||
for i := range syncBlock.Rollup.L1UserTxs {
|
for i := range syncBlock.Rollup.L1UserTxs {
|
||||||
syncBlock.Rollup.L1UserTxs[i].BatchNum = block.Rollup.L1UserTxs[i].BatchNum
|
syncBlock.Rollup.L1UserTxs[i].BatchNum = block.Rollup.L1UserTxs[i].BatchNum
|
||||||
syncBlock.Rollup.L1UserTxs[i].EffectiveAmount = block.Rollup.L1UserTxs[i].EffectiveAmount
|
assert.Nil(t, syncBlock.Rollup.L1UserTxs[i].EffectiveDepositAmount)
|
||||||
syncBlock.Rollup.L1UserTxs[i].EffectiveDepositAmount =
|
assert.Nil(t, syncBlock.Rollup.L1UserTxs[i].EffectiveAmount)
|
||||||
block.Rollup.L1UserTxs[i].EffectiveDepositAmount
|
|
||||||
}
|
}
|
||||||
assert.Equal(t, block.Rollup.L1UserTxs, syncBlock.Rollup.L1UserTxs)
|
assert.Equal(t, block.Rollup.L1UserTxs, syncBlock.Rollup.L1UserTxs)
|
||||||
for _, tx := range block.Rollup.L1UserTxs {
|
for _, tx := range block.Rollup.L1UserTxs {
|
||||||
@@ -101,8 +100,13 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tx.EffectiveAmount = tx.Amount
|
// If the tx has been forged in this block, this will be
|
||||||
tx.EffectiveDepositAmount = tx.DepositAmount
|
// reflected in the DB, and so the Effective values will be
|
||||||
|
// already set
|
||||||
|
if dbTx.BatchNum != nil {
|
||||||
|
tx.EffectiveAmount = tx.Amount
|
||||||
|
tx.EffectiveDepositAmount = tx.DepositAmount
|
||||||
|
}
|
||||||
assert.Equal(t, &tx, dbTx) //nolint:gosec
|
assert.Equal(t, &tx, dbTx) //nolint:gosec
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,6 +141,7 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc
|
|||||||
batch.Batch.NumAccounts = len(batch.CreatedAccounts)
|
batch.Batch.NumAccounts = len(batch.CreatedAccounts)
|
||||||
|
|
||||||
// Test field by field to facilitate debugging of errors
|
// Test field by field to facilitate debugging of errors
|
||||||
|
assert.Equal(t, batch.L1UserTxs, syncBatch.L1UserTxs)
|
||||||
assert.Equal(t, batch.L1CoordinatorTxs, syncBatch.L1CoordinatorTxs)
|
assert.Equal(t, batch.L1CoordinatorTxs, syncBatch.L1CoordinatorTxs)
|
||||||
assert.Equal(t, batch.L2Txs, syncBatch.L2Txs)
|
assert.Equal(t, batch.L2Txs, syncBatch.L2Txs)
|
||||||
// In exit tree, we only check AccountIdx and Balance, because
|
// In exit tree, we only check AccountIdx and Balance, because
|
||||||
@@ -152,6 +157,26 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc
|
|||||||
assert.Equal(t, batch, syncBatch)
|
assert.Equal(t, batch, syncBatch)
|
||||||
assert.Equal(t, &batch.Batch, dbBatch) //nolint:gosec
|
assert.Equal(t, &batch.Batch, dbBatch) //nolint:gosec
|
||||||
|
|
||||||
|
// Check forged L1UserTxs from DB, and check effective values
|
||||||
|
// in sync output
|
||||||
|
for j, tx := range batch.L1UserTxs {
|
||||||
|
var dbTx *common.L1Tx
|
||||||
|
// Find tx in DB output
|
||||||
|
for _, _dbTx := range dbL1UserTxs {
|
||||||
|
if *tx.BatchNum == *_dbTx.BatchNum &&
|
||||||
|
tx.Position == _dbTx.Position {
|
||||||
|
dbTx = new(common.L1Tx)
|
||||||
|
*dbTx = _dbTx
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Equal(t, &tx, dbTx) //nolint:gosec
|
||||||
|
|
||||||
|
syncTx := &syncBlock.Rollup.Batches[i].L1UserTxs[j]
|
||||||
|
assert.Equal(t, syncTx.DepositAmount, syncTx.EffectiveDepositAmount)
|
||||||
|
assert.Equal(t, syncTx.Amount, syncTx.EffectiveAmount)
|
||||||
|
}
|
||||||
|
|
||||||
// Check L1CoordinatorTxs from DB
|
// Check L1CoordinatorTxs from DB
|
||||||
for _, tx := range batch.L1CoordinatorTxs {
|
for _, tx := range batch.L1CoordinatorTxs {
|
||||||
var dbTx *common.L1Tx
|
var dbTx *common.L1Tx
|
||||||
@@ -295,7 +320,7 @@ func TestSync(t *testing.T) {
|
|||||||
defer assert.Nil(t, os.RemoveAll(dir))
|
defer assert.Nil(t, os.RemoveAll(dir))
|
||||||
|
|
||||||
stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32)
|
stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32)
|
||||||
assert.Nil(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Init History DB
|
// Init History DB
|
||||||
pass := os.Getenv("POSTGRES_PASS")
|
pass := os.Getenv("POSTGRES_PASS")
|
||||||
@@ -432,8 +457,10 @@ func TestSync(t *testing.T) {
|
|||||||
ethAddTokens(blocks, client)
|
ethAddTokens(blocks, client)
|
||||||
|
|
||||||
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
|
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
|
||||||
assert.Nil(t, err)
|
assert.NoError(t, err)
|
||||||
tc.FillBlocksL1UserTxsBatchNum(blocks)
|
tc.FillBlocksL1UserTxsBatchNum(blocks)
|
||||||
|
err = tc.FillBlocksForgedL1UserTxs(blocks)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
// Add block data to the smart contracts
|
// Add block data to the smart contracts
|
||||||
ethAddBlocks(t, blocks, client, clientSetup)
|
ethAddBlocks(t, blocks, client, clientSetup)
|
||||||
@@ -459,6 +486,7 @@ func TestSync(t *testing.T) {
|
|||||||
assert.Equal(t, int64(2), stats.Sync.LastBlock.Num)
|
assert.Equal(t, int64(2), stats.Sync.LastBlock.Num)
|
||||||
|
|
||||||
checkSyncBlock(t, s, 2, &blocks[0], syncBlock)
|
checkSyncBlock(t, s, 2, &blocks[0], syncBlock)
|
||||||
|
|
||||||
// Block 3
|
// Block 3
|
||||||
|
|
||||||
syncBlock, discards, err = s.Sync2(ctx, nil)
|
syncBlock, discards, err = s.Sync2(ctx, nil)
|
||||||
@@ -606,7 +634,7 @@ func TestSync(t *testing.T) {
|
|||||||
ethAddTokens(blocks, client)
|
ethAddTokens(blocks, client)
|
||||||
|
|
||||||
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
|
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
|
||||||
assert.Nil(t, err)
|
assert.NoError(t, err)
|
||||||
tc.FillBlocksL1UserTxsBatchNum(blocks)
|
tc.FillBlocksL1UserTxsBatchNum(blocks)
|
||||||
|
|
||||||
// Add block data to the smart contracts
|
// Add block data to the smart contracts
|
||||||
|
|||||||
@@ -58,6 +58,11 @@ func NewTxSelector(dbpath string, synchronizerStateDB *statedb.StateDB, l2 *l2db
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LocalAccountsDB returns the LocalStateDB of the TxSelector
|
||||||
|
func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB {
|
||||||
|
return txsel.localAccountsDB
|
||||||
|
}
|
||||||
|
|
||||||
// Reset tells the TxSelector to get it's internal AccountsDB
|
// Reset tells the TxSelector to get it's internal AccountsDB
|
||||||
// from the required `batchNum`
|
// from the required `batchNum`
|
||||||
func (txsel *TxSelector) Reset(batchNum common.BatchNum) error {
|
func (txsel *TxSelector) Reset(batchNum common.BatchNum) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user