Browse Source

Add max txs limit to tx pool

feature/sql-semaphore1
Arnau B 3 years ago
parent
commit
622e01e9af
3 changed files with 43 additions and 49 deletions
  1. +1
    -1
      api/api_test.go
  2. +26
    -29
      db/l2db/l2db.go
  3. +16
    -19
      db/l2db/l2db_test.go

+ 1
- 1
api/api_test.go

@ -217,7 +217,7 @@ func TestMain(m *testing.M) {
panic(err)
}
// L2DB
l2DB := l2db.NewL2DB(database, 10, 100, 24*time.Hour)
l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour)
test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB
// Config (smart contract constants)
chainID := uint16(0)

+ 26
- 29
db/l2db/l2db.go

@ -24,7 +24,7 @@ type L2DB struct {
db *sqlx.DB
safetyPeriod common.BatchNum
ttl time.Duration
maxTxs uint32
maxTxs uint32 // limit of txs that are accepted in the pool
}
// NewL2DB creates a L2DB.
@ -78,6 +78,19 @@ func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCre
// AddTx inserts a tx to the pool
func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
row := l2db.db.QueryRow(
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
common.PoolL2TxStatePending,
)
var totalTxs uint32
if err := row.Scan(&totalTxs); err != nil {
return tracerr.Wrap(err)
}
if totalTxs >= l2db.maxTxs {
return tracerr.New(
"The pool is at full capacity. More transactions are not accepted currently",
)
}
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
}
@ -313,38 +326,22 @@ func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
}
// Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
// it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
// it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded
func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
txn, err := l2db.db.Beginx()
if err != nil {
return tracerr.Wrap(err)
}
defer func() {
// Rollback the transaction if there was an error.
if err != nil {
db.Rollback(txn)
}
}()
// Delete pending txs that have been in the pool after the TTL if maxTxs is reached
now := time.Now().UTC().Unix()
_, err = txn.Exec(
`DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
l2db.maxTxs,
time.Unix(now-int64(l2db.ttl.Seconds()), 0),
)
if err != nil {
return tracerr.Wrap(err)
}
// Delete txs that have been marked as forged / invalid after the safety period
_, err = txn.Exec(
`DELETE FROM tx_pool
WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
_, err = l2db.db.Exec(
`DELETE FROM tx_pool WHERE (
batch_num < $1 AND (state = $2 OR state = $3)
) OR (
(SELECT count(*) FROM tx_pool WHERE state = $4) > $5
AND timestamp < $6 AND state = $4
);`,
currentBatchNum-l2db.safetyPeriod,
common.PoolL2TxStateForged,
common.PoolL2TxStateInvalid,
common.PoolL2TxStatePending,
l2db.maxTxs,
time.Unix(now-int64(l2db.ttl.Seconds()), 0),
)
if err != nil {
return tracerr.Wrap(err)
}
return tracerr.Wrap(txn.Commit())
return tracerr.Wrap(err)
}

+ 16
- 19
db/l2db/l2db_test.go

@ -33,7 +33,7 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
l2DB = NewL2DB(db, 10, 100, 24*time.Hour)
l2DB = NewL2DB(db, 10, 1000, 24*time.Hour)
test.WipeDB(l2DB.DB())
historyDB = historydb.NewHistoryDB(db)
// Run tests
@ -529,8 +529,8 @@ func TestPurge(t *testing.T) {
if err != nil {
log.Error("Error prepare historyDB", err)
}
// generatePoolL2Txs generate 10 txs
generateTx := int(l2DB.maxTxs/10 + 1)
// generatePoolL2Txs
generateTx := int(l2DB.maxTxs/8 + 1)
var poolL2Tx []common.PoolL2Tx
for i := 0; i < generateTx; i++ {
poolL2TxAux, err := generatePoolL2Txs()
@ -538,24 +538,27 @@ func TestPurge(t *testing.T) {
poolL2Tx = append(poolL2Tx, poolL2TxAux...)
}
deletedIDs := []common.TxID{}
afterTTLIDs := []common.TxID{}
keepedIDs := []common.TxID{}
var deletedIDs []common.TxID
var invalidTxIDs []common.TxID
var doneForgingTxIDs []common.TxID
const toDeleteBatchNum common.BatchNum = 30
safeBatchNum := toDeleteBatchNum + l2DB.safetyPeriod + 1
// Add txs to the DB
for i := 0; i < int(l2DB.maxTxs); i++ {
for i := 0; i < len(poolL2Tx); i++ {
tx := poolL2Tx[i]
if i%2 == 0 { // keep tx
keepedIDs = append(keepedIDs, tx.TxID)
} else { // delete after safety period
if i%3 == 0 {
doneForgingTxIDs = append(doneForgingTxIDs, tx.TxID)
} else {
} else if i%5 == 0 {
invalidTxIDs = append(invalidTxIDs, tx.TxID)
} else {
afterTTLIDs = append(afterTTLIDs, tx.TxID)
}
deletedIDs = append(deletedIDs, tx.TxID)
deletedIDs = append(deletedIDs, poolL2Tx[i].TxID)
}
err := l2DB.AddTxTest(&tx)
assert.NoError(t, err)
@ -577,16 +580,13 @@ func TestPurge(t *testing.T) {
// Invalidate txs and set batchNum
err = l2DB.InvalidateTxs(invalidTxIDs, toDeleteBatchNum)
assert.NoError(t, err)
for i := int(l2DB.maxTxs); i < len(poolL2Tx); i++ {
// Delete after TTL
deletedIDs = append(deletedIDs, poolL2Tx[i].TxID)
err := l2DB.AddTxTest(&poolL2Tx[i])
assert.NoError(t, err)
// Update timestamp of afterTTL txs
deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0)
for _, id := range afterTTLIDs {
// Set timestamp
deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0)
_, err = l2DB.db.Exec(
"UPDATE tx_pool SET timestamp = $1 WHERE tx_id = $2;",
deleteTimestamp, poolL2Tx[i].TxID,
"UPDATE tx_pool SET timestamp = $1, state = $2 WHERE tx_id = $3;",
deleteTimestamp, common.PoolL2TxStatePending, id,
)
assert.NoError(t, err)
}
@ -596,10 +596,7 @@ func TestPurge(t *testing.T) {
assert.NoError(t, err)
// Check results
for _, id := range deletedIDs {
tx, err := l2DB.GetTx(id)
if err == nil {
log.Debug(tx)
}
_, err := l2DB.GetTx(id)
assert.Error(t, err)
}
for _, id := range keepedIDs {

Loading…
Cancel
Save