From 622e01e9af7e1b74f89a6c7a71b0c6fa1a4b640a Mon Sep 17 00:00:00 2001 From: Arnau B Date: Mon, 4 Jan 2021 13:16:20 +0100 Subject: [PATCH] Add max txs limit to tx pool --- api/api_test.go | 2 +- db/l2db/l2db.go | 55 +++++++++++++++++++++----------------------- db/l2db/l2db_test.go | 35 +++++++++++++--------------- 3 files changed, 43 insertions(+), 49 deletions(-) diff --git a/api/api_test.go b/api/api_test.go index 6a2c7c0..80641e1 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -217,7 +217,7 @@ func TestMain(m *testing.M) { panic(err) } // L2DB - l2DB := l2db.NewL2DB(database, 10, 100, 24*time.Hour) + l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour) test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB // Config (smart contract constants) chainID := uint16(0) diff --git a/db/l2db/l2db.go b/db/l2db/l2db.go index 20d0785..6b71754 100644 --- a/db/l2db/l2db.go +++ b/db/l2db/l2db.go @@ -24,7 +24,7 @@ type L2DB struct { db *sqlx.DB safetyPeriod common.BatchNum ttl time.Duration - maxTxs uint32 + maxTxs uint32 // limit of txs that are accepted in the pool } // NewL2DB creates a L2DB. @@ -78,6 +78,19 @@ func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCre // AddTx inserts a tx to the pool func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error { + row := l2db.db.QueryRow( + "SELECT COUNT(*) FROM tx_pool WHERE state = $1;", + common.PoolL2TxStatePending, + ) + var totalTxs uint32 + if err := row.Scan(&totalTxs); err != nil { + return tracerr.Wrap(err) + } + if totalTxs >= l2db.maxTxs { + return tracerr.New( + "The pool is at full capacity. More transactions are not accepted currently", + ) + } return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx)) } @@ -313,38 +326,22 @@ func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error { } // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period -// it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded +// it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) { - txn, err := l2db.db.Beginx() - if err != nil { - return tracerr.Wrap(err) - } - defer func() { - // Rollback the transaction if there was an error. - if err != nil { - db.Rollback(txn) - } - }() - // Delete pending txs that have been in the pool after the TTL if maxTxs is reached now := time.Now().UTC().Unix() - _, err = txn.Exec( - `DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`, - l2db.maxTxs, - time.Unix(now-int64(l2db.ttl.Seconds()), 0), - ) - if err != nil { - return tracerr.Wrap(err) - } - // Delete txs that have been marked as forged / invalid after the safety period - _, err = txn.Exec( - `DELETE FROM tx_pool - WHERE batch_num < $1 AND (state = $2 OR state = $3)`, + _, err = l2db.db.Exec( + `DELETE FROM tx_pool WHERE ( + batch_num < $1 AND (state = $2 OR state = $3) + ) OR ( + (SELECT count(*) FROM tx_pool WHERE state = $4) > $5 + AND timestamp < $6 AND state = $4 + );`, currentBatchNum-l2db.safetyPeriod, common.PoolL2TxStateForged, common.PoolL2TxStateInvalid, + common.PoolL2TxStatePending, + l2db.maxTxs, + time.Unix(now-int64(l2db.ttl.Seconds()), 0), ) - if err != nil { - return tracerr.Wrap(err) - } - return tracerr.Wrap(txn.Commit()) + return tracerr.Wrap(err) } diff --git a/db/l2db/l2db_test.go b/db/l2db/l2db_test.go index 155d08f..7ea366b 100644 --- a/db/l2db/l2db_test.go +++ b/db/l2db/l2db_test.go @@ -33,7 +33,7 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - l2DB = NewL2DB(db, 10, 100, 24*time.Hour) + l2DB = NewL2DB(db, 10, 1000, 24*time.Hour) test.WipeDB(l2DB.DB()) historyDB = historydb.NewHistoryDB(db) // Run tests @@ -529,8 +529,8 @@ func TestPurge(t *testing.T) { if err != nil { log.Error("Error prepare historyDB", err) } - // generatePoolL2Txs generate 10 txs - generateTx := int(l2DB.maxTxs/10 + 1) + // generatePoolL2Txs + generateTx := int(l2DB.maxTxs/8 + 1) var poolL2Tx []common.PoolL2Tx for i := 0; i < generateTx; i++ { poolL2TxAux, err := generatePoolL2Txs() @@ -538,24 +538,27 @@ func TestPurge(t *testing.T) { poolL2Tx = append(poolL2Tx, poolL2TxAux...) } - deletedIDs := []common.TxID{} + afterTTLIDs := []common.TxID{} keepedIDs := []common.TxID{} + var deletedIDs []common.TxID var invalidTxIDs []common.TxID var doneForgingTxIDs []common.TxID const toDeleteBatchNum common.BatchNum = 30 safeBatchNum := toDeleteBatchNum + l2DB.safetyPeriod + 1 // Add txs to the DB - for i := 0; i < int(l2DB.maxTxs); i++ { + for i := 0; i < len(poolL2Tx); i++ { tx := poolL2Tx[i] if i%2 == 0 { // keep tx keepedIDs = append(keepedIDs, tx.TxID) } else { // delete after safety period if i%3 == 0 { doneForgingTxIDs = append(doneForgingTxIDs, tx.TxID) - } else { + } else if i%5 == 0 { invalidTxIDs = append(invalidTxIDs, tx.TxID) + } else { + afterTTLIDs = append(afterTTLIDs, tx.TxID) } - deletedIDs = append(deletedIDs, tx.TxID) + deletedIDs = append(deletedIDs, poolL2Tx[i].TxID) } err := l2DB.AddTxTest(&tx) assert.NoError(t, err) @@ -577,16 +580,13 @@ func TestPurge(t *testing.T) { // Invalidate txs and set batchNum err = l2DB.InvalidateTxs(invalidTxIDs, toDeleteBatchNum) assert.NoError(t, err) - for i := int(l2DB.maxTxs); i < len(poolL2Tx); i++ { - // Delete after TTL - deletedIDs = append(deletedIDs, poolL2Tx[i].TxID) - err := l2DB.AddTxTest(&poolL2Tx[i]) - assert.NoError(t, err) + // Update timestamp of afterTTL txs + deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0) + for _, id := range afterTTLIDs { // Set timestamp - deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0) _, err = l2DB.db.Exec( - "UPDATE tx_pool SET timestamp = $1 WHERE tx_id = $2;", - deleteTimestamp, poolL2Tx[i].TxID, + "UPDATE tx_pool SET timestamp = $1, state = $2 WHERE tx_id = $3;", + deleteTimestamp, common.PoolL2TxStatePending, id, ) assert.NoError(t, err) } @@ -596,10 +596,7 @@ func TestPurge(t *testing.T) { assert.NoError(t, err) // Check results for _, id := range deletedIDs { - tx, err := l2DB.GetTx(id) - if err == nil { - log.Debug(tx) - } + _, err := l2DB.GetTx(id) assert.Error(t, err) } for _, id := range keepedIDs {