Update SQL schemas

This commit is contained in:
a_bennassar
2020-08-13 12:53:42 +02:00
committed by arnaucube
parent bf1a648091
commit cb1b820256
12 changed files with 418 additions and 108 deletions

View File

@@ -10,7 +10,7 @@ CREATE TABLE slot_min_prices (
min_prices VARCHAR(200) NOT NULL
);
CREATE TABLE coordiantor (
CREATE TABLE coordianator (
forger_addr BYTEA NOT NULL,
eth_block_num BIGINT NOT NULL REFERENCES block (eth_block_num) ON DELETE CASCADE,
beneficiary_addr BYTEA NOT NULL,
@@ -59,19 +59,17 @@ CREATE TABLE token (
CREATE TABLE l1tx (
tx_id BYTEA PRIMARY KEY,
to_forge_l1_txs_num BIGINT NOT NULL,
position INT NOT NULL,
user_origin BOOLEAN NOT NULL,
from_idx BIGINT NOT NULL,
from_eth_addr BYTEA NOT NULL,
from_bjj BYTEA NOT NULL,
to_idx BIGINT NOT NULL,
token_id INT NOT NULL REFERENCES token (token_id),
amount NUMERIC NOT NULL,
nonce BIGINT NOT NULL,
fee INT NOT NULL,
eth_block_num BIGINT NOT NULL REFERENCES block (eth_block_num) ON DELETE CASCADE,
to_forge_l1_txs_num BIGINT NOT NULL,
position INT NOT NULL,
origin_user BOOLEAN NOT NULL,
from_eth_addr BYTEA NOT NULL,
from_bjj BYTEA NOT NULL,
load_amount BYTEA NOT NULL
load_amount BYTEA NOT NULL,
eth_block_num BIGINT NOT NULL REFERENCES block (eth_block_num) ON DELETE CASCADE
);
CREATE TABLE l2tx (
@@ -101,6 +99,6 @@ DROP TABLE token;
DROP TABLE bid;
DROP TABLE exit_tree;
DROP TABLE batch;
DROP TABLE coordiantor;
DROP TABLE coordianator;
DROP TABLE slot_min_prices;
DROP TABLE block;

View File

@@ -1,17 +1,24 @@
package l2db
import (
"fmt"
"strconv"
"time"
eth "github.com/ethereum/go-ethereum/common"
"github.com/gobuffalo/packr/v2"
"github.com/hermeznetwork/hermez-node/common"
"github.com/jinzhu/gorm"
"github.com/hermeznetwork/hermez-node/db"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" // driver for postgres DB
migrate "github.com/rubenv/sql-migrate"
"github.com/russross/meddler"
)
// L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
// due to them being forged or invalid after a safety period
type L2DB struct {
db *gorm.DB
db *sqlx.DB
safetyPeriod uint16
ttl time.Duration
maxTxs uint32
@@ -25,23 +32,28 @@ type L2DB struct {
// (to prevent tx that won't ever be forged to stay there, will be used if maxTxs is exceeded).
// autoPurgePeriod will be used as delay between calls to Purge. If the value is 0, it will be disabled.
func NewL2DB(
dbDialect, dbArgs string,
port int, host, user, password, dbname string,
safetyPeriod uint16,
maxTxs uint32,
TTL time.Duration,
) (*L2DB, error) {
// init meddler
db.InitMeddler()
meddler.Default = meddler.PostgreSQL
// Stablish DB connection
db, err := gorm.Open(dbDialect, dbArgs)
psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
db, err := sqlx.Connect("postgres", psqlconn)
if err != nil {
return nil, err
}
// Create or update SQL schemas
// WARNING: AutoMigrate will ONLY create tables, missing columns and missing indexes,
// and WONT change existing columns type or delete unused columns to protect your data.
// more info: http://gorm.io/docs/migration.html
db.AutoMigrate(&common.PoolL2Tx{})
// TODO: db.AutoMigrate(&common.RegisterAuthorization{})
// Run DB migrations
migrations := &migrate.PackrMigrationSource{
Box: packr.New("history-migrations", "./migrations"),
}
if _, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up); err != nil {
return nil, err
}
return &L2DB{
db: db,
@@ -53,38 +65,66 @@ func NewL2DB(
// AddTx inserts a tx into the L2DB
func (l2db *L2DB) AddTx(tx *common.PoolL2Tx) error {
return nil
return meddler.Insert(l2db.db, "tx_pool", tx)
}
// AddAccountCreationAuth inserts an account creation authorization into the DB
func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error { // TODO: AddRegisterAuthorization(auth &common.RegisterAuthorization)
func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
// TODO: impl
return nil
}
// GetTx return the specified Tx
func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
return nil, nil
tx := new(common.PoolL2Tx)
return tx, meddler.QueryRow(
l2db.db, tx,
"SELECT * FROM tx_pool WHERE tx_id = $1;",
txID,
)
}
// GetPendingTxs return all the pending txs of the L2DB
func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) {
return nil, nil
func (l2db *L2DB) GetPendingTxs() ([]*common.PoolL2Tx, error) {
var txs []*common.PoolL2Tx
err := meddler.QueryAll(
l2db.db, &txs,
"SELECT * FROM tx_pool WHERE state = $1",
common.PoolL2TxStatePending,
)
return txs, err
}
// GetAccountCreationAuth return the authorization to make registers of an Ethereum address
func (l2db *L2DB) GetAccountCreationAuth(ethAddr eth.Address) (*common.AccountCreationAuth, error) {
// TODO: impl
return nil, nil
}
// StartForging updates the state of the transactions that will begin the forging process.
// The state of the txs referenced by txIDs will be changed from Pending -> Forging
func (l2db *L2DB) StartForging(txIDs []common.TxID) error {
return nil
func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
query, args, err := sqlx.In(
`UPDATE tx_pool
SET state = ?, batch_num = ?
WHERE state = ? AND tx_id IN (?);`,
string(common.PoolL2TxStateForging),
strconv.Itoa(int(batchNum)),
string(common.PoolL2TxStatePending),
txIDs,
)
if err != nil {
return err
}
query = l2db.db.Rebind(query)
_, err = l2db.db.Exec(query, args...)
return err
}
// DoneForging updates the state of the transactions that have been forged
// so the state of the txs referenced by txIDs will be changed from Forging -> Forged
func (l2db *L2DB) DoneForging(txIDs []common.TxID) error {
// TODO: impl
return nil
}
@@ -97,18 +137,33 @@ func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID) error {
// CheckNonces invalidate txs with nonces that are smaller than their respective accounts nonces.
// The state of the affected txs will be changed from Pending -> Invalid
func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account) error {
// TODO: impl
return nil
}
// GetTxsByAbsoluteFeeUpdate return the txs that have an AbsoluteFee updated before olderThan
func (l2db *L2DB) GetTxsByAbsoluteFeeUpdate(olderThan time.Time) ([]*common.PoolL2Tx, error) {
// TODO: impl
return nil, nil
}
// UpdateTxs update existing txs from the pool (TxID must exist)
func (l2db *L2DB) UpdateTxs(txs []*common.PoolL2Tx) error {
// TODO: impl
return nil
}
// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchian reorg.
// The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
// TODO: impl
return nil
}
// Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
// it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
func (l2db *L2DB) Purge() error {
// TODO: impl
return nil
}

181
db/l2db/l2db_test.go Normal file
View File

@@ -0,0 +1,181 @@
package l2db
import (
"fmt"
"math/big"
"os"
"strconv"
"testing"
"time"
eth "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/common"
"github.com/iden3/go-iden3-crypto/babyjub"
"github.com/stretchr/testify/assert"
)
var l2DB *L2DB
// In order to run the test you need to run a Posgres DB with
// a database named "l2" that is accessible by
// user: "hermez"
// pass: set it using the env var POSTGRES_PASS
// This can be achieved by running: POSTGRES_PASS=your_strong_pass && sudo docker run --rm --name hermez-db-test -p 5432:5432 -e POSTGRES_DB=history -e POSTGRES_USER=hermez -e POSTGRES_PASSWORD=$POSTGRES_PASS -d postgres && sleep 2s && sudo docker exec -it hermez-db-test psql -a history -U hermez -c "CREATE DATABASE l2;"
// After running the test you can stop the container by running: sudo docker kill hermez-ydb-test
// If you already did that for the HistoryDB you don't have to do it again
func TestMain(m *testing.M) {
// init DB
var err error
pass := os.Getenv("POSTGRES_PASS")
l2DB, err = NewL2DB(5432, "localhost", "hermez", pass, "l2", 10, 512, 24*time.Hour)
if err != nil {
panic(err)
}
// Run tests
result := m.Run()
// Close DB
if err := l2DB.Close(); err != nil {
fmt.Println("Error closing the history DB:", err)
}
os.Exit(result)
}
func TestAddTx(t *testing.T) {
const nInserts = 20
cleanDB()
txs := genTxs(nInserts)
for _, tx := range txs {
err := l2DB.AddTx(tx)
assert.NoError(t, err)
fetchedTx, err := l2DB.GetTx(tx.TxID)
assert.NoError(t, err)
assert.Equal(t, tx.Timestamp.Unix(), fetchedTx.Timestamp.Unix())
tx.Timestamp = fetchedTx.Timestamp
assert.Equal(t, tx.AbsoluteFeeUpdate.Unix(), fetchedTx.AbsoluteFeeUpdate.Unix())
tx.Timestamp = fetchedTx.Timestamp
tx.AbsoluteFeeUpdate = fetchedTx.AbsoluteFeeUpdate
assert.Equal(t, tx, fetchedTx)
}
}
func BenchmarkAddTx(b *testing.B) {
const nInserts = 20
cleanDB()
txs := genTxs(nInserts)
now := time.Now()
for _, tx := range txs {
l2DB.AddTx(tx)
}
elapsedTime := time.Since(now)
fmt.Println("Time to insert 2048 txs:", elapsedTime)
}
func TestGetPending(t *testing.T) {
const nInserts = 20
cleanDB()
txs := genTxs(nInserts)
var pendingTxs []*common.PoolL2Tx
for _, tx := range txs {
err := l2DB.AddTx(tx)
assert.NoError(t, err)
if tx.State == common.PoolL2TxStatePending {
pendingTxs = append(pendingTxs, tx)
}
}
fetchedTxs, err := l2DB.GetPendingTxs()
assert.NoError(t, err)
assert.Equal(t, len(pendingTxs), len(fetchedTxs))
for i, fetchedTx := range fetchedTxs {
assert.Equal(t, pendingTxs[i].Timestamp.Unix(), fetchedTx.Timestamp.Unix())
pendingTxs[i].Timestamp = fetchedTx.Timestamp
assert.Equal(t, pendingTxs[i].AbsoluteFeeUpdate.Unix(), fetchedTx.AbsoluteFeeUpdate.Unix())
pendingTxs[i].AbsoluteFeeUpdate = fetchedTx.AbsoluteFeeUpdate
assert.Equal(t, pendingTxs[i], fetchedTx)
}
}
func TestStartForging(t *testing.T) {
const nInserts = 24
const fakeBlockNum = 33
cleanDB()
txs := genTxs(nInserts)
var startForgingTxs []*common.PoolL2Tx
var startForgingTxIDs []common.TxID
randomizer := 0
for _, tx := range txs {
err := l2DB.AddTx(tx)
assert.NoError(t, err)
if tx.State == common.PoolL2TxStatePending && randomizer%2 == 0 {
randomizer++
startForgingTxs = append(startForgingTxs, tx)
startForgingTxIDs = append(startForgingTxIDs, tx.TxID)
}
if tx.State == common.PoolL2TxStateForging {
startForgingTxs = append(startForgingTxs, tx)
}
}
err := l2DB.StartForging(startForgingTxIDs, fakeBlockNum)
assert.NoError(t, err)
// TODO: Fetch txs and check that they've been updated correctly
}
func genTxs(n int) []*common.PoolL2Tx {
// WARNING: This tx doesn't follow the protocol (signature, txID, ...)
// it's just to test geting/seting from/to the DB.
// Type and RqTxCompressedData: not initialized because it's not stored
// on the DB and add noise when checking results.
txs := make([]*common.PoolL2Tx, 0, n)
privK := babyjub.NewRandPrivKey()
for i := 0; i < n; i++ {
var state common.PoolL2TxState
if i%4 == 0 {
state = common.PoolL2TxStatePending
} else if i%4 == 1 {
state = common.PoolL2TxStateInvalid
} else if i%4 == 2 {
state = common.PoolL2TxStateForging
} else if i%4 == 3 {
state = common.PoolL2TxStateForged
}
tx := &common.PoolL2Tx{
TxID: common.TxID(common.Hash([]byte(strconv.Itoa(i)))),
FromIdx: 47,
ToIdx: 96,
ToEthAddr: eth.BigToAddress(big.NewInt(234523534)),
ToBJJ: privK.Public(),
TokenID: 73,
Amount: big.NewInt(3487762374627846747),
Fee: 99,
Nonce: 28,
State: state,
Signature: *privK.SignPoseidon(big.NewInt(674238462)),
Timestamp: time.Now().UTC(),
}
if i%2 == 0 { // Optional parameters: rq
tx.RqFromIdx = 893
tx.RqToIdx = 334
tx.RqToEthAddr = eth.BigToAddress(big.NewInt(239457111187))
tx.RqToBJJ = privK.Public()
tx.RqTokenID = 222
tx.RqAmount = big.NewInt(3487762374627846747)
tx.RqFee = 11
tx.RqNonce = 78
}
if i%3 == 0 { // Optional parameters: things that get updated "a posteriori"
tx.BatchNum = 489
tx.AbsoluteFee = 39.12345
tx.AbsoluteFeeUpdate = time.Now().UTC()
}
txs = append(txs, tx)
}
return txs
}
func cleanDB() {
if _, err := l2DB.db.Exec("DELETE FROM tx_pool"); err != nil {
panic(err)
}
if _, err := l2DB.db.Exec("DELETE FROM account_creation_auth"); err != nil {
panic(err)
}
}

View File

@@ -0,0 +1,37 @@
-- +migrate Up
CREATE TABLE tx_pool (
tx_id BYTEA PRIMARY KEY,
from_idx BIGINT NOT NULL,
to_idx BIGINT NOT NULL,
to_eth_addr BYTEA NOT NULL,
to_bjj BYTEA NOT NULL,
token_id INT NOT NULL,
amount BYTEA NOT NULL,
fee SMALLINT NOT NULL,
nonce BIGINT NOT NULL,
state CHAR(4) NOT NULL,
batch_num BIGINT,
rq_from_idx BIGINT,
rq_to_idx BIGINT,
rq_to_eth_addr BYTEA,
rq_to_bjj BYTEA,
rq_token_id INT,
rq_amount BYTEA,
rq_fee SMALLINT,
rq_nonce BIGINT,
signature BYTEA NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL,
absolute_fee NUMERIC,
absolute_fee_update TIMESTAMP WITHOUT TIME ZONE
);
CREATE TABLE account_creation_auth (
eth_addr BYTEA PRIMARY KEY,
bjj BYTEA NOT NULL,
account_creation_auth_sig BYTEA NOT NULL,
timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL
);
-- +migrate Down
DROP TABLE account_creation_auth;
DROP TABLE tx_pool;

View File

@@ -13,6 +13,7 @@ import (
// InitMeddler registers tags to be used to read/write from SQL DBs using meddler
func InitMeddler() {
meddler.Register("bigint", BigIntMeddler{})
meddler.Register("bigintnull", BigIntNullMeddler{})
}
// BulkInsert performs a bulk insert with a single statement into the specified table. Example:
@@ -76,3 +77,42 @@ func (b BigIntMeddler) PreWrite(fieldPtr interface{}) (saveValue interface{}, er
return str, nil
}
// BigIntNullMeddler encodes or decodes the field value to or from JSON
type BigIntNullMeddler struct{}
// PreRead is called before a Scan operation for fields that have the BigIntNullMeddler
func (b BigIntNullMeddler) PreRead(fieldAddr interface{}) (scanTarget interface{}, err error) {
return &fieldAddr, nil
}
// PostRead is called after a Scan operation for fields that have the BigIntNullMeddler
func (b BigIntNullMeddler) PostRead(fieldPtr, scanTarget interface{}) error {
sv := reflect.ValueOf(scanTarget)
if sv.Elem().IsNil() {
// null column, so set target to be zero value
fv := reflect.ValueOf(fieldPtr)
fv.Elem().Set(reflect.Zero(fv.Elem().Type()))
return nil
}
// not null
encoded := new([]byte)
refEnc := reflect.ValueOf(encoded)
refEnc.Elem().Set(sv.Elem().Elem())
data, err := base64.StdEncoding.DecodeString(string(*encoded))
if err != nil {
return fmt.Errorf("big.Int decode error: %v", err)
}
field := fieldPtr.(**big.Int)
*field = new(big.Int).SetBytes(data)
return nil
}
// PreWrite is called before an Insert or Update operation for fields that have the BigIntNullMeddler
func (b BigIntNullMeddler) PreWrite(fieldPtr interface{}) (saveValue interface{}, err error) {
field := fieldPtr.(*big.Int)
if field == nil {
return nil, nil
}
return base64.StdEncoding.EncodeToString(field.Bytes()), nil
}