mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-06 19:06:42 +01:00
Delete pending txs by external mark, store tx IP
- In tx_pool, add a column called `external_delete` that can be set to true externally. Regularly, the coordinator will delete all pending txs with this column set to true. The interval for this action is set via the new config parameter `Coordinator.PurgeByExtDelInterval`. - In tx_pool, add a column for the client ip that sent the transaction. The api fills this value using the ClientIP method from gin.Context, which should work even under a reverse-proxy.
This commit is contained in:
@@ -27,6 +27,7 @@ func (a *API) postPoolTx(c *gin.Context) {
|
||||
retBadReq(err, c)
|
||||
return
|
||||
}
|
||||
writeTx.ClientIP = c.ClientIP()
|
||||
// Insert to DB
|
||||
if err := a.l2.AddTxAPI(writeTx); err != nil {
|
||||
retSQLErr(err, c)
|
||||
|
||||
1
cli/node/.gitignore
vendored
1
cli/node/.gitignore
vendored
@@ -1,2 +1,3 @@
|
||||
cfg.example.secret.toml
|
||||
cfg.toml
|
||||
node
|
||||
|
||||
@@ -32,6 +32,7 @@ URL = "http://localhost:8545"
|
||||
[Synchronizer]
|
||||
SyncLoopInterval = "1s"
|
||||
StatsRefreshPeriod = "1s"
|
||||
StoreAccountUpdates = true
|
||||
|
||||
[SmartContracts]
|
||||
Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0"
|
||||
@@ -55,6 +56,7 @@ ForgeRetryInterval = "500ms"
|
||||
SyncRetryInterval = "1s"
|
||||
ForgeDelay = "10s"
|
||||
ForgeNoTxsDelay = "0s"
|
||||
PurgeByExtDelInterval = "1m"
|
||||
|
||||
[Coordinator.FeeAccount]
|
||||
Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E"
|
||||
|
||||
@@ -91,6 +91,10 @@ type Coordinator struct {
|
||||
// SyncRetryInterval is the waiting interval between calls to the main
|
||||
// handler of a synced block after an error
|
||||
SyncRetryInterval Duration `validate:"required"`
|
||||
// PurgeByExtDelInterval is the waiting interval between calls
|
||||
// to the PurgeByExternalDelete function of the l2db which deletes
|
||||
// pending txs externally marked by the column `external_delete`
|
||||
PurgeByExtDelInterval Duration `validate:"required"`
|
||||
// L2DB is the DB that holds the pool of L2Txs
|
||||
L2DB struct {
|
||||
// SafetyPeriod is the number of batches after which
|
||||
|
||||
@@ -85,6 +85,10 @@ type Config struct {
|
||||
// SyncRetryInterval is the waiting interval between calls to the main
|
||||
// handler of a synced block after an error
|
||||
SyncRetryInterval time.Duration
|
||||
// PurgeByExtDelInterval is the waiting interval between calls
|
||||
// to the PurgeByExternalDelete function of the l2db which deletes
|
||||
// pending txs externally marked by the column `external_delete`
|
||||
PurgeByExtDelInterval time.Duration
|
||||
// EthClientAttemptsDelay is delay between attempts do do an eth client
|
||||
// RPC call
|
||||
EthClientAttemptsDelay time.Duration
|
||||
@@ -153,6 +157,15 @@ type Coordinator struct {
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
||||
// mutexL2DBUpdateDelete protects updates to the L2DB so that
|
||||
// these two processes always happen exclusively:
|
||||
// - Pipeline taking pending txs, running through the TxProcessor and
|
||||
// marking selected txs as forging
|
||||
// - Coordinator deleting pending txs that have been marked with
|
||||
// `external_delete`.
|
||||
// Without this mutex, the coordinator could delete a pending txs that
|
||||
// has just been selected by the TxProcessor in the pipeline.
|
||||
mutexL2DBUpdateDelete sync.Mutex
|
||||
pipeline *Pipeline
|
||||
lastNonFailedBatchNum common.BatchNum
|
||||
|
||||
@@ -248,7 +261,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
|
||||
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
|
||||
c.pipelineNum++
|
||||
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
|
||||
c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts)
|
||||
c.batchBuilder, &c.mutexL2DBUpdateDelete, c.purger, c, c.txManager,
|
||||
c.provers, &c.consts)
|
||||
}
|
||||
|
||||
// MsgSyncBlock indicates an update to the Synchronizer stats
|
||||
@@ -527,6 +541,24 @@ func (c *Coordinator) Start() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
log.Info("Coordinator L2DB.PurgeByExternalDelete loop done")
|
||||
c.wg.Done()
|
||||
return
|
||||
case <-time.After(c.cfg.PurgeByExtDelInterval):
|
||||
c.mutexL2DBUpdateDelete.Lock()
|
||||
if err := c.l2DB.PurgeByExternalDelete(); err != nil {
|
||||
log.Errorw("L2DB.PurgeByExternalDelete", "err", err)
|
||||
}
|
||||
c.mutexL2DBUpdateDelete.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
const stopCtxTimeout = 200 * time.Millisecond
|
||||
|
||||
@@ -45,15 +45,16 @@ type Pipeline struct {
|
||||
errAtBatchNum common.BatchNum
|
||||
lastForgeTime time.Time
|
||||
|
||||
proversPool *ProversPool
|
||||
provers []prover.Client
|
||||
coord *Coordinator
|
||||
txManager *TxManager
|
||||
historyDB *historydb.HistoryDB
|
||||
l2DB *l2db.L2DB
|
||||
txSelector *txselector.TxSelector
|
||||
batchBuilder *batchbuilder.BatchBuilder
|
||||
purger *Purger
|
||||
proversPool *ProversPool
|
||||
provers []prover.Client
|
||||
coord *Coordinator
|
||||
txManager *TxManager
|
||||
historyDB *historydb.HistoryDB
|
||||
l2DB *l2db.L2DB
|
||||
txSelector *txselector.TxSelector
|
||||
batchBuilder *batchbuilder.BatchBuilder
|
||||
mutexL2DBUpdateDelete *sync.Mutex
|
||||
purger *Purger
|
||||
|
||||
stats synchronizer.Stats
|
||||
vars synchronizer.SCVariables
|
||||
@@ -84,6 +85,7 @@ func NewPipeline(ctx context.Context,
|
||||
l2DB *l2db.L2DB,
|
||||
txSelector *txselector.TxSelector,
|
||||
batchBuilder *batchbuilder.BatchBuilder,
|
||||
mutexL2DBUpdateDelete *sync.Mutex,
|
||||
purger *Purger,
|
||||
coord *Coordinator,
|
||||
txManager *TxManager,
|
||||
@@ -104,19 +106,20 @@ func NewPipeline(ctx context.Context,
|
||||
return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
|
||||
}
|
||||
return &Pipeline{
|
||||
num: num,
|
||||
cfg: cfg,
|
||||
historyDB: historyDB,
|
||||
l2DB: l2DB,
|
||||
txSelector: txSelector,
|
||||
batchBuilder: batchBuilder,
|
||||
provers: provers,
|
||||
proversPool: proversPool,
|
||||
purger: purger,
|
||||
coord: coord,
|
||||
txManager: txManager,
|
||||
consts: *scConsts,
|
||||
statsVarsCh: make(chan statsVars, queueLen),
|
||||
num: num,
|
||||
cfg: cfg,
|
||||
historyDB: historyDB,
|
||||
l2DB: l2DB,
|
||||
txSelector: txSelector,
|
||||
batchBuilder: batchBuilder,
|
||||
provers: provers,
|
||||
proversPool: proversPool,
|
||||
mutexL2DBUpdateDelete: mutexL2DBUpdateDelete,
|
||||
purger: purger,
|
||||
coord: coord,
|
||||
txManager: txManager,
|
||||
consts: *scConsts,
|
||||
statsVarsCh: make(chan statsVars, queueLen),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -199,7 +202,9 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
|
||||
// and then waits for an available proof server and sends the zkInputs to it so
|
||||
// that the proof computation begins.
|
||||
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
||||
p.mutexL2DBUpdateDelete.Lock()
|
||||
batchInfo, err := p.forgeBatch(batchNum)
|
||||
p.mutexL2DBUpdateDelete.Unlock()
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
} else if err != nil {
|
||||
|
||||
@@ -73,24 +73,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun
|
||||
))
|
||||
}
|
||||
|
||||
// AddTx inserts a tx to the pool
|
||||
func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
|
||||
row := l2db.db.QueryRow(
|
||||
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
|
||||
common.PoolL2TxStatePending,
|
||||
)
|
||||
var totalTxs uint32
|
||||
if err := row.Scan(&totalTxs); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
if totalTxs >= l2db.maxTxs {
|
||||
return tracerr.New(
|
||||
"The pool is at full capacity. More transactions are not accepted currently",
|
||||
)
|
||||
}
|
||||
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
|
||||
}
|
||||
|
||||
// UpdateTxsInfo updates the parameter Info of the pool transactions
|
||||
func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
|
||||
if len(txs) == 0 {
|
||||
@@ -354,3 +336,14 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
|
||||
)
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
// PurgeByExternalDelete deletes all pending transactions marked with true in
|
||||
// the `external_delete` column. An external process can set this column to
|
||||
// true to instruct the coordinator to delete the tx when possible.
|
||||
func (l2db *L2DB) PurgeByExternalDelete() error {
|
||||
_, err := l2db.db.Exec(
|
||||
`DELETE from tx_pool WHERE (external_delete = true AND state = $1);`,
|
||||
common.PoolL2TxStatePending,
|
||||
)
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package l2db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"math"
|
||||
"math/big"
|
||||
"os"
|
||||
@@ -701,3 +702,56 @@ func TestAddGet(t *testing.T) {
|
||||
assert.Equal(t, txs[i], *dbTx)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPurgeByExternalDelete(t *testing.T) {
|
||||
err := prepareHistoryDB(historyDB)
|
||||
if err != nil {
|
||||
log.Error("Error prepare historyDB", err)
|
||||
}
|
||||
txs, err := generatePoolL2Txs()
|
||||
assert.NoError(t, err)
|
||||
|
||||
// We will work with 8 txs
|
||||
require.GreaterOrEqual(t, len(txs), 8)
|
||||
txs = txs[:8]
|
||||
for i := range txs {
|
||||
require.NoError(t, l2DB.AddTxTest(&txs[i]))
|
||||
}
|
||||
|
||||
// We will recreate this scenario:
|
||||
// tx index, status , external_delete
|
||||
// 0 , pending, false
|
||||
// 1 , pending, false
|
||||
// 2 , pending, true // will be deleted
|
||||
// 3 , pending, true // will be deleted
|
||||
// 4 , fging , false
|
||||
// 5 , fging , false
|
||||
// 6 , fging , true
|
||||
// 7 , fging , true
|
||||
|
||||
require.NoError(t, l2DB.StartForging(
|
||||
[]common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID},
|
||||
1))
|
||||
_, err = l2DB.db.Exec(
|
||||
`UPDATE tx_pool SET external_delete = true WHERE
|
||||
tx_id IN ($1, $2, $3, $4)
|
||||
;`,
|
||||
txs[2].TxID, txs[3].TxID, txs[6].TxID, txs[7].TxID,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, l2DB.PurgeByExternalDelete())
|
||||
|
||||
// Query txs that are have been not deleted
|
||||
for _, i := range []int{0, 1, 4, 5, 6, 7} {
|
||||
txID := txs[i].TxID
|
||||
_, err := l2DB.GetTx(txID)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Query txs that have been deleted
|
||||
for _, i := range []int{2, 3} {
|
||||
txID := txs[i].TxID
|
||||
_, err := l2DB.GetTx(txID)
|
||||
require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ type PoolL2TxWrite struct {
|
||||
RqFee *common.FeeSelector `meddler:"rq_fee"`
|
||||
RqNonce *common.Nonce `meddler:"rq_nonce"`
|
||||
Type common.TxType `meddler:"tx_type"`
|
||||
ClientIP string `meddler:"client_ip"`
|
||||
}
|
||||
|
||||
// PoolTxAPI represents a L2 Tx pool with extra metadata used by the API
|
||||
|
||||
@@ -627,7 +627,9 @@ CREATE TABLE tx_pool (
|
||||
rq_amount BYTEA,
|
||||
rq_fee SMALLINT,
|
||||
rq_nonce BIGINT,
|
||||
tx_type VARCHAR(40) NOT NULL
|
||||
tx_type VARCHAR(40) NOT NULL,
|
||||
client_ip VARCHAR,
|
||||
external_delete BOOLEAN NOT NULL DEFAULT false
|
||||
);
|
||||
|
||||
-- +migrate StatementBegin
|
||||
|
||||
@@ -303,6 +303,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
|
||||
ForgeDelay: cfg.Coordinator.ForgeDelay.Duration,
|
||||
ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration,
|
||||
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
|
||||
PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration,
|
||||
EthClientAttempts: cfg.Coordinator.EthClient.Attempts,
|
||||
EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration,
|
||||
EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,
|
||||
|
||||
Reference in New Issue
Block a user