Integrate purger to node

- Common
	- Add `IdxNonce` type used to track nonces in accounts to invalidate
	  l2txs in the pool
- Config
	- Update coordinator config will all the new configuration parameters
	  used in the coordinator
- Coordinator
	- Introduce the `Purger` to track how often to purge and do the job when
	  needed according to a configuration.
	- Implement the methods to invalidate l2txs transactions due to l2txs
	  selection in batches.  For now these functions are not used in favour
	  of the `Purger` methods, which check ALL the l2txs in the pool.
	- Call Invalidation and Purging methods of the purger both when the
	  node is forging (in the pipeline when starting a new batch) and when
	  the node is not forging (in coordinator when being notified about a
	  new synced block)
- L2DB:
	- Implement `GetPendingUniqueFromIdxs` to get all the unique idxs from
	  pending transactions (used to get their nonces and then invalidate
	  txs)
	- Redo `CheckNonces` with a single SQL query and using `common.IdxNonce`
	  instead of `common.Account`
- StateDB:
	- Expose GetIdx to check errors when invalidating pool txs
- Synchronizer:
	- Test forged L1UserTxs processed by TxProcessor
	- Improve checks of Effective values
- TxSelector:
	- Expose the internal LocalStateDB in order to check account nonces in
	  the coordinator when not forging.
This commit is contained in:
Eduard S
2020-12-03 14:58:26 +01:00
parent 8de7fe537a
commit 900d1fb6ce
15 changed files with 402 additions and 101 deletions

View File

@@ -45,6 +45,7 @@ type Config struct {
// DebugBatchPath if set, specifies the path where batchInfo is stored
// in JSON in every step/update of the pipeline
DebugBatchPath string
Purger PurgerCfg
}
func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
@@ -79,6 +80,7 @@ type Coordinator struct {
pipeline *Pipeline
purger *Purger
txManager *TxManager
}
@@ -103,6 +105,14 @@ func NewCoordinator(cfg Config,
cfg.EthClientAttempts))
}
purger := Purger{
cfg: cfg.Purger,
lastPurgeBlock: 0,
lastPurgeBatch: 0,
lastInvalidateBlock: 0,
lastInvalidateBatch: 0,
}
ctx, cancel := context.WithCancel(context.Background())
c := Coordinator{
pipelineBatchNum: -1,
@@ -117,6 +127,8 @@ func NewCoordinator(cfg Config,
txSelector: txSelector,
batchBuilder: batchBuilder,
purger: &purger,
// ethClient: ethClient,
msgCh: make(chan interface{}),
@@ -130,16 +142,18 @@ func NewCoordinator(cfg Config,
}
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB,
c.txSelector, c.batchBuilder, c.txManager, c.provers, &c.consts)
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts)
}
// MsgSyncStats indicates an update to the Synchronizer stats
type MsgSyncStats struct {
Stats synchronizer.Stats
// MsgSyncBlock indicates an update to the Synchronizer stats
type MsgSyncBlock struct {
Stats synchronizer.Stats
Batches []common.BatchData
}
// MsgSyncSCVars indicates an update to Smart Contract Vars
// TODO: Move this to MsgSyncBlock and remove MsgSyncSCVars
type MsgSyncSCVars struct {
Rollup *common.RollupVariables
Auction *common.AuctionVariables
@@ -186,7 +200,9 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
return false
}
func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronizer.Stats) error {
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
stats := &msg.Stats
// batches := msg.Batches
if !stats.Synced() {
return nil
}
@@ -218,6 +234,29 @@ func (c *Coordinator) handleMsgSyncStats(ctx context.Context, stats *synchronize
c.pipeline = nil
}
}
if c.pipeline == nil {
// Mark invalid in Pool due to forged L2Txs
// for _, batch := range batches {
// if err := poolMarkInvalidOldNoncesFromL2Txs(c.l2DB,
// idxsNonceFromL2Txs(batch.L2Txs), batch.Batch.BatchNum); err != nil {
// return err
// }
// }
if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) {
if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil {
return err
}
}
_, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
if err != nil {
return err
}
_, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
if err != nil {
return err
}
}
return nil
}
@@ -254,12 +293,11 @@ func (c *Coordinator) Start() {
return
case msg := <-c.msgCh:
switch msg := msg.(type) {
case MsgSyncStats:
stats := msg.Stats
if err := c.handleMsgSyncStats(c.ctx, &stats); common.IsErrDone(err) {
case MsgSyncBlock:
if err := c.handleMsgSyncBlock(c.ctx, &msg); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleMsgSyncStats error", "err", err)
log.Errorw("Coordinator.handleMsgSyncBlock error", "err", err)
continue
}
case MsgSyncReorg:
@@ -522,6 +560,7 @@ type Pipeline struct {
l2DB *l2db.L2DB
txSelector *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
purger *Purger
stats synchronizer.Stats
statsCh chan synchronizer.Stats
@@ -538,6 +577,7 @@ func NewPipeline(ctx context.Context,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
batchBuilder *batchbuilder.BatchBuilder,
purger *Purger,
txManager *TxManager,
provers []prover.Client,
scConsts *synchronizer.SCConsts,
@@ -563,6 +603,7 @@ func NewPipeline(ctx context.Context,
batchBuilder: batchBuilder,
provers: provers,
proversPool: proversPool,
purger: purger,
txManager: txManager,
consts: *scConsts,
statsCh: make(chan synchronizer.Stats, queueLen),
@@ -679,7 +720,12 @@ func l2TxsIDs(txs []common.PoolL2Tx) []common.TxID {
// circuit inputs to the proof server.
func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
// remove transactions from the pool that have been there for too long
err := p.l2DB.Purge(common.BatchNum(p.stats.Sync.LastBatch))
_, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, err
}
_, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -721,11 +767,11 @@ func (p *Pipeline) forgeSendServerProof(ctx context.Context, batchNum common.Bat
return nil, tracerr.Wrap(err)
}
// Run purger to invalidate transactions that become invalid beause of
// Invalidate transactions that become invalid beause of
// the poolL2Txs selected. Will mark as invalid the txs that have a
// (fromIdx, nonce) which already appears in the selected txs (includes
// all the nonces smaller than the current one)
err = p.purgeInvalidDueToL2TxsSelection(poolL2Txs)
err = poolMarkInvalidOldNoncesFromL2Txs(p.l2DB, idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -784,10 +830,6 @@ func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) er
return nil
}
func (p *Pipeline) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error {
return nil // TODO
}
func (p *Pipeline) shouldL1L2Batch() bool {
// Take the lastL1BatchBlockNum as the biggest between the last
// scheduled one, and the synchronized one.

View File

@@ -79,28 +79,29 @@ func newTestModules(t *testing.T) (*historydb.HistoryDB, *l2db.L2DB,
var err error
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
require.Nil(t, err)
require.NoError(t, err)
deleteme = append(deleteme, syncDBPath)
syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels)
assert.Nil(t, err)
assert.NoError(t, err)
pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.Nil(t, err)
require.NoError(t, err)
test.WipeDB(db)
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour)
historyDB := historydb.NewHistoryDB(db)
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")
require.Nil(t, err)
require.NoError(t, err)
deleteme = append(deleteme, txSelDBPath)
txsel, err := txselector.NewTxSelector(txSelDBPath, syncSdb, l2DB, 10, 10, 10)
assert.Nil(t, err)
assert.NoError(t, err)
batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB")
require.Nil(t, err)
require.NoError(t, err)
deleteme = append(deleteme, batchBuilderDBPath)
bb, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncSdb, nil, 0, uint64(nLevels))
assert.Nil(t, err)
assert.NoError(t, err)
// l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0)
@@ -124,7 +125,7 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
historyDB, l2DB, txsel, bb := newTestModules(t)
debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch")
require.Nil(t, err)
require.NoError(t, err)
deleteme = append(deleteme, debugBatchPath)
conf := Config{
@@ -150,7 +151,7 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
}
coord, err := NewCoordinator(conf, historyDB, l2DB, txsel, bb, serverProofs,
ethClient, scConsts, initSCVars)
require.Nil(t, err)
require.NoError(t, err)
return coord
}
@@ -165,11 +166,11 @@ func TestCoordinatorFlow(t *testing.T) {
// Bid for slot 2 and 4
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
require.Nil(t, err)
require.NoError(t, err)
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
require.Nil(t, err)
require.NoError(t, err)
_, err = ethClient.AuctionBidSimple(4, big.NewInt(9999))
require.Nil(t, err)
require.NoError(t, err)
coord.Start()
time.Sleep(1 * time.Second)
@@ -177,9 +178,9 @@ func TestCoordinatorFlow(t *testing.T) {
waitForSlot := func(slot int64) {
for {
blockNum, err := ethClient.EthLastBlock()
require.Nil(t, err)
require.NoError(t, err)
nextBlockSlot, err := ethClient.AuctionGetSlotNumber(blockNum + 1)
require.Nil(t, err)
require.NoError(t, err)
if nextBlockSlot == slot {
break
}
@@ -191,7 +192,7 @@ func TestCoordinatorFlow(t *testing.T) {
stats.Eth.LastBatch = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch = stats.Eth.LastBatch
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
require.Nil(t, err)
require.NoError(t, err)
if canForge {
// fmt.Println("DBG canForge")
stats.Sync.Auction.CurrentSlot.Forger = forger
@@ -207,7 +208,7 @@ func TestCoordinatorFlow(t *testing.T) {
require.NoError(t, err)
}
}
coord.SendMsg(MsgSyncStats{
coord.SendMsg(MsgSyncBlock{
Stats: stats,
})
}
@@ -254,16 +255,16 @@ func TestCoordCanForge(t *testing.T) {
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
require.Nil(t, err)
require.NoError(t, err)
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
require.Nil(t, err)
require.NoError(t, err)
bootCoord := newTestCoordinator(t, bootForger, ethClient, ethClientSetup)
assert.Equal(t, forger, coord.cfg.ForgerAddress)
assert.Equal(t, bootForger, bootCoord.cfg.ForgerAddress)
ethBootCoord, err := ethClient.AuctionGetBootCoordinator()
require.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, &bootForger, ethBootCoord)
var stats synchronizer.Stats
@@ -300,11 +301,12 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
require.Nil(t, err)
require.NoError(t, err)
_, err = ethClient.AuctionBidSimple(2, big.NewInt(9999))
require.Nil(t, err)
require.NoError(t, err)
var stats synchronizer.Stats
var msg MsgSyncBlock
stats := &msg.Stats
ctx := context.Background()
// Slot 0. No bid, so the winner is the boot coordinator
@@ -312,8 +314,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, false, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.Equal(t, false, coord.canForge(stats))
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.Nil(t, coord.pipeline)
// Slot 0. No bid, and we reach the deadline, so anyone can forge
@@ -322,8 +324,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
int64(ethClientSetup.AuctionVariables.SlotDeadline)
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, true, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.Equal(t, true, coord.canForge(stats))
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.NotNil(t, coord.pipeline)
// Slot 0. No bid, and we reach the deadline, so anyone can forge
@@ -332,8 +334,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
int64(ethClientSetup.AuctionVariables.SlotDeadline) + 1
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, true, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.Equal(t, true, coord.canForge(stats))
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.NotNil(t, coord.pipeline)
// Slot 0. No bid, so the winner is the boot coordinator
@@ -342,8 +344,8 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, false, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.Equal(t, false, coord.canForge(stats))
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.Nil(t, coord.pipeline)
}

152
coordinator/purger.go Normal file
View File

@@ -0,0 +1,152 @@
package coordinator
import (
"fmt"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/tracerr"
"github.com/iden3/go-merkletree/db"
)
// PurgerCfg is the purger configuration
type PurgerCfg struct {
// PurgeBatchDelay is the delay between batches to purge outdated transactions
PurgeBatchDelay int64
// InvalidateBatchDelay is the delay between batches to mark invalid transactions
InvalidateBatchDelay int64
// PurgeBlockDelay is the delay between blocks to purge outdated transactions
PurgeBlockDelay int64
// InvalidateBlockDelay is the delay between blocks to mark invalid transactions
InvalidateBlockDelay int64
}
// Purger manages cleanup of transactions in the pool
type Purger struct {
cfg PurgerCfg
lastPurgeBlock int64
lastPurgeBatch int64
lastInvalidateBlock int64
lastInvalidateBatch int64
}
// CanPurge returns true if it's a good time to purge according to the
// configuration
func (p *Purger) CanPurge(blockNum, batchNum int64) bool {
if blockNum > p.lastPurgeBlock+p.cfg.PurgeBlockDelay {
return true
}
if batchNum > p.lastPurgeBatch+p.cfg.PurgeBatchDelay {
return true
}
return false
}
// CanInvalidate returns true if it's a good time to invalidate according to
// the configuration
func (p *Purger) CanInvalidate(blockNum, batchNum int64) bool {
if blockNum > p.lastInvalidateBlock+p.cfg.InvalidateBlockDelay {
return true
}
if batchNum > p.lastInvalidateBatch+p.cfg.InvalidateBatchDelay {
return true
}
return false
}
// PurgeMaybe purges txs if it's a good time to do so
func (p *Purger) PurgeMaybe(l2DB *l2db.L2DB, blockNum, batchNum int64) (bool, error) {
if !p.CanPurge(blockNum, batchNum) {
return false, nil
}
p.lastPurgeBlock = blockNum
p.lastPurgeBatch = batchNum
log.Debugw("Purger: purging l2txs in pool", "block", blockNum, "batch", batchNum)
err := l2DB.Purge(common.BatchNum(batchNum))
return true, tracerr.Wrap(err)
}
// InvalidateMaybe invalidates txs if it's a good time to do so
func (p *Purger) InvalidateMaybe(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB,
blockNum, batchNum int64) (bool, error) {
if !p.CanInvalidate(blockNum, batchNum) {
return false, nil
}
p.lastInvalidateBlock = blockNum
p.lastInvalidateBatch = batchNum
log.Debugw("Purger: invalidating l2txs in pool", "block", blockNum, "batch", batchNum)
err := poolMarkInvalidOldNonces(l2DB, stateDB, common.BatchNum(batchNum))
return true, err
}
//nolint:unused,deadcode
func idxsNonceFromL2Txs(txs []common.L2Tx) []common.IdxNonce {
idxNonceMap := map[common.Idx]common.Nonce{}
for _, tx := range txs {
if nonce, ok := idxNonceMap[tx.FromIdx]; !ok {
idxNonceMap[tx.FromIdx] = tx.Nonce
} else if tx.Nonce > nonce {
idxNonceMap[tx.FromIdx] = tx.Nonce
}
}
idxsNonce := make([]common.IdxNonce, 0, len(idxNonceMap))
for idx, nonce := range idxNonceMap {
idxsNonce = append(idxsNonce, common.IdxNonce{Idx: idx, Nonce: nonce})
}
return idxsNonce
}
func idxsNonceFromPoolL2Txs(txs []common.PoolL2Tx) []common.IdxNonce {
idxNonceMap := map[common.Idx]common.Nonce{}
for _, tx := range txs {
if nonce, ok := idxNonceMap[tx.FromIdx]; !ok {
idxNonceMap[tx.FromIdx] = tx.Nonce
} else if tx.Nonce > nonce {
idxNonceMap[tx.FromIdx] = tx.Nonce
}
}
idxsNonce := make([]common.IdxNonce, 0, len(idxNonceMap))
for idx, nonce := range idxNonceMap {
idxsNonce = append(idxsNonce, common.IdxNonce{Idx: idx, Nonce: nonce})
}
return idxsNonce
}
// poolMarkInvalidOldNoncesFromL2Txs marks as invalid the txs in the pool that
// contain nonces equal or older to the highest nonce used in a forged l2Tx for
// the
// corresponding sender account
func poolMarkInvalidOldNoncesFromL2Txs(l2DB *l2db.L2DB,
idxsNonce []common.IdxNonce, batchNum common.BatchNum) error {
return l2DB.CheckNonces(idxsNonce, batchNum)
}
// poolMarkInvalidOldNonces marks as invalid txs in the pool that contain
// nonces equal or older to the nonce of the corresponding sender account
func poolMarkInvalidOldNonces(l2DB *l2db.L2DB, stateDB *statedb.LocalStateDB,
batchNum common.BatchNum) error {
idxs, err := l2DB.GetPendingUniqueFromIdxs()
if err != nil {
return err
}
idxsNonce := make([]common.IdxNonce, len(idxs))
lastIdx, err := stateDB.GetIdx()
if err != nil {
return err
}
for i, idx := range idxs {
acc, err := stateDB.GetAccount(idx)
if err != nil {
if tracerr.Unwrap(err) != db.ErrNotFound {
return err
} else if idx <= lastIdx {
return fmt.Errorf("account with idx %v not found: %w", idx, err)
}
}
idxsNonce[i].Idx = idx
idxsNonce[i].Nonce = acc.Nonce
}
return l2DB.CheckNonces(idxsNonce, batchNum)
}

View File

@@ -0,0 +1,3 @@
package coordinator
// TODO: Test purger functions