mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 03:16:45 +01:00
Merge pull request #450 from hermeznetwork/feature/l2db-maxtxs
Add max txs limit to tx pool
This commit is contained in:
@@ -24,7 +24,7 @@ type L2DB struct {
|
||||
db *sqlx.DB
|
||||
safetyPeriod common.BatchNum
|
||||
ttl time.Duration
|
||||
maxTxs uint32
|
||||
maxTxs uint32 // limit of txs that are accepted in the pool
|
||||
}
|
||||
|
||||
// NewL2DB creates a L2DB.
|
||||
@@ -78,6 +78,19 @@ func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCre
|
||||
|
||||
// AddTx inserts a tx to the pool
|
||||
func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
|
||||
row := l2db.db.QueryRow(
|
||||
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
|
||||
common.PoolL2TxStatePending,
|
||||
)
|
||||
var totalTxs uint32
|
||||
if err := row.Scan(&totalTxs); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
if totalTxs >= l2db.maxTxs {
|
||||
return tracerr.New(
|
||||
"The pool is at full capacity. More transactions are not accepted currently",
|
||||
)
|
||||
}
|
||||
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
|
||||
}
|
||||
|
||||
@@ -313,38 +326,22 @@ func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
|
||||
}
|
||||
|
||||
// Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
|
||||
// it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
|
||||
// it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded
|
||||
func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
|
||||
txn, err := l2db.db.Beginx()
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
defer func() {
|
||||
// Rollback the transaction if there was an error.
|
||||
if err != nil {
|
||||
db.Rollback(txn)
|
||||
}
|
||||
}()
|
||||
// Delete pending txs that have been in the pool after the TTL if maxTxs is reached
|
||||
now := time.Now().UTC().Unix()
|
||||
_, err = txn.Exec(
|
||||
`DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
|
||||
l2db.maxTxs,
|
||||
time.Unix(now-int64(l2db.ttl.Seconds()), 0),
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
// Delete txs that have been marked as forged / invalid after the safety period
|
||||
_, err = txn.Exec(
|
||||
`DELETE FROM tx_pool
|
||||
WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
|
||||
_, err = l2db.db.Exec(
|
||||
`DELETE FROM tx_pool WHERE (
|
||||
batch_num < $1 AND (state = $2 OR state = $3)
|
||||
) OR (
|
||||
(SELECT count(*) FROM tx_pool WHERE state = $4) > $5
|
||||
AND timestamp < $6 AND state = $4
|
||||
);`,
|
||||
currentBatchNum-l2db.safetyPeriod,
|
||||
common.PoolL2TxStateForged,
|
||||
common.PoolL2TxStateInvalid,
|
||||
common.PoolL2TxStatePending,
|
||||
l2db.maxTxs,
|
||||
time.Unix(now-int64(l2db.ttl.Seconds()), 0),
|
||||
)
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
return tracerr.Wrap(txn.Commit())
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@ func TestMain(m *testing.M) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
l2DB = NewL2DB(db, 10, 100, 24*time.Hour)
|
||||
l2DB = NewL2DB(db, 10, 1000, 24*time.Hour)
|
||||
test.WipeDB(l2DB.DB())
|
||||
historyDB = historydb.NewHistoryDB(db)
|
||||
// Run tests
|
||||
@@ -529,8 +529,8 @@ func TestPurge(t *testing.T) {
|
||||
if err != nil {
|
||||
log.Error("Error prepare historyDB", err)
|
||||
}
|
||||
// generatePoolL2Txs generate 10 txs
|
||||
generateTx := int(l2DB.maxTxs/10 + 1)
|
||||
// generatePoolL2Txs
|
||||
generateTx := int(l2DB.maxTxs/8 + 1)
|
||||
var poolL2Tx []common.PoolL2Tx
|
||||
for i := 0; i < generateTx; i++ {
|
||||
poolL2TxAux, err := generatePoolL2Txs()
|
||||
@@ -538,24 +538,27 @@ func TestPurge(t *testing.T) {
|
||||
poolL2Tx = append(poolL2Tx, poolL2TxAux...)
|
||||
}
|
||||
|
||||
deletedIDs := []common.TxID{}
|
||||
afterTTLIDs := []common.TxID{}
|
||||
keepedIDs := []common.TxID{}
|
||||
var deletedIDs []common.TxID
|
||||
var invalidTxIDs []common.TxID
|
||||
var doneForgingTxIDs []common.TxID
|
||||
const toDeleteBatchNum common.BatchNum = 30
|
||||
safeBatchNum := toDeleteBatchNum + l2DB.safetyPeriod + 1
|
||||
// Add txs to the DB
|
||||
for i := 0; i < int(l2DB.maxTxs); i++ {
|
||||
for i := 0; i < len(poolL2Tx); i++ {
|
||||
tx := poolL2Tx[i]
|
||||
if i%2 == 0 { // keep tx
|
||||
keepedIDs = append(keepedIDs, tx.TxID)
|
||||
} else { // delete after safety period
|
||||
if i%3 == 0 {
|
||||
doneForgingTxIDs = append(doneForgingTxIDs, tx.TxID)
|
||||
} else {
|
||||
} else if i%5 == 0 {
|
||||
invalidTxIDs = append(invalidTxIDs, tx.TxID)
|
||||
} else {
|
||||
afterTTLIDs = append(afterTTLIDs, tx.TxID)
|
||||
}
|
||||
deletedIDs = append(deletedIDs, tx.TxID)
|
||||
deletedIDs = append(deletedIDs, poolL2Tx[i].TxID)
|
||||
}
|
||||
err := l2DB.AddTxTest(&tx)
|
||||
assert.NoError(t, err)
|
||||
@@ -577,16 +580,13 @@ func TestPurge(t *testing.T) {
|
||||
// Invalidate txs and set batchNum
|
||||
err = l2DB.InvalidateTxs(invalidTxIDs, toDeleteBatchNum)
|
||||
assert.NoError(t, err)
|
||||
for i := int(l2DB.maxTxs); i < len(poolL2Tx); i++ {
|
||||
// Delete after TTL
|
||||
deletedIDs = append(deletedIDs, poolL2Tx[i].TxID)
|
||||
err := l2DB.AddTxTest(&poolL2Tx[i])
|
||||
assert.NoError(t, err)
|
||||
// Update timestamp of afterTTL txs
|
||||
deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0)
|
||||
for _, id := range afterTTLIDs {
|
||||
// Set timestamp
|
||||
deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0)
|
||||
_, err = l2DB.db.Exec(
|
||||
"UPDATE tx_pool SET timestamp = $1 WHERE tx_id = $2;",
|
||||
deleteTimestamp, poolL2Tx[i].TxID,
|
||||
"UPDATE tx_pool SET timestamp = $1, state = $2 WHERE tx_id = $3;",
|
||||
deleteTimestamp, common.PoolL2TxStatePending, id,
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
@@ -596,10 +596,7 @@ func TestPurge(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
// Check results
|
||||
for _, id := range deletedIDs {
|
||||
tx, err := l2DB.GetTx(id)
|
||||
if err == nil {
|
||||
log.Debug(tx)
|
||||
}
|
||||
_, err := l2DB.GetTx(id)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
for _, id := range keepedIDs {
|
||||
|
||||
Reference in New Issue
Block a user