From 67b2b7da4b9dc7e7f1df200cd515ecc2ad1db924 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Fri, 19 Feb 2021 13:54:56 +0100 Subject: [PATCH] Delete pending txs by external mark, store tx IP - In tx_pool, add a column called `external_delete` that can be set to true externally. Regularly, the coordinator will delete all pending txs with this column set to true. The interval for this action is set via the new config parameter `Coordinator.PurgeByExtDelInterval`. - In tx_pool, add a column for the client ip that sent the transaction. The api fills this value using the ClientIP method from gin.Context, which should work even under a reverse-proxy. --- api/txspool.go | 1 + cli/node/.gitignore | 1 + cli/node/cfg.buidler.toml | 2 ++ config/config.go | 4 +++ coordinator/coordinator.go | 34 +++++++++++++++++++++++- coordinator/pipeline.go | 49 ++++++++++++++++++---------------- db/l2db/l2db.go | 29 ++++++++------------ db/l2db/l2db_test.go | 54 ++++++++++++++++++++++++++++++++++++++ db/l2db/views.go | 1 + db/migrations/0001.sql | 4 ++- node/node.go | 1 + 11 files changed, 138 insertions(+), 42 deletions(-) diff --git a/api/txspool.go b/api/txspool.go index 207e843..b990818 100644 --- a/api/txspool.go +++ b/api/txspool.go @@ -27,6 +27,7 @@ func (a *API) postPoolTx(c *gin.Context) { retBadReq(err, c) return } + writeTx.ClientIP = c.ClientIP() // Insert to DB if err := a.l2.AddTxAPI(writeTx); err != nil { retSQLErr(err, c) diff --git a/cli/node/.gitignore b/cli/node/.gitignore index 47a3b92..c4617c7 100644 --- a/cli/node/.gitignore +++ b/cli/node/.gitignore @@ -1,2 +1,3 @@ cfg.example.secret.toml cfg.toml +node diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index f80efa8..941b6ed 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -32,6 +32,7 @@ URL = "http://localhost:8545" [Synchronizer] SyncLoopInterval = "1s" StatsRefreshPeriod = "1s" +StoreAccountUpdates = true [SmartContracts] Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0" @@ -55,6 +56,7 @@ ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" ForgeDelay = "10s" ForgeNoTxsDelay = "0s" +PurgeByExtDelInterval = "1m" [Coordinator.FeeAccount] Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E" diff --git a/config/config.go b/config/config.go index 76a1f13..07921f6 100644 --- a/config/config.go +++ b/config/config.go @@ -91,6 +91,10 @@ type Coordinator struct { // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` + // PurgeByExtDelInterval is the waiting interval between calls + // to the PurgeByExternalDelete function of the l2db which deletes + // pending txs externally marked by the column `external_delete` + PurgeByExtDelInterval Duration `validate:"required"` // L2DB is the DB that holds the pool of L2Txs L2DB struct { // SafetyPeriod is the number of batches after which diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index b6d922f..5263713 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -85,6 +85,10 @@ type Config struct { // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration + // PurgeByExtDelInterval is the waiting interval between calls + // to the PurgeByExternalDelete function of the l2db which deletes + // pending txs externally marked by the column `external_delete` + PurgeByExtDelInterval time.Duration // EthClientAttemptsDelay is delay between attempts do do an eth client // RPC call EthClientAttemptsDelay time.Duration @@ -153,6 +157,15 @@ type Coordinator struct { wg sync.WaitGroup cancel context.CancelFunc + // mutexL2DBUpdateDelete protects updates to the L2DB so that + // these two processes always happen exclusively: + // - Pipeline taking pending txs, running through the TxProcessor and + // marking selected txs as forging + // - Coordinator deleting pending txs that have been marked with + // `external_delete`. + // Without this mutex, the coordinator could delete a pending txs that + // has just been selected by the TxProcessor in the pipeline. + mutexL2DBUpdateDelete sync.Mutex pipeline *Pipeline lastNonFailedBatchNum common.BatchNum @@ -248,7 +261,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { c.pipelineNum++ return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector, - c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts) + c.batchBuilder, &c.mutexL2DBUpdateDelete, c.purger, c, c.txManager, + c.provers, &c.consts) } // MsgSyncBlock indicates an update to the Synchronizer stats @@ -527,6 +541,24 @@ func (c *Coordinator) Start() { } } }() + + c.wg.Add(1) + go func() { + for { + select { + case <-c.ctx.Done(): + log.Info("Coordinator L2DB.PurgeByExternalDelete loop done") + c.wg.Done() + return + case <-time.After(c.cfg.PurgeByExtDelInterval): + c.mutexL2DBUpdateDelete.Lock() + if err := c.l2DB.PurgeByExternalDelete(); err != nil { + log.Errorw("L2DB.PurgeByExternalDelete", "err", err) + } + c.mutexL2DBUpdateDelete.Unlock() + } + } + }() } const stopCtxTimeout = 200 * time.Millisecond diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index aaeea59..a60423a 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -45,15 +45,16 @@ type Pipeline struct { errAtBatchNum common.BatchNum lastForgeTime time.Time - proversPool *ProversPool - provers []prover.Client - coord *Coordinator - txManager *TxManager - historyDB *historydb.HistoryDB - l2DB *l2db.L2DB - txSelector *txselector.TxSelector - batchBuilder *batchbuilder.BatchBuilder - purger *Purger + proversPool *ProversPool + provers []prover.Client + coord *Coordinator + txManager *TxManager + historyDB *historydb.HistoryDB + l2DB *l2db.L2DB + txSelector *txselector.TxSelector + batchBuilder *batchbuilder.BatchBuilder + mutexL2DBUpdateDelete *sync.Mutex + purger *Purger stats synchronizer.Stats vars synchronizer.SCVariables @@ -84,6 +85,7 @@ func NewPipeline(ctx context.Context, l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, + mutexL2DBUpdateDelete *sync.Mutex, purger *Purger, coord *Coordinator, txManager *TxManager, @@ -104,19 +106,20 @@ func NewPipeline(ctx context.Context, return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) } return &Pipeline{ - num: num, - cfg: cfg, - historyDB: historyDB, - l2DB: l2DB, - txSelector: txSelector, - batchBuilder: batchBuilder, - provers: provers, - proversPool: proversPool, - purger: purger, - coord: coord, - txManager: txManager, - consts: *scConsts, - statsVarsCh: make(chan statsVars, queueLen), + num: num, + cfg: cfg, + historyDB: historyDB, + l2DB: l2DB, + txSelector: txSelector, + batchBuilder: batchBuilder, + provers: provers, + proversPool: proversPool, + mutexL2DBUpdateDelete: mutexL2DBUpdateDelete, + purger: purger, + coord: coord, + txManager: txManager, + consts: *scConsts, + statsVarsCh: make(chan statsVars, queueLen), }, nil } @@ -199,7 +202,9 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { // and then waits for an available proof server and sends the zkInputs to it so // that the proof computation begins. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + p.mutexL2DBUpdateDelete.Lock() batchInfo, err := p.forgeBatch(batchNum) + p.mutexL2DBUpdateDelete.Unlock() if ctx.Err() != nil { return nil, ctx.Err() } else if err != nil { diff --git a/db/l2db/l2db.go b/db/l2db/l2db.go index df077f2..5af7dfa 100644 --- a/db/l2db/l2db.go +++ b/db/l2db/l2db.go @@ -73,24 +73,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun )) } -// AddTx inserts a tx to the pool -func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error { - row := l2db.db.QueryRow( - "SELECT COUNT(*) FROM tx_pool WHERE state = $1;", - common.PoolL2TxStatePending, - ) - var totalTxs uint32 - if err := row.Scan(&totalTxs); err != nil { - return tracerr.Wrap(err) - } - if totalTxs >= l2db.maxTxs { - return tracerr.New( - "The pool is at full capacity. More transactions are not accepted currently", - ) - } - return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx)) -} - // UpdateTxsInfo updates the parameter Info of the pool transactions func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error { if len(txs) == 0 { @@ -354,3 +336,14 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) { ) return tracerr.Wrap(err) } + +// PurgeByExternalDelete deletes all pending transactions marked with true in +// the `external_delete` column. An external process can set this column to +// true to instruct the coordinator to delete the tx when possible. +func (l2db *L2DB) PurgeByExternalDelete() error { + _, err := l2db.db.Exec( + `DELETE from tx_pool WHERE (external_delete = true AND state = $1);`, + common.PoolL2TxStatePending, + ) + return tracerr.Wrap(err) +} diff --git a/db/l2db/l2db_test.go b/db/l2db/l2db_test.go index 2a12748..d1b0257 100644 --- a/db/l2db/l2db_test.go +++ b/db/l2db/l2db_test.go @@ -1,6 +1,7 @@ package l2db import ( + "database/sql" "math" "math/big" "os" @@ -701,3 +702,56 @@ func TestAddGet(t *testing.T) { assert.Equal(t, txs[i], *dbTx) } } + +func TestPurgeByExternalDelete(t *testing.T) { + err := prepareHistoryDB(historyDB) + if err != nil { + log.Error("Error prepare historyDB", err) + } + txs, err := generatePoolL2Txs() + assert.NoError(t, err) + + // We will work with 8 txs + require.GreaterOrEqual(t, len(txs), 8) + txs = txs[:8] + for i := range txs { + require.NoError(t, l2DB.AddTxTest(&txs[i])) + } + + // We will recreate this scenario: + // tx index, status , external_delete + // 0 , pending, false + // 1 , pending, false + // 2 , pending, true // will be deleted + // 3 , pending, true // will be deleted + // 4 , fging , false + // 5 , fging , false + // 6 , fging , true + // 7 , fging , true + + require.NoError(t, l2DB.StartForging( + []common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID}, + 1)) + _, err = l2DB.db.Exec( + `UPDATE tx_pool SET external_delete = true WHERE + tx_id IN ($1, $2, $3, $4) + ;`, + txs[2].TxID, txs[3].TxID, txs[6].TxID, txs[7].TxID, + ) + require.NoError(t, err) + require.NoError(t, l2DB.PurgeByExternalDelete()) + + // Query txs that are have been not deleted + for _, i := range []int{0, 1, 4, 5, 6, 7} { + txID := txs[i].TxID + _, err := l2DB.GetTx(txID) + require.NoError(t, err) + } + + // Query txs that have been deleted + for _, i := range []int{2, 3} { + txID := txs[i].TxID + _, err := l2DB.GetTx(txID) + require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err)) + } +} diff --git a/db/l2db/views.go b/db/l2db/views.go index 7850c0a..5ea1fff 100644 --- a/db/l2db/views.go +++ b/db/l2db/views.go @@ -34,6 +34,7 @@ type PoolL2TxWrite struct { RqFee *common.FeeSelector `meddler:"rq_fee"` RqNonce *common.Nonce `meddler:"rq_nonce"` Type common.TxType `meddler:"tx_type"` + ClientIP string `meddler:"client_ip"` } // PoolTxAPI represents a L2 Tx pool with extra metadata used by the API diff --git a/db/migrations/0001.sql b/db/migrations/0001.sql index 6064089..6e73402 100644 --- a/db/migrations/0001.sql +++ b/db/migrations/0001.sql @@ -627,7 +627,9 @@ CREATE TABLE tx_pool ( rq_amount BYTEA, rq_fee SMALLINT, rq_nonce BIGINT, - tx_type VARCHAR(40) NOT NULL + tx_type VARCHAR(40) NOT NULL, + client_ip VARCHAR, + external_delete BOOLEAN NOT NULL DEFAULT false ); -- +migrate StatementBegin diff --git a/node/node.go b/node/node.go index 5bc7f0f..c5e0899 100644 --- a/node/node.go +++ b/node/node.go @@ -303,6 +303,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, + PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,