From 900d1fb6ce390efb4f1fab27c3306c26263285c5 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Thu, 3 Dec 2020 14:58:26 +0100 Subject: [PATCH] Integrate purger to node - Common - Add `IdxNonce` type used to track nonces in accounts to invalidate l2txs in the pool - Config - Update coordinator config will all the new configuration parameters used in the coordinator - Coordinator - Introduce the `Purger` to track how often to purge and do the job when needed according to a configuration. - Implement the methods to invalidate l2txs transactions due to l2txs selection in batches. For now these functions are not used in favour of the `Purger` methods, which check ALL the l2txs in the pool. - Call Invalidation and Purging methods of the purger both when the node is forging (in the pipeline when starting a new batch) and when the node is not forging (in coordinator when being notified about a new synced block) - L2DB: - Implement `GetPendingUniqueFromIdxs` to get all the unique idxs from pending transactions (used to get their nonces and then invalidate txs) - Redo `CheckNonces` with a single SQL query and using `common.IdxNonce` instead of `common.Account` - StateDB: - Expose GetIdx to check errors when invalidating pool txs - Synchronizer: - Test forged L1UserTxs processed by TxProcessor - Improve checks of Effective values - TxSelector: - Expose the internal LocalStateDB in order to check account nonces in the coordinator when not forging. --- common/account.go | 6 ++ config/config.go | 32 ++++++- coordinator/coordinator.go | 76 +++++++++++---- coordinator/coordinator_test.go | 62 ++++++------ coordinator/purger.go | 152 ++++++++++++++++++++++++++++++ coordinator/purger_test.go | 3 + db/l2db/l2db.go | 69 ++++++++------ db/l2db/l2db_test.go | 9 +- db/statedb/statedb.go | 2 +- db/statedb/txprocessors.go | 4 +- eth/ethereum.go | 4 +- node/node.go | 24 ++++- synchronizer/synchronizer.go | 9 +- synchronizer/synchronizer_test.go | 46 +++++++-- txselector/txselector.go | 5 + 15 files changed, 402 insertions(+), 101 deletions(-) create mode 100644 coordinator/purger.go create mode 100644 coordinator/purger_test.go diff --git a/common/account.go b/common/account.go index 3526320..fde0ec3 100644 --- a/common/account.go +++ b/common/account.go @@ -257,3 +257,9 @@ func AccountFromBytes(b [32 * NLeafElems]byte) (*Account, error) { } return &a, nil } + +// IdxNonce is a pair of Idx and Nonce representing an account +type IdxNonce struct { + Idx Idx `db:"idx"` + Nonce Nonce `db:"nonce"` +} diff --git a/config/config.go b/config/config.go index 34da119..71489a8 100644 --- a/config/config.go +++ b/config/config.go @@ -35,13 +35,27 @@ type ServerProof struct { // Coordinator is the coordinator specific configuration. type Coordinator struct { + // ForgerAddress is the address under which this coordinator is forging ForgerAddress ethCommon.Address `validate:"required"` ForgeLoopInterval Duration `validate:"required"` - ConfirmBlocks int64 `validate:"required"` - L2DB struct { + // ConfirmBlocks is the number of confirmation blocks to wait for sent + // ethereum transactions before forgetting about them + ConfirmBlocks int64 `validate:"required"` + // L1BatchTimeoutPerc is the portion of the range before the L1Batch + // timeout that will trigger a schedule to forge an L1Batch + L1BatchTimeoutPerc float64 `validate:"required"` + L2DB struct { SafetyPeriod common.BatchNum `validate:"required"` MaxTxs uint32 `validate:"required"` TTL Duration `validate:"required"` + // PurgeBatchDelay is the delay between batches to purge outdated transactions + PurgeBatchDelay int64 `validate:"required"` + // InvalidateBatchDelay is the delay between batches to mark invalid transactions + InvalidateBatchDelay int64 `validate:"required"` + // PurgeBlockDelay is the delay between blocks to purge outdated transactions + PurgeBlockDelay int64 `validate:"required"` + // InvalidateBlockDelay is the delay between blocks to mark invalid transactions + InvalidateBlockDelay int64 `validate:"required"` } `validate:"required"` TxSelector struct { Path string `validate:"required"` @@ -56,10 +70,24 @@ type Coordinator struct { GasPriceDiv uint64 `validate:"required"` ReceiptTimeout Duration `validate:"required"` IntervalReceiptLoop Duration `validate:"required"` + // IntervalCheckLoop is the waiting interval between receipt + // checks of ethereum transactions in the TxManager + IntervalCheckLoop Duration `validate:"required"` + // Attempts is the number of attempts to do an eth client RPC + // call before giving up + Attempts int `validate:"required"` + // AttemptsDelay is delay between attempts do do an eth client + // RPC call + AttemptsDelay Duration `validate:"required"` } `validate:"required"` API struct { Coordinator bool } `validate:"required"` + Debug struct { + // BatchPath if set, specifies the path where batchInfo is stored + // in JSON in every step/update of the pipeline + BatchPath string + } } // Node is the hermez node configuration. diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 8b1ef76..50e6ee5 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -45,6 +45,7 @@ type Config struct { // DebugBatchPath if set, specifies the path where batchInfo is stored // in JSON in every step/update of the pipeline DebugBatchPath string + Purger PurgerCfg } func (c *Config) debugBatchStore(batchInfo *BatchInfo) { @@ -79,6 +80,7 @@ type Coordinator struct { pipeline *Pipeline + purger *Purger txManager *TxManager } @@ -103,6 +105,14 @@ func NewCoordinator(cfg Config, cfg.EthClientAttempts)) } + purger := Purger{ + cfg: cfg.Purger, + lastPurgeBlock: 0, + lastPurgeBatch: 0, + lastInvalidateBlock: 0, + lastInvalidateBatch: 0, + } + ctx, cancel := context.WithCancel(context.Background()) c := Coordinator{ pipelineBatchNum: -1, @@ -117,6 +127,8 @@ func NewCoordinator(cfg Config, txSelector: txSelector, batchBuilder: batchBuilder, + purger: &purger, + // ethClient: ethClient, msgCh: make(chan interface{}), @@ -130,16 +142,18 @@ func NewCoordinator(cfg Config, } func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { - return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, - c.txSelector, c.batchBuilder, c.txManager, c.provers, &c.consts) + return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector, + c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts) } -// MsgSyncStats indicates an update to the Synchronizer stats -type MsgSyncStats struct { - Stats synchronizer.Stats +// MsgSyncBlock indicates an update to the Synchronizer stats +type MsgSyncBlock struct { + Stats synchronizer.Stats + Batches []common.BatchData } // MsgSyncSCVars indicates an update to Smart Contract Vars +// TODO: Move this to MsgSyncBlock and remove MsgSyncSCVars type MsgSyncSCVars struct { Rollup *common.RollupVariables Auction *common.AuctionVariables @@ -186,7 +200,9 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool { return false } -func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronizer.Stats) error { +func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error { + stats := &msg.Stats + // batches := msg.Batches if !stats.Synced() { return nil } @@ -218,6 +234,29 @@ func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronize c.pipeline = nil } } + if c.pipeline == nil { + // Mark invalid in Pool due to forged L2Txs + // for _, batch := range batches { + // if err := poolMarkInvalidOldNoncesFromL2Txs(c.l2DB, + // idxsNonceFromL2Txs(batch.L2Txs), batch.Batch.BatchNum); err != nil { + // return err + // } + // } + if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) { + if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil { + return err + } + } + _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(), + stats.Sync.LastBlock.Num, stats.Sync.LastBatch) + if err != nil { + return err + } + _, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch) + if err != nil { + return err + } + } return nil } @@ -254,12 +293,11 @@ func (c *Coordinator) Start() { return case msg := <-c.msgCh: switch msg := msg.(type) { - case MsgSyncStats: - stats := msg.Stats - if err := c.handleMsgSyncStats(c.ctx, &stats); common.IsErrDone(err) { + case MsgSyncBlock: + if err := c.handleMsgSyncBlock(c.ctx, &msg); common.IsErrDone(err) { continue } else if err != nil { - log.Errorw("Coordinator.handleMsgSyncStats error", "err", err) + log.Errorw("Coordinator.handleMsgSyncBlock error", "err", err) continue } case MsgSyncReorg: @@ -522,6 +560,7 @@ type Pipeline struct { l2DB *l2db.L2DB txSelector *txselector.TxSelector batchBuilder *batchbuilder.BatchBuilder + purger *Purger stats synchronizer.Stats statsCh chan synchronizer.Stats @@ -538,6 +577,7 @@ func NewPipeline(ctx context.Context, l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, + purger *Purger, txManager *TxManager, provers []prover.Client, scConsts *synchronizer.SCConsts, @@ -563,6 +603,7 @@ func NewPipeline(ctx context.Context, batchBuilder: batchBuilder, provers: provers, proversPool: proversPool, + purger: purger, txManager: txManager, consts: *scConsts, statsCh: make(chan synchronizer.Stats, queueLen), @@ -679,7 +720,12 @@ func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID { // circuit inputs to the proof server. func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { // remove transactions from the pool that have been there for too long - err := p.l2DB.Purge(common.BatchNum(p.stats.Sync.LastBatch)) + _, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), + p.stats.Sync.LastBlock.Num, int64(batchNum)) + if err != nil { + return nil, err + } + _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) if err != nil { return nil, tracerr.Wrap(err) } @@ -721,11 +767,11 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat return nil, tracerr.Wrap(err) } - // Run purger to invalidate transactions that become invalid beause of + // Invalidate transactions that become invalid beause of // the poolL2Txs selected. Will mark as invalid the txs that have a // (fromIdx, nonce) which already appears in the selected txs (includes // all the nonces smaller than the current one) - err = p.purgeInvalidDueToL2TxsSelection(poolL2Txs) + err = poolMarkInvalidOldNoncesFromL2Txs(p.l2DB, idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) if err != nil { return nil, tracerr.Wrap(err) } @@ -784,10 +830,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er return nil } -func (p *Pipeline) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error { - return nil // TODO -} - func (p *Pipeline) shouldL1L2Batch() bool { // Take the lastL1BatchBlockNum as the biggest between the last // scheduled one, and the synchronized one. diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 1e6413d..c5d4fd0 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -79,28 +79,29 @@ func newTestModules(t *testing.T) (*historydb.HistoryDB, *l2db.L2DB, var err error syncDBPath, err = ioutil.TempDir("", "tmpSyncDB") - require.Nil(t, err) + require.NoError(t, err) deleteme = append(deleteme, syncDBPath) syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels) - assert.Nil(t, err) + assert.NoError(t, err) pass := os.Getenv("POSTGRES_PASS") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") - require.Nil(t, err) + require.NoError(t, err) + test.WipeDB(db) l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour) historyDB := historydb.NewHistoryDB(db) txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB") - require.Nil(t, err) + require.NoError(t, err) deleteme = append(deleteme, txSelDBPath) txsel, err := txselector.NewTxSelector(txSelDBPath, syncSdb, l2DB, 10, 10, 10) - assert.Nil(t, err) + assert.NoError(t, err) batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB") - require.Nil(t, err) + require.NoError(t, err) deleteme = append(deleteme, batchBuilderDBPath) bb, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncSdb, nil, 0, uint64(nLevels)) - assert.Nil(t, err) + assert.NoError(t, err) // l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0) @@ -124,7 +125,7 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t historyDB, l2DB, txsel, bb := newTestModules(t) debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch") - require.Nil(t, err) + require.NoError(t, err) deleteme = append(deleteme, debugBatchPath) conf := Config{ @@ -150,7 +151,7 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t } coord, err := NewCoordinator(conf, historyDB, l2DB, txsel, bb, serverProofs, ethClient, scConsts, initSCVars) - require.Nil(t, err) + require.NoError(t, err) return coord } @@ -165,11 +166,11 @@ func TestCoordinatorFlow(t *testing.T) { // Bid for slot 2 and 4 _, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar") - require.Nil(t, err) + require.NoError(t, err) _, err = ethClient.AuctionBidSimple(2, big.NewInt(9999)) - require.Nil(t, err) + require.NoError(t, err) _, err = ethClient.AuctionBidSimple(4, big.NewInt(9999)) - require.Nil(t, err) + require.NoError(t, err) coord.Start() time.Sleep(1 * time.Second) @@ -177,9 +178,9 @@ func TestCoordinatorFlow(t *testing.T) { waitForSlot := func(slot int64) { for { blockNum, err := ethClient.EthLastBlock() - require.Nil(t, err) + require.NoError(t, err) nextBlockSlot, err := ethClient.AuctionGetSlotNumber(blockNum + 1) - require.Nil(t, err) + require.NoError(t, err) if nextBlockSlot == slot { break } @@ -191,7 +192,7 @@ func TestCoordinatorFlow(t *testing.T) { stats.Eth.LastBatch = ethClient.CtlLastForgedBatch() stats.Sync.LastBatch = stats.Eth.LastBatch canForge, err := ethClient.AuctionCanForge(forger, blockNum+1) - require.Nil(t, err) + require.NoError(t, err) if canForge { // fmt.Println("DBG canForge") stats.Sync.Auction.CurrentSlot.Forger = forger @@ -207,7 +208,7 @@ func TestCoordinatorFlow(t *testing.T) { require.NoError(t, err) } } - coord.SendMsg(MsgSyncStats{ + coord.SendMsg(MsgSyncBlock{ Stats: stats, }) } @@ -254,16 +255,16 @@ func TestCoordCanForge(t *testing.T) { ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) _, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar") - require.Nil(t, err) + require.NoError(t, err) _, err = ethClient.AuctionBidSimple(2, big.NewInt(9999)) - require.Nil(t, err) + require.NoError(t, err) bootCoord := newTestCoordinator(t, bootForger, ethClient, ethClientSetup) assert.Equal(t, forger, coord.cfg.ForgerAddress) assert.Equal(t, bootForger, bootCoord.cfg.ForgerAddress) ethBootCoord, err := ethClient.AuctionGetBootCoordinator() - require.Nil(t, err) + require.NoError(t, err) assert.Equal(t, &bootForger, ethBootCoord) var stats synchronizer.Stats @@ -300,11 +301,12 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) coord := newTestCoordinator(t, forger, ethClient, ethClientSetup) _, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar") - require.Nil(t, err) + require.NoError(t, err) _, err = ethClient.AuctionBidSimple(2, big.NewInt(9999)) - require.Nil(t, err) + require.NoError(t, err) - var stats synchronizer.Stats + var msg MsgSyncBlock + stats := &msg.Stats ctx := context.Background() // Slot 0. No bid, so the winner is the boot coordinator @@ -312,8 +314,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger - assert.Equal(t, false, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) + assert.Equal(t, false, coord.canForge(stats)) + require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg)) assert.Nil(t, coord.pipeline) // Slot 0. No bid, and we reach the deadline, so anyone can forge @@ -322,8 +324,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { int64(ethClientSetup.AuctionVariables.SlotDeadline) stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger - assert.Equal(t, true, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) + assert.Equal(t, true, coord.canForge(stats)) + require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg)) assert.NotNil(t, coord.pipeline) // Slot 0. No bid, and we reach the deadline, so anyone can forge @@ -332,8 +334,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { int64(ethClientSetup.AuctionVariables.SlotDeadline) + 1 stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger - assert.Equal(t, true, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) + assert.Equal(t, true, coord.canForge(stats)) + require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg)) assert.NotNil(t, coord.pipeline) // Slot 0. No bid, so the winner is the boot coordinator @@ -342,8 +344,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) { 1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot) stats.Sync.LastBlock = stats.Eth.LastBlock stats.Sync.Auction.CurrentSlot.Forger = bootForger - assert.Equal(t, false, coord.canForge(&stats)) - require.Nil(t, coord.handleMsgSyncStats(ctx, &stats)) + assert.Equal(t, false, coord.canForge(stats)) + require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg)) assert.Nil(t, coord.pipeline) } diff --git a/coordinator/purger.go b/coordinator/purger.go new file mode 100644 index 0000000..e900581 --- /dev/null +++ b/coordinator/purger.go @@ -0,0 +1,152 @@ +package coordinator + +import ( + "fmt" + + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db/l2db" + "github.com/hermeznetwork/hermez-node/db/statedb" + "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/tracerr" + "github.com/iden3/go-merkletree/db" +) + +// PurgerCfg is the purger configuration +type PurgerCfg struct { + // PurgeBatchDelay is the delay between batches to purge outdated transactions + PurgeBatchDelay int64 + // InvalidateBatchDelay is the delay between batches to mark invalid transactions + InvalidateBatchDelay int64 + // PurgeBlockDelay is the delay between blocks to purge outdated transactions + PurgeBlockDelay int64 + // InvalidateBlockDelay is the delay between blocks to mark invalid transactions + InvalidateBlockDelay int64 +} + +// Purger manages cleanup of transactions in the pool +type Purger struct { + cfg PurgerCfg + lastPurgeBlock int64 + lastPurgeBatch int64 + lastInvalidateBlock int64 + lastInvalidateBatch int64 +} + +// CanPurge returns true if it's a good time to purge according to the +// configuration +func (p *Purger) CanPurge(blockNum, batchNum int64) bool { + if blockNum > p.lastPurgeBlock+p.cfg.PurgeBlockDelay { + return true + } + if batchNum > p.lastPurgeBatch+p.cfg.PurgeBatchDelay { + return true + } + return false +} + +// CanInvalidate returns true if it's a good time to invalidate according to +// the configuration +func (p *Purger) CanInvalidate(blockNum, batchNum int64) bool { + if blockNum > p.lastInvalidateBlock+p.cfg.InvalidateBlockDelay { + return true + } + if batchNum > p.lastInvalidateBatch+p.cfg.InvalidateBatchDelay { + return true + } + return false +} + +// PurgeMaybe purges txs if it's a good time to do so +func (p *Purger) PurgeMaybe(l2DB *l2db.L2DB, blockNum, batchNum int64) (bool, error) { + if !p.CanPurge(blockNum, batchNum) { + return false, nil + } + p.lastPurgeBlock = blockNum + p.lastPurgeBatch = batchNum + log.Debugw("Purger: purging l2txs in pool", "block", blockNum, "batch", batchNum) + err := l2DB.Purge(common.BatchNum(batchNum)) + return true, tracerr.Wrap(err) +} + +// InvalidateMaybe invalidates txs if it's a good time to do so +func (p *Purger) InvalidateMaybe(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB, + blockNum, batchNum int64) (bool, error) { + if !p.CanInvalidate(blockNum, batchNum) { + return false, nil + } + p.lastInvalidateBlock = blockNum + p.lastInvalidateBatch = batchNum + log.Debugw("Purger: invalidating l2txs in pool", "block", blockNum, "batch", batchNum) + err := poolMarkInvalidOldNonces(l2DB, stateDB, common.BatchNum(batchNum)) + return true, err +} + +//nolint:unused,deadcode +func idxsNonceFromL2Txs(txs []common.L2Tx) []common.IdxNonce { + idxNonceMap := map[common.Idx]common.Nonce{} + for _, tx := range txs { + if nonce, ok := idxNonceMap[tx.FromIdx]; !ok { + idxNonceMap[tx.FromIdx] = tx.Nonce + } else if tx.Nonce > nonce { + idxNonceMap[tx.FromIdx] = tx.Nonce + } + } + idxsNonce := make([]common.IdxNonce, 0, len(idxNonceMap)) + for idx, nonce := range idxNonceMap { + idxsNonce = append(idxsNonce, common.IdxNonce{Idx: idx, Nonce: nonce}) + } + return idxsNonce +} + +func idxsNonceFromPoolL2Txs(txs []common.PoolL2Tx) []common.IdxNonce { + idxNonceMap := map[common.Idx]common.Nonce{} + for _, tx := range txs { + if nonce, ok := idxNonceMap[tx.FromIdx]; !ok { + idxNonceMap[tx.FromIdx] = tx.Nonce + } else if tx.Nonce > nonce { + idxNonceMap[tx.FromIdx] = tx.Nonce + } + } + idxsNonce := make([]common.IdxNonce, 0, len(idxNonceMap)) + for idx, nonce := range idxNonceMap { + idxsNonce = append(idxsNonce, common.IdxNonce{Idx: idx, Nonce: nonce}) + } + return idxsNonce +} + +// poolMarkInvalidOldNoncesFromL2Txs marks as invalid the txs in the pool that +// contain nonces equal or older to the highest nonce used in a forged l2Tx for +// the +// corresponding sender account +func poolMarkInvalidOldNoncesFromL2Txs(l2DB *l2db.L2DB, + idxsNonce []common.IdxNonce, batchNum common.BatchNum) error { + return l2DB.CheckNonces(idxsNonce, batchNum) +} + +// poolMarkInvalidOldNonces marks as invalid txs in the pool that contain +// nonces equal or older to the nonce of the corresponding sender account +func poolMarkInvalidOldNonces(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB, + batchNum common.BatchNum) error { + idxs, err := l2DB.GetPendingUniqueFromIdxs() + if err != nil { + return err + } + idxsNonce := make([]common.IdxNonce, len(idxs)) + lastIdx, err := stateDB.GetIdx() + if err != nil { + return err + } + for i, idx := range idxs { + acc, err := stateDB.GetAccount(idx) + if err != nil { + if tracerr.Unwrap(err) != db.ErrNotFound { + return err + } else if idx <= lastIdx { + return fmt.Errorf("account with idx %v not found: %w", idx, err) + } + } + idxsNonce[i].Idx = idx + idxsNonce[i].Nonce = acc.Nonce + } + return l2DB.CheckNonces(idxsNonce, batchNum) +} diff --git a/coordinator/purger_test.go b/coordinator/purger_test.go new file mode 100644 index 0000000..6849688 --- /dev/null +++ b/coordinator/purger_test.go @@ -0,0 +1,3 @@ +package coordinator + +// TODO: Test purger functions diff --git a/db/l2db/l2db.go b/db/l2db/l2db.go index b4b8b90..6a9c919 100644 --- a/db/l2db/l2db.go +++ b/db/l2db/l2db.go @@ -1,6 +1,7 @@ package l2db import ( + "fmt" "math/big" "time" @@ -242,38 +243,52 @@ func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) e return tracerr.Wrap(err) } -// CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces. -// The state of the affected txs will be changed from Pending -> Invalid -func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) (err error) { - if len(updatedAccounts) == 0 { - return nil - } - txn, err := l2db.db.Beginx() +// GetPendingUniqueFromIdxs returns from all the pending transactions, the set +// of unique FromIdx +func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.Idx, error) { + var idxs []common.Idx + rows, err := l2db.db.Query(`SELECT DISTINCT from_idx FROM tx_pool + WHERE state = $1;`, common.PoolL2TxStatePending) if err != nil { - return tracerr.Wrap(err) + return nil, tracerr.Wrap(err) } - defer func() { - // Rollback the transaction if there was an error. - if err != nil { - db.Rollback(txn) - } - }() - for i := 0; i < len(updatedAccounts); i++ { - _, err = txn.Exec( - `UPDATE tx_pool - SET state = $1, batch_num = $2 - WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`, - common.PoolL2TxStateInvalid, - batchNum, - common.PoolL2TxStatePending, - updatedAccounts[i].Idx, - updatedAccounts[i].Nonce, - ) + var idx common.Idx + for rows.Next() { + err = rows.Scan(&idx) if err != nil { - return tracerr.Wrap(err) + return nil, tracerr.Wrap(err) } + idxs = append(idxs, idx) } - return tracerr.Wrap(txn.Commit()) + return idxs, nil +} + +var checkNoncesQuery = fmt.Sprintf(` + UPDATE tx_pool SET + state = '%s', + batch_num = %%d + FROM (VALUES + (NULL::::BIGINT, NULL::::BIGINT), + (:idx, :nonce) + ) as updated_acc (idx, nonce) + WHERE tx_pool.from_idx = updated_acc.idx AND tx_pool.nonce <= updated_acc.nonce; + `, common.PoolL2TxStateInvalid) + +// CheckNonces invalidate txs with nonces that are smaller or equal than their +// respective accounts nonces. The state of the affected txs will be changed +// from Pending to Invalid +func (l2db *L2DB) CheckNonces(updatedAccounts []common.IdxNonce, batchNum common.BatchNum) (err error) { + if len(updatedAccounts) == 0 { + return nil + } + // Fill the batch_num in the query with Sprintf because we are using a + // named query which works with slices, and doens't handle an extra + // individual argument. + query := fmt.Sprintf(checkNoncesQuery, batchNum) + if _, err := sqlx.NamedQuery(l2db.db, query, updatedAccounts); err != nil { + return tracerr.Wrap(err) + } + return nil } // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg. diff --git a/db/l2db/l2db_test.go b/db/l2db/l2db_test.go index 69f4022..499164a 100644 --- a/db/l2db/l2db_test.go +++ b/db/l2db/l2db_test.go @@ -334,12 +334,13 @@ func TestCheckNonces(t *testing.T) { poolL2Txs, err := generatePoolL2Txs() assert.NoError(t, err) // Update Accounts currentNonce - var updateAccounts []common.Account + var updateAccounts []common.IdxNonce const currentNonce = common.Nonce(1) for i := range accs { - account := accs[i] - account.Nonce = common.Nonce(currentNonce) - updateAccounts = append(updateAccounts, account) + updateAccounts = append(updateAccounts, common.IdxNonce{ + Idx: accs[i].Idx, + Nonce: common.Nonce(currentNonce), + }) } // Add txs to DB var invalidTxIDs []common.TxID diff --git a/db/statedb/statedb.go b/db/statedb/statedb.go index 166a1ec..21d1c28 100644 --- a/db/statedb/statedb.go +++ b/db/statedb/statedb.go @@ -303,7 +303,7 @@ func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error { return tracerr.Wrap(err) } // idx is obtained from the statedb reset - s.idx, err = s.getIdx() + s.idx, err = s.GetIdx() if err != nil { return tracerr.Wrap(err) } diff --git a/db/statedb/txprocessors.go b/db/statedb/txprocessors.go index c96e5fd..ddf6cb4 100644 --- a/db/statedb/txprocessors.go +++ b/db/statedb/txprocessors.go @@ -1118,9 +1118,9 @@ func (s *StateDB) computeEffectiveAmounts(tx *common.L1Tx) { } } -// getIdx returns the stored Idx from the localStateDB, which is the last Idx +// GetIdx returns the stored Idx from the localStateDB, which is the last Idx // used for an Account in the localStateDB. -func (s *StateDB) getIdx() (common.Idx, error) { +func (s *StateDB) GetIdx() (common.Idx, error) { idxBytes, err := s.DB().Get(keyidx) if tracerr.Unwrap(err) == db.ErrNotFound { return 0, nil diff --git a/eth/ethereum.go b/eth/ethereum.go index 4cbc9c0..1e91373 100644 --- a/eth/ethereum.go +++ b/eth/ethereum.go @@ -65,8 +65,8 @@ type EthereumConfig struct { CallGasLimit uint64 DeployGasLimit uint64 GasPriceDiv uint64 - ReceiptTimeout time.Duration // in seconds - IntervalReceiptLoop time.Duration // in milliseconds + ReceiptTimeout time.Duration + IntervalReceiptLoop time.Duration } // EthereumClient is an ethereum client to call Smart Contract methods and check blockchain information. diff --git a/node/node.go b/node/node.go index 7f85c5e..435bc9b 100644 --- a/node/node.go +++ b/node/node.go @@ -170,8 +170,19 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, coord, err = coordinator.NewCoordinator( coordinator.Config{ - ForgerAddress: coordCfg.ForgerAddress, - ConfirmBlocks: coordCfg.ConfirmBlocks, + ForgerAddress: coordCfg.ForgerAddress, + ConfirmBlocks: coordCfg.ConfirmBlocks, + L1BatchTimeoutPerc: coordCfg.L1BatchTimeoutPerc, + EthClientAttempts: coordCfg.EthClient.Attempts, + EthClientAttemptsDelay: coordCfg.EthClient.AttemptsDelay.Duration, + TxManagerCheckInterval: coordCfg.EthClient.IntervalCheckLoop.Duration, + DebugBatchPath: coordCfg.Debug.BatchPath, + Purger: coordinator.PurgerCfg{ + PurgeBatchDelay: coordCfg.L2DB.PurgeBatchDelay, + InvalidateBatchDelay: coordCfg.L2DB.InvalidateBatchDelay, + PurgeBlockDelay: coordCfg.L2DB.PurgeBlockDelay, + InvalidateBlockDelay: coordCfg.L2DB.InvalidateBlockDelay, + }, }, historyDB, l2DB, @@ -327,7 +338,9 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration // case: reorg log.Infow("Synchronizer.Sync reorg", "discarded", *discarded) if n.mode == ModeCoordinator { - n.coord.SendMsg(coordinator.MsgSyncReorg{}) + n.coord.SendMsg(coordinator.MsgSyncReorg{ + Stats: *stats, + }) } if n.nodeAPI != nil { rollup, auction, wDelayer := n.sync.SCVars() @@ -351,8 +364,9 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration WDelayer: blockData.WDelayer.Vars, }) } - n.coord.SendMsg(coordinator.MsgSyncStats{ - Stats: *stats, + n.coord.SendMsg(coordinator.MsgSyncBlock{ + Stats: *stats, + Batches: blockData.Rollup.Batches, }) } if n.nodeAPI != nil { diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 9738394..1498267 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -801,11 +801,16 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e MaxTx: 512, MaxL1Tx: 64, } - processTxsOut, err := s.stateDB.ProcessTxs(ptc, forgeBatchArgs.FeeIdxCoordinator, l1UserTxs, - batchData.L1CoordinatorTxs, poolL2Txs) + processTxsOut, err := s.stateDB.ProcessTxs(ptc, forgeBatchArgs.FeeIdxCoordinator, + l1UserTxs, batchData.L1CoordinatorTxs, poolL2Txs) if err != nil { return nil, tracerr.Wrap(err) } + // Set the BatchNum in the forged L1UserTxs + for i := range l1UserTxs { + l1UserTxs[i].BatchNum = &batchNum + } + batchData.L1UserTxs = l1UserTxs // Set batchNum in exits for i := range processTxsOut.ExitInfos { diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index 92ea43e..e158421 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -76,7 +76,7 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc assert.Equal(t, tokenCpy, dbToken) } - // Check L1UserTxs + // Check submitted L1UserTxs assert.Equal(t, len(block.Rollup.L1UserTxs), len(syncBlock.Rollup.L1UserTxs)) dbL1UserTxs, err := s.historyDB.GetAllL1UserTxs() require.Nil(t, err) @@ -85,9 +85,8 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc // because this value is set by StateDB.ProcessTxs. for i := range syncBlock.Rollup.L1UserTxs { syncBlock.Rollup.L1UserTxs[i].BatchNum = block.Rollup.L1UserTxs[i].BatchNum - syncBlock.Rollup.L1UserTxs[i].EffectiveAmount = block.Rollup.L1UserTxs[i].EffectiveAmount - syncBlock.Rollup.L1UserTxs[i].EffectiveDepositAmount = - block.Rollup.L1UserTxs[i].EffectiveDepositAmount + assert.Nil(t, syncBlock.Rollup.L1UserTxs[i].EffectiveDepositAmount) + assert.Nil(t, syncBlock.Rollup.L1UserTxs[i].EffectiveAmount) } assert.Equal(t, block.Rollup.L1UserTxs, syncBlock.Rollup.L1UserTxs) for _, tx := range block.Rollup.L1UserTxs { @@ -101,8 +100,13 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc break } } - tx.EffectiveAmount = tx.Amount - tx.EffectiveDepositAmount = tx.DepositAmount + // If the tx has been forged in this block, this will be + // reflected in the DB, and so the Effective values will be + // already set + if dbTx.BatchNum != nil { + tx.EffectiveAmount = tx.Amount + tx.EffectiveDepositAmount = tx.DepositAmount + } assert.Equal(t, &tx, dbTx) //nolint:gosec } @@ -137,6 +141,7 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc batch.Batch.NumAccounts = len(batch.CreatedAccounts) // Test field by field to facilitate debugging of errors + assert.Equal(t, batch.L1UserTxs, syncBatch.L1UserTxs) assert.Equal(t, batch.L1CoordinatorTxs, syncBatch.L1CoordinatorTxs) assert.Equal(t, batch.L2Txs, syncBatch.L2Txs) // In exit tree, we only check AccountIdx and Balance, because @@ -152,6 +157,26 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc assert.Equal(t, batch, syncBatch) assert.Equal(t, &batch.Batch, dbBatch) //nolint:gosec + // Check forged L1UserTxs from DB, and check effective values + // in sync output + for j, tx := range batch.L1UserTxs { + var dbTx *common.L1Tx + // Find tx in DB output + for _, _dbTx := range dbL1UserTxs { + if *tx.BatchNum == *_dbTx.BatchNum && + tx.Position == _dbTx.Position { + dbTx = new(common.L1Tx) + *dbTx = _dbTx + break + } + } + assert.Equal(t, &tx, dbTx) //nolint:gosec + + syncTx := &syncBlock.Rollup.Batches[i].L1UserTxs[j] + assert.Equal(t, syncTx.DepositAmount, syncTx.EffectiveDepositAmount) + assert.Equal(t, syncTx.Amount, syncTx.EffectiveAmount) + } + // Check L1CoordinatorTxs from DB for _, tx := range batch.L1CoordinatorTxs { var dbTx *common.L1Tx @@ -295,7 +320,7 @@ func TestSync(t *testing.T) { defer assert.Nil(t, os.RemoveAll(dir)) stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32) - assert.Nil(t, err) + assert.NoError(t, err) // Init History DB pass := os.Getenv("POSTGRES_PASS") @@ -432,8 +457,10 @@ func TestSync(t *testing.T) { ethAddTokens(blocks, client) err = tc.FillBlocksExtra(blocks, &tilCfgExtra) - assert.Nil(t, err) + assert.NoError(t, err) tc.FillBlocksL1UserTxsBatchNum(blocks) + err = tc.FillBlocksForgedL1UserTxs(blocks) + assert.NoError(t, err) // Add block data to the smart contracts ethAddBlocks(t, blocks, client, clientSetup) @@ -459,6 +486,7 @@ func TestSync(t *testing.T) { assert.Equal(t, int64(2), stats.Sync.LastBlock.Num) checkSyncBlock(t, s, 2, &blocks[0], syncBlock) + // Block 3 syncBlock, discards, err = s.Sync2(ctx, nil) @@ -606,7 +634,7 @@ func TestSync(t *testing.T) { ethAddTokens(blocks, client) err = tc.FillBlocksExtra(blocks, &tilCfgExtra) - assert.Nil(t, err) + assert.NoError(t, err) tc.FillBlocksL1UserTxsBatchNum(blocks) // Add block data to the smart contracts diff --git a/txselector/txselector.go b/txselector/txselector.go index 34d30b2..2694b32 100644 --- a/txselector/txselector.go +++ b/txselector/txselector.go @@ -58,6 +58,11 @@ func NewTxSelector(dbpath string, synchronizerStateDB *statedb.StateDB, l2 *l2db }, nil } +// LocalAccountsDB returns the LocalStateDB of the TxSelector +func (txsel *TxSelector) LocalAccountsDB() *statedb.LocalStateDB { + return txsel.localAccountsDB +} + // Reset tells the TxSelector to get it's internal AccountsDB // from the required `batchNum` func (txsel *TxSelector) Reset(batchNum common.BatchNum) error {