mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 03:16:45 +01:00
Merge pull request #536 from hermeznetwork/feature/sql-semaphore
Add semaphore for API queries to SQL
This commit is contained in:
85
db/l2db/apiqueries.go
Normal file
85
db/l2db/apiqueries.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package l2db
|
||||
|
||||
import (
|
||||
ethCommon "github.com/ethereum/go-ethereum/common"
|
||||
"github.com/hermeznetwork/hermez-node/common"
|
||||
"github.com/hermeznetwork/tracerr"
|
||||
"github.com/russross/meddler"
|
||||
)
|
||||
|
||||
// AddAccountCreationAuthAPI inserts an account creation authorization into the DB
|
||||
func (l2db *L2DB) AddAccountCreationAuthAPI(auth *common.AccountCreationAuth) error {
|
||||
cancel, err := l2db.apiConnCon.Acquire()
|
||||
defer cancel()
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
defer l2db.apiConnCon.Release()
|
||||
return l2db.AddAccountCreationAuth(auth)
|
||||
}
|
||||
|
||||
// GetAccountCreationAuthAPI returns an account creation authorization from the DB
|
||||
func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCreationAuthAPI, error) {
|
||||
cancel, err := l2db.apiConnCon.Acquire()
|
||||
defer cancel()
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
defer l2db.apiConnCon.Release()
|
||||
auth := new(AccountCreationAuthAPI)
|
||||
return auth, tracerr.Wrap(meddler.QueryRow(
|
||||
l2db.db, auth,
|
||||
"SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
|
||||
addr,
|
||||
))
|
||||
}
|
||||
|
||||
// AddTxAPI inserts a tx to the pool
|
||||
func (l2db *L2DB) AddTxAPI(tx *PoolL2TxWrite) error {
|
||||
cancel, err := l2db.apiConnCon.Acquire()
|
||||
defer cancel()
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
defer l2db.apiConnCon.Release()
|
||||
row := l2db.db.QueryRow(
|
||||
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
|
||||
common.PoolL2TxStatePending,
|
||||
)
|
||||
var totalTxs uint32
|
||||
if err := row.Scan(&totalTxs); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
if totalTxs >= l2db.maxTxs {
|
||||
return tracerr.New(
|
||||
"The pool is at full capacity. More transactions are not accepted currently",
|
||||
)
|
||||
}
|
||||
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
|
||||
}
|
||||
|
||||
// selectPoolTxAPI select part of queries to get PoolL2TxRead
|
||||
const selectPoolTxAPI = `SELECT tx_pool.tx_id, hez_idx(tx_pool.from_idx, token.symbol) AS from_idx, tx_pool.effective_from_eth_addr,
|
||||
tx_pool.effective_from_bjj, hez_idx(tx_pool.to_idx, token.symbol) AS to_idx, tx_pool.effective_to_eth_addr,
|
||||
tx_pool.effective_to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
|
||||
tx_pool.state, tx_pool.info, tx_pool.signature, tx_pool.timestamp, tx_pool.batch_num, hez_idx(tx_pool.rq_from_idx, token.symbol) AS rq_from_idx,
|
||||
hez_idx(tx_pool.rq_to_idx, token.symbol) AS rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount,
|
||||
tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type,
|
||||
token.item_id AS token_item_id, token.eth_block_num, token.eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update
|
||||
FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
|
||||
|
||||
// GetTxAPI return the specified Tx in PoolTxAPI format
|
||||
func (l2db *L2DB) GetTxAPI(txID common.TxID) (*PoolTxAPI, error) {
|
||||
cancel, err := l2db.apiConnCon.Acquire()
|
||||
defer cancel()
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
defer l2db.apiConnCon.Release()
|
||||
tx := new(PoolTxAPI)
|
||||
return tx, tracerr.Wrap(meddler.QueryRow(
|
||||
l2db.db, tx,
|
||||
selectPoolTxAPI+"WHERE tx_id = $1;",
|
||||
txID,
|
||||
))
|
||||
}
|
||||
@@ -25,17 +25,25 @@ type L2DB struct {
|
||||
safetyPeriod common.BatchNum
|
||||
ttl time.Duration
|
||||
maxTxs uint32 // limit of txs that are accepted in the pool
|
||||
apiConnCon *db.APIConnectionController
|
||||
}
|
||||
|
||||
// NewL2DB creates a L2DB.
|
||||
// To create it, it's needed db connection, safety period expressed in batches,
|
||||
// maxTxs that the DB should have and TTL (time to live) for pending txs.
|
||||
func NewL2DB(db *sqlx.DB, safetyPeriod common.BatchNum, maxTxs uint32, TTL time.Duration) *L2DB {
|
||||
func NewL2DB(
|
||||
db *sqlx.DB,
|
||||
safetyPeriod common.BatchNum,
|
||||
maxTxs uint32,
|
||||
TTL time.Duration,
|
||||
apiConnCon *db.APIConnectionController,
|
||||
) *L2DB {
|
||||
return &L2DB{
|
||||
db: db,
|
||||
safetyPeriod: safetyPeriod,
|
||||
ttl: TTL,
|
||||
maxTxs: maxTxs,
|
||||
apiConnCon: apiConnCon,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +55,6 @@ func (l2db *L2DB) DB() *sqlx.DB {
|
||||
|
||||
// AddAccountCreationAuth inserts an account creation authorization into the DB
|
||||
func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
|
||||
// return meddler.Insert(l2db.db, "account_creation_auth", auth)
|
||||
_, err := l2db.db.Exec(
|
||||
`INSERT INTO account_creation_auth (eth_addr, bjj, signature)
|
||||
VALUES ($1, $2, $3);`,
|
||||
@@ -66,16 +73,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun
|
||||
))
|
||||
}
|
||||
|
||||
// GetAccountCreationAuthAPI returns an account creation authorization from the DB
|
||||
func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCreationAuthAPI, error) {
|
||||
auth := new(AccountCreationAuthAPI)
|
||||
return auth, tracerr.Wrap(meddler.QueryRow(
|
||||
l2db.db, auth,
|
||||
"SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
|
||||
addr,
|
||||
))
|
||||
}
|
||||
|
||||
// AddTx inserts a tx to the pool
|
||||
func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
|
||||
row := l2db.db.QueryRow(
|
||||
@@ -173,16 +170,6 @@ func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
|
||||
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", insertTx))
|
||||
}
|
||||
|
||||
// selectPoolTxAPI select part of queries to get PoolL2TxRead
|
||||
const selectPoolTxAPI = `SELECT tx_pool.tx_id, hez_idx(tx_pool.from_idx, token.symbol) AS from_idx, tx_pool.effective_from_eth_addr,
|
||||
tx_pool.effective_from_bjj, hez_idx(tx_pool.to_idx, token.symbol) AS to_idx, tx_pool.effective_to_eth_addr,
|
||||
tx_pool.effective_to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
|
||||
tx_pool.state, tx_pool.info, tx_pool.signature, tx_pool.timestamp, tx_pool.batch_num, hez_idx(tx_pool.rq_from_idx, token.symbol) AS rq_from_idx,
|
||||
hez_idx(tx_pool.rq_to_idx, token.symbol) AS rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount,
|
||||
tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type,
|
||||
token.item_id AS token_item_id, token.eth_block_num, token.eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update
|
||||
FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
|
||||
|
||||
// selectPoolTxCommon select part of queries to get common.PoolL2Tx
|
||||
const selectPoolTxCommon = `SELECT tx_pool.tx_id, from_idx, to_idx, tx_pool.to_eth_addr,
|
||||
tx_pool.to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
|
||||
@@ -202,16 +189,6 @@ func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
|
||||
))
|
||||
}
|
||||
|
||||
// GetTxAPI return the specified Tx in PoolTxAPI format
|
||||
func (l2db *L2DB) GetTxAPI(txID common.TxID) (*PoolTxAPI, error) {
|
||||
tx := new(PoolTxAPI)
|
||||
return tx, tracerr.Wrap(meddler.QueryRow(
|
||||
l2db.db, tx,
|
||||
selectPoolTxAPI+"WHERE tx_id = $1;",
|
||||
txID,
|
||||
))
|
||||
}
|
||||
|
||||
// GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee
|
||||
func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) {
|
||||
var txs []*common.PoolL2Tx
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
)
|
||||
|
||||
var l2DB *L2DB
|
||||
var l2DBWithACC *L2DB
|
||||
var historyDB *historydb.HistoryDB
|
||||
var tc *til.Context
|
||||
var tokens map[common.TokenID]historydb.TokenWithUSD
|
||||
@@ -34,9 +35,11 @@ func TestMain(m *testing.M) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
l2DB = NewL2DB(db, 10, 1000, 24*time.Hour)
|
||||
l2DB = NewL2DB(db, 10, 1000, 24*time.Hour, nil)
|
||||
apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second)
|
||||
l2DBWithACC = NewL2DB(db, 10, 1000, 24*time.Hour, apiConnCon)
|
||||
test.WipeDB(l2DB.DB())
|
||||
historyDB = historydb.NewHistoryDB(db)
|
||||
historyDB = historydb.NewHistoryDB(db, nil)
|
||||
// Run tests
|
||||
result := m.Run()
|
||||
// Close DB
|
||||
@@ -267,7 +270,7 @@ func TestStartForging(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
// Fetch txs and check that they've been updated correctly
|
||||
for _, id := range startForgingTxIDs {
|
||||
fetchedTx, err := l2DB.GetTxAPI(id)
|
||||
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, common.PoolL2TxStateForging, fetchedTx.State)
|
||||
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
|
||||
@@ -312,7 +315,7 @@ func TestDoneForging(t *testing.T) {
|
||||
|
||||
// Fetch txs and check that they've been updated correctly
|
||||
for _, id := range doneForgingTxIDs {
|
||||
fetchedTx, err := l2DB.GetTxAPI(id)
|
||||
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, common.PoolL2TxStateForged, fetchedTx.State)
|
||||
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
|
||||
@@ -344,7 +347,7 @@ func TestInvalidate(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
// Fetch txs and check that they've been updated correctly
|
||||
for _, id := range invalidTxIDs {
|
||||
fetchedTx, err := l2DB.GetTxAPI(id)
|
||||
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, common.PoolL2TxStateInvalid, fetchedTx.State)
|
||||
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
|
||||
@@ -385,7 +388,7 @@ func TestInvalidateOldNonces(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
// Fetch txs and check that they've been updated correctly
|
||||
for _, id := range invalidTxIDs {
|
||||
fetchedTx, err := l2DB.GetTxAPI(id)
|
||||
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, common.PoolL2TxStateInvalid, fetchedTx.State)
|
||||
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
|
||||
@@ -460,13 +463,13 @@ func TestReorg(t *testing.T) {
|
||||
err = l2DB.Reorg(lastValidBatch)
|
||||
assert.NoError(t, err)
|
||||
for _, id := range reorgedTxIDs {
|
||||
tx, err := l2DB.GetTxAPI(id)
|
||||
tx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, tx.BatchNum)
|
||||
assert.Equal(t, common.PoolL2TxStatePending, tx.State)
|
||||
}
|
||||
for _, id := range nonReorgedTxIDs {
|
||||
fetchedTx, err := l2DB.GetTxAPI(id)
|
||||
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, lastValidBatch, *fetchedTx.BatchNum)
|
||||
}
|
||||
@@ -537,13 +540,13 @@ func TestReorg2(t *testing.T) {
|
||||
err = l2DB.Reorg(lastValidBatch)
|
||||
assert.NoError(t, err)
|
||||
for _, id := range reorgedTxIDs {
|
||||
tx, err := l2DB.GetTxAPI(id)
|
||||
tx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Nil(t, tx.BatchNum)
|
||||
assert.Equal(t, common.PoolL2TxStatePending, tx.State)
|
||||
}
|
||||
for _, id := range nonReorgedTxIDs {
|
||||
fetchedTx, err := l2DB.GetTxAPI(id)
|
||||
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, lastValidBatch, *fetchedTx.BatchNum)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user