Compare commits

...

12 Commits

Author SHA1 Message Date
Eduard S
217a41d465 Add minPriceUSD in L2DB, check maxTxs atomically
- Add config parameter `Coordinator.L2DB.MinPriceUSD` which allows rejecting
  txs to the pool that have a fee lower than the minimum.
- In pool tx insertion, checking the number of pending txs atomically with the
  insertion to avoid data races leading to more than MaxTxs pending txs in the
  pool.
2021-02-22 16:37:18 +01:00
Eduard S
67b2b7da4b Delete pending txs by external mark, store tx IP
- In tx_pool, add a column called `external_delete` that can be set to true
  externally.  Regularly, the coordinator will delete all pending txs with this
  column set to true.  The interval for this action is set via the new config
  parameter `Coordinator.PurgeByExtDelInterval`.
- In tx_pool, add a column for the client ip that sent the transaction.  The
  api fills this value using the ClientIP method from gin.Context, which should
  work even under a reverse-proxy.
2021-02-19 16:00:45 +01:00
arnau
e23063380c Merge pull request #555 from hermeznetwork/feature/accountupdatetable
Feature/accountupdatetable
2021-02-19 13:23:03 +01:00
Eduard S
ed4d39fcd1 Add account_update SQL table with balances and nonces 2021-02-19 13:16:14 +01:00
Eduard S
d6ec1910da Simplify historyDB test and make it faster 2021-02-19 11:12:56 +01:00
Eduard S
c829eb99dc Remove unecessary slice indexing 2021-02-19 11:00:36 +01:00
Eduard S
6ecb8118bd Merge pull request #553 from hermeznetwork/fix/exit-amount-0
Txs w/ exit Amount=0,to not create new Exit leafs
2021-02-19 10:56:08 +01:00
arnaucube
4500820a03 Txs w/ exit Amount=0,to not create new Exit leafs
There are 2 ways to interpret an Exit transaction with Amount=0:
`A`. Not adding a new Leaf to the ExitTree, so not modifying the Exit
MerkleRoot
`B`. Adding a new Leaf to the ExitTree, with Balance=0, which modifies
the Exit MerkleRoot

Currently the [Circuits](https://github.com/hermeznetwork/circuits) are
doing approach `A`, and the
[hermez-node](https://github.com/hermeznetwork/hermez-node) is doing the
approach `B`. The idea of this commit, is to use approach `A` also in
the hermez-node.
2021-02-19 10:50:59 +01:00
Eduard S
b4e6104fd3 Merge pull request #551 from hermeznetwork/fix/api-err-dupkey-meddler
Duplicated error when caused by meddler
2021-02-18 17:18:20 +01:00
arnaubennassar
28f026f628 Duplicated error when caused by meddler 2021-02-18 17:06:49 +01:00
arnau
688d376ce0 Merge pull request #550 from hermeznetwork/fix/txselectornoncessorting2
Fix TxSel sorting, TxManager geth err checking
2021-02-18 16:59:10 +01:00
Eduard S
2547d5dce7 Fix TxSel sorting, TxManager geth err checking 2021-02-18 15:18:39 +01:00
34 changed files with 878 additions and 285 deletions

View File

@@ -221,7 +221,7 @@ func TestMain(m *testing.M) {
panic(err)
}
// L2DB
l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour, apiConnCon)
l2DB := l2db.NewL2DB(database, 10, 1000, 0.0, 24*time.Hour, apiConnCon)
test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB
// Config (smart contract constants)
chainID := uint16(0)
@@ -585,7 +585,7 @@ func TestTimeout(t *testing.T) {
hdbTO := historydb.NewHistoryDB(databaseTO, apiConnConTO)
require.NoError(t, err)
// L2DB
l2DBTO := l2db.NewL2DB(databaseTO, 10, 1000, 24*time.Hour, apiConnConTO)
l2DBTO := l2db.NewL2DB(databaseTO, 10, 1000, 0.0, 24*time.Hour, apiConnConTO)
// API
apiGinTO := gin.Default()

View File

@@ -10,6 +10,7 @@ import (
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/tracerr"
"github.com/lib/pq"
"github.com/russross/meddler"
)
const (
@@ -46,24 +47,33 @@ var (
func retSQLErr(err error, c *gin.Context) {
log.Warnw("HTTP API SQL request error", "err", err)
errMsg := tracerr.Unwrap(err).Error()
retDupKey := func(errCode pq.ErrorCode) {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
if errCode == "23505" {
c.JSON(http.StatusInternalServerError, errorMsg{
Message: errDuplicatedKey,
})
} else {
c.JSON(http.StatusInternalServerError, errorMsg{
Message: errMsg,
})
}
}
if errMsg == errCtxTimeout {
c.JSON(http.StatusServiceUnavailable, errorMsg{
Message: errSQLTimeout,
})
} else if sqlErr, ok := tracerr.Unwrap(err).(*pq.Error); ok {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
if sqlErr.Code == "23505" {
c.JSON(http.StatusInternalServerError, errorMsg{
Message: errDuplicatedKey,
})
}
retDupKey(sqlErr.Code)
} else if sqlErr, ok := meddler.DriverErr(tracerr.Unwrap(err)); ok {
retDupKey(sqlErr.(*pq.Error).Code)
} else if tracerr.Unwrap(err) == sql.ErrNoRows {
c.JSON(http.StatusNotFound, errorMsg{
Message: err.Error(),
Message: errMsg,
})
} else {
c.JSON(http.StatusInternalServerError, errorMsg{
Message: err.Error(),
Message: errMsg,
})
}
}

View File

@@ -2,6 +2,7 @@ package api
import (
"errors"
"fmt"
"math/big"
"net/http"
@@ -27,6 +28,7 @@ func (a *API) postPoolTx(c *gin.Context) {
retBadReq(err, c)
return
}
writeTx.ClientIP = c.ClientIP()
// Insert to DB
if err := a.l2.AddTxAPI(writeTx); err != nil {
retSQLErr(err, c)
@@ -179,6 +181,11 @@ func (a *API) verifyPoolL2TxWrite(txw *l2db.PoolL2TxWrite) error {
if err != nil {
return tracerr.Wrap(err)
}
// Validate TokenID
if poolTx.TokenID != account.TokenID {
return tracerr.Wrap(fmt.Errorf("tx.TokenID (%v) != account.TokenID (%v)",
poolTx.TokenID, account.TokenID))
}
// Check signature
if !poolTx.VerifySignature(a.chainID, account.BJJ) {
return tracerr.Wrap(errors.New("wrong signature"))

View File

@@ -10,6 +10,7 @@ import (
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/iden3/go-iden3-crypto/babyjub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testPoolTxReceive is a struct to be used to assert the response
@@ -170,9 +171,9 @@ func TestPoolTxs(t *testing.T) {
fetchedTxID := common.TxID{}
for _, tx := range tc.poolTxsToSend {
jsonTxBytes, err := json.Marshal(tx)
assert.NoError(t, err)
require.NoError(t, err)
jsonTxReader := bytes.NewReader(jsonTxBytes)
assert.NoError(
require.NoError(
t, doGoodReq(
"POST",
endpoint,
@@ -187,42 +188,42 @@ func TestPoolTxs(t *testing.T) {
badTx.Amount = "99950000000000000"
badTx.Fee = 255
jsonTxBytes, err := json.Marshal(badTx)
assert.NoError(t, err)
require.NoError(t, err)
jsonTxReader := bytes.NewReader(jsonTxBytes)
err = doBadReq("POST", endpoint, jsonTxReader, 400)
assert.NoError(t, err)
require.NoError(t, err)
// Wrong signature
badTx = tc.poolTxsToSend[0]
badTx.FromIdx = "hez:foo:1000"
jsonTxBytes, err = json.Marshal(badTx)
assert.NoError(t, err)
require.NoError(t, err)
jsonTxReader = bytes.NewReader(jsonTxBytes)
err = doBadReq("POST", endpoint, jsonTxReader, 400)
assert.NoError(t, err)
require.NoError(t, err)
// Wrong to
badTx = tc.poolTxsToSend[0]
ethAddr := "hez:0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
badTx.ToEthAddr = &ethAddr
badTx.ToIdx = nil
jsonTxBytes, err = json.Marshal(badTx)
assert.NoError(t, err)
require.NoError(t, err)
jsonTxReader = bytes.NewReader(jsonTxBytes)
err = doBadReq("POST", endpoint, jsonTxReader, 400)
assert.NoError(t, err)
require.NoError(t, err)
// Wrong rq
badTx = tc.poolTxsToSend[0]
rqFromIdx := "hez:foo:30"
badTx.RqFromIdx = &rqFromIdx
jsonTxBytes, err = json.Marshal(badTx)
assert.NoError(t, err)
require.NoError(t, err)
jsonTxReader = bytes.NewReader(jsonTxBytes)
err = doBadReq("POST", endpoint, jsonTxReader, 400)
assert.NoError(t, err)
require.NoError(t, err)
// GET
endpoint += "/"
for _, tx := range tc.poolTxsToReceive {
fetchedTx := testPoolTxReceive{}
assert.NoError(
require.NoError(
t, doGoodReq(
"GET",
endpoint+tx.TxID.String(),
@@ -233,10 +234,10 @@ func TestPoolTxs(t *testing.T) {
}
// 400, due invalid TxID
err = doBadReq("GET", endpoint+"0xG2241b6f2b1dd772dba391f4a1a3407c7c21f598d86e2585a14e616fb4a255f823", nil, 400)
assert.NoError(t, err)
require.NoError(t, err)
// 404, due inexistent TxID in DB
err = doBadReq("GET", endpoint+"0x02241b6f2b1dd772dba391f4a1a3407c7c21f598d86e2585a14e616fb4a255f823", nil, 404)
assert.NoError(t, err)
require.NoError(t, err)
}
func assertPoolTx(t *testing.T, expected, actual testPoolTxReceive) {

1
cli/node/.gitignore vendored
View File

@@ -1,2 +1,3 @@
cfg.example.secret.toml
cfg.toml
node

View File

@@ -32,6 +32,7 @@ URL = "http://localhost:8545"
[Synchronizer]
SyncLoopInterval = "1s"
StatsRefreshPeriod = "1s"
StoreAccountUpdates = true
[SmartContracts]
Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0"
@@ -55,6 +56,7 @@ ForgeRetryInterval = "500ms"
SyncRetryInterval = "1s"
ForgeDelay = "10s"
ForgeNoTxsDelay = "0s"
PurgeByExtDelInterval = "1m"
[Coordinator.FeeAccount]
Address = "0x56232B1c5B10038125Bc7345664B4AFD745bcF8E"
@@ -65,6 +67,7 @@ BJJ = "0x1b176232f78ba0d388ecc5f4896eca2d3b3d4f272092469f559247297f5c0c13"
[Coordinator.L2DB]
SafetyPeriod = 10
MaxTxs = 512
MinFeeUSD = 0.0
TTL = "24h"
PurgeBatchDelay = 10
InvalidateBatchDelay = 20

View File

@@ -173,6 +173,7 @@ func cmdDiscard(c *cli.Context) error {
db,
cfg.Coordinator.L2DB.SafetyPeriod,
cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD,
cfg.Coordinator.L2DB.TTL.Duration,
nil,
)

View File

@@ -263,3 +263,13 @@ type IdxNonce struct {
Idx Idx `db:"idx"`
Nonce Nonce `db:"nonce"`
}
// AccountUpdate represents an account balance and/or nonce update after a
// processed batch
type AccountUpdate struct {
EthBlockNum int64 `meddler:"eth_block_num"`
BatchNum BatchNum `meddler:"batch_num"`
Idx Idx `meddler:"idx"`
Nonce Nonce `meddler:"nonce"`
Balance *big.Int `meddler:"balance,bigint"`
}

View File

@@ -77,6 +77,7 @@ type BatchData struct {
L1CoordinatorTxs []L1Tx
L2Txs []L2Tx
CreatedAccounts []Account
UpdatedAccounts []AccountUpdate
ExitTree []ExitInfo
Batch Batch
}

View File

@@ -62,3 +62,17 @@ func RmEndingZeroes(siblings []*merkletree.Hash) []*merkletree.Hash {
}
return siblings[:pos]
}
// TokensToUSD is a helper function to calculate the USD value of a certain
// amount of tokens considering the normalized token price (which is the price
// commonly reported by exhanges)
func TokensToUSD(amount *big.Int, decimals uint64, valueUSD float64) float64 {
amountF := new(big.Float).SetInt(amount)
// Divide by 10^decimals to normalize the amount
baseF := new(big.Float).SetInt(new(big.Int).Exp(
big.NewInt(10), big.NewInt(int64(decimals)), nil)) //nolint:gomnd
amountF.Mul(amountF, big.NewFloat(valueUSD))
amountF.Quo(amountF, baseF)
amountUSD, _ := amountF.Float64()
return amountUSD
}

View File

@@ -91,6 +91,10 @@ type Coordinator struct {
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval Duration `validate:"required"`
// PurgeByExtDelInterval is the waiting interval between calls
// to the PurgeByExternalDelete function of the l2db which deletes
// pending txs externally marked by the column `external_delete`
PurgeByExtDelInterval Duration `validate:"required"`
// L2DB is the DB that holds the pool of L2Txs
L2DB struct {
// SafetyPeriod is the number of batches after which
@@ -101,6 +105,10 @@ type Coordinator struct {
// reached, inserts to the pool will be denied until some of
// the pending txs are forged.
MaxTxs uint32 `validate:"required"`
// MinFeeUSD is the minimum fee in USD that a tx must pay in
// order to be accepted into the pool. Txs with lower than
// minimum fee will be rejected at the API level.
MinFeeUSD float64
// TTL is the Time To Live for L2Txs in the pool. Once MaxTxs
// L2Txs is reached, L2Txs older than TTL will be deleted.
TTL Duration `validate:"required"`
@@ -232,6 +240,11 @@ type Node struct {
// `Eth.LastBatch`). This value only affects the reported % of
// synchronization of blocks and batches, nothing else.
StatsRefreshPeriod Duration `validate:"required"`
// StoreAccountUpdates when set to true makes the synchronizer
// store every account update in the account_update SQL table.
// This allows querying nonces and balances from the HistoryDB
// via SQL.
StoreAccountUpdates bool
} `validate:"required"`
SmartContracts struct {
// Rollup is the address of the Hermez.sol smart contract

View File

@@ -85,6 +85,10 @@ type Config struct {
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time.Duration
// PurgeByExtDelInterval is the waiting interval between calls
// to the PurgeByExternalDelete function of the l2db which deletes
// pending txs externally marked by the column `external_delete`
PurgeByExtDelInterval time.Duration
// EthClientAttemptsDelay is delay between attempts do do an eth client
// RPC call
EthClientAttemptsDelay time.Duration
@@ -153,6 +157,15 @@ type Coordinator struct {
wg sync.WaitGroup
cancel context.CancelFunc
// mutexL2DBUpdateDelete protects updates to the L2DB so that
// these two processes always happen exclusively:
// - Pipeline taking pending txs, running through the TxProcessor and
// marking selected txs as forging
// - Coordinator deleting pending txs that have been marked with
// `external_delete`.
// Without this mutex, the coordinator could delete a pending txs that
// has just been selected by the TxProcessor in the pipeline.
mutexL2DBUpdateDelete sync.Mutex
pipeline *Pipeline
lastNonFailedBatchNum common.BatchNum
@@ -248,7 +261,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
c.pipelineNum++
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts)
c.batchBuilder, &c.mutexL2DBUpdateDelete, c.purger, c, c.txManager,
c.provers, &c.consts)
}
// MsgSyncBlock indicates an update to the Synchronizer stats
@@ -527,6 +541,24 @@ func (c *Coordinator) Start() {
}
}
}()
c.wg.Add(1)
go func() {
for {
select {
case <-c.ctx.Done():
log.Info("Coordinator L2DB.PurgeByExternalDelete loop done")
c.wg.Done()
return
case <-time.After(c.cfg.PurgeByExtDelInterval):
c.mutexL2DBUpdateDelete.Lock()
if err := c.l2DB.PurgeByExternalDelete(); err != nil {
log.Errorw("L2DB.PurgeByExternalDelete", "err", err)
}
c.mutexL2DBUpdateDelete.Unlock()
}
}
}()
}
const stopCtxTimeout = 200 * time.Millisecond

View File

@@ -105,7 +105,7 @@ func newTestModules(t *testing.T) modules {
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err)
test.WipeDB(db)
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil)
l2DB := l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil)
historyDB := historydb.NewHistoryDB(db, nil)
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")

View File

@@ -53,6 +53,7 @@ type Pipeline struct {
l2DB *l2db.L2DB
txSelector *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
mutexL2DBUpdateDelete *sync.Mutex
purger *Purger
stats synchronizer.Stats
@@ -84,6 +85,7 @@ func NewPipeline(ctx context.Context,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
batchBuilder *batchbuilder.BatchBuilder,
mutexL2DBUpdateDelete *sync.Mutex,
purger *Purger,
coord *Coordinator,
txManager *TxManager,
@@ -112,6 +114,7 @@ func NewPipeline(ctx context.Context,
batchBuilder: batchBuilder,
provers: provers,
proversPool: proversPool,
mutexL2DBUpdateDelete: mutexL2DBUpdateDelete,
purger: purger,
coord: coord,
txManager: txManager,
@@ -199,7 +202,9 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
// and then waits for an available proof server and sends the zkInputs to it so
// that the proof computation begins.
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
p.mutexL2DBUpdateDelete.Lock()
batchInfo, err := p.forgeBatch(batchNum)
p.mutexL2DBUpdateDelete.Unlock()
if ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {

View File

@@ -21,7 +21,7 @@ func newL2DB(t *testing.T) *l2db.L2DB {
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err)
test.WipeDB(db)
return l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil)
return l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil)
}
func newStateDB(t *testing.T) *statedb.LocalStateDB {

View File

@@ -2,9 +2,9 @@ package coordinator
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"time"
"github.com/ethereum/go-ethereum"
@@ -206,22 +206,27 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn
}
// RollupForgeBatch() calls ethclient.SendTransaction()
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
if errors.Is(err, core.ErrNonceTooLow) {
// We check the errors via strings because we match the
// definition of the error from geth, with the string returned
// via RPC obtained by the client.
if err == nil {
break
} else if strings.Contains(err.Error(), core.ErrNonceTooLow.Error()) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Add(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrNonceTooHigh) {
} else if strings.Contains(err.Error(), core.ErrNonceTooHigh.Error()) {
log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Sub(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrUnderpriced) {
} else if strings.Contains(err.Error(), core.ErrReplaceUnderpriced.Error()) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if errors.Is(err, core.ErrReplaceUnderpriced) {
} else if strings.Contains(err.Error(), core.ErrUnderpriced.Error()) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
@@ -230,8 +235,6 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
"batchNum", batchInfo.BatchNum)
} else {
break
}
select {
case <-ctx.Done():

View File

@@ -61,7 +61,7 @@ func (hdb *HistoryDB) addBlocks(d meddler.DB, blocks []common.Block) error {
timestamp,
hash
) VALUES %s;`,
blocks[:],
blocks,
))
}
@@ -273,7 +273,7 @@ func (hdb *HistoryDB) addBids(d meddler.DB, bids []common.Bid) error {
return tracerr.Wrap(db.BulkInsert(
d,
"INSERT INTO bid (slot_num, bid_value, eth_block_num, bidder_addr) VALUES %s;",
bids[:],
bids,
))
}
@@ -324,7 +324,7 @@ func (hdb *HistoryDB) addCoordinators(d meddler.DB, coordinators []common.Coordi
return tracerr.Wrap(db.BulkInsert(
d,
"INSERT INTO coordinator (bidder_addr, forger_addr, eth_block_num, url) VALUES %s;",
coordinators[:],
coordinators,
))
}
@@ -340,7 +340,7 @@ func (hdb *HistoryDB) addExitTree(d meddler.DB, exitTree []common.ExitInfo) erro
d,
"INSERT INTO exit_tree (batch_num, account_idx, merkle_proof, balance, "+
"instant_withdrawn, delayed_withdraw_request, delayed_withdrawn) VALUES %s;",
exitTree[:],
exitTree,
))
}
@@ -443,11 +443,12 @@ func (hdb *HistoryDB) addTokens(d meddler.DB, tokens []common.Token) error {
symbol,
decimals
) VALUES %s;`,
tokens[:],
tokens,
))
}
// UpdateTokenValue updates the USD value of a token
// UpdateTokenValue updates the USD value of a token. Value is the price in
// USD of a normalized token (token amount divided by 10^decimals)
func (hdb *HistoryDB) UpdateTokenValue(tokenSymbol string, value float64) error {
// Sanitize symbol
tokenSymbol = strings.ToValidUTF8(tokenSymbol, " ")
@@ -514,7 +515,7 @@ func (hdb *HistoryDB) addAccounts(d meddler.DB, accounts []common.Account) error
bjj,
eth_addr
) VALUES %s;`,
accounts[:],
accounts,
))
}
@@ -528,6 +529,37 @@ func (hdb *HistoryDB) GetAllAccounts() ([]common.Account, error) {
return db.SlicePtrsToSlice(accs).([]common.Account), tracerr.Wrap(err)
}
// AddAccountUpdates inserts accUpdates into the DB
func (hdb *HistoryDB) AddAccountUpdates(accUpdates []common.AccountUpdate) error {
return tracerr.Wrap(hdb.addAccountUpdates(hdb.db, accUpdates))
}
func (hdb *HistoryDB) addAccountUpdates(d meddler.DB, accUpdates []common.AccountUpdate) error {
if len(accUpdates) == 0 {
return nil
}
return tracerr.Wrap(db.BulkInsert(
d,
`INSERT INTO account_update (
eth_block_num,
batch_num,
idx,
nonce,
balance
) VALUES %s;`,
accUpdates,
))
}
// GetAllAccountUpdates returns all the AccountUpdate from the DB
func (hdb *HistoryDB) GetAllAccountUpdates() ([]common.AccountUpdate, error) {
var accUpdates []*common.AccountUpdate
err := meddler.QueryAll(
hdb.db, &accUpdates,
"SELECT eth_block_num, batch_num, idx, nonce, balance FROM account_update ORDER BY idx;",
)
return db.SlicePtrsToSlice(accUpdates).([]common.AccountUpdate), tracerr.Wrap(err)
}
// AddL1Txs inserts L1 txs to the DB. USD and DepositAmountUSD will be set automatically before storing the tx.
// If the tx is originated by a coordinator, BatchNum must be provided. If it's originated by a user,
// BatchNum should be null, and the value will be setted by a trigger when a batch forges the tx.
@@ -646,7 +678,7 @@ func (hdb *HistoryDB) addTxs(d meddler.DB, txs []txWrite) error {
fee,
nonce
) VALUES %s;`,
txs[:],
txs,
))
}
@@ -781,7 +813,7 @@ func (hdb *HistoryDB) addBucketUpdates(d meddler.DB, bucketUpdates []common.Buck
block_stamp,
withdrawals
) VALUES %s;`,
bucketUpdates[:],
bucketUpdates,
))
}
@@ -813,7 +845,7 @@ func (hdb *HistoryDB) addTokenExchanges(d meddler.DB, tokenExchanges []common.To
eth_addr,
value_usd
) VALUES %s;`,
tokenExchanges[:],
tokenExchanges,
))
}
@@ -841,7 +873,7 @@ func (hdb *HistoryDB) addEscapeHatchWithdrawals(d meddler.DB,
token_addr,
amount
) VALUES %s;`,
escapeHatchWithdrawals[:],
escapeHatchWithdrawals,
))
}
@@ -1018,6 +1050,11 @@ func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) {
return tracerr.Wrap(err)
}
// Add accountBalances if it exists
if err := hdb.addAccountUpdates(txn, batch.UpdatedAccounts); err != nil {
return tracerr.Wrap(err)
}
// Set the EffectiveAmount and EffectiveDepositAmount of all the
// L1UserTxs that have been forged in this batch
if err = hdb.setExtraInfoForgedL1UserTxs(txn, batch.L1UserTxs); err != nil {

View File

@@ -377,6 +377,22 @@ func TestAccounts(t *testing.T) {
accs[i].Balance = nil
assert.Equal(t, accs[i], acc)
}
// Test AccountBalances
accUpdates := make([]common.AccountUpdate, len(accs))
for i, acc := range accs {
accUpdates[i] = common.AccountUpdate{
EthBlockNum: batches[acc.BatchNum-1].EthBlockNum,
BatchNum: acc.BatchNum,
Idx: acc.Idx,
Nonce: common.Nonce(i),
Balance: big.NewInt(int64(i)),
}
}
err = historyDB.AddAccountUpdates(accUpdates)
require.NoError(t, err)
fetchedAccBalances, err := historyDB.GetAllAccountUpdates()
require.NoError(t, err)
assert.Equal(t, accUpdates, fetchedAccBalances)
}
func TestTxs(t *testing.T) {
@@ -1195,7 +1211,8 @@ func TestGetMetricsAPIMoreThan24Hours(t *testing.T) {
set = append(set, til.Instruction{Typ: til.TypeNewBlock})
// Transfers
for x := 0; x < 6000; x++ {
const numBlocks int = 30
for x := 0; x < numBlocks; x++ {
set = append(set, til.Instruction{
Typ: common.TxTypeTransfer,
TokenID: common.TokenID(0),
@@ -1219,19 +1236,20 @@ func TestGetMetricsAPIMoreThan24Hours(t *testing.T) {
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
require.NoError(t, err)
const numBatches int = 6002
const numTx int = 6003
const blockNum = 6005 - 1
const numBatches int = 2 + numBlocks
const blockNum = 4 + numBlocks
// Sanity check
require.Equal(t, blockNum, len(blocks))
// Adding one batch per block
// batch frequency can be chosen
const frequency int = 15
const blockTime time.Duration = 3600 * time.Second
now := time.Now()
require.NoError(t, err)
for i := range blocks {
blocks[i].Block.Timestamp = time.Now().Add(-time.Second * time.Duration(frequency*(len(blocks)-i)))
blocks[i].Block.Timestamp = now.Add(-time.Duration(len(blocks)-1-i) * blockTime)
err = historyDB.AddBlockSCData(&blocks[i])
assert.NoError(t, err)
}
@@ -1239,16 +1257,10 @@ func TestGetMetricsAPIMoreThan24Hours(t *testing.T) {
res, err := historyDBWithACC.GetMetricsAPI(common.BatchNum(numBatches))
assert.NoError(t, err)
assert.Equal(t, math.Trunc((float64(numTx)/float64(numBatches-1))/0.001)*0.001, math.Trunc(res.TransactionsPerBatch/0.001)*0.001)
assert.InEpsilon(t, 1.0, res.TransactionsPerBatch, 0.1)
// Frequency is not exactly the desired one, some decimals may appear
assert.GreaterOrEqual(t, res.BatchFrequency, float64(frequency))
assert.Less(t, res.BatchFrequency, float64(frequency+1))
// Truncate frecuency into an int to do an exact check
assert.Equal(t, frequency, int(res.BatchFrequency))
// This may also be different in some decimals
// Truncate it to the third decimal to compare
assert.Equal(t, math.Trunc((float64(numTx)/float64(frequency*blockNum-frequency))/0.001)*0.001, math.Trunc(res.TransactionsPerSecond/0.001)*0.001)
assert.InEpsilon(t, res.BatchFrequency, float64(blockTime/time.Second), 0.1)
assert.InEpsilon(t, 1.0/float64(blockTime/time.Second), res.TransactionsPerSecond, 0.1)
assert.Equal(t, int64(3), res.TotalAccounts)
assert.Equal(t, int64(3), res.TotalBJJs)
// Til does not set fees

View File

@@ -1,12 +1,18 @@
package l2db
import (
"fmt"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/tracerr"
"github.com/russross/meddler"
)
var (
errPoolFull = fmt.Errorf("the pool is at full capacity. More transactions are not accepted currently")
)
// AddAccountCreationAuthAPI inserts an account creation authorization into the DB
func (l2db *L2DB) AddAccountCreationAuthAPI(auth *common.AccountCreationAuth) error {
cancel, err := l2db.apiConnCon.Acquire()
@@ -42,20 +48,54 @@ func (l2db *L2DB) AddTxAPI(tx *PoolL2TxWrite) error {
return tracerr.Wrap(err)
}
defer l2db.apiConnCon.Release()
row := l2db.db.QueryRow(
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
common.PoolL2TxStatePending,
)
var totalTxs uint32
if err := row.Scan(&totalTxs); err != nil {
row := l2db.db.QueryRow(`SELECT
($1::NUMERIC * token.usd * fee_percentage($2::NUMERIC)) /
(10.0 ^ token.decimals::NUMERIC)
FROM token WHERE token.token_id = $3;`,
tx.AmountFloat, tx.Fee, tx.TokenID)
var feeUSD float64
if err := row.Scan(&feeUSD); err != nil {
return tracerr.Wrap(err)
}
if totalTxs >= l2db.maxTxs {
return tracerr.New(
"The pool is at full capacity. More transactions are not accepted currently",
)
if feeUSD < l2db.minFeeUSD {
return tracerr.Wrap(fmt.Errorf("tx.feeUSD (%v) < minFeeUSD (%v)",
feeUSD, l2db.minFeeUSD))
}
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
// Prepare insert SQL query argument parameters
namesPart, err := meddler.Default.ColumnsQuoted(tx, false)
if err != nil {
return err
}
valuesPart, err := meddler.Default.PlaceholdersString(tx, false)
if err != nil {
return err
}
values, err := meddler.Default.Values(tx, false)
if err != nil {
return err
}
q := fmt.Sprintf(
`INSERT INTO tx_pool (%s)
SELECT %s
WHERE (SELECT COUNT(*) FROM tx_pool WHERE state = $%v) < $%v;`,
namesPart, valuesPart,
len(values)+1, len(values)+2) //nolint:gomnd
values = append(values, common.PoolL2TxStatePending, l2db.maxTxs)
res, err := l2db.db.Exec(q, values...)
if err != nil {
return tracerr.Wrap(err)
}
rowsAffected, err := res.RowsAffected()
if err != nil {
return tracerr.Wrap(err)
}
if rowsAffected == 0 {
return tracerr.Wrap(errPoolFull)
}
return nil
}
// selectPoolTxAPI select part of queries to get PoolL2TxRead

View File

@@ -25,6 +25,7 @@ type L2DB struct {
safetyPeriod common.BatchNum
ttl time.Duration
maxTxs uint32 // limit of txs that are accepted in the pool
minFeeUSD float64
apiConnCon *db.APIConnectionController
}
@@ -35,6 +36,7 @@ func NewL2DB(
db *sqlx.DB,
safetyPeriod common.BatchNum,
maxTxs uint32,
minFeeUSD float64,
TTL time.Duration,
apiConnCon *db.APIConnectionController,
) *L2DB {
@@ -43,6 +45,7 @@ func NewL2DB(
safetyPeriod: safetyPeriod,
ttl: TTL,
maxTxs: maxTxs,
minFeeUSD: minFeeUSD,
apiConnCon: apiConnCon,
}
}
@@ -73,24 +76,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun
))
}
// AddTx inserts a tx to the pool
func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
row := l2db.db.QueryRow(
"SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
common.PoolL2TxStatePending,
)
var totalTxs uint32
if err := row.Scan(&totalTxs); err != nil {
return tracerr.Wrap(err)
}
if totalTxs >= l2db.maxTxs {
return tracerr.New(
"The pool is at full capacity. More transactions are not accepted currently",
)
}
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
}
// UpdateTxsInfo updates the parameter Info of the pool transactions
func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
if len(txs) == 0 {
@@ -122,9 +107,8 @@ func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
return nil
}
// AddTxTest inserts a tx into the L2DB. This is useful for test purposes,
// but in production txs will only be inserted through the API
func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
// NewPoolL2TxWriteFromPoolL2Tx creates a new PoolL2TxWrite from a PoolL2Tx
func NewPoolL2TxWriteFromPoolL2Tx(tx *common.PoolL2Tx) *PoolL2TxWrite {
// transform tx from *common.PoolL2Tx to PoolL2TxWrite
insertTx := &PoolL2TxWrite{
TxID: tx.TxID,
@@ -166,6 +150,13 @@ func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
f := new(big.Float).SetInt(tx.Amount)
amountF, _ := f.Float64()
insertTx.AmountFloat = amountF
return insertTx
}
// AddTxTest inserts a tx into the L2DB. This is useful for test purposes,
// but in production txs will only be inserted through the API
func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
insertTx := NewPoolL2TxWriteFromPoolL2Tx(tx)
// insert tx
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", insertTx))
}
@@ -176,7 +167,8 @@ tx_pool.to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
tx_pool.state, tx_pool.info, tx_pool.signature, tx_pool.timestamp, rq_from_idx,
rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount,
tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type,
fee_percentage(tx_pool.fee::NUMERIC) * token.usd * tx_pool.amount_f AS fee_usd, token.usd_update
(fee_percentage(tx_pool.fee::NUMERIC) * token.usd * tx_pool.amount_f) /
(10.0 ^ token.decimals::NUMERIC) AS fee_usd, token.usd_update
FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
// GetTx return the specified Tx in common.PoolL2Tx format
@@ -354,3 +346,14 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
)
return tracerr.Wrap(err)
}
// PurgeByExternalDelete deletes all pending transactions marked with true in
// the `external_delete` column. An external process can set this column to
// true to instruct the coordinator to delete the tx when possible.
func (l2db *L2DB) PurgeByExternalDelete() error {
_, err := l2db.db.Exec(
`DELETE from tx_pool WHERE (external_delete = true AND state = $1);`,
common.PoolL2TxStatePending,
)
return tracerr.Wrap(err)
}

View File

@@ -1,8 +1,8 @@
package l2db
import (
"math"
"math/big"
"database/sql"
"fmt"
"os"
"testing"
"time"
@@ -20,12 +20,14 @@ import (
"github.com/stretchr/testify/require"
)
var decimals = uint64(3)
var tokenValue = 1.0 // The price update gives a value of 1.0 USD to the token
var l2DB *L2DB
var l2DBWithACC *L2DB
var historyDB *historydb.HistoryDB
var tc *til.Context
var tokens map[common.TokenID]historydb.TokenWithUSD
var tokensValue map[common.TokenID]float64
var accs map[common.Idx]common.Account
func TestMain(m *testing.M) {
@@ -35,9 +37,9 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
l2DB = NewL2DB(db, 10, 1000, 24*time.Hour, nil)
l2DB = NewL2DB(db, 10, 1000, 0.0, 24*time.Hour, nil)
apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second)
l2DBWithACC = NewL2DB(db, 10, 1000, 24*time.Hour, apiConnCon)
l2DBWithACC = NewL2DB(db, 10, 1000, 0.0, 24*time.Hour, apiConnCon)
test.WipeDB(l2DB.DB())
historyDB = historydb.NewHistoryDB(db, nil)
// Run tests
@@ -58,10 +60,10 @@ func prepareHistoryDB(historyDB *historydb.HistoryDB) error {
AddToken(1)
AddToken(2)
CreateAccountDeposit(1) A: 2000
CreateAccountDeposit(2) A: 2000
CreateAccountDeposit(1) B: 1000
CreateAccountDeposit(2) B: 1000
CreateAccountDeposit(1) A: 20000
CreateAccountDeposit(2) A: 20000
CreateAccountDeposit(1) B: 10000
CreateAccountDeposit(2) B: 10000
> batchL1
> batchL1
> block
@@ -82,15 +84,23 @@ func prepareHistoryDB(historyDB *historydb.HistoryDB) error {
if err != nil {
return tracerr.Wrap(err)
}
for i := range blocks {
block := &blocks[i]
for j := range block.Rollup.AddedTokens {
token := &block.Rollup.AddedTokens[j]
token.Name = fmt.Sprintf("Token %d", token.TokenID)
token.Symbol = fmt.Sprintf("TK%d", token.TokenID)
token.Decimals = decimals
}
}
tokens = make(map[common.TokenID]historydb.TokenWithUSD)
tokensValue = make(map[common.TokenID]float64)
// tokensValue = make(map[common.TokenID]float64)
accs = make(map[common.Idx]common.Account)
value := 5 * 5.389329
now := time.Now().UTC()
// Add all blocks except for the last one
for i := range blocks[:len(blocks)-1] {
err = historyDB.AddBlockSCData(&blocks[i])
if err != nil {
if err := historyDB.AddBlockSCData(&blocks[i]); err != nil {
return tracerr.Wrap(err)
}
for _, batch := range blocks[i].Rollup.Batches {
@@ -106,39 +116,38 @@ func prepareHistoryDB(historyDB *historydb.HistoryDB) error {
Name: token.Name,
Symbol: token.Symbol,
Decimals: token.Decimals,
USD: &tokenValue,
USDUpdate: &now,
}
tokensValue[token.TokenID] = value / math.Pow(10, float64(token.Decimals))
readToken.USDUpdate = &now
readToken.USD = &value
tokens[token.TokenID] = readToken
}
// Set value to the tokens (tokens have no symbol)
tokenSymbol := ""
err := historyDB.UpdateTokenValue(tokenSymbol, value)
// Set value to the tokens
err := historyDB.UpdateTokenValue(readToken.Symbol, *readToken.USD)
if err != nil {
return tracerr.Wrap(err)
}
}
}
return nil
}
func generatePoolL2Txs() ([]common.PoolL2Tx, error) {
// Fee = 126 corresponds to ~10%
setPool := `
Type: PoolL2
PoolTransfer(1) A-B: 6 (4)
PoolTransfer(2) A-B: 3 (1)
PoolTransfer(1) B-A: 5 (2)
PoolTransfer(2) B-A: 10 (3)
PoolTransfer(1) A-B: 7 (2)
PoolTransfer(2) A-B: 2 (1)
PoolTransfer(1) B-A: 8 (2)
PoolTransfer(2) B-A: 1 (1)
PoolTransfer(1) A-B: 3 (1)
PoolTransferToEthAddr(2) B-A: 5 (2)
PoolTransferToBJJ(2) B-A: 5 (2)
PoolTransfer(1) A-B: 6000 (126)
PoolTransfer(2) A-B: 3000 (126)
PoolTransfer(1) B-A: 5000 (126)
PoolTransfer(2) B-A: 10000 (126)
PoolTransfer(1) A-B: 7000 (126)
PoolTransfer(2) A-B: 2000 (126)
PoolTransfer(1) B-A: 8000 (126)
PoolTransfer(2) B-A: 1000 (126)
PoolTransfer(1) A-B: 3000 (126)
PoolTransferToEthAddr(2) B-A: 5000 (126)
PoolTransferToBJJ(2) B-A: 5000 (126)
PoolExit(1) A: 5 (2)
PoolExit(2) B: 3 (1)
PoolExit(1) A: 5000 (126)
PoolExit(2) B: 3000 (126)
`
poolL2Txs, err := tc.GeneratePoolL2Txs(setPool)
if err != nil {
@@ -153,25 +162,74 @@ func TestAddTxTest(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
fetchedTx, err := l2DB.GetTx(poolL2Txs[i].TxID)
assert.NoError(t, err)
require.NoError(t, err)
assertTx(t, &poolL2Txs[i], fetchedTx)
nameZone, offset := fetchedTx.Timestamp.Zone()
assert.Equal(t, "UTC", nameZone)
assert.Equal(t, 0, offset)
}
}
func TestAddTxAPI(t *testing.T) {
err := prepareHistoryDB(historyDB)
if err != nil {
log.Error("Error prepare historyDB", err)
}
oldMaxTxs := l2DBWithACC.maxTxs
// set max number of pending txs that can be kept in the pool to 5
l2DBWithACC.maxTxs = 5
poolL2Txs, err := generatePoolL2Txs()
txs := make([]*PoolL2TxWrite, len(poolL2Txs))
for i := range poolL2Txs {
txs[i] = NewPoolL2TxWriteFromPoolL2Tx(&poolL2Txs[i])
}
require.NoError(t, err)
require.GreaterOrEqual(t, len(poolL2Txs), 8)
for i := range txs[:5] {
err := l2DBWithACC.AddTxAPI(txs[i])
require.NoError(t, err)
fetchedTx, err := l2DB.GetTx(poolL2Txs[i].TxID)
require.NoError(t, err)
assertTx(t, &poolL2Txs[i], fetchedTx)
nameZone, offset := fetchedTx.Timestamp.Zone()
assert.Equal(t, "UTC", nameZone)
assert.Equal(t, 0, offset)
}
err = l2DBWithACC.AddTxAPI(txs[5])
assert.Equal(t, errPoolFull, tracerr.Unwrap(err))
// reset maxTxs to original value
l2DBWithACC.maxTxs = oldMaxTxs
// set minFeeUSD to a high value than the tx feeUSD to test the error
// of inserting a tx with lower than min fee
oldMinFeeUSD := l2DBWithACC.minFeeUSD
tx := txs[5]
feeAmount, err := common.CalcFeeAmount(tx.Amount, tx.Fee)
require.NoError(t, err)
feeAmountUSD := common.TokensToUSD(feeAmount, decimals, tokenValue)
// set minFeeUSD higher than the tx fee to trigger the error
l2DBWithACC.minFeeUSD = feeAmountUSD + 1
err = l2DBWithACC.AddTxAPI(tx)
require.Error(t, err)
assert.Regexp(t, "tx.feeUSD (.*) < minFeeUSD (.*)", err.Error())
// reset minFeeUSD to original value
l2DBWithACC.minFeeUSD = oldMinFeeUSD
}
func TestUpdateTxsInfo(t *testing.T) {
err := prepareHistoryDB(historyDB)
if err != nil {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
require.NoError(t, err)
@@ -185,7 +243,7 @@ func TestUpdateTxsInfo(t *testing.T) {
for i := range poolL2Txs {
fetchedTx, err := l2DB.GetTx(poolL2Txs[i].TxID)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "test", fetchedTx.Info)
}
}
@@ -203,9 +261,8 @@ func assertTx(t *testing.T, expected, actual *common.PoolL2Tx) {
assert.Less(t, token.USDUpdate.Unix()-3, actual.AbsoluteFeeUpdate.Unix())
expected.AbsoluteFeeUpdate = actual.AbsoluteFeeUpdate
// Set expected fee
f := new(big.Float).SetInt(expected.Amount)
amountF, _ := f.Float64()
expected.AbsoluteFee = *token.USD * amountF * expected.Fee.Percentage()
amountUSD := common.TokensToUSD(expected.Amount, token.Decimals, *token.USD)
expected.AbsoluteFee = amountUSD * expected.Fee.Percentage()
test.AssertUSD(t, &expected.AbsoluteFee, &actual.AbsoluteFee)
}
assert.Equal(t, expected, actual)
@@ -230,19 +287,28 @@ func TestGetPending(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
var pendingTxs []*common.PoolL2Tx
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
pendingTxs = append(pendingTxs, &poolL2Txs[i])
}
fetchedTxs, err := l2DB.GetPendingTxs()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, len(pendingTxs), len(fetchedTxs))
for i := range fetchedTxs {
assertTx(t, pendingTxs[i], &fetchedTxs[i])
}
// Check AbsoluteFee amount
for i := range fetchedTxs {
tx := &fetchedTxs[i]
feeAmount, err := common.CalcFeeAmount(tx.Amount, tx.Fee)
require.NoError(t, err)
feeAmountUSD := common.TokensToUSD(feeAmount,
tokens[tx.TokenID].Decimals, *tokens[tx.TokenID].USD)
assert.InEpsilon(t, feeAmountUSD, tx.AbsoluteFee, 0.01)
}
}
func TestStartForging(t *testing.T) {
@@ -253,13 +319,13 @@ func TestStartForging(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
var startForgingTxIDs []common.TxID
randomizer := 0
// Add txs to DB
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
if poolL2Txs[i].State == common.PoolL2TxStatePending && randomizer%2 == 0 {
startForgingTxIDs = append(startForgingTxIDs, poolL2Txs[i].TxID)
}
@@ -267,11 +333,11 @@ func TestStartForging(t *testing.T) {
}
// Start forging txs
err = l2DB.StartForging(startForgingTxIDs, fakeBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Fetch txs and check that they've been updated correctly
for _, id := range startForgingTxIDs {
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, common.PoolL2TxStateForging, fetchedTx.State)
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
}
@@ -285,13 +351,13 @@ func TestDoneForging(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
var startForgingTxIDs []common.TxID
randomizer := 0
// Add txs to DB
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
if poolL2Txs[i].State == common.PoolL2TxStatePending && randomizer%2 == 0 {
startForgingTxIDs = append(startForgingTxIDs, poolL2Txs[i].TxID)
}
@@ -299,7 +365,7 @@ func TestDoneForging(t *testing.T) {
}
// Start forging txs
err = l2DB.StartForging(startForgingTxIDs, fakeBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
var doneForgingTxIDs []common.TxID
randomizer = 0
@@ -311,12 +377,12 @@ func TestDoneForging(t *testing.T) {
}
// Done forging txs
err = l2DB.DoneForging(doneForgingTxIDs, fakeBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Fetch txs and check that they've been updated correctly
for _, id := range doneForgingTxIDs {
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, common.PoolL2TxStateForged, fetchedTx.State)
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
}
@@ -330,13 +396,13 @@ func TestInvalidate(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
var invalidTxIDs []common.TxID
randomizer := 0
// Add txs to DB
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
if poolL2Txs[i].State != common.PoolL2TxStateInvalid && randomizer%2 == 0 {
randomizer++
invalidTxIDs = append(invalidTxIDs, poolL2Txs[i].TxID)
@@ -344,11 +410,11 @@ func TestInvalidate(t *testing.T) {
}
// Invalidate txs
err = l2DB.InvalidateTxs(invalidTxIDs, fakeBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Fetch txs and check that they've been updated correctly
for _, id := range invalidTxIDs {
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, common.PoolL2TxStateInvalid, fetchedTx.State)
assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum)
}
@@ -362,7 +428,7 @@ func TestInvalidateOldNonces(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
// Update Accounts currentNonce
var updateAccounts []common.IdxNonce
var currentNonce = common.Nonce(1)
@@ -379,13 +445,13 @@ func TestInvalidateOldNonces(t *testing.T) {
invalidTxIDs = append(invalidTxIDs, poolL2Txs[i].TxID)
}
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
}
// sanity check
require.Greater(t, len(invalidTxIDs), 0)
err = l2DB.InvalidateOldNonces(updateAccounts, fakeBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Fetch txs and check that they've been updated correctly
for _, id := range invalidTxIDs {
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
@@ -407,7 +473,7 @@ func TestReorg(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
reorgedTxIDs := []common.TxID{}
nonReorgedTxIDs := []common.TxID{}
@@ -418,7 +484,7 @@ func TestReorg(t *testing.T) {
// Add txs to DB
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
if poolL2Txs[i].State == common.PoolL2TxStatePending && randomizer%2 == 0 {
startForgingTxIDs = append(startForgingTxIDs, poolL2Txs[i].TxID)
allTxRandomize = append(allTxRandomize, poolL2Txs[i].TxID)
@@ -430,7 +496,7 @@ func TestReorg(t *testing.T) {
}
// Start forging txs
err = l2DB.StartForging(startForgingTxIDs, lastValidBatch)
assert.NoError(t, err)
require.NoError(t, err)
var doneForgingTxIDs []common.TxID
randomizer = 0
@@ -455,22 +521,22 @@ func TestReorg(t *testing.T) {
// Invalidate txs BEFORE reorgBatch --> nonReorg
err = l2DB.InvalidateTxs(invalidTxIDs, lastValidBatch)
assert.NoError(t, err)
require.NoError(t, err)
// Done forging txs in reorgBatch --> Reorg
err = l2DB.DoneForging(doneForgingTxIDs, reorgBatch)
assert.NoError(t, err)
require.NoError(t, err)
err = l2DB.Reorg(lastValidBatch)
assert.NoError(t, err)
require.NoError(t, err)
for _, id := range reorgedTxIDs {
tx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Nil(t, tx.BatchNum)
assert.Equal(t, common.PoolL2TxStatePending, tx.State)
}
for _, id := range nonReorgedTxIDs {
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, lastValidBatch, *fetchedTx.BatchNum)
}
}
@@ -487,7 +553,7 @@ func TestReorg2(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
reorgedTxIDs := []common.TxID{}
nonReorgedTxIDs := []common.TxID{}
@@ -498,7 +564,7 @@ func TestReorg2(t *testing.T) {
// Add txs to DB
for i := range poolL2Txs {
err := l2DB.AddTxTest(&poolL2Txs[i])
assert.NoError(t, err)
require.NoError(t, err)
if poolL2Txs[i].State == common.PoolL2TxStatePending && randomizer%2 == 0 {
startForgingTxIDs = append(startForgingTxIDs, poolL2Txs[i].TxID)
allTxRandomize = append(allTxRandomize, poolL2Txs[i].TxID)
@@ -510,7 +576,7 @@ func TestReorg2(t *testing.T) {
}
// Start forging txs
err = l2DB.StartForging(startForgingTxIDs, lastValidBatch)
assert.NoError(t, err)
require.NoError(t, err)
var doneForgingTxIDs []common.TxID
randomizer = 0
@@ -532,22 +598,22 @@ func TestReorg2(t *testing.T) {
}
// Done forging txs BEFORE reorgBatch --> nonReorg
err = l2DB.DoneForging(doneForgingTxIDs, lastValidBatch)
assert.NoError(t, err)
require.NoError(t, err)
// Invalidate txs in reorgBatch --> Reorg
err = l2DB.InvalidateTxs(invalidTxIDs, reorgBatch)
assert.NoError(t, err)
require.NoError(t, err)
err = l2DB.Reorg(lastValidBatch)
assert.NoError(t, err)
require.NoError(t, err)
for _, id := range reorgedTxIDs {
tx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Nil(t, tx.BatchNum)
assert.Equal(t, common.PoolL2TxStatePending, tx.State)
}
for _, id := range nonReorgedTxIDs {
fetchedTx, err := l2DBWithACC.GetTxAPI(id)
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, lastValidBatch, *fetchedTx.BatchNum)
}
}
@@ -563,7 +629,7 @@ func TestPurge(t *testing.T) {
var poolL2Tx []common.PoolL2Tx
for i := 0; i < generateTx; i++ {
poolL2TxAux, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
poolL2Tx = append(poolL2Tx, poolL2TxAux...)
}
@@ -590,7 +656,7 @@ func TestPurge(t *testing.T) {
deletedIDs = append(deletedIDs, poolL2Tx[i].TxID)
}
err := l2DB.AddTxTest(&tx)
assert.NoError(t, err)
require.NoError(t, err)
}
// Set batchNum keeped txs
for i := range keepedIDs {
@@ -598,17 +664,17 @@ func TestPurge(t *testing.T) {
"UPDATE tx_pool SET batch_num = $1 WHERE tx_id = $2;",
safeBatchNum, keepedIDs[i],
)
assert.NoError(t, err)
require.NoError(t, err)
}
// Start forging txs and set batchNum
err = l2DB.StartForging(doneForgingTxIDs, toDeleteBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Done forging txs and set batchNum
err = l2DB.DoneForging(doneForgingTxIDs, toDeleteBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Invalidate txs and set batchNum
err = l2DB.InvalidateTxs(invalidTxIDs, toDeleteBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Update timestamp of afterTTL txs
deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0)
for _, id := range afterTTLIDs {
@@ -617,12 +683,12 @@ func TestPurge(t *testing.T) {
"UPDATE tx_pool SET timestamp = $1, state = $2 WHERE tx_id = $3;",
deleteTimestamp, common.PoolL2TxStatePending, id,
)
assert.NoError(t, err)
require.NoError(t, err)
}
// Purge txs
err = l2DB.Purge(safeBatchNum)
assert.NoError(t, err)
require.NoError(t, err)
// Check results
for _, id := range deletedIDs {
_, err := l2DB.GetTx(id)
@@ -630,7 +696,7 @@ func TestPurge(t *testing.T) {
}
for _, id := range keepedIDs {
_, err := l2DB.GetTx(id)
assert.NoError(t, err)
require.NoError(t, err)
}
}
@@ -644,10 +710,10 @@ func TestAuth(t *testing.T) {
for i := 0; i < len(auths); i++ {
// Add to the DB
err := l2DB.AddAccountCreationAuth(auths[i])
assert.NoError(t, err)
require.NoError(t, err)
// Fetch from DB
auth, err := l2DB.GetAccountCreationAuth(auths[i].EthAddr)
assert.NoError(t, err)
require.NoError(t, err)
// Check fetched vs generated
assert.Equal(t, auths[i].EthAddr, auth.EthAddr)
assert.Equal(t, auths[i].BJJ, auth.BJJ)
@@ -665,7 +731,7 @@ func TestAddGet(t *testing.T) {
log.Error("Error prepare historyDB", err)
}
poolL2Txs, err := generatePoolL2Txs()
assert.NoError(t, err)
require.NoError(t, err)
// We will work with only 3 txs
require.GreaterOrEqual(t, len(poolL2Txs), 3)
@@ -701,3 +767,56 @@ func TestAddGet(t *testing.T) {
assert.Equal(t, txs[i], *dbTx)
}
}
func TestPurgeByExternalDelete(t *testing.T) {
err := prepareHistoryDB(historyDB)
if err != nil {
log.Error("Error prepare historyDB", err)
}
txs, err := generatePoolL2Txs()
require.NoError(t, err)
// We will work with 8 txs
require.GreaterOrEqual(t, len(txs), 8)
txs = txs[:8]
for i := range txs {
require.NoError(t, l2DB.AddTxTest(&txs[i]))
}
// We will recreate this scenario:
// tx index, status , external_delete
// 0 , pending, false
// 1 , pending, false
// 2 , pending, true // will be deleted
// 3 , pending, true // will be deleted
// 4 , fging , false
// 5 , fging , false
// 6 , fging , true
// 7 , fging , true
require.NoError(t, l2DB.StartForging(
[]common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID},
1))
_, err = l2DB.db.Exec(
`UPDATE tx_pool SET external_delete = true WHERE
tx_id IN ($1, $2, $3, $4)
;`,
txs[2].TxID, txs[3].TxID, txs[6].TxID, txs[7].TxID,
)
require.NoError(t, err)
require.NoError(t, l2DB.PurgeByExternalDelete())
// Query txs that are have been not deleted
for _, i := range []int{0, 1, 4, 5, 6, 7} {
txID := txs[i].TxID
_, err := l2DB.GetTx(txID)
require.NoError(t, err)
}
// Query txs that have been deleted
for _, i := range []int{2, 3} {
txID := txs[i].TxID
_, err := l2DB.GetTx(txID)
require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err))
}
}

View File

@@ -34,6 +34,7 @@ type PoolL2TxWrite struct {
RqFee *common.FeeSelector `meddler:"rq_fee"`
RqNonce *common.Nonce `meddler:"rq_nonce"`
Type common.TxType `meddler:"tx_type"`
ClientIP string `meddler:"client_ip"`
}
// PoolTxAPI represents a L2 Tx pool with extra metadata used by the API

View File

@@ -47,7 +47,7 @@ CREATE TABLE token (
name VARCHAR(20) NOT NULL,
symbol VARCHAR(10) NOT NULL,
decimals INT NOT NULL,
usd NUMERIC,
usd NUMERIC, -- value of a normalized token (divided by 10^decimals)
usd_update TIMESTAMP WITHOUT TIME ZONE
);
@@ -100,6 +100,15 @@ CREATE TABLE account (
eth_addr BYTEA NOT NULL
);
CREATE TABLE account_update (
item_id SERIAL,
eth_block_num BIGINT NOT NULL REFERENCES block (eth_block_num) ON DELETE CASCADE,
batch_num BIGINT NOT NULL REFERENCES batch (batch_num) ON DELETE CASCADE,
idx BIGINT NOT NULL REFERENCES account (idx) ON DELETE CASCADE,
nonce BIGINT NOT NULL,
balance BYTEA NOT NULL
);
CREATE TABLE exit_tree (
item_id SERIAL PRIMARY KEY,
batch_num BIGINT REFERENCES batch (batch_num) ON DELETE CASCADE,
@@ -618,7 +627,9 @@ CREATE TABLE tx_pool (
rq_amount BYTEA,
rq_fee SMALLINT,
rq_nonce BIGINT,
tx_type VARCHAR(40) NOT NULL
tx_type VARCHAR(40) NOT NULL,
client_ip VARCHAR,
external_delete BOOLEAN NOT NULL DEFAULT false
);
-- +migrate StatementBegin
@@ -674,6 +685,7 @@ DROP TABLE token_exchange;
DROP TABLE wdelayer_vars;
DROP TABLE tx;
DROP TABLE exit_tree;
DROP TABLE account_update;
DROP TABLE account;
DROP TABLE token;
DROP TABLE bid;

View File

@@ -324,5 +324,6 @@ func (c *EthereumClient) EthCall(ctx context.Context, tx *types.Transaction,
Value: tx.Value(),
Data: tx.Data(),
}
return c.client.CallContract(ctx, msg, blockNum)
result, err := c.client.CallContract(ctx, msg, blockNum)
return result, tracerr.Wrap(err)
}

View File

@@ -327,7 +327,7 @@ func (c *RollupClient) RollupForgeBatch(args *RollupForgeBatchArgs, auth *bind.T
if auth == nil {
auth, err = c.client.NewAuth()
if err != nil {
return nil, err
return nil, tracerr.Wrap(err)
}
auth.GasLimit = 1000000
}
@@ -393,7 +393,7 @@ func (c *RollupClient) RollupForgeBatch(args *RollupForgeBatchArgs, auth *bind.T
l1CoordinatorBytes, l1l2TxData, feeIdxCoordinator, args.VerifierIdx, args.L1Batch,
args.ProofA, args.ProofB, args.ProofC)
if err != nil {
return nil, tracerr.Wrap(fmt.Errorf("Failed Hermez.ForgeBatch: %w", err))
return nil, tracerr.Wrap(fmt.Errorf("Hermez.ForgeBatch: %w", err))
}
return tx, nil
}

View File

@@ -67,6 +67,11 @@ func Init(levelStr string, outputs []string) {
func sprintStackTrace(st []tracerr.Frame) string {
builder := strings.Builder{}
// Skip deepest frame because it belongs to the go runtime and we don't
// care about it.
if len(st) > 0 {
st = st[:len(st)-1]
}
for _, f := range st {
builder.WriteString(fmt.Sprintf("\n%s:%d %s()", f.Path, f.Line, f.Func))
}

View File

@@ -184,6 +184,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB, synchronizer.Config{
StatsRefreshPeriod: cfg.Synchronizer.StatsRefreshPeriod.Duration,
StoreAccountUpdates: cfg.Synchronizer.StoreAccountUpdates,
ChainID: chainIDU16,
})
if err != nil {
@@ -204,6 +205,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
db,
cfg.Coordinator.L2DB.SafetyPeriod,
cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD,
cfg.Coordinator.L2DB.TTL.Duration,
apiConnCon,
)
@@ -302,6 +304,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
ForgeDelay: cfg.Coordinator.ForgeDelay.Duration,
ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration,
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration,
EthClientAttempts: cfg.Coordinator.EthClient.Attempts,
EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration,
EthNoReuseNonce: cfg.Coordinator.EthClient.NoReuseNonce,

View File

@@ -207,6 +207,7 @@ type SCConsts struct {
// Config is the Synchronizer configuration
type Config struct {
StatsRefreshPeriod time.Duration
StoreAccountUpdates bool
ChainID uint16
}
@@ -993,6 +994,21 @@ func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, e
}
batchData.CreatedAccounts = processTxsOut.CreatedAccounts
if s.cfg.StoreAccountUpdates {
batchData.UpdatedAccounts = make([]common.AccountUpdate, 0,
len(processTxsOut.UpdatedAccounts))
for _, acc := range processTxsOut.UpdatedAccounts {
batchData.UpdatedAccounts = append(batchData.UpdatedAccounts,
common.AccountUpdate{
EthBlockNum: blockNum,
BatchNum: batchNum,
Idx: acc.Idx,
Nonce: acc.Nonce,
Balance: acc.Balance,
})
}
}
slotNum := int64(0)
if ethBlock.Num >= s.consts.Auction.GenesisBlockNum {
slotNum = (ethBlock.Num - s.consts.Auction.GenesisBlockNum) /

View File

@@ -171,6 +171,8 @@ func checkSyncBlock(t *testing.T, s *Synchronizer, blockNum int, block, syncBloc
*exit = syncBatch.ExitTree[j]
}
assert.Equal(t, batch.Batch, syncBatch.Batch)
// Ignore updated accounts
syncBatch.UpdatedAccounts = nil
assert.Equal(t, batch, syncBatch)
assert.Equal(t, &batch.Batch, dbBatch) //nolint:gosec
@@ -345,6 +347,7 @@ func TestSyncGeneral(t *testing.T) {
// Create Synchronizer
s, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
StoreAccountUpdates: true,
})
require.NoError(t, err)
@@ -736,6 +739,7 @@ func TestSyncForgerCommitment(t *testing.T) {
// Create Synchronizer
s, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
StoreAccountUpdates: true,
})
require.NoError(t, err)
@@ -836,6 +840,7 @@ func TestSyncForgerCommitment(t *testing.T) {
s2, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
StoreAccountUpdates: true,
})
require.NoError(t, err)
stats = s2.Stats()

View File

@@ -75,7 +75,7 @@ func initTxSelector(t *testing.T, chainID uint16, hermezContractAddr ethCommon.A
pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err)
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil)
l2DB := l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil)
dir, err := ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err)

View File

@@ -27,6 +27,9 @@ type TxProcessor struct {
// AccumulatedFees contains the accumulated fees for each token (Coord
// Idx) in the processed batch
AccumulatedFees map[common.Idx]*big.Int
// updatedAccounts stores the last version of the account when it has
// been created/updated by any of the processed transactions.
updatedAccounts map[common.Idx]*common.Account
config Config
}
@@ -55,6 +58,9 @@ type ProcessTxOutput struct {
CreatedAccounts []common.Account
CoordinatorIdxsMap map[common.TokenID]common.Idx
CollectedFees map[common.TokenID]*big.Int
// UpdatedAccounts returns the current state of each account
// created/updated by any of the processed transactions.
UpdatedAccounts map[common.Idx]*common.Account
}
func newErrorNotEnoughBalance(tx common.Tx) error {
@@ -127,6 +133,10 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
return nil, tracerr.Wrap(fmt.Errorf("L1UserTx + L1CoordinatorTx (%d) can not be bigger than MaxL1Tx (%d)", len(l1usertxs)+len(l1coordinatortxs), tp.config.MaxTx))
}
if tp.s.Type() == statedb.TypeSynchronizer {
tp.updatedAccounts = make(map[common.Idx]*common.Account)
}
exits := make([]processedExit, nTx)
if tp.s.Type() == statedb.TypeBatchBuilder {
@@ -198,7 +208,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
}
}
if tp.s.Type() == statedb.TypeSynchronizer || tp.s.Type() == statedb.TypeBatchBuilder {
if exitIdx != nil && exitTree != nil {
if exitIdx != nil && exitTree != nil && exitAccount != nil {
exits[tp.i] = processedExit{
exit: true,
newExit: newExit,
@@ -382,7 +392,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
tp.zki.EthAddr3[iFee] = common.EthAddrToBigInt(accCoord.EthAddr)
}
accCoord.Balance = new(big.Int).Add(accCoord.Balance, accumulatedFee)
pFee, err := tp.s.UpdateAccount(idx, accCoord)
pFee, err := tp.updateAccount(idx, accCoord)
if err != nil {
log.Error(err)
return nil, tracerr.Wrap(err)
@@ -407,6 +417,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
return nil, nil
}
if tp.s.Type() == statedb.TypeSynchronizer {
// once all txs processed (exitTree root frozen), for each Exit,
// generate common.ExitInfo data
var exitInfos []common.ExitInfo
@@ -438,8 +449,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
}
}
if tp.s.Type() == statedb.TypeSynchronizer {
// retuTypeexitInfos, createdAccounts and collectedFees, so Synchronizer will
// retun exitInfos, createdAccounts and collectedFees, so Synchronizer will
// be able to store it into HistoryDB for the concrete BatchNum
return &ProcessTxOutput{
ZKInputs: nil,
@@ -447,6 +457,7 @@ func (tp *TxProcessor) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinat
CreatedAccounts: createdAccounts,
CoordinatorIdxsMap: coordIdxsMap,
CollectedFees: collectedFees,
UpdatedAccounts: tp.updatedAccounts,
}, nil
}
@@ -741,7 +752,7 @@ func (tp *TxProcessor) applyCreateAccount(tx *common.L1Tx) error {
EthAddr: tx.FromEthAddr,
}
p, err := tp.s.CreateAccount(common.Idx(tp.s.CurrentIdx()+1), account)
p, err := tp.createAccount(common.Idx(tp.s.CurrentIdx()+1), account)
if err != nil {
return tracerr.Wrap(err)
}
@@ -776,6 +787,28 @@ func (tp *TxProcessor) applyCreateAccount(tx *common.L1Tx) error {
return tp.s.SetCurrentIdx(tp.s.CurrentIdx() + 1)
}
// createAccount is a wrapper over the StateDB.CreateAccount method that also
// stores the created account in the updatedAccounts map in case the StateDB is
// of TypeSynchronizer
func (tp *TxProcessor) createAccount(idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) {
if tp.s.Type() == statedb.TypeSynchronizer {
account.Idx = idx
tp.updatedAccounts[idx] = account
}
return tp.s.CreateAccount(idx, account)
}
// updateAccount is a wrapper over the StateDB.UpdateAccount method that also
// stores the updated account in the updatedAccounts map in case the StateDB is
// of TypeSynchronizer
func (tp *TxProcessor) updateAccount(idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) {
if tp.s.Type() == statedb.TypeSynchronizer {
account.Idx = idx
tp.updatedAccounts[idx] = account
}
return tp.s.UpdateAccount(idx, account)
}
// applyDeposit updates the balance in the account of the depositer, if
// andTransfer parameter is set to true, the method will also apply the
// Transfer of the L1Tx/DepositTransfer
@@ -806,7 +839,7 @@ func (tp *TxProcessor) applyDeposit(tx *common.L1Tx, transfer bool) error {
}
// update sender account in localStateDB
p, err := tp.s.UpdateAccount(tx.FromIdx, accSender)
p, err := tp.updateAccount(tx.FromIdx, accSender)
if err != nil {
return tracerr.Wrap(err)
}
@@ -843,7 +876,7 @@ func (tp *TxProcessor) applyDeposit(tx *common.L1Tx, transfer bool) error {
accReceiver.Balance = new(big.Int).Add(accReceiver.Balance, tx.EffectiveAmount)
// update receiver account in localStateDB
p, err := tp.s.UpdateAccount(tx.ToIdx, accReceiver)
p, err := tp.updateAccount(tx.ToIdx, accReceiver)
if err != nil {
return tracerr.Wrap(err)
}
@@ -926,7 +959,7 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx,
}
// update sender account in localStateDB
pSender, err := tp.s.UpdateAccount(tx.FromIdx, accSender)
pSender, err := tp.updateAccount(tx.FromIdx, accSender)
if err != nil {
log.Error(err)
return tracerr.Wrap(err)
@@ -965,7 +998,7 @@ func (tp *TxProcessor) applyTransfer(coordIdxsMap map[common.TokenID]common.Idx,
accReceiver.Balance = new(big.Int).Add(accReceiver.Balance, tx.Amount)
// update receiver account in localStateDB
pReceiver, err := tp.s.UpdateAccount(auxToIdx, accReceiver)
pReceiver, err := tp.updateAccount(auxToIdx, accReceiver)
if err != nil {
return tracerr.Wrap(err)
}
@@ -1008,7 +1041,7 @@ func (tp *TxProcessor) applyCreateAccountDepositTransfer(tx *common.L1Tx) error
}
// create Account of the Sender
p, err := tp.s.CreateAccount(common.Idx(tp.s.CurrentIdx()+1), accSender)
p, err := tp.createAccount(common.Idx(tp.s.CurrentIdx()+1), accSender)
if err != nil {
return tracerr.Wrap(err)
}
@@ -1056,7 +1089,7 @@ func (tp *TxProcessor) applyCreateAccountDepositTransfer(tx *common.L1Tx) error
accReceiver.Balance = new(big.Int).Add(accReceiver.Balance, tx.EffectiveAmount)
// update receiver account in localStateDB
p, err = tp.s.UpdateAccount(tx.ToIdx, accReceiver)
p, err = tp.updateAccount(tx.ToIdx, accReceiver)
if err != nil {
return tracerr.Wrap(err)
}
@@ -1130,7 +1163,7 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
}
}
p, err := tp.s.UpdateAccount(tx.FromIdx, acc)
p, err := tp.updateAccount(tx.FromIdx, acc)
if err != nil {
return nil, false, tracerr.Wrap(err)
}
@@ -1141,6 +1174,11 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
if exitTree == nil {
return nil, false, nil
}
if tx.Amount.Cmp(big.NewInt(0)) == 0 { // Amount == 0
// if the Exit Amount==0, the Exit is not added to the ExitTree
return nil, false, nil
}
exitAccount, err := statedb.GetAccountInTreeDB(exitTree.DB(), tx.FromIdx)
if tracerr.Unwrap(err) == db.ErrNotFound {
// 1a. if idx does not exist in exitTree:

View File

@@ -4,6 +4,7 @@ import (
"io/ioutil"
"math/big"
"os"
"sort"
"testing"
ethCommon "github.com/ethereum/go-ethereum/common"
@@ -911,3 +912,198 @@ func TestTwoExits(t *testing.T) {
assert.Equal(t, ptOuts[3].ExitInfos[0].MerkleProof, ptOuts2[3].ExitInfos[0].MerkleProof)
}
func TestExitOf0Amount(t *testing.T) {
// Test to check that when doing an Exit with amount 0 the Exit Root
// does not change (as there is no new Exit Leaf created)
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeBatchBuilder, NLevels: 32})
assert.NoError(t, err)
chainID := uint16(1)
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
set := `
Type: Blockchain
CreateAccountDeposit(0) A: 100
CreateAccountDeposit(0) B: 100
> batchL1 // batch1: freeze L1User{2}
> batchL1 // batch2: forge L1User{2}
ForceExit(0) A: 10
ForceExit(0) B: 0
> batchL1 // batch3: freeze L1User{2}
> batchL1 // batch4: forge L1User{2}
ForceExit(0) A: 10
> batchL1 // batch5: freeze L1User{1}
> batchL1 // batch6: forge L1User{1}
ForceExit(0) A: 0
> batchL1 // batch7: freeze L1User{1}
> batchL1 // batch8: forge L1User{1}
> block
`
blocks, err := tc.GenerateBlocks(set)
require.NoError(t, err)
err = tc.FillBlocksExtra(blocks, &til.ConfigExtra{})
require.NoError(t, err)
err = tc.FillBlocksForgedL1UserTxs(blocks)
require.NoError(t, err)
// Sanity check
require.Equal(t, 2, len(blocks[0].Rollup.Batches[1].L1UserTxs))
require.Equal(t, 2, len(blocks[0].Rollup.Batches[3].L1UserTxs))
require.Equal(t, big.NewInt(10), blocks[0].Rollup.Batches[3].L1UserTxs[0].Amount)
require.Equal(t, big.NewInt(0), blocks[0].Rollup.Batches[3].L1UserTxs[1].Amount)
config := Config{
NLevels: 32,
MaxFeeTx: 64,
MaxTx: 512,
MaxL1Tx: 16,
ChainID: chainID,
}
tp := NewTxProcessor(sdb, config)
// For this test are only processed the batches with transactions:
// - Batch2, equivalent to Batches[1]
// - Batch4, equivalent to Batches[3]
// - Batch6, equivalent to Batches[5]
// - Batch8, equivalent to Batches[7]
// process Batch2:
_, err = tp.ProcessTxs(nil, blocks[0].Rollup.Batches[1].L1UserTxs, nil, nil)
require.NoError(t, err)
// process Batch4:
ptOut, err := tp.ProcessTxs(nil, blocks[0].Rollup.Batches[3].L1UserTxs, nil, nil)
require.NoError(t, err)
assert.Equal(t, "14329759303391468223438874789317921522067594445474390443816827472846339238908", ptOut.ZKInputs.Metadata.NewExitRootRaw.BigInt().String())
exitRootBatch4 := ptOut.ZKInputs.Metadata.NewExitRootRaw.BigInt().String()
// process Batch6:
ptOut, err = tp.ProcessTxs(nil, blocks[0].Rollup.Batches[5].L1UserTxs, nil, nil)
require.NoError(t, err)
assert.Equal(t, "14329759303391468223438874789317921522067594445474390443816827472846339238908", ptOut.ZKInputs.Metadata.NewExitRootRaw.BigInt().String())
// Expect that the ExitRoot for the Batch6 will be equal than for the
// Batch4, as the Batch4 & Batch6 have the same tx with Exit Amount=10,
// and Batch4 has a 2nd tx with Exit Amount=0.
assert.Equal(t, exitRootBatch4, ptOut.ZKInputs.Metadata.NewExitRootRaw.BigInt().String())
// For the Batch8, as there is only 1 exit with Amount=0, the ExitRoot
// should be 0.
// process Batch8:
ptOut, err = tp.ProcessTxs(nil, blocks[0].Rollup.Batches[7].L1UserTxs, nil, nil)
require.NoError(t, err)
assert.Equal(t, "0", ptOut.ZKInputs.Metadata.NewExitRootRaw.BigInt().String())
}
func TestUpdatedAccounts(t *testing.T) {
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.NoError(t, os.RemoveAll(dir))
sdb, err := statedb.NewStateDB(statedb.Config{Path: dir, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 32})
assert.NoError(t, err)
set := `
Type: Blockchain
AddToken(1)
CreateAccountCoordinator(0) Coord // 256
CreateAccountCoordinator(1) Coord // 257
> batch // 1
CreateAccountDeposit(0) A: 50 // 258
CreateAccountDeposit(0) B: 60 // 259
CreateAccountDeposit(1) A: 70 // 260
CreateAccountDeposit(1) B: 80 // 261
> batchL1 // 2
> batchL1 // 3
Transfer(0) A-B: 5 (126)
> batch // 4
Exit(1) B: 5 (126)
> batch // 5
> block
`
chainID := uint16(0)
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocks(set)
require.NoError(t, err)
tilCfgExtra := til.ConfigExtra{
BootCoordAddr: ethCommon.HexToAddress("0xE39fEc6224708f0772D2A74fd3f9055A90E0A9f2"),
CoordUser: "Coord",
}
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
require.NoError(t, err)
tc.FillBlocksL1UserTxsBatchNum(blocks)
err = tc.FillBlocksForgedL1UserTxs(blocks)
require.NoError(t, err)
require.Equal(t, 5, len(blocks[0].Rollup.Batches))
config := Config{
NLevels: 32,
MaxFeeTx: 64,
MaxTx: 512,
MaxL1Tx: 16,
ChainID: chainID,
}
tp := NewTxProcessor(sdb, config)
sortedKeys := func(m map[common.Idx]*common.Account) []int {
keys := make([]int, 0)
for k := range m {
keys = append(keys, int(k))
}
sort.Ints(keys)
return keys
}
for _, batch := range blocks[0].Rollup.Batches {
l2Txs := common.L2TxsToPoolL2Txs(batch.L2Txs)
ptOut, err := tp.ProcessTxs(batch.Batch.FeeIdxsCoordinator, batch.L1UserTxs,
batch.L1CoordinatorTxs, l2Txs)
require.NoError(t, err)
switch batch.Batch.BatchNum {
case 1:
assert.Equal(t, 2, len(ptOut.UpdatedAccounts))
assert.Equal(t, []int{256, 257}, sortedKeys(ptOut.UpdatedAccounts))
case 2:
assert.Equal(t, 0, len(ptOut.UpdatedAccounts))
assert.Equal(t, []int{}, sortedKeys(ptOut.UpdatedAccounts))
case 3:
assert.Equal(t, 4, len(ptOut.UpdatedAccounts))
assert.Equal(t, []int{258, 259, 260, 261}, sortedKeys(ptOut.UpdatedAccounts))
case 4:
assert.Equal(t, 2+1, len(ptOut.UpdatedAccounts))
assert.Equal(t, []int{256, 258, 259}, sortedKeys(ptOut.UpdatedAccounts))
case 5:
assert.Equal(t, 1+1, len(ptOut.UpdatedAccounts))
assert.Equal(t, []int{257, 261}, sortedKeys(ptOut.UpdatedAccounts))
}
for idx, updAcc := range ptOut.UpdatedAccounts {
acc, err := sdb.GetAccount(idx)
require.NoError(t, err)
// If acc.Balance is 0, set it to 0 with big.NewInt so
// that the comparison succeeds. Without this, the
// comparison will not succeed because acc.Balance is
// set from a slice, and thus the internal big.Int
// buffer is not nil (big.Int.abs)
if acc.Balance.BitLen() == 0 {
acc.Balance = big.NewInt(0)
}
assert.Equal(t, acc, updAcc)
}
}
}

View File

@@ -18,20 +18,6 @@ import (
"github.com/iden3/go-iden3-crypto/babyjub"
)
// txs implements the interface Sort for an array of Tx, and sorts the txs by
// absolute fee
type txs []common.PoolL2Tx
func (t txs) Len() int {
return len(t)
}
func (t txs) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}
func (t txs) Less(i, j int) bool {
return t[i].AbsoluteFee > t[j].AbsoluteFee
}
// CoordAccount contains the data of the Coordinator account, that will be used
// to create new transactions of CreateAccountDeposit type to add new TokenID
// accounts for the Coordinator to receive the fees.
@@ -191,14 +177,16 @@ func (txsel *TxSelector) GetL1L2TxSelection(selectionConfig *SelectionConfig,
}
// discardedL2Txs contains an array of the L2Txs that have not been selected in this Batch
var discardedL2Txs []common.PoolL2Tx
var l1CoordinatorTxs []common.L1Tx
positionL1 := len(l1UserTxs)
var accAuths [][]byte
// sort l2TxsRaw (cropping at MaxTx at this point)
l2Txs0 := txsel.getL2Profitable(l2TxsRaw, selectionConfig.TxProcessorConfig.MaxTx)
l2Txs0, discardedL2Txs := txsel.getL2Profitable(l2TxsRaw, selectionConfig.TxProcessorConfig.MaxTx)
for i := range discardedL2Txs {
discardedL2Txs[i].Info = "Tx not selected due to low absolute fee"
}
noncesMap := make(map[common.Idx]common.Nonce)
var l2Txs []common.PoolL2Tx
@@ -590,11 +578,22 @@ func checkAlreadyPendingToCreate(l1CoordinatorTxs []common.L1Tx, tokenID common.
}
// getL2Profitable returns the profitable selection of L2Txssorted by Nonce
func (txsel *TxSelector) getL2Profitable(l2Txs []common.PoolL2Tx, max uint32) []common.PoolL2Tx {
// Sort by absolute fee
sort.Sort(txs(l2Txs))
func (txsel *TxSelector) getL2Profitable(l2Txs []common.PoolL2Tx, max uint32) ([]common.PoolL2Tx,
[]common.PoolL2Tx) {
// First sort by nonce so that txs from the same account are sorted so
// that they could be applied in succession.
sort.Slice(l2Txs, func(i, j int) bool {
return l2Txs[i].Nonce < l2Txs[j].Nonce
})
// Sort by absolute fee with SliceStable, so that txs with same
// AbsoluteFee are not rearranged and nonce order is kept in such case
sort.SliceStable(l2Txs, func(i, j int) bool {
return l2Txs[i].AbsoluteFee > l2Txs[j].AbsoluteFee
})
discardedL2Txs := []common.PoolL2Tx{}
if len(l2Txs) > int(max) {
discardedL2Txs = l2Txs[max:]
l2Txs = l2Txs[:max]
}
@@ -603,9 +602,9 @@ func (txsel *TxSelector) getL2Profitable(l2Txs []common.PoolL2Tx, max uint32) []
// Account is sorted, but the l2Txs can not be grouped by sender Account
// neither by Fee. This is because later on the Nonces will need to be
// sequential for the zkproof generation.
sort.SliceStable(l2Txs, func(i, j int) bool {
sort.Slice(l2Txs, func(i, j int) bool {
return l2Txs[i].Nonce < l2Txs[j].Nonce
})
return l2Txs
return l2Txs, discardedL2Txs
}

View File

@@ -29,7 +29,7 @@ func initTest(t *testing.T, chainID uint16, hermezContractAddr ethCommon.Address
pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err)
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil)
l2DB := l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil)
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
@@ -658,7 +658,7 @@ func TestTransferManyFromSameAccount(t *testing.T) {
tpc := txprocessor.Config{
NLevels: 16,
MaxFeeTx: 10,
MaxTx: 20,
MaxTx: 10,
MaxL1Tx: 10,
ChainID: chainID,
}
@@ -683,13 +683,17 @@ func TestTransferManyFromSameAccount(t *testing.T) {
PoolTransfer(0) A-B: 10 (126) // 6
PoolTransfer(0) A-B: 10 (126) // 7
PoolTransfer(0) A-B: 10 (126) // 8
PoolTransfer(0) A-B: 10 (126) // 9
PoolTransfer(0) A-B: 10 (126) // 10
PoolTransfer(0) A-B: 10 (126) // 11
`
poolL2Txs, err := tc.GeneratePoolL2Txs(batchPoolL2)
require.NoError(t, err)
require.Equal(t, 11, len(poolL2Txs))
// reorder poolL2Txs so that nonces are not sorted
poolL2Txs[0], poolL2Txs[7] = poolL2Txs[7], poolL2Txs[0]
poolL2Txs[1], poolL2Txs[6] = poolL2Txs[6], poolL2Txs[1]
poolL2Txs[1], poolL2Txs[10] = poolL2Txs[10], poolL2Txs[1]
// add the PoolL2Txs to the l2DB
addL2Txs(t, txsel, poolL2Txs)
@@ -699,8 +703,9 @@ func TestTransferManyFromSameAccount(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, 3, len(oL1UserTxs))
require.Equal(t, 0, len(oL1CoordTxs))
assert.Equal(t, 8, len(oL2Txs))
assert.Equal(t, 0, len(discardedL2Txs))
assert.Equal(t, 7, len(oL2Txs))
assert.Equal(t, 1, len(discardedL2Txs))
err = txsel.l2db.StartForging(common.TxIDsFromPoolL2Txs(oL2Txs), txsel.localAccountsDB.CurrentBatch())
require.NoError(t, err)
}