Implement L2DB

This commit is contained in:
Arnau B
2020-09-16 15:19:33 +02:00
parent 5ad447c47c
commit 94e1f11a98
11 changed files with 557 additions and 154 deletions

View File

@@ -2,10 +2,9 @@ package l2db
import (
"fmt"
"strconv"
"time"
eth "github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/gobuffalo/packr/v2"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db"
@@ -21,21 +20,17 @@ import (
// due to them being forged or invalid after a safety period
type L2DB struct {
db *sqlx.DB
safetyPeriod uint16
safetyPeriod common.BatchNum
ttl time.Duration
maxTxs uint32
}
// NewL2DB creates a L2DB.
// More info on how to set dbDialect and dbArgs here: http://gorm.io/docs/connecting_to_the_database.html
// safetyPeriod is the amount of blockchain blocks that must be waited before deleting anything (to avoid reorg problems).
// maxTxs indicates the desired maximum amount of txs stored on the L2DB.
// TTL indicates the maximum amount of time that a tx can be in the L2DB
// (to prevent tx that won't ever be forged to stay there, will be used if maxTxs is exceeded).
// autoPurgePeriod will be used as delay between calls to Purge. If the value is 0, it will be disabled.
// To create it, it's needed postgres configuration, safety period expressed in batches,
// maxTxs that the DB should have and TTL (time to live) for pending txs.
func NewL2DB(
port int, host, user, password, dbname string,
safetyPeriod uint16,
safetyPeriod common.BatchNum,
maxTxs uint32,
TTL time.Duration,
) (*L2DB, error) {
@@ -71,17 +66,26 @@ func (l2db *L2DB) DB() *sqlx.DB {
return l2db.db
}
// AddAccountCreationAuth inserts an account creation authorization into the DB
func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
return meddler.Insert(l2db.db, "account_creation_auth", auth)
}
// GetAccountCreationAuth returns an account creation authorization into the DB
func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
auth := new(common.AccountCreationAuth)
return auth, meddler.QueryRow(
l2db.db, auth,
"SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
addr,
)
}
// AddTx inserts a tx into the L2DB
func (l2db *L2DB) AddTx(tx *common.PoolL2Tx) error {
return meddler.Insert(l2db.db, "tx_pool", tx)
}
// AddAccountCreationAuth inserts an account creation authorization into the DB
func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
// TODO: impl
return nil
}
// GetTx return the specified Tx
func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
tx := new(common.PoolL2Tx)
@@ -103,12 +107,6 @@ func (l2db *L2DB) GetPendingTxs() ([]*common.PoolL2Tx, error) {
return txs, err
}
// GetAccountCreationAuth return the authorization to make registers of an Ethereum address
func (l2db *L2DB) GetAccountCreationAuth(ethAddr eth.Address) (*common.AccountCreationAuth, error) {
// TODO: impl
return nil, nil
}
// StartForging updates the state of the transactions that will begin the forging process.
// The state of the txs referenced by txIDs will be changed from Pending -> Forging
func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
@@ -116,9 +114,9 @@ func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) er
`UPDATE tx_pool
SET state = ?, batch_num = ?
WHERE state = ? AND tx_id IN (?);`,
string(common.PoolL2TxStateForging),
strconv.Itoa(int(batchNum)),
string(common.PoolL2TxStatePending),
common.PoolL2TxStateForging,
batchNum,
common.PoolL2TxStatePending,
txIDs,
)
if err != nil {
@@ -131,48 +129,157 @@ func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) er
// DoneForging updates the state of the transactions that have been forged
// so the state of the txs referenced by txIDs will be changed from Forging -> Forged
func (l2db *L2DB) DoneForging(txIDs []common.TxID) error {
// TODO: impl
return nil
func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
query, args, err := sqlx.In(
`UPDATE tx_pool
SET state = ?, batch_num = ?
WHERE state = ? AND tx_id IN (?);`,
common.PoolL2TxStateForged,
batchNum,
common.PoolL2TxStateForging,
txIDs,
)
if err != nil {
return err
}
query = l2db.db.Rebind(query)
_, err = l2db.db.Exec(query, args...)
return err
}
// InvalidateTxs updates the state of the transactions that are invalid.
// The state of the txs referenced by txIDs will be changed from * -> Invalid
func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID) error {
return nil
func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
query, args, err := sqlx.In(
`UPDATE tx_pool
SET state = ?, batch_num = ?
WHERE tx_id IN (?);`,
common.PoolL2TxStateInvalid,
batchNum,
txIDs,
)
if err != nil {
return err
}
query = l2db.db.Rebind(query)
_, err = l2db.db.Exec(query, args...)
return err
}
// CheckNonces invalidate txs with nonces that are smaller than their respective accounts nonces.
// CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces.
// The state of the affected txs will be changed from Pending -> Invalid
func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account) error {
// TODO: impl
return nil
func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) error {
txn, err := l2db.db.Begin()
if err != nil {
return err
}
defer func() {
// Rollback the transaction if there was an error.
if err != nil {
err = txn.Rollback()
}
}()
for i := 0; i < len(updatedAccounts); i++ {
_, err = txn.Exec(
`UPDATE tx_pool
SET state = $1, batch_num = $2
WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`,
common.PoolL2TxStateInvalid,
batchNum,
common.PoolL2TxStatePending,
updatedAccounts[i].Idx,
updatedAccounts[i].Nonce,
)
if err != nil {
return err
}
}
return txn.Commit()
}
// GetTxsByAbsoluteFeeUpdate return the txs that have an AbsoluteFee updated before olderThan
func (l2db *L2DB) GetTxsByAbsoluteFeeUpdate(olderThan time.Time) ([]*common.PoolL2Tx, error) {
// TODO: impl
return nil, nil
}
// UpdateTxs update existing txs from the pool (TxID must exist)
func (l2db *L2DB) UpdateTxs(txs []*common.PoolL2Tx) error {
// TODO: impl
return nil
// UpdateTxValue updates the absolute fee and value of txs given a token list that include their price in USD
func (l2db *L2DB) UpdateTxValue(tokens []common.Token) error {
// WARNING: this is very slow and should be optimized
txn, err := l2db.db.Begin()
if err != nil {
return err
}
defer func() {
// Rollback the transaction if there was an error.
if err != nil {
err = txn.Rollback()
}
}()
now := time.Now()
for i := 0; i < len(tokens); i++ {
_, err = txn.Exec(
`UPDATE tx_pool
SET usd_update = $1, value_usd = amount_f * $2, fee_usd = $2 * amount_f * CASE
WHEN fee = 0 THEN 0
WHEN fee >= 1 AND fee <= 32 THEN POWER(10,-24+(fee::float/2))
WHEN fee >= 33 AND fee <= 223 THEN POWER(10,-8+(0.041666666666667*(fee::float-32)))
WHEN fee >= 224 AND fee <= 255 THEN POWER(10,fee-224) END
WHERE token_id = $3;`,
now,
tokens[i].USD,
tokens[i].TokenID,
)
if err != nil {
return err
}
}
return txn.Commit()
}
// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
// The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
// TODO: impl
return nil
_, err := l2db.db.Exec(
`UPDATE tx_pool SET batch_num = NULL, state = $1
WHERE (state = $2 OR state = $3) AND batch_num > $4`,
common.PoolL2TxStatePending,
common.PoolL2TxStateForged,
common.PoolL2TxStateInvalid,
lastValidBatch,
)
return err
}
// Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
// it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
func (l2db *L2DB) Purge() error {
// TODO: impl
return nil
func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) error {
txn, err := l2db.db.Begin()
if err != nil {
return err
}
defer func() {
// Rollback the transaction if there was an error.
if err != nil {
err = txn.Rollback()
}
}()
// Delete pending txs that have been in the pool after the TTL if maxTxs is reached
now := time.Now().UTC().Unix()
_, err = txn.Exec(
`DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
l2db.maxTxs,
time.Unix(now-int64(l2db.ttl.Seconds()), 0),
)
if err != nil {
return err
}
// Delete txs that have been marked as forged / invalid after the safety period
_, err = txn.Exec(
`DELETE FROM tx_pool
WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
currentBatchNum-l2db.safetyPeriod,
common.PoolL2TxStateForged,
common.PoolL2TxStateInvalid,
)
if err != nil {
return err
}
return txn.Commit()
}
// Close frees the resources used by the L2DB