diff --git a/api/txspool.go b/api/txspool.go index 207e843..b990818 100644 --- a/api/txspool.go +++ b/api/txspool.go @@ -27,6 +27,7 @@ func (a *API) postPoolTx(c *gin.Context) { retBadReq(err, c) return } + writeTx.ClientIP = c.ClientIP() // Insert to DB if err := a.l2.AddTxAPI(writeTx); err != nil { retSQLErr(err, c) diff --git a/cli/node/.gitignore b/cli/node/.gitignore index 47a3b92..c4617c7 100644 --- a/cli/node/.gitignore +++ b/cli/node/.gitignore @@ -1,2 +1,3 @@ cfg.example.secret.toml cfg.toml +node diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index f80efa8..941b6ed 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -32,6 +32,7 @@ URL = "http://localhost:8545" [Synchronizer] SyncLoopInterval = "1s" StatsRefreshPeriod = "1s" +StoreAccountUpdates = true [SmartContracts] Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0" @@ -55,6 +56,7 @@ ForgeRetryInterval = "500ms" SyncRetryInterval = "1s" ForgeDelay = "10s" ForgeNoTxsDelay = "0s" +PurgeByExtDelInterval = "1m" [Coordinator.FeeAccount] Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E" diff --git a/config/config.go b/config/config.go index 76a1f13..07921f6 100644 --- a/config/config.go +++ b/config/config.go @@ -91,6 +91,10 @@ type Coordinator struct { // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval Duration `validate:"required"` + // PurgeByExtDelInterval is the waiting interval between calls + // to the PurgeByExternalDelete function of the l2db which deletes + // pending txs externally marked by the column `external_delete` + PurgeByExtDelInterval Duration `validate:"required"` // L2DB is the DB that holds the pool of L2Txs L2DB struct { // SafetyPeriod is the number of batches after which diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index b6d922f..5263713 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -85,6 +85,10 @@ type Config struct { // SyncRetryInterval is the waiting interval between calls to the main // handler of a synced block after an error SyncRetryInterval time.Duration + // PurgeByExtDelInterval is the waiting interval between calls + // to the PurgeByExternalDelete function of the l2db which deletes + // pending txs externally marked by the column `external_delete` + PurgeByExtDelInterval time.Duration // EthClientAttemptsDelay is delay between attempts do do an eth client // RPC call EthClientAttemptsDelay time.Duration @@ -153,6 +157,15 @@ type Coordinator struct { wg sync.WaitGroup cancel context.CancelFunc + // mutexL2DBUpdateDelete protects updates to the L2DB so that + // these two processes always happen exclusively: + // - Pipeline taking pending txs, running through the TxProcessor and + // marking selected txs as forging + // - Coordinator deleting pending txs that have been marked with + // `external_delete`. + // Without this mutex, the coordinator could delete a pending txs that + // has just been selected by the TxProcessor in the pipeline. + mutexL2DBUpdateDelete sync.Mutex pipeline *Pipeline lastNonFailedBatchNum common.BatchNum @@ -248,7 +261,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder { func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) { c.pipelineNum++ return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector, - c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts) + c.batchBuilder, &c.mutexL2DBUpdateDelete, c.purger, c, c.txManager, + c.provers, &c.consts) } // MsgSyncBlock indicates an update to the Synchronizer stats @@ -527,6 +541,24 @@ func (c *Coordinator) Start() { } } }() + + c.wg.Add(1) + go func() { + for { + select { + case <-c.ctx.Done(): + log.Info("Coordinator L2DB.PurgeByExternalDelete loop done") + c.wg.Done() + return + case <-time.After(c.cfg.PurgeByExtDelInterval): + c.mutexL2DBUpdateDelete.Lock() + if err := c.l2DB.PurgeByExternalDelete(); err != nil { + log.Errorw("L2DB.PurgeByExternalDelete", "err", err) + } + c.mutexL2DBUpdateDelete.Unlock() + } + } + }() } const stopCtxTimeout = 200 * time.Millisecond diff --git a/coordinator/pipeline.go b/coordinator/pipeline.go index aaeea59..a60423a 100644 --- a/coordinator/pipeline.go +++ b/coordinator/pipeline.go @@ -45,15 +45,16 @@ type Pipeline struct { errAtBatchNum common.BatchNum lastForgeTime time.Time - proversPool *ProversPool - provers []prover.Client - coord *Coordinator - txManager *TxManager - historyDB *historydb.HistoryDB - l2DB *l2db.L2DB - txSelector *txselector.TxSelector - batchBuilder *batchbuilder.BatchBuilder - purger *Purger + proversPool *ProversPool + provers []prover.Client + coord *Coordinator + txManager *TxManager + historyDB *historydb.HistoryDB + l2DB *l2db.L2DB + txSelector *txselector.TxSelector + batchBuilder *batchbuilder.BatchBuilder + mutexL2DBUpdateDelete *sync.Mutex + purger *Purger stats synchronizer.Stats vars synchronizer.SCVariables @@ -84,6 +85,7 @@ func NewPipeline(ctx context.Context, l2DB *l2db.L2DB, txSelector *txselector.TxSelector, batchBuilder *batchbuilder.BatchBuilder, + mutexL2DBUpdateDelete *sync.Mutex, purger *Purger, coord *Coordinator, txManager *TxManager, @@ -104,19 +106,20 @@ func NewPipeline(ctx context.Context, return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) } return &Pipeline{ - num: num, - cfg: cfg, - historyDB: historyDB, - l2DB: l2DB, - txSelector: txSelector, - batchBuilder: batchBuilder, - provers: provers, - proversPool: proversPool, - purger: purger, - coord: coord, - txManager: txManager, - consts: *scConsts, - statsVarsCh: make(chan statsVars, queueLen), + num: num, + cfg: cfg, + historyDB: historyDB, + l2DB: l2DB, + txSelector: txSelector, + batchBuilder: batchBuilder, + provers: provers, + proversPool: proversPool, + mutexL2DBUpdateDelete: mutexL2DBUpdateDelete, + purger: purger, + coord: coord, + txManager: txManager, + consts: *scConsts, + statsVarsCh: make(chan statsVars, queueLen), }, nil } @@ -199,7 +202,9 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { // and then waits for an available proof server and sends the zkInputs to it so // that the proof computation begins. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { + p.mutexL2DBUpdateDelete.Lock() batchInfo, err := p.forgeBatch(batchNum) + p.mutexL2DBUpdateDelete.Unlock() if ctx.Err() != nil { return nil, ctx.Err() } else if err != nil { diff --git a/db/l2db/l2db.go b/db/l2db/l2db.go index df077f2..5af7dfa 100644 --- a/db/l2db/l2db.go +++ b/db/l2db/l2db.go @@ -73,24 +73,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun )) } -// AddTx inserts a tx to the pool -func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error { - row := l2db.db.QueryRow( - "SELECT COUNT(*) FROM tx_pool WHERE state = $1;", - common.PoolL2TxStatePending, - ) - var totalTxs uint32 - if err := row.Scan(&totalTxs); err != nil { - return tracerr.Wrap(err) - } - if totalTxs >= l2db.maxTxs { - return tracerr.New( - "The pool is at full capacity. More transactions are not accepted currently", - ) - } - return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx)) -} - // UpdateTxsInfo updates the parameter Info of the pool transactions func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error { if len(txs) == 0 { @@ -354,3 +336,14 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) { ) return tracerr.Wrap(err) } + +// PurgeByExternalDelete deletes all pending transactions marked with true in +// the `external_delete` column. An external process can set this column to +// true to instruct the coordinator to delete the tx when possible. +func (l2db *L2DB) PurgeByExternalDelete() error { + _, err := l2db.db.Exec( + `DELETE from tx_pool WHERE (external_delete = true AND state = $1);`, + common.PoolL2TxStatePending, + ) + return tracerr.Wrap(err) +} diff --git a/db/l2db/l2db_test.go b/db/l2db/l2db_test.go index 2a12748..d1b0257 100644 --- a/db/l2db/l2db_test.go +++ b/db/l2db/l2db_test.go @@ -1,6 +1,7 @@ package l2db import ( + "database/sql" "math" "math/big" "os" @@ -701,3 +702,56 @@ func TestAddGet(t *testing.T) { assert.Equal(t, txs[i], *dbTx) } } + +func TestPurgeByExternalDelete(t *testing.T) { + err := prepareHistoryDB(historyDB) + if err != nil { + log.Error("Error prepare historyDB", err) + } + txs, err := generatePoolL2Txs() + assert.NoError(t, err) + + // We will work with 8 txs + require.GreaterOrEqual(t, len(txs), 8) + txs = txs[:8] + for i := range txs { + require.NoError(t, l2DB.AddTxTest(&txs[i])) + } + + // We will recreate this scenario: + // tx index, status , external_delete + // 0 , pending, false + // 1 , pending, false + // 2 , pending, true // will be deleted + // 3 , pending, true // will be deleted + // 4 , fging , false + // 5 , fging , false + // 6 , fging , true + // 7 , fging , true + + require.NoError(t, l2DB.StartForging( + []common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID}, + 1)) + _, err = l2DB.db.Exec( + `UPDATE tx_pool SET external_delete = true WHERE + tx_id IN ($1, $2, $3, $4) + ;`, + txs[2].TxID, txs[3].TxID, txs[6].TxID, txs[7].TxID, + ) + require.NoError(t, err) + require.NoError(t, l2DB.PurgeByExternalDelete()) + + // Query txs that are have been not deleted + for _, i := range []int{0, 1, 4, 5, 6, 7} { + txID := txs[i].TxID + _, err := l2DB.GetTx(txID) + require.NoError(t, err) + } + + // Query txs that have been deleted + for _, i := range []int{2, 3} { + txID := txs[i].TxID + _, err := l2DB.GetTx(txID) + require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err)) + } +} diff --git a/db/l2db/views.go b/db/l2db/views.go index 7850c0a..5ea1fff 100644 --- a/db/l2db/views.go +++ b/db/l2db/views.go @@ -34,6 +34,7 @@ type PoolL2TxWrite struct { RqFee *common.FeeSelector `meddler:"rq_fee"` RqNonce *common.Nonce `meddler:"rq_nonce"` Type common.TxType `meddler:"tx_type"` + ClientIP string `meddler:"client_ip"` } // PoolTxAPI represents a L2 Tx pool with extra metadata used by the API diff --git a/db/migrations/0001.sql b/db/migrations/0001.sql index 6064089..6e73402 100644 --- a/db/migrations/0001.sql +++ b/db/migrations/0001.sql @@ -627,7 +627,9 @@ CREATE TABLE tx_pool ( rq_amount BYTEA, rq_fee SMALLINT, rq_nonce BIGINT, - tx_type VARCHAR(40) NOT NULL + tx_type VARCHAR(40) NOT NULL, + client_ip VARCHAR, + external_delete BOOLEAN NOT NULL DEFAULT false ); -- +migrate StatementBegin diff --git a/node/node.go b/node/node.go index 5bc7f0f..c5e0899 100644 --- a/node/node.go +++ b/node/node.go @@ -303,6 +303,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { ForgeDelay: cfg.Coordinator.ForgeDelay.Duration, ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration, SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration, + PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration, EthClientAttempts: cfg.Coordinator.EthClient.Attempts, EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration, EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,