mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 03:16:45 +01:00
Merge pull request #557 from hermeznetwork/feature/poolexternaldelete
Delete pending txs by external mark, store tx IP
This commit is contained in:
@@ -27,6 +27,7 @@ func (a *API) postPoolTx(c *gin.Context) {
|
|||||||
retBadReq(err, c)
|
retBadReq(err, c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
writeTx.ClientIP = c.ClientIP()
|
||||||
// Insert to DB
|
// Insert to DB
|
||||||
if err := a.l2.AddTxAPI(writeTx); err != nil {
|
if err := a.l2.AddTxAPI(writeTx); err != nil {
|
||||||
retSQLErr(err, c)
|
retSQLErr(err, c)
|
||||||
|
|||||||
1
cli/node/.gitignore
vendored
1
cli/node/.gitignore
vendored
@@ -1,2 +1,3 @@
|
|||||||
cfg.example.secret.toml
|
cfg.example.secret.toml
|
||||||
cfg.toml
|
cfg.toml
|
||||||
|
node
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ URL = "http://localhost:8545"
|
|||||||
[Synchronizer]
|
[Synchronizer]
|
||||||
SyncLoopInterval = "1s"
|
SyncLoopInterval = "1s"
|
||||||
StatsRefreshPeriod = "1s"
|
StatsRefreshPeriod = "1s"
|
||||||
|
StoreAccountUpdates = true
|
||||||
|
|
||||||
[SmartContracts]
|
[SmartContracts]
|
||||||
Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0"
|
Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0"
|
||||||
@@ -55,6 +56,7 @@ ForgeRetryInterval = "500ms"
|
|||||||
SyncRetryInterval = "1s"
|
SyncRetryInterval = "1s"
|
||||||
ForgeDelay = "10s"
|
ForgeDelay = "10s"
|
||||||
ForgeNoTxsDelay = "0s"
|
ForgeNoTxsDelay = "0s"
|
||||||
|
PurgeByExtDelInterval = "1m"
|
||||||
|
|
||||||
[Coordinator.FeeAccount]
|
[Coordinator.FeeAccount]
|
||||||
Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E"
|
Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E"
|
||||||
|
|||||||
@@ -91,6 +91,10 @@ type Coordinator struct {
|
|||||||
// SyncRetryInterval is the waiting interval between calls to the main
|
// SyncRetryInterval is the waiting interval between calls to the main
|
||||||
// handler of a synced block after an error
|
// handler of a synced block after an error
|
||||||
SyncRetryInterval Duration `validate:"required"`
|
SyncRetryInterval Duration `validate:"required"`
|
||||||
|
// PurgeByExtDelInterval is the waiting interval between calls
|
||||||
|
// to the PurgeByExternalDelete function of the l2db which deletes
|
||||||
|
// pending txs externally marked by the column `external_delete`
|
||||||
|
PurgeByExtDelInterval Duration `validate:"required"`
|
||||||
// L2DB is the DB that holds the pool of L2Txs
|
// L2DB is the DB that holds the pool of L2Txs
|
||||||
L2DB struct {
|
L2DB struct {
|
||||||
// SafetyPeriod is the number of batches after which
|
// SafetyPeriod is the number of batches after which
|
||||||
|
|||||||
@@ -85,6 +85,10 @@ type Config struct {
|
|||||||
// SyncRetryInterval is the waiting interval between calls to the main
|
// SyncRetryInterval is the waiting interval between calls to the main
|
||||||
// handler of a synced block after an error
|
// handler of a synced block after an error
|
||||||
SyncRetryInterval time.Duration
|
SyncRetryInterval time.Duration
|
||||||
|
// PurgeByExtDelInterval is the waiting interval between calls
|
||||||
|
// to the PurgeByExternalDelete function of the l2db which deletes
|
||||||
|
// pending txs externally marked by the column `external_delete`
|
||||||
|
PurgeByExtDelInterval time.Duration
|
||||||
// EthClientAttemptsDelay is delay between attempts do do an eth client
|
// EthClientAttemptsDelay is delay between attempts do do an eth client
|
||||||
// RPC call
|
// RPC call
|
||||||
EthClientAttemptsDelay time.Duration
|
EthClientAttemptsDelay time.Duration
|
||||||
@@ -153,6 +157,15 @@ type Coordinator struct {
|
|||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
// mutexL2DBUpdateDelete protects updates to the L2DB so that
|
||||||
|
// these two processes always happen exclusively:
|
||||||
|
// - Pipeline taking pending txs, running through the TxProcessor and
|
||||||
|
// marking selected txs as forging
|
||||||
|
// - Coordinator deleting pending txs that have been marked with
|
||||||
|
// `external_delete`.
|
||||||
|
// Without this mutex, the coordinator could delete a pending txs that
|
||||||
|
// has just been selected by the TxProcessor in the pipeline.
|
||||||
|
mutexL2DBUpdateDelete sync.Mutex
|
||||||
pipeline *Pipeline
|
pipeline *Pipeline
|
||||||
lastNonFailedBatchNum common.BatchNum
|
lastNonFailedBatchNum common.BatchNum
|
||||||
|
|
||||||
@@ -248,7 +261,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
|
|||||||
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
|
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
|
||||||
c.pipelineNum++
|
c.pipelineNum++
|
||||||
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
|
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
|
||||||
c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts)
|
c.batchBuilder, &c.mutexL2DBUpdateDelete, c.purger, c, c.txManager,
|
||||||
|
c.provers, &c.consts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MsgSyncBlock indicates an update to the Synchronizer stats
|
// MsgSyncBlock indicates an update to the Synchronizer stats
|
||||||
@@ -527,6 +541,24 @@ func (c *Coordinator) Start() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
c.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
log.Info("Coordinator L2DB.PurgeByExternalDelete loop done")
|
||||||
|
c.wg.Done()
|
||||||
|
return
|
||||||
|
case <-time.After(c.cfg.PurgeByExtDelInterval):
|
||||||
|
c.mutexL2DBUpdateDelete.Lock()
|
||||||
|
if err := c.l2DB.PurgeByExternalDelete(); err != nil {
|
||||||
|
log.Errorw("L2DB.PurgeByExternalDelete", "err", err)
|
||||||
|
}
|
||||||
|
c.mutexL2DBUpdateDelete.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
const stopCtxTimeout = 200 * time.Millisecond
|
const stopCtxTimeout = 200 * time.Millisecond
|
||||||
|
|||||||
@@ -53,6 +53,7 @@ type Pipeline struct {
|
|||||||
l2DB *l2db.L2DB
|
l2DB *l2db.L2DB
|
||||||
txSelector *txselector.TxSelector
|
txSelector *txselector.TxSelector
|
||||||
batchBuilder *batchbuilder.BatchBuilder
|
batchBuilder *batchbuilder.BatchBuilder
|
||||||
|
mutexL2DBUpdateDelete *sync.Mutex
|
||||||
purger *Purger
|
purger *Purger
|
||||||
|
|
||||||
stats synchronizer.Stats
|
stats synchronizer.Stats
|
||||||
@@ -84,6 +85,7 @@ func NewPipeline(ctx context.Context,
|
|||||||
l2DB *l2db.L2DB,
|
l2DB *l2db.L2DB,
|
||||||
txSelector *txselector.TxSelector,
|
txSelector *txselector.TxSelector,
|
||||||
batchBuilder *batchbuilder.BatchBuilder,
|
batchBuilder *batchbuilder.BatchBuilder,
|
||||||
|
mutexL2DBUpdateDelete *sync.Mutex,
|
||||||
purger *Purger,
|
purger *Purger,
|
||||||
coord *Coordinator,
|
coord *Coordinator,
|
||||||
txManager *TxManager,
|
txManager *TxManager,
|
||||||
@@ -112,6 +114,7 @@ func NewPipeline(ctx context.Context,
|
|||||||
batchBuilder: batchBuilder,
|
batchBuilder: batchBuilder,
|
||||||
provers: provers,
|
provers: provers,
|
||||||
proversPool: proversPool,
|
proversPool: proversPool,
|
||||||
|
mutexL2DBUpdateDelete: mutexL2DBUpdateDelete,
|
||||||
purger: purger,
|
purger: purger,
|
||||||
coord: coord,
|
coord: coord,
|
||||||
txManager: txManager,
|
txManager: txManager,
|
||||||
@@ -199,7 +202,9 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
|
|||||||
// and then waits for an available proof server and sends the zkInputs to it so
|
// and then waits for an available proof server and sends the zkInputs to it so
|
||||||
// that the proof computation begins.
|
// that the proof computation begins.
|
||||||
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
|
||||||
|
p.mutexL2DBUpdateDelete.Lock()
|
||||||
batchInfo, err := p.forgeBatch(batchNum)
|
batchInfo, err := p.forgeBatch(batchNum)
|
||||||
|
p.mutexL2DBUpdateDelete.Unlock()
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
|
|||||||
@@ -73,24 +73,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddTx inserts a tx to the pool
|
|
||||||
func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
|
|
||||||
row := l2db.db.QueryRow(
|
|
||||||
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
|
|
||||||
common.PoolL2TxStatePending,
|
|
||||||
)
|
|
||||||
var totalTxs uint32
|
|
||||||
if err := row.Scan(&totalTxs); err != nil {
|
|
||||||
return tracerr.Wrap(err)
|
|
||||||
}
|
|
||||||
if totalTxs >= l2db.maxTxs {
|
|
||||||
return tracerr.New(
|
|
||||||
"The pool is at full capacity. More transactions are not accepted currently",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateTxsInfo updates the parameter Info of the pool transactions
|
// UpdateTxsInfo updates the parameter Info of the pool transactions
|
||||||
func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
|
func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
|
||||||
if len(txs) == 0 {
|
if len(txs) == 0 {
|
||||||
@@ -354,3 +336,14 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
|
|||||||
)
|
)
|
||||||
return tracerr.Wrap(err)
|
return tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PurgeByExternalDelete deletes all pending transactions marked with true in
|
||||||
|
// the `external_delete` column. An external process can set this column to
|
||||||
|
// true to instruct the coordinator to delete the tx when possible.
|
||||||
|
func (l2db *L2DB) PurgeByExternalDelete() error {
|
||||||
|
_, err := l2db.db.Exec(
|
||||||
|
`DELETE from tx_pool WHERE (external_delete = true AND state = $1);`,
|
||||||
|
common.PoolL2TxStatePending,
|
||||||
|
)
|
||||||
|
return tracerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package l2db
|
package l2db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"math"
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
@@ -701,3 +702,56 @@ func TestAddGet(t *testing.T) {
|
|||||||
assert.Equal(t, txs[i], *dbTx)
|
assert.Equal(t, txs[i], *dbTx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPurgeByExternalDelete(t *testing.T) {
|
||||||
|
err := prepareHistoryDB(historyDB)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error prepare historyDB", err)
|
||||||
|
}
|
||||||
|
txs, err := generatePoolL2Txs()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// We will work with 8 txs
|
||||||
|
require.GreaterOrEqual(t, len(txs), 8)
|
||||||
|
txs = txs[:8]
|
||||||
|
for i := range txs {
|
||||||
|
require.NoError(t, l2DB.AddTxTest(&txs[i]))
|
||||||
|
}
|
||||||
|
|
||||||
|
// We will recreate this scenario:
|
||||||
|
// tx index, status , external_delete
|
||||||
|
// 0 , pending, false
|
||||||
|
// 1 , pending, false
|
||||||
|
// 2 , pending, true // will be deleted
|
||||||
|
// 3 , pending, true // will be deleted
|
||||||
|
// 4 , fging , false
|
||||||
|
// 5 , fging , false
|
||||||
|
// 6 , fging , true
|
||||||
|
// 7 , fging , true
|
||||||
|
|
||||||
|
require.NoError(t, l2DB.StartForging(
|
||||||
|
[]common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID},
|
||||||
|
1))
|
||||||
|
_, err = l2DB.db.Exec(
|
||||||
|
`UPDATE tx_pool SET external_delete = true WHERE
|
||||||
|
tx_id IN ($1, $2, $3, $4)
|
||||||
|
;`,
|
||||||
|
txs[2].TxID, txs[3].TxID, txs[6].TxID, txs[7].TxID,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, l2DB.PurgeByExternalDelete())
|
||||||
|
|
||||||
|
// Query txs that are have been not deleted
|
||||||
|
for _, i := range []int{0, 1, 4, 5, 6, 7} {
|
||||||
|
txID := txs[i].TxID
|
||||||
|
_, err := l2DB.GetTx(txID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query txs that have been deleted
|
||||||
|
for _, i := range []int{2, 3} {
|
||||||
|
txID := txs[i].TxID
|
||||||
|
_, err := l2DB.GetTx(txID)
|
||||||
|
require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ type PoolL2TxWrite struct {
|
|||||||
RqFee *common.FeeSelector `meddler:"rq_fee"`
|
RqFee *common.FeeSelector `meddler:"rq_fee"`
|
||||||
RqNonce *common.Nonce `meddler:"rq_nonce"`
|
RqNonce *common.Nonce `meddler:"rq_nonce"`
|
||||||
Type common.TxType `meddler:"tx_type"`
|
Type common.TxType `meddler:"tx_type"`
|
||||||
|
ClientIP string `meddler:"client_ip"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// PoolTxAPI represents a L2 Tx pool with extra metadata used by the API
|
// PoolTxAPI represents a L2 Tx pool with extra metadata used by the API
|
||||||
|
|||||||
@@ -627,7 +627,9 @@ CREATE TABLE tx_pool (
|
|||||||
rq_amount BYTEA,
|
rq_amount BYTEA,
|
||||||
rq_fee SMALLINT,
|
rq_fee SMALLINT,
|
||||||
rq_nonce BIGINT,
|
rq_nonce BIGINT,
|
||||||
tx_type VARCHAR(40) NOT NULL
|
tx_type VARCHAR(40) NOT NULL,
|
||||||
|
client_ip VARCHAR,
|
||||||
|
external_delete BOOLEAN NOT NULL DEFAULT false
|
||||||
);
|
);
|
||||||
|
|
||||||
-- +migrate StatementBegin
|
-- +migrate StatementBegin
|
||||||
|
|||||||
@@ -303,6 +303,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
|
|||||||
ForgeDelay: cfg.Coordinator.ForgeDelay.Duration,
|
ForgeDelay: cfg.Coordinator.ForgeDelay.Duration,
|
||||||
ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration,
|
ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration,
|
||||||
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
|
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
|
||||||
|
PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration,
|
||||||
EthClientAttempts: cfg.Coordinator.EthClient.Attempts,
|
EthClientAttempts: cfg.Coordinator.EthClient.Attempts,
|
||||||
EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration,
|
EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration,
|
||||||
EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,
|
EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,
|
||||||
|
|||||||
Reference in New Issue
Block a user