Compare commits

...

17 Commits

Author SHA1 Message Date
Eduard S
672d08c671 Calculate ForgeBatch gasLimit with parametrized formula
Add the following config parameters at
`Coordinator.EthClient.ForgeBatchGasCost`:
- `Fixed`
- `L1UserTx`
- `L1CoordTx`
- `L2Tx`
Which are the costs associated to a ForgeBatch transaction, split
into different parts to be used in the formula to compute the gasLimit.
2021-02-25 15:30:50 +01:00
Eduard S
9b7b333acf Reorg l2db before starting pipeline 2021-02-25 15:30:38 +01:00
Eduard S
2a5992d218 Fix missing timer reset in TxManager
Also, replace usage of time.Duration by time.NewTimer, because the later allows
replacing timers by stopping them before so that we never leak resources.
2021-02-25 15:30:26 +01:00
arnau
706e4c7a3d Merge pull request #575 from hermeznetwork/fix/httpserve
In http servers, first listen, then serve
2021-02-24 16:40:16 +01:00
arnau
bf4acfad9f Merge pull request #578 from hermeznetwork/feature/waitproofserverbeforebatch
Wait for serverProof before starting batch
2021-02-24 16:39:46 +01:00
a_bennassar
ba88b25425 Merge pull request #570 from hermeznetwork/feature/api-TxPerBatchAmend
Change how TX per Batch is calculated in state API endpoint
2021-02-24 15:49:16 +01:00
Eduard S
df729b3b71 Wait for serverProof before starting batch 2021-02-24 15:43:55 +01:00
Toni Ramírez
f8d01b5998 Change how TX per Batch is calculated in state API endpoint 2021-02-24 15:39:54 +01:00
Eduard S
155e06484b Merge pull request #571 from hermeznetwork/feature/sql-read-write
Diferentiate SQL connections by read write
2021-02-24 15:26:43 +01:00
Eduard S
5cdcae47b8 Merge pull request #577 from hermeznetwork/feature/zki-forceexit-circuit-effectiveamount-behaviour
Update ZKI generation to match circuit behaviour for Exit Amount>Balance
2021-02-24 15:25:32 +01:00
arnaubennassar
ed9bfdffa9 Diferentiate SQL connections by read write 2021-02-24 15:12:43 +01:00
arnaucube
504e36ac47 Update ZKI gen to match circuit behaviour for Exit
Currently the circuit does not use an Exit Leaf at the Exit MerkleTree
when the Amount=0 & EffectiveAmount=0, but it does use an Exit Leaf when
the Amount>0 but EffectiveAmount=0 (for example when tx.Amount >
sender.Balance). This is a particularity of the approach of the circuit,
the idea will be in the future to update the circuit and when Amount>0
but EffectiveAmount=0, to not add the Exit in the Exits MerkleTree, but
for the moment the Go code is adapted to the circuit and when an Exit
with Amount>0 & EffectiveAmount=0, it will be added to the Leaf of the
Exit MerkleTree.
2021-02-24 15:00:30 +01:00
Eduard S
ca53dc9b52 In http servers, first listen, then serve
- In test, doing the `net.Listen` outside of the goroutine guarantees that we
  are listening, so we avoid a possible datarace consisting of doing an http
  request before listening.
- In packages that run an http server: doing the listen first allows
    - Checking for errors when opening the address for listening before
      starting the goroutine, so that if there's an error on listen we can
      terminate grafecully
    - Logging that we are listening when we are really listening, and not
      before.
2021-02-24 12:11:12 +01:00
Eduard S
5b4cfac309 Merge pull request #563 from hermeznetwork/feature/metrics0
Add Prometheus initial setup
2021-02-23 18:53:57 +01:00
arnau
11a0707746 Merge pull request #574 from hermeznetwork/feature/apirecommendedfeeuseminfee
In API recommended fee, use minFeeUSD as min value
2021-02-23 17:58:38 +01:00
Eduard S
d16fba72a7 In API recommended fee, use minFeeUSD as min value 2021-02-23 17:20:16 +01:00
arnaucube
971b2c1d40 Add Prometheus initial setup 2021-02-23 16:27:29 +01:00
34 changed files with 704 additions and 303 deletions

View File

@@ -15,7 +15,7 @@ start PostgreSQL with docker easily this way (where `yourpasswordhere` should
be your password): be your password):
``` ```
POSTGRES_PASS=yourpasswordhere sudo docker run --rm --name hermez-db-test -p 5432:5432 -e POSTGRES_DB=hermez -e POSTGRES_USER=hermez -e POSTGRES_PASSWORD="$POSTGRES_PASS" -d postgres POSTGRES_PASS=yourpasswordhere; sudo docker run --rm --name hermez-db-test -p 5432:5432 -e POSTGRES_DB=hermez -e POSTGRES_USER=hermez -e POSTGRES_PASSWORD="$POSTGRES_PASS" -d postgres
``` ```
Afterwards, run the tests with the password as env var: Afterwards, run the tests with the password as env var:

View File

@@ -8,6 +8,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"math/big" "math/big"
"net"
"net/http" "net/http"
"os" "os"
"strconv" "strconv"
@@ -38,8 +39,8 @@ type Pendinger interface {
New() Pendinger New() Pendinger
} }
const apiPort = ":4010" const apiAddr = ":4010"
const apiURL = "http://localhost" + apiPort + "/" const apiURL = "http://localhost" + apiAddr + "/"
var SetBlockchain = ` var SetBlockchain = `
Type: Blockchain Type: Blockchain
@@ -201,7 +202,7 @@ func TestMain(m *testing.M) {
panic(err) panic(err)
} }
apiConnCon := db.NewAPICnnectionController(1, time.Second) apiConnCon := db.NewAPICnnectionController(1, time.Second)
hdb := historydb.NewHistoryDB(database, apiConnCon) hdb := historydb.NewHistoryDB(database, database, apiConnCon)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@@ -216,7 +217,7 @@ func TestMain(m *testing.M) {
} }
}() }()
// L2DB // L2DB
l2DB := l2db.NewL2DB(database, 10, 1000, 0.0, 24*time.Hour, apiConnCon) l2DB := l2db.NewL2DB(database, database, 10, 1000, 0.0, 24*time.Hour, apiConnCon)
test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB
// Config (smart contract constants) // Config (smart contract constants)
chainID := uint16(0) chainID := uint16(0)
@@ -241,9 +242,14 @@ func TestMain(m *testing.M) {
panic(err) panic(err)
} }
// Start server // Start server
server := &http.Server{Addr: apiPort, Handler: apiGin} listener, err := net.Listen("tcp", apiAddr) //nolint:gosec
if err != nil {
panic(err)
}
server := &http.Server{Handler: apiGin}
go func() { go func() {
if err := server.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed { if err := server.Serve(listener); err != nil &&
tracerr.Unwrap(err) != http.ErrServerClosed {
panic(err) panic(err)
} }
}() }()
@@ -591,10 +597,10 @@ func TestTimeout(t *testing.T) {
databaseTO, err := db.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") databaseTO, err := db.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err) require.NoError(t, err)
apiConnConTO := db.NewAPICnnectionController(1, 100*time.Millisecond) apiConnConTO := db.NewAPICnnectionController(1, 100*time.Millisecond)
hdbTO := historydb.NewHistoryDB(databaseTO, apiConnConTO) hdbTO := historydb.NewHistoryDB(databaseTO, databaseTO, apiConnConTO)
require.NoError(t, err) require.NoError(t, err)
// L2DB // L2DB
l2DBTO := l2db.NewL2DB(databaseTO, 10, 1000, 0.0, 24*time.Hour, apiConnConTO) l2DBTO := l2db.NewL2DB(databaseTO, databaseTO, 10, 1000, 1.0, 24*time.Hour, apiConnConTO)
// API // API
apiGinTO := gin.Default() apiGinTO := gin.Default()
@@ -609,9 +615,12 @@ func TestTimeout(t *testing.T) {
<-finishWait <-finishWait
}) })
// Start server // Start server
serverTO := &http.Server{Addr: ":4444", Handler: apiGinTO} serverTO := &http.Server{Handler: apiGinTO}
listener, err := net.Listen("tcp", ":4444") //nolint:gosec
require.NoError(t, err)
go func() { go func() {
if err := serverTO.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed { if err := serverTO.Serve(listener); err != nil &&
tracerr.Unwrap(err) != http.ErrServerClosed {
require.NoError(t, err) require.NoError(t, err)
} }
}() }()

View File

@@ -3,6 +3,7 @@ package api
import ( import (
"database/sql" "database/sql"
"fmt" "fmt"
"math"
"math/big" "math/big"
"net/http" "net/http"
"time" "time"
@@ -297,10 +298,17 @@ func (a *API) UpdateRecommendedFee() error {
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
var minFeeUSD float64
if a.l2 != nil {
minFeeUSD = a.l2.MinFeeUSD()
}
a.status.Lock() a.status.Lock()
a.status.RecommendedFee.ExistingAccount = feeExistingAccount a.status.RecommendedFee.ExistingAccount =
a.status.RecommendedFee.CreatesAccount = createAccountExtraFeePercentage * feeExistingAccount math.Max(feeExistingAccount, minFeeUSD)
a.status.RecommendedFee.CreatesAccountAndRegister = createAccountInternalExtraFeePercentage * feeExistingAccount a.status.RecommendedFee.CreatesAccount =
math.Max(createAccountExtraFeePercentage*feeExistingAccount, minFeeUSD)
a.status.RecommendedFee.CreatesAccountAndRegister =
math.Max(createAccountInternalExtraFeePercentage*feeExistingAccount, minFeeUSD)
a.status.Unlock() a.status.Unlock()
return nil return nil
} }

View File

@@ -131,7 +131,7 @@ func TestUpdateNetworkInfo(t *testing.T) {
func TestUpdateMetrics(t *testing.T) { func TestUpdateMetrics(t *testing.T) {
// Update Metrics needs api.status.Network.LastBatch.BatchNum to be updated // Update Metrics needs api.status.Network.LastBatch.BatchNum to be updated
lastBlock := tc.blocks[3] lastBlock := tc.blocks[3]
lastBatchNum := common.BatchNum(3) lastBatchNum := common.BatchNum(12)
currentSlotNum := int64(1) currentSlotNum := int64(1)
err := api.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum) err := api.UpdateNetworkInfo(lastBlock, lastBlock, lastBatchNum, currentSlotNum)
assert.NoError(t, err) assert.NoError(t, err)
@@ -149,7 +149,11 @@ func TestUpdateMetrics(t *testing.T) {
func TestUpdateRecommendedFee(t *testing.T) { func TestUpdateRecommendedFee(t *testing.T) {
err := api.UpdateRecommendedFee() err := api.UpdateRecommendedFee()
assert.NoError(t, err) assert.NoError(t, err)
assert.Greater(t, api.status.RecommendedFee.ExistingAccount, float64(0)) var minFeeUSD float64
if api.l2 != nil {
minFeeUSD = api.l2.MinFeeUSD()
}
assert.Greater(t, api.status.RecommendedFee.ExistingAccount, minFeeUSD)
assert.Equal(t, api.status.RecommendedFee.CreatesAccount, assert.Equal(t, api.status.RecommendedFee.CreatesAccount,
api.status.RecommendedFee.ExistingAccount*createAccountExtraFeePercentage) api.status.RecommendedFee.ExistingAccount*createAccountExtraFeePercentage)
assert.Equal(t, api.status.RecommendedFee.CreatesAccountAndRegister, assert.Equal(t, api.status.RecommendedFee.CreatesAccountAndRegister,
@@ -158,7 +162,7 @@ func TestUpdateRecommendedFee(t *testing.T) {
func TestGetState(t *testing.T) { func TestGetState(t *testing.T) {
lastBlock := tc.blocks[3] lastBlock := tc.blocks[3]
lastBatchNum := common.BatchNum(3) lastBatchNum := common.BatchNum(12)
currentSlotNum := int64(1) currentSlotNum := int64(1)
api.SetRollupVariables(tc.rollupVars) api.SetRollupVariables(tc.rollupVars)
api.SetWDelayerVariables(tc.wdelayerVars) api.SetWDelayerVariables(tc.wdelayerVars)

View File

@@ -20,11 +20,16 @@ Path = "/tmp/iden3-test/hermez/statedb"
Keep = 256 Keep = 256
[PostgreSQL] [PostgreSQL]
Port = 5432 PortWrite = 5432
Host = "localhost" HostWrite = "localhost"
User = "hermez" UserWrite = "hermez"
Password = "yourpasswordhere" PasswordWrite = "yourpasswordhere"
Name = "hermez" NameWrite = "hermez"
# PortRead = 5432
# HostRead = "localhost"
# UserRead = "hermez"
# PasswordRead = "yourpasswordhere"
# NameRead = "hermez"
[Web3] [Web3]
URL = "http://localhost:8545" URL = "http://localhost:8545"
@@ -99,6 +104,12 @@ GasPriceIncPerc = 10
Path = "/tmp/iden3-test/hermez/ethkeystore" Path = "/tmp/iden3-test/hermez/ethkeystore"
Password = "yourpasswordhere" Password = "yourpasswordhere"
[Coordinator.EthClient.ForgeBatchGasCost]
Fixed = 500000
L1UserTx = 8000
L1CoordTx = 9000
L2Tx = 1
[Coordinator.API] [Coordinator.API]
Coordinator = true Coordinator = true

View File

@@ -17,6 +17,7 @@ import (
"github.com/hermeznetwork/hermez-node/node" "github.com/hermeznetwork/hermez-node/node"
"github.com/hermeznetwork/tracerr" "github.com/hermeznetwork/tracerr"
"github.com/iden3/go-iden3-crypto/babyjub" "github.com/iden3/go-iden3-crypto/babyjub"
"github.com/jmoiron/sqlx"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
) )
@@ -90,11 +91,11 @@ func cmdWipeSQL(c *cli.Context) error {
} }
} }
db, err := dbUtils.ConnectSQLDB( db, err := dbUtils.ConnectSQLDB(
cfg.PostgreSQL.Port, cfg.PostgreSQL.PortWrite,
cfg.PostgreSQL.Host, cfg.PostgreSQL.HostWrite,
cfg.PostgreSQL.User, cfg.PostgreSQL.UserWrite,
cfg.PostgreSQL.Password, cfg.PostgreSQL.PasswordWrite,
cfg.PostgreSQL.Name, cfg.PostgreSQL.NameWrite,
) )
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
@@ -151,17 +152,36 @@ func cmdDiscard(c *cli.Context) error {
blockNum := c.Int64(flagBlock) blockNum := c.Int64(flagBlock)
log.Infof("Discarding all blocks up to block %v...", blockNum) log.Infof("Discarding all blocks up to block %v...", blockNum)
db, err := dbUtils.InitSQLDB( dbWrite, err := dbUtils.InitSQLDB(
cfg.PostgreSQL.Port, cfg.PostgreSQL.PortWrite,
cfg.PostgreSQL.Host, cfg.PostgreSQL.HostWrite,
cfg.PostgreSQL.User, cfg.PostgreSQL.UserWrite,
cfg.PostgreSQL.Password, cfg.PostgreSQL.PasswordWrite,
cfg.PostgreSQL.Name, cfg.PostgreSQL.NameWrite,
) )
if err != nil { if err != nil {
return tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err)) return tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
} }
historyDB := historydb.NewHistoryDB(db, nil) var dbRead *sqlx.DB
if cfg.PostgreSQL.HostRead == "" {
dbRead = dbWrite
} else if cfg.PostgreSQL.HostRead == cfg.PostgreSQL.HostWrite {
return tracerr.Wrap(fmt.Errorf(
"PostgreSQL.HostRead and PostgreSQL.HostWrite must be different",
))
} else {
dbRead, err = dbUtils.InitSQLDB(
cfg.PostgreSQL.PortRead,
cfg.PostgreSQL.HostRead,
cfg.PostgreSQL.UserRead,
cfg.PostgreSQL.PasswordRead,
cfg.PostgreSQL.NameRead,
)
if err != nil {
return tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
}
}
historyDB := historydb.NewHistoryDB(dbRead, dbWrite, nil)
if err := historyDB.Reorg(blockNum); err != nil { if err := historyDB.Reorg(blockNum); err != nil {
return tracerr.Wrap(fmt.Errorf("historyDB.Reorg: %w", err)) return tracerr.Wrap(fmt.Errorf("historyDB.Reorg: %w", err))
} }
@@ -170,7 +190,7 @@ func cmdDiscard(c *cli.Context) error {
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err)) return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err))
} }
l2DB := l2db.NewL2DB( l2DB := l2db.NewL2DB(
db, dbRead, dbWrite,
cfg.Coordinator.L2DB.SafetyPeriod, cfg.Coordinator.L2DB.SafetyPeriod,
cfg.Coordinator.L2DB.MaxTxs, cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD, cfg.Coordinator.L2DB.MinFeeUSD,

View File

@@ -35,6 +35,15 @@ type ServerProof struct {
URL string `validate:"required"` URL string `validate:"required"`
} }
// ForgeBatchGasCost is the costs associated to a ForgeBatch transaction, split
// into different parts to be used in a formula.
type ForgeBatchGasCost struct {
Fixed uint64 `validate:"required"`
L1UserTx uint64 `validate:"required"`
L1CoordTx uint64 `validate:"required"`
L2Tx uint64 `validate:"required"`
}
// Coordinator is the coordinator specific configuration. // Coordinator is the coordinator specific configuration.
type Coordinator struct { type Coordinator struct {
// ForgerAddress is the address under which this coordinator is forging // ForgerAddress is the address under which this coordinator is forging
@@ -180,6 +189,9 @@ type Coordinator struct {
// Password used to decrypt the keys in the keystore // Password used to decrypt the keys in the keystore
Password string `validate:"required"` Password string `validate:"required"`
} `validate:"required"` } `validate:"required"`
// ForgeBatchGasCost contains the cost of each action in the
// ForgeBatch transaction.
ForgeBatchGasCost ForgeBatchGasCost `validate:"required"`
} `validate:"required"` } `validate:"required"`
API struct { API struct {
// Coordinator enables the coordinator API endpoints // Coordinator enables the coordinator API endpoints
@@ -215,17 +227,30 @@ type Node struct {
// Keep is the number of checkpoints to keep // Keep is the number of checkpoints to keep
Keep int `validate:"required"` Keep int `validate:"required"`
} `validate:"required"` } `validate:"required"`
// It's possible to use diferentiated SQL connections for read/write.
// If the read configuration is not provided, the write one it's going to be used
// for both reads and writes
PostgreSQL struct { PostgreSQL struct {
// Port of the PostgreSQL server // Port of the PostgreSQL write server
Port int `validate:"required"` PortWrite int `validate:"required"`
// Host of the PostgreSQL server // Host of the PostgreSQL write server
Host string `validate:"required"` HostWrite string `validate:"required"`
// User of the PostgreSQL server // User of the PostgreSQL write server
User string `validate:"required"` UserWrite string `validate:"required"`
// Password of the PostgreSQL server // Password of the PostgreSQL write server
Password string `validate:"required"` PasswordWrite string `validate:"required"`
// Name of the PostgreSQL server database // Name of the PostgreSQL write server database
Name string `validate:"required"` NameWrite string `validate:"required"`
// Port of the PostgreSQL read server
PortRead int
// Host of the PostgreSQL read server
HostRead string
// User of the PostgreSQL read server
UserRead string
// Password of the PostgreSQL read server
PasswordRead string
// Name of the PostgreSQL read server database
NameRead string
} `validate:"required"` } `validate:"required"`
Web3 struct { Web3 struct {
// URL is the URL of the web3 ethereum-node RPC server // URL is the URL of the web3 ethereum-node RPC server

View File

@@ -11,6 +11,7 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common" ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/config"
"github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/eth"
@@ -116,6 +117,9 @@ type Config struct {
// VerifierIdx is the index of the verifier contract registered in the // VerifierIdx is the index of the verifier contract registered in the
// smart contract // smart contract
VerifierIdx uint8 VerifierIdx uint8
// ForgeBatchGasCost contains the cost of each action in the
// ForgeBatch transaction.
ForgeBatchGasCost config.ForgeBatchGasCost
TxProcessorConfig txprocessor.Config TxProcessorConfig txprocessor.Config
} }
@@ -383,11 +387,23 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
fromBatch.ForgerAddr = c.cfg.ForgerAddress fromBatch.ForgerAddr = c.cfg.ForgerAddress
fromBatch.StateRoot = big.NewInt(0) fromBatch.StateRoot = big.NewInt(0)
} }
// Before starting the pipeline make sure we reset any
// l2tx from the pool that was forged in a batch that
// didn't end up being mined. We are already doing
// this in handleStopPipeline, but we do it again as a
// failsafe in case the last synced batchnum is
// different than in the previous call to l2DB.Reorg,
// or in case the node was restarted when there was a
// started batch that included l2txs but was not mined.
if err := c.l2DB.Reorg(fromBatch.BatchNum); err != nil {
return tracerr.Wrap(err)
}
var err error var err error
if c.pipeline, err = c.newPipeline(ctx); err != nil { if c.pipeline, err = c.newPipeline(ctx); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
c.pipelineFromBatch = fromBatch c.pipelineFromBatch = fromBatch
// Start the pipeline
if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil { if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil {
c.pipeline = nil c.pipeline = nil
return tracerr.Wrap(err) return tracerr.Wrap(err)
@@ -508,7 +524,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
waitCh := time.After(longWaitDuration) timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
@@ -520,24 +536,27 @@ func (c *Coordinator) Start() {
continue continue
} else if err != nil { } else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err) log.Errorw("Coordinator.handleMsg", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval) if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue continue
} }
waitCh = time.After(longWaitDuration) case <-timer.C:
case <-waitCh: timer.Reset(longWaitDuration)
if !c.stats.Synced() { if !c.stats.Synced() {
waitCh = time.After(longWaitDuration)
continue continue
} }
if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil { if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil {
waitCh = time.After(longWaitDuration)
continue continue
} else if err != nil { } else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err) log.Errorw("Coordinator.syncStats", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval) if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue continue
} }
waitCh = time.After(longWaitDuration)
} }
} }
}() }()

View File

@@ -105,8 +105,8 @@ func newTestModules(t *testing.T) modules {
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err) require.NoError(t, err)
test.WipeDB(db) test.WipeDB(db)
l2DB := l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil) l2DB := l2db.NewL2DB(db, db, 10, 100, 0.0, 24*time.Hour, nil)
historyDB := historydb.NewHistoryDB(db, nil) historyDB := historydb.NewHistoryDB(db, db, nil)
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB") txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")
require.NoError(t, err) require.NoError(t, err)

View File

@@ -198,12 +198,33 @@ func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
updateSCVars(&p.vars, vars) updateSCVars(&p.vars, vars)
} }
// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs, // handleForgeBatch waits for an available proof server, calls p.forgeBatch to
// and then waits for an available proof server and sends the zkInputs to it so // forge the batch and get the zkInputs, and then sends the zkInputs to the
// that the proof computation begins. // selected proof server so that the proof computation begins.
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { func (p *Pipeline) handleForgeBatch(ctx context.Context,
batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
// 1. Wait for an available serverProof (blocking call)
serverProof, err := p.proversPool.Get(ctx)
if ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {
log.Errorw("proversPool.Get", "err", err)
return nil, err
}
defer func() {
// If we encounter any error (notice that this function returns
// errors to notify that a batch is not forged not only because
// of unexpected errors but also due to benign causes), add the
// serverProof back to the pool
if err != nil {
p.proversPool.Add(ctx, serverProof)
}
}()
// 2. Forge the batch internally (make a selection of txs and prepare
// all the smart contract arguments)
p.mutexL2DBUpdateDelete.Lock() p.mutexL2DBUpdateDelete.Lock()
batchInfo, err := p.forgeBatch(batchNum) batchInfo, err = p.forgeBatch(batchNum)
p.mutexL2DBUpdateDelete.Unlock() p.mutexL2DBUpdateDelete.Unlock()
if ctx.Err() != nil { if ctx.Err() != nil {
return nil, ctx.Err() return nil, ctx.Err()
@@ -220,21 +241,13 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNu
} }
return nil, err return nil, err
} }
// 6. Wait for an available server proof (blocking call)
serverProof, err := p.proversPool.Get(ctx) // 3. Send the ZKInputs to the proof server
if ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {
log.Errorw("proversPool.Get", "err", err)
return nil, err
}
batchInfo.ServerProof = serverProof batchInfo.ServerProof = serverProof
if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
return nil, ctx.Err() return nil, ctx.Err()
} else if err != nil { } else if err != nil {
log.Errorw("sendServerProof", "err", err) log.Errorw("sendServerProof", "err", err)
batchInfo.ServerProof = nil
p.proversPool.Add(ctx, serverProof)
return nil, err return nil, err
} }
return batchInfo, nil return batchInfo, nil
@@ -258,7 +271,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.wg.Add(1) p.wg.Add(1)
go func() { go func() {
waitCh := time.After(zeroDuration) timer := time.NewTimer(zeroDuration)
for { for {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
@@ -268,23 +281,21 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case statsVars := <-p.statsVarsCh: case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats p.stats = statsVars.Stats
p.syncSCVars(statsVars.Vars) p.syncSCVars(statsVars.Vars)
case <-waitCh: case <-timer.C:
timer.Reset(p.cfg.ForgeRetryInterval)
// Once errAtBatchNum != 0, we stop forging // Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we // batches because there's been an error and we
// wait for the pipeline to be stopped. // wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 { if p.getErrAtBatchNum() != 0 {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} }
batchNum = p.state.batchNum + 1 batchNum = p.state.batchNum + 1
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
if p.ctx.Err() != nil { if p.ctx.Err() != nil {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced ||
tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay { tracerr.Unwrap(err) == errForgeBeforeDelay {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} else if err != nil { } else if err != nil {
p.setErrAtBatchNum(batchNum) p.setErrAtBatchNum(batchNum)
@@ -293,7 +304,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
"Pipeline.handleForgBatch: %v", err), "Pipeline.handleForgBatch: %v", err),
FailedBatchNum: batchNum, FailedBatchNum: batchNum,
}) })
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} }
p.lastForgeTime = time.Now() p.lastForgeTime = time.Now()
@@ -303,7 +313,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case batchChSentServerProof <- batchInfo: case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done(): case <-p.ctx.Done():
} }
waitCh = time.After(zeroDuration) if !timer.Stop() {
<-timer.C
}
timer.Reset(zeroDuration)
} }
} }
}() }()
@@ -338,7 +351,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
} }
// We are done with this serverProof, add it back to the pool // We are done with this serverProof, add it back to the pool
p.proversPool.Add(p.ctx, batchInfo.ServerProof) p.proversPool.Add(p.ctx, batchInfo.ServerProof)
// batchInfo.ServerProof = nil
p.txManager.AddBatch(p.ctx, batchInfo) p.txManager.AddBatch(p.ctx, batchInfo)
} }
} }

View File

@@ -21,7 +21,7 @@ func newL2DB(t *testing.T) *l2db.L2DB {
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err) require.NoError(t, err)
test.WipeDB(db) test.WipeDB(db)
return l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil) return l2db.NewL2DB(db, db, 10, 100, 0.0, 24*time.Hour, nil)
} }
func newStateDB(t *testing.T) *statedb.LocalStateDB { func newStateDB(t *testing.T) *statedb.LocalStateDB {

View File

@@ -123,7 +123,7 @@ func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
} }
// NewAuth generates a new auth object for an ethereum transaction // NewAuth generates a new auth object for an ethereum transaction
func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { func (t *TxManager) NewAuth(ctx context.Context, batchInfo *BatchInfo) (*bind.TransactOpts, error) {
gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx) gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
@@ -143,15 +143,12 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
auth.Value = big.NewInt(0) // in wei auth.Value = big.NewInt(0) // in wei
// TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
// This requires a function that estimates the gas usage of the gasLimit := t.cfg.ForgeBatchGasCost.Fixed +
// forgeBatch call based on the contents of the ForgeBatch args: uint64(len(batchInfo.L1UserTxsExtra))*t.cfg.ForgeBatchGasCost.L1UserTx +
// - length of l2txs uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx +
// - length of l1Usertxs uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx
// - length of l1CoordTxs with authorization signature auth.GasLimit = gasLimit
// - length of l1CoordTxs without authoriation signature
// - etc.
auth.GasLimit = 1000000
auth.GasPrice = gasPrice auth.GasPrice = gasPrice
auth.Nonce = nil auth.Nonce = nil
@@ -191,7 +188,7 @@ func addPerc(v *big.Int, p int64) *big.Int {
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error { func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error {
var ethTx *types.Transaction var ethTx *types.Transaction
var err error var err error
auth, err := t.NewAuth(ctx) auth, err := t.NewAuth(ctx, batchInfo)
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -419,8 +416,6 @@ func (q *Queue) Push(batchInfo *BatchInfo) {
// Run the TxManager // Run the TxManager
func (t *TxManager) Run(ctx context.Context) { func (t *TxManager) Run(ctx context.Context) {
waitCh := time.After(longWaitDuration)
var statsVars statsVars var statsVars statsVars
select { select {
case statsVars = <-t.statsVarsCh: case statsVars = <-t.statsVarsCh:
@@ -431,6 +426,7 @@ func (t *TxManager) Run(ctx context.Context) {
log.Infow("TxManager: received initial statsVars", log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -474,13 +470,17 @@ func (t *TxManager) Run(ctx context.Context) {
continue continue
} }
t.queue.Push(batchInfo) t.queue.Push(batchInfo)
waitCh = time.After(t.cfg.TxManagerCheckInterval) if !timer.Stop() {
case <-waitCh: <-timer.C
}
timer.Reset(t.cfg.TxManagerCheckInterval)
case <-timer.C:
queuePosition, batchInfo := t.queue.Next() queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil { if batchInfo == nil {
waitCh = time.After(longWaitDuration) timer.Reset(longWaitDuration)
continue continue
} }
timer.Reset(t.cfg.TxManagerCheckInterval)
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue continue
} else if err != nil { //nolint:staticcheck } else if err != nil { //nolint:staticcheck

View File

@@ -34,7 +34,7 @@ func (hdb *HistoryDB) GetBatchAPI(batchNum common.BatchNum) (*BatchAPI, error) {
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
batch := &BatchAPI{} batch := &BatchAPI{}
return batch, tracerr.Wrap(meddler.QueryRow( return batch, tracerr.Wrap(meddler.QueryRow(
hdb.db, batch, hdb.dbRead, batch,
`SELECT batch.item_id, batch.batch_num, batch.eth_block_num, `SELECT batch.item_id, batch.batch_num, batch.eth_block_num,
batch.forger_addr, batch.fees_collected, batch.total_fees_usd, batch.state_root, batch.forger_addr, batch.fees_collected, batch.total_fees_usd, batch.state_root,
batch.num_accounts, batch.exit_root, batch.forge_l1_txs_num, batch.slot_num, batch.num_accounts, batch.exit_root, batch.forge_l1_txs_num, batch.slot_num,
@@ -133,10 +133,10 @@ func (hdb *HistoryDB) GetBatchesAPI(
queryStr += " DESC " queryStr += " DESC "
} }
queryStr += fmt.Sprintf("LIMIT %d;", *limit) queryStr += fmt.Sprintf("LIMIT %d;", *limit)
query = hdb.db.Rebind(queryStr) query = hdb.dbRead.Rebind(queryStr)
// log.Debug(query) // log.Debug(query)
batchPtrs := []*BatchAPI{} batchPtrs := []*BatchAPI{}
if err := meddler.QueryAll(hdb.db, &batchPtrs, query, args...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &batchPtrs, query, args...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
batches := db.SlicePtrsToSlice(batchPtrs).([]BatchAPI) batches := db.SlicePtrsToSlice(batchPtrs).([]BatchAPI)
@@ -156,7 +156,7 @@ func (hdb *HistoryDB) GetBestBidAPI(slotNum *int64) (BidAPI, error) {
} }
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, bid, `SELECT bid.*, block.timestamp, coordinator.forger_addr, coordinator.url hdb.dbRead, bid, `SELECT bid.*, block.timestamp, coordinator.forger_addr, coordinator.url
FROM bid INNER JOIN block ON bid.eth_block_num = block.eth_block_num FROM bid INNER JOIN block ON bid.eth_block_num = block.eth_block_num
INNER JOIN ( INNER JOIN (
SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator
@@ -212,9 +212,9 @@ func (hdb *HistoryDB) GetBestBidsAPI(
if limit != nil { if limit != nil {
queryStr += fmt.Sprintf("LIMIT %d;", *limit) queryStr += fmt.Sprintf("LIMIT %d;", *limit)
} }
query = hdb.db.Rebind(queryStr) query = hdb.dbRead.Rebind(queryStr)
bidPtrs := []*BidAPI{} bidPtrs := []*BidAPI{}
if err := meddler.QueryAll(hdb.db, &bidPtrs, query, args...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &bidPtrs, query, args...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
// log.Debug(query) // log.Debug(query)
@@ -296,9 +296,9 @@ func (hdb *HistoryDB) GetBidsAPI(
if err != nil { if err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
query = hdb.db.Rebind(query) query = hdb.dbRead.Rebind(query)
bids := []*BidAPI{} bids := []*BidAPI{}
if err := meddler.QueryAll(hdb.db, &bids, query, argsQ...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &bids, query, argsQ...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
if len(bids) == 0 { if len(bids) == 0 {
@@ -384,9 +384,9 @@ func (hdb *HistoryDB) GetTokensAPI(
if err != nil { if err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
query = hdb.db.Rebind(query) query = hdb.dbRead.Rebind(query)
tokens := []*TokenWithUSD{} tokens := []*TokenWithUSD{}
if err := meddler.QueryAll(hdb.db, &tokens, query, argsQ...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &tokens, query, argsQ...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
if len(tokens) == 0 { if len(tokens) == 0 {
@@ -408,7 +408,7 @@ func (hdb *HistoryDB) GetTxAPI(txID common.TxID) (*TxAPI, error) {
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
tx := &TxAPI{} tx := &TxAPI{}
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, tx, `SELECT tx.item_id, tx.is_l1, tx.id, tx.type, tx.position, hdb.dbRead, tx, `SELECT tx.item_id, tx.is_l1, tx.id, tx.type, tx.position,
hez_idx(tx.effective_from_idx, token.symbol) AS from_idx, tx.from_eth_addr, tx.from_bjj, hez_idx(tx.effective_from_idx, token.symbol) AS from_idx, tx.from_eth_addr, tx.from_bjj,
hez_idx(tx.to_idx, token.symbol) AS to_idx, tx.to_eth_addr, tx.to_bjj, hez_idx(tx.to_idx, token.symbol) AS to_idx, tx.to_eth_addr, tx.to_bjj,
tx.amount, tx.amount_success, tx.token_id, tx.amount_usd, tx.amount, tx.amount_success, tx.token_id, tx.amount_usd,
@@ -541,10 +541,10 @@ func (hdb *HistoryDB) GetTxsAPI(
queryStr += " DESC " queryStr += " DESC "
} }
queryStr += fmt.Sprintf("LIMIT %d;", *limit) queryStr += fmt.Sprintf("LIMIT %d;", *limit)
query = hdb.db.Rebind(queryStr) query = hdb.dbRead.Rebind(queryStr)
// log.Debug(query) // log.Debug(query)
txsPtrs := []*TxAPI{} txsPtrs := []*TxAPI{}
if err := meddler.QueryAll(hdb.db, &txsPtrs, query, args...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &txsPtrs, query, args...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
txs := db.SlicePtrsToSlice(txsPtrs).([]TxAPI) txs := db.SlicePtrsToSlice(txsPtrs).([]TxAPI)
@@ -564,7 +564,7 @@ func (hdb *HistoryDB) GetExitAPI(batchNum *uint, idx *common.Idx) (*ExitAPI, err
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
exit := &ExitAPI{} exit := &ExitAPI{}
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, exit, `SELECT exit_tree.item_id, exit_tree.batch_num, hdb.dbRead, exit, `SELECT exit_tree.item_id, exit_tree.batch_num,
hez_idx(exit_tree.account_idx, token.symbol) AS account_idx, hez_idx(exit_tree.account_idx, token.symbol) AS account_idx,
account.bjj, account.eth_addr, account.bjj, account.eth_addr,
exit_tree.merkle_proof, exit_tree.balance, exit_tree.instant_withdrawn, exit_tree.merkle_proof, exit_tree.balance, exit_tree.instant_withdrawn,
@@ -685,10 +685,10 @@ func (hdb *HistoryDB) GetExitsAPI(
queryStr += " DESC " queryStr += " DESC "
} }
queryStr += fmt.Sprintf("LIMIT %d;", *limit) queryStr += fmt.Sprintf("LIMIT %d;", *limit)
query = hdb.db.Rebind(queryStr) query = hdb.dbRead.Rebind(queryStr)
// log.Debug(query) // log.Debug(query)
exits := []*ExitAPI{} exits := []*ExitAPI{}
if err := meddler.QueryAll(hdb.db, &exits, query, args...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &exits, query, args...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
if len(exits) == 0 { if len(exits) == 0 {
@@ -707,7 +707,7 @@ func (hdb *HistoryDB) GetBucketUpdatesAPI() ([]BucketUpdateAPI, error) {
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
var bucketUpdates []*BucketUpdateAPI var bucketUpdates []*BucketUpdateAPI
err = meddler.QueryAll( err = meddler.QueryAll(
hdb.db, &bucketUpdates, hdb.dbRead, &bucketUpdates,
`SELECT num_bucket, withdrawals FROM bucket_update `SELECT num_bucket, withdrawals FROM bucket_update
WHERE item_id in(SELECT max(item_id) FROM bucket_update WHERE item_id in(SELECT max(item_id) FROM bucket_update
group by num_bucket) group by num_bucket)
@@ -772,10 +772,10 @@ func (hdb *HistoryDB) GetCoordinatorsAPI(
queryStr += " DESC " queryStr += " DESC "
} }
queryStr += fmt.Sprintf("LIMIT %d;", *limit) queryStr += fmt.Sprintf("LIMIT %d;", *limit)
query = hdb.db.Rebind(queryStr) query = hdb.dbRead.Rebind(queryStr)
coordinators := []*CoordinatorAPI{} coordinators := []*CoordinatorAPI{}
if err := meddler.QueryAll(hdb.db, &coordinators, query, args...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &coordinators, query, args...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
if len(coordinators) == 0 { if len(coordinators) == 0 {
@@ -795,7 +795,7 @@ func (hdb *HistoryDB) GetAuctionVarsAPI() (*common.AuctionVariables, error) {
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
auctionVars := &common.AuctionVariables{} auctionVars := &common.AuctionVariables{}
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, auctionVars, `SELECT * FROM auction_vars;`, hdb.dbRead, auctionVars, `SELECT * FROM auction_vars;`,
) )
return auctionVars, tracerr.Wrap(err) return auctionVars, tracerr.Wrap(err)
} }
@@ -816,7 +816,7 @@ func (hdb *HistoryDB) GetAuctionVarsUntilSetSlotNumAPI(slotNum int64, maxItems i
ORDER BY default_slot_set_bid_slot_num DESC ORDER BY default_slot_set_bid_slot_num DESC
LIMIT $2; LIMIT $2;
` `
err = meddler.QueryAll(hdb.db, &auctionVars, query, slotNum, maxItems) err = meddler.QueryAll(hdb.dbRead, &auctionVars, query, slotNum, maxItems)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
@@ -832,7 +832,7 @@ func (hdb *HistoryDB) GetAccountAPI(idx common.Idx) (*AccountAPI, error) {
} }
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
account := &AccountAPI{} account := &AccountAPI{}
err = meddler.QueryRow(hdb.db, account, `SELECT account.item_id, hez_idx(account.idx, err = meddler.QueryRow(hdb.dbRead, account, `SELECT account.item_id, hez_idx(account.idx,
token.symbol) as idx, account.batch_num, account.bjj, account.eth_addr, token.symbol) as idx, account.batch_num, account.bjj, account.eth_addr,
token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block,
token.eth_addr as token_eth_addr, token.name, token.symbol, token.decimals, token.usd, token.eth_addr as token_eth_addr, token.name, token.symbol, token.decimals, token.usd,
@@ -927,10 +927,10 @@ func (hdb *HistoryDB) GetAccountsAPI(
if err != nil { if err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
query = hdb.db.Rebind(query) query = hdb.dbRead.Rebind(query)
accounts := []*AccountAPI{} accounts := []*AccountAPI{}
if err := meddler.QueryAll(hdb.db, &accounts, query, argsQ...); err != nil { if err := meddler.QueryAll(hdb.dbRead, &accounts, query, argsQ...); err != nil {
return nil, 0, tracerr.Wrap(err) return nil, 0, tracerr.Wrap(err)
} }
if len(accounts) == 0 { if len(accounts) == 0 {
@@ -952,11 +952,19 @@ func (hdb *HistoryDB) GetMetricsAPI(lastBatchNum common.BatchNum) (*Metrics, err
metricsTotals := &MetricsTotals{} metricsTotals := &MetricsTotals{}
metrics := &Metrics{} metrics := &Metrics{}
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, metricsTotals, `SELECT COUNT(tx.*) as total_txs, hdb.dbRead, metricsTotals, `SELECT
COALESCE (MIN(tx.batch_num), 0) as batch_num, COALESCE (MIN(block.timestamp), COALESCE (MIN(batch.batch_num), 0) as batch_num,
NOW()) AS min_timestamp, COALESCE (MAX(block.timestamp), NOW()) AS max_timestamp COALESCE (MIN(block.timestamp), NOW()) AS min_timestamp,
FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num COALESCE (MAX(block.timestamp), NOW()) AS max_timestamp
WHERE block.timestamp >= NOW() - INTERVAL '24 HOURS';`) FROM batch INNER JOIN block ON batch.eth_block_num = block.eth_block_num
WHERE block.timestamp >= NOW() - INTERVAL '24 HOURS' and batch.batch_num <= $1;`, lastBatchNum)
if err != nil {
return nil, tracerr.Wrap(err)
}
err = meddler.QueryRow(
hdb.dbRead, metricsTotals, `SELECT COUNT(*) as total_txs
FROM tx WHERE tx.batch_num between $1 AND $2;`, metricsTotals.FirstBatchNum, lastBatchNum)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
@@ -977,12 +985,13 @@ func (hdb *HistoryDB) GetMetricsAPI(lastBatchNum common.BatchNum) (*Metrics, err
} }
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, metricsTotals, `SELECT COUNT(*) AS total_batches, hdb.dbRead, metricsTotals, `SELECT COUNT(*) AS total_batches,
COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch
WHERE batch_num > $1;`, metricsTotals.FirstBatchNum) WHERE batch_num between $1 and $2;`, metricsTotals.FirstBatchNum, lastBatchNum)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
if metricsTotals.TotalBatches > 0 { if metricsTotals.TotalBatches > 0 {
metrics.BatchFrequency = seconds / float64(metricsTotals.TotalBatches) metrics.BatchFrequency = seconds / float64(metricsTotals.TotalBatches)
} else { } else {
@@ -994,7 +1003,7 @@ func (hdb *HistoryDB) GetMetricsAPI(lastBatchNum common.BatchNum) (*Metrics, err
metrics.AvgTransactionFee = 0 metrics.AvgTransactionFee = 0
} }
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, metrics, hdb.dbRead, metrics,
`SELECT COUNT(*) AS total_bjjs, COUNT(DISTINCT(bjj)) AS total_accounts FROM account;`) `SELECT COUNT(*) AS total_bjjs, COUNT(DISTINCT(bjj)) AS total_accounts FROM account;`)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
@@ -1013,7 +1022,7 @@ func (hdb *HistoryDB) GetAvgTxFeeAPI() (float64, error) {
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
metricsTotals := &MetricsTotals{} metricsTotals := &MetricsTotals{}
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, metricsTotals, `SELECT COUNT(tx.*) as total_txs, hdb.dbRead, metricsTotals, `SELECT COUNT(tx.*) as total_txs,
COALESCE (MIN(tx.batch_num), 0) as batch_num COALESCE (MIN(tx.batch_num), 0) as batch_num
FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num
WHERE block.timestamp >= NOW() - INTERVAL '1 HOURS';`) WHERE block.timestamp >= NOW() - INTERVAL '1 HOURS';`)
@@ -1021,7 +1030,7 @@ func (hdb *HistoryDB) GetAvgTxFeeAPI() (float64, error) {
return 0, tracerr.Wrap(err) return 0, tracerr.Wrap(err)
} }
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, metricsTotals, `SELECT COUNT(*) AS total_batches, hdb.dbRead, metricsTotals, `SELECT COUNT(*) AS total_batches,
COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch
WHERE batch_num > $1;`, metricsTotals.FirstBatchNum) WHERE batch_num > $1;`, metricsTotals.FirstBatchNum)
if err != nil { if err != nil {
@@ -1048,7 +1057,7 @@ func (hdb *HistoryDB) GetCommonAccountAPI(idx common.Idx) (*common.Account, erro
defer hdb.apiConnCon.Release() defer hdb.apiConnCon.Release()
account := &common.Account{} account := &common.Account{}
err = meddler.QueryRow( err = meddler.QueryRow(
hdb.db, account, `SELECT * FROM account WHERE idx = $1;`, idx, hdb.dbRead, account, `SELECT * FROM account WHERE idx = $1;`, idx,
) )
return account, tracerr.Wrap(err) return account, tracerr.Wrap(err)
} }

View File

@@ -27,30 +27,35 @@ const (
// HistoryDB persist the historic of the rollup // HistoryDB persist the historic of the rollup
type HistoryDB struct { type HistoryDB struct {
db *sqlx.DB dbRead *sqlx.DB
dbWrite *sqlx.DB
apiConnCon *db.APIConnectionController apiConnCon *db.APIConnectionController
} }
// NewHistoryDB initialize the DB // NewHistoryDB initialize the DB
func NewHistoryDB(db *sqlx.DB, apiConnCon *db.APIConnectionController) *HistoryDB { func NewHistoryDB(dbRead, dbWrite *sqlx.DB, apiConnCon *db.APIConnectionController) *HistoryDB {
return &HistoryDB{db: db, apiConnCon: apiConnCon} return &HistoryDB{
dbRead: dbRead,
dbWrite: dbWrite,
apiConnCon: apiConnCon,
}
} }
// DB returns a pointer to the L2DB.db. This method should be used only for // DB returns a pointer to the L2DB.db. This method should be used only for
// internal testing purposes. // internal testing purposes.
func (hdb *HistoryDB) DB() *sqlx.DB { func (hdb *HistoryDB) DB() *sqlx.DB {
return hdb.db return hdb.dbWrite
} }
// AddBlock insert a block into the DB // AddBlock insert a block into the DB
func (hdb *HistoryDB) AddBlock(block *common.Block) error { return hdb.addBlock(hdb.db, block) } func (hdb *HistoryDB) AddBlock(block *common.Block) error { return hdb.addBlock(hdb.dbWrite, block) }
func (hdb *HistoryDB) addBlock(d meddler.DB, block *common.Block) error { func (hdb *HistoryDB) addBlock(d meddler.DB, block *common.Block) error {
return tracerr.Wrap(meddler.Insert(d, "block", block)) return tracerr.Wrap(meddler.Insert(d, "block", block))
} }
// AddBlocks inserts blocks into the DB // AddBlocks inserts blocks into the DB
func (hdb *HistoryDB) AddBlocks(blocks []common.Block) error { func (hdb *HistoryDB) AddBlocks(blocks []common.Block) error {
return tracerr.Wrap(hdb.addBlocks(hdb.db, blocks)) return tracerr.Wrap(hdb.addBlocks(hdb.dbWrite, blocks))
} }
func (hdb *HistoryDB) addBlocks(d meddler.DB, blocks []common.Block) error { func (hdb *HistoryDB) addBlocks(d meddler.DB, blocks []common.Block) error {
@@ -69,7 +74,7 @@ func (hdb *HistoryDB) addBlocks(d meddler.DB, blocks []common.Block) error {
func (hdb *HistoryDB) GetBlock(blockNum int64) (*common.Block, error) { func (hdb *HistoryDB) GetBlock(blockNum int64) (*common.Block, error) {
block := &common.Block{} block := &common.Block{}
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, block, hdb.dbRead, block,
"SELECT * FROM block WHERE eth_block_num = $1;", blockNum, "SELECT * FROM block WHERE eth_block_num = $1;", blockNum,
) )
return block, tracerr.Wrap(err) return block, tracerr.Wrap(err)
@@ -79,7 +84,7 @@ func (hdb *HistoryDB) GetBlock(blockNum int64) (*common.Block, error) {
func (hdb *HistoryDB) GetAllBlocks() ([]common.Block, error) { func (hdb *HistoryDB) GetAllBlocks() ([]common.Block, error) {
var blocks []*common.Block var blocks []*common.Block
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &blocks, hdb.dbRead, &blocks,
"SELECT * FROM block ORDER BY eth_block_num;", "SELECT * FROM block ORDER BY eth_block_num;",
) )
return db.SlicePtrsToSlice(blocks).([]common.Block), tracerr.Wrap(err) return db.SlicePtrsToSlice(blocks).([]common.Block), tracerr.Wrap(err)
@@ -89,7 +94,7 @@ func (hdb *HistoryDB) GetAllBlocks() ([]common.Block, error) {
func (hdb *HistoryDB) getBlocks(from, to int64) ([]common.Block, error) { func (hdb *HistoryDB) getBlocks(from, to int64) ([]common.Block, error) {
var blocks []*common.Block var blocks []*common.Block
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &blocks, hdb.dbRead, &blocks,
"SELECT * FROM block WHERE $1 <= eth_block_num AND eth_block_num < $2 ORDER BY eth_block_num;", "SELECT * FROM block WHERE $1 <= eth_block_num AND eth_block_num < $2 ORDER BY eth_block_num;",
from, to, from, to,
) )
@@ -100,13 +105,13 @@ func (hdb *HistoryDB) getBlocks(from, to int64) ([]common.Block, error) {
func (hdb *HistoryDB) GetLastBlock() (*common.Block, error) { func (hdb *HistoryDB) GetLastBlock() (*common.Block, error) {
block := &common.Block{} block := &common.Block{}
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, block, "SELECT * FROM block ORDER BY eth_block_num DESC LIMIT 1;", hdb.dbRead, block, "SELECT * FROM block ORDER BY eth_block_num DESC LIMIT 1;",
) )
return block, tracerr.Wrap(err) return block, tracerr.Wrap(err)
} }
// AddBatch insert a Batch into the DB // AddBatch insert a Batch into the DB
func (hdb *HistoryDB) AddBatch(batch *common.Batch) error { return hdb.addBatch(hdb.db, batch) } func (hdb *HistoryDB) AddBatch(batch *common.Batch) error { return hdb.addBatch(hdb.dbWrite, batch) }
func (hdb *HistoryDB) addBatch(d meddler.DB, batch *common.Batch) error { func (hdb *HistoryDB) addBatch(d meddler.DB, batch *common.Batch) error {
// Calculate total collected fees in USD // Calculate total collected fees in USD
// Get IDs of collected tokens for fees // Get IDs of collected tokens for fees
@@ -129,9 +134,9 @@ func (hdb *HistoryDB) addBatch(d meddler.DB, batch *common.Batch) error {
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
query = hdb.db.Rebind(query) query = hdb.dbWrite.Rebind(query)
if err := meddler.QueryAll( if err := meddler.QueryAll(
hdb.db, &tokenPrices, query, args..., hdb.dbWrite, &tokenPrices, query, args...,
); err != nil { ); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -153,7 +158,7 @@ func (hdb *HistoryDB) addBatch(d meddler.DB, batch *common.Batch) error {
// AddBatches insert Bids into the DB // AddBatches insert Bids into the DB
func (hdb *HistoryDB) AddBatches(batches []common.Batch) error { func (hdb *HistoryDB) AddBatches(batches []common.Batch) error {
return tracerr.Wrap(hdb.addBatches(hdb.db, batches)) return tracerr.Wrap(hdb.addBatches(hdb.dbWrite, batches))
} }
func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error { func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error {
for i := 0; i < len(batches); i++ { for i := 0; i < len(batches); i++ {
@@ -168,7 +173,7 @@ func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error {
func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error) { func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error) {
var batch common.Batch var batch common.Batch
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, hdb.dbRead, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr,
batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root,
batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num,
batch.slot_num, batch.total_fees_usd FROM batch WHERE batch_num = $1;`, batch.slot_num, batch.total_fees_usd FROM batch WHERE batch_num = $1;`,
@@ -181,7 +186,7 @@ func (hdb *HistoryDB) GetBatch(batchNum common.BatchNum) (*common.Batch, error)
func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) { func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) {
var batches []*common.Batch var batches []*common.Batch
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &batches, hdb.dbRead, &batches,
`SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, batch.fees_collected, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, batch.fees_collected,
batch.fee_idxs_coordinator, batch.state_root, batch.num_accounts, batch.last_idx, batch.exit_root, batch.fee_idxs_coordinator, batch.state_root, batch.num_accounts, batch.last_idx, batch.exit_root,
batch.forge_l1_txs_num, batch.slot_num, batch.total_fees_usd FROM batch batch.forge_l1_txs_num, batch.slot_num, batch.total_fees_usd FROM batch
@@ -194,7 +199,7 @@ func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) {
func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]common.Batch, error) { func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]common.Batch, error) {
var batches []*common.Batch var batches []*common.Batch
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &batches, hdb.dbRead, &batches,
`SELECT batch_num, eth_block_num, forger_addr, fees_collected, fee_idxs_coordinator, `SELECT batch_num, eth_block_num, forger_addr, fees_collected, fee_idxs_coordinator,
state_root, num_accounts, last_idx, exit_root, forge_l1_txs_num, slot_num, total_fees_usd state_root, num_accounts, last_idx, exit_root, forge_l1_txs_num, slot_num, total_fees_usd
FROM batch WHERE $1 <= batch_num AND batch_num < $2 ORDER BY batch_num;`, FROM batch WHERE $1 <= batch_num AND batch_num < $2 ORDER BY batch_num;`,
@@ -206,7 +211,7 @@ func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]common.Batch, erro
// GetFirstBatchBlockNumBySlot returns the ethereum block number of the first // GetFirstBatchBlockNumBySlot returns the ethereum block number of the first
// batch within a slot // batch within a slot
func (hdb *HistoryDB) GetFirstBatchBlockNumBySlot(slotNum int64) (int64, error) { func (hdb *HistoryDB) GetFirstBatchBlockNumBySlot(slotNum int64) (int64, error) {
row := hdb.db.QueryRow( row := hdb.dbRead.QueryRow(
`SELECT eth_block_num FROM batch `SELECT eth_block_num FROM batch
WHERE slot_num = $1 ORDER BY batch_num ASC LIMIT 1;`, slotNum, WHERE slot_num = $1 ORDER BY batch_num ASC LIMIT 1;`, slotNum,
) )
@@ -216,7 +221,7 @@ func (hdb *HistoryDB) GetFirstBatchBlockNumBySlot(slotNum int64) (int64, error)
// GetLastBatchNum returns the BatchNum of the latest forged batch // GetLastBatchNum returns the BatchNum of the latest forged batch
func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) { func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) {
row := hdb.db.QueryRow("SELECT batch_num FROM batch ORDER BY batch_num DESC LIMIT 1;") row := hdb.dbRead.QueryRow("SELECT batch_num FROM batch ORDER BY batch_num DESC LIMIT 1;")
var batchNum common.BatchNum var batchNum common.BatchNum
return batchNum, tracerr.Wrap(row.Scan(&batchNum)) return batchNum, tracerr.Wrap(row.Scan(&batchNum))
} }
@@ -225,7 +230,7 @@ func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) {
func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) { func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) {
var batch common.Batch var batch common.Batch
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr, hdb.dbRead, &batch, `SELECT batch.batch_num, batch.eth_block_num, batch.forger_addr,
batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root, batch.fees_collected, batch.fee_idxs_coordinator, batch.state_root,
batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num, batch.num_accounts, batch.last_idx, batch.exit_root, batch.forge_l1_txs_num,
batch.slot_num, batch.total_fees_usd FROM batch ORDER BY batch_num DESC LIMIT 1;`, batch.slot_num, batch.total_fees_usd FROM batch ORDER BY batch_num DESC LIMIT 1;`,
@@ -235,7 +240,7 @@ func (hdb *HistoryDB) GetLastBatch() (*common.Batch, error) {
// GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch // GetLastL1BatchBlockNum returns the blockNum of the latest forged l1Batch
func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) { func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) {
row := hdb.db.QueryRow(`SELECT eth_block_num FROM batch row := hdb.dbRead.QueryRow(`SELECT eth_block_num FROM batch
WHERE forge_l1_txs_num IS NOT NULL WHERE forge_l1_txs_num IS NOT NULL
ORDER BY batch_num DESC LIMIT 1;`) ORDER BY batch_num DESC LIMIT 1;`)
var blockNum int64 var blockNum int64
@@ -245,7 +250,7 @@ func (hdb *HistoryDB) GetLastL1BatchBlockNum() (int64, error) {
// GetLastL1TxsNum returns the greatest ForgeL1TxsNum in the DB from forged // GetLastL1TxsNum returns the greatest ForgeL1TxsNum in the DB from forged
// batches. If there's no batch in the DB (nil, nil) is returned. // batches. If there's no batch in the DB (nil, nil) is returned.
func (hdb *HistoryDB) GetLastL1TxsNum() (*int64, error) { func (hdb *HistoryDB) GetLastL1TxsNum() (*int64, error) {
row := hdb.db.QueryRow("SELECT MAX(forge_l1_txs_num) FROM batch;") row := hdb.dbRead.QueryRow("SELECT MAX(forge_l1_txs_num) FROM batch;")
lastL1TxsNum := new(int64) lastL1TxsNum := new(int64)
return lastL1TxsNum, tracerr.Wrap(row.Scan(&lastL1TxsNum)) return lastL1TxsNum, tracerr.Wrap(row.Scan(&lastL1TxsNum))
} }
@@ -256,15 +261,15 @@ func (hdb *HistoryDB) GetLastL1TxsNum() (*int64, error) {
func (hdb *HistoryDB) Reorg(lastValidBlock int64) error { func (hdb *HistoryDB) Reorg(lastValidBlock int64) error {
var err error var err error
if lastValidBlock < 0 { if lastValidBlock < 0 {
_, err = hdb.db.Exec("DELETE FROM block;") _, err = hdb.dbWrite.Exec("DELETE FROM block;")
} else { } else {
_, err = hdb.db.Exec("DELETE FROM block WHERE eth_block_num > $1;", lastValidBlock) _, err = hdb.dbWrite.Exec("DELETE FROM block WHERE eth_block_num > $1;", lastValidBlock)
} }
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
// AddBids insert Bids into the DB // AddBids insert Bids into the DB
func (hdb *HistoryDB) AddBids(bids []common.Bid) error { return hdb.addBids(hdb.db, bids) } func (hdb *HistoryDB) AddBids(bids []common.Bid) error { return hdb.addBids(hdb.dbWrite, bids) }
func (hdb *HistoryDB) addBids(d meddler.DB, bids []common.Bid) error { func (hdb *HistoryDB) addBids(d meddler.DB, bids []common.Bid) error {
if len(bids) == 0 { if len(bids) == 0 {
return nil return nil
@@ -281,7 +286,7 @@ func (hdb *HistoryDB) addBids(d meddler.DB, bids []common.Bid) error {
func (hdb *HistoryDB) GetAllBids() ([]common.Bid, error) { func (hdb *HistoryDB) GetAllBids() ([]common.Bid, error) {
var bids []*common.Bid var bids []*common.Bid
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &bids, hdb.dbRead, &bids,
`SELECT bid.slot_num, bid.bid_value, bid.eth_block_num, bid.bidder_addr FROM bid `SELECT bid.slot_num, bid.bid_value, bid.eth_block_num, bid.bidder_addr FROM bid
ORDER BY item_id;`, ORDER BY item_id;`,
) )
@@ -292,7 +297,7 @@ func (hdb *HistoryDB) GetAllBids() ([]common.Bid, error) {
func (hdb *HistoryDB) GetBestBidCoordinator(slotNum int64) (*common.BidCoordinator, error) { func (hdb *HistoryDB) GetBestBidCoordinator(slotNum int64) (*common.BidCoordinator, error) {
bidCoord := &common.BidCoordinator{} bidCoord := &common.BidCoordinator{}
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, bidCoord, hdb.dbRead, bidCoord,
`SELECT ( `SELECT (
SELECT default_slot_set_bid SELECT default_slot_set_bid
FROM auction_vars FROM auction_vars
@@ -315,7 +320,7 @@ func (hdb *HistoryDB) GetBestBidCoordinator(slotNum int64) (*common.BidCoordinat
// AddCoordinators insert Coordinators into the DB // AddCoordinators insert Coordinators into the DB
func (hdb *HistoryDB) AddCoordinators(coordinators []common.Coordinator) error { func (hdb *HistoryDB) AddCoordinators(coordinators []common.Coordinator) error {
return tracerr.Wrap(hdb.addCoordinators(hdb.db, coordinators)) return tracerr.Wrap(hdb.addCoordinators(hdb.dbWrite, coordinators))
} }
func (hdb *HistoryDB) addCoordinators(d meddler.DB, coordinators []common.Coordinator) error { func (hdb *HistoryDB) addCoordinators(d meddler.DB, coordinators []common.Coordinator) error {
if len(coordinators) == 0 { if len(coordinators) == 0 {
@@ -330,7 +335,7 @@ func (hdb *HistoryDB) addCoordinators(d meddler.DB, coordinators []common.Coordi
// AddExitTree insert Exit tree into the DB // AddExitTree insert Exit tree into the DB
func (hdb *HistoryDB) AddExitTree(exitTree []common.ExitInfo) error { func (hdb *HistoryDB) AddExitTree(exitTree []common.ExitInfo) error {
return tracerr.Wrap(hdb.addExitTree(hdb.db, exitTree)) return tracerr.Wrap(hdb.addExitTree(hdb.dbWrite, exitTree))
} }
func (hdb *HistoryDB) addExitTree(d meddler.DB, exitTree []common.ExitInfo) error { func (hdb *HistoryDB) addExitTree(d meddler.DB, exitTree []common.ExitInfo) error {
if len(exitTree) == 0 { if len(exitTree) == 0 {
@@ -418,11 +423,13 @@ func (hdb *HistoryDB) updateExitTree(d sqlx.Ext, blockNum int64,
// AddToken insert a token into the DB // AddToken insert a token into the DB
func (hdb *HistoryDB) AddToken(token *common.Token) error { func (hdb *HistoryDB) AddToken(token *common.Token) error {
return tracerr.Wrap(meddler.Insert(hdb.db, "token", token)) return tracerr.Wrap(meddler.Insert(hdb.dbWrite, "token", token))
} }
// AddTokens insert tokens into the DB // AddTokens insert tokens into the DB
func (hdb *HistoryDB) AddTokens(tokens []common.Token) error { return hdb.addTokens(hdb.db, tokens) } func (hdb *HistoryDB) AddTokens(tokens []common.Token) error {
return hdb.addTokens(hdb.dbWrite, tokens)
}
func (hdb *HistoryDB) addTokens(d meddler.DB, tokens []common.Token) error { func (hdb *HistoryDB) addTokens(d meddler.DB, tokens []common.Token) error {
if len(tokens) == 0 { if len(tokens) == 0 {
return nil return nil
@@ -453,7 +460,7 @@ func (hdb *HistoryDB) UpdateTokenValue(tokenSymbol string, value float64) error
// Sanitize symbol // Sanitize symbol
tokenSymbol = strings.ToValidUTF8(tokenSymbol, " ") tokenSymbol = strings.ToValidUTF8(tokenSymbol, " ")
_, err := hdb.db.Exec( _, err := hdb.dbWrite.Exec(
"UPDATE token SET usd = $1 WHERE symbol = $2;", "UPDATE token SET usd = $1 WHERE symbol = $2;",
value, tokenSymbol, value, tokenSymbol,
) )
@@ -464,7 +471,7 @@ func (hdb *HistoryDB) UpdateTokenValue(tokenSymbol string, value float64) error
func (hdb *HistoryDB) GetToken(tokenID common.TokenID) (*TokenWithUSD, error) { func (hdb *HistoryDB) GetToken(tokenID common.TokenID) (*TokenWithUSD, error) {
token := &TokenWithUSD{} token := &TokenWithUSD{}
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, token, `SELECT * FROM token WHERE token_id = $1;`, tokenID, hdb.dbRead, token, `SELECT * FROM token WHERE token_id = $1;`, tokenID,
) )
return token, tracerr.Wrap(err) return token, tracerr.Wrap(err)
} }
@@ -473,7 +480,7 @@ func (hdb *HistoryDB) GetToken(tokenID common.TokenID) (*TokenWithUSD, error) {
func (hdb *HistoryDB) GetAllTokens() ([]TokenWithUSD, error) { func (hdb *HistoryDB) GetAllTokens() ([]TokenWithUSD, error) {
var tokens []*TokenWithUSD var tokens []*TokenWithUSD
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &tokens, hdb.dbRead, &tokens,
"SELECT * FROM token ORDER BY token_id;", "SELECT * FROM token ORDER BY token_id;",
) )
return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), tracerr.Wrap(err) return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), tracerr.Wrap(err)
@@ -482,7 +489,7 @@ func (hdb *HistoryDB) GetAllTokens() ([]TokenWithUSD, error) {
// GetTokenSymbols returns all the token symbols from the DB // GetTokenSymbols returns all the token symbols from the DB
func (hdb *HistoryDB) GetTokenSymbols() ([]string, error) { func (hdb *HistoryDB) GetTokenSymbols() ([]string, error) {
var tokenSymbols []string var tokenSymbols []string
rows, err := hdb.db.Query("SELECT symbol FROM token;") rows, err := hdb.dbRead.Query("SELECT symbol FROM token;")
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
@@ -500,7 +507,7 @@ func (hdb *HistoryDB) GetTokenSymbols() ([]string, error) {
// AddAccounts insert accounts into the DB // AddAccounts insert accounts into the DB
func (hdb *HistoryDB) AddAccounts(accounts []common.Account) error { func (hdb *HistoryDB) AddAccounts(accounts []common.Account) error {
return tracerr.Wrap(hdb.addAccounts(hdb.db, accounts)) return tracerr.Wrap(hdb.addAccounts(hdb.dbWrite, accounts))
} }
func (hdb *HistoryDB) addAccounts(d meddler.DB, accounts []common.Account) error { func (hdb *HistoryDB) addAccounts(d meddler.DB, accounts []common.Account) error {
if len(accounts) == 0 { if len(accounts) == 0 {
@@ -523,7 +530,7 @@ func (hdb *HistoryDB) addAccounts(d meddler.DB, accounts []common.Account) error
func (hdb *HistoryDB) GetAllAccounts() ([]common.Account, error) { func (hdb *HistoryDB) GetAllAccounts() ([]common.Account, error) {
var accs []*common.Account var accs []*common.Account
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &accs, hdb.dbRead, &accs,
"SELECT idx, token_id, batch_num, bjj, eth_addr FROM account ORDER BY idx;", "SELECT idx, token_id, batch_num, bjj, eth_addr FROM account ORDER BY idx;",
) )
return db.SlicePtrsToSlice(accs).([]common.Account), tracerr.Wrap(err) return db.SlicePtrsToSlice(accs).([]common.Account), tracerr.Wrap(err)
@@ -531,7 +538,7 @@ func (hdb *HistoryDB) GetAllAccounts() ([]common.Account, error) {
// AddAccountUpdates inserts accUpdates into the DB // AddAccountUpdates inserts accUpdates into the DB
func (hdb *HistoryDB) AddAccountUpdates(accUpdates []common.AccountUpdate) error { func (hdb *HistoryDB) AddAccountUpdates(accUpdates []common.AccountUpdate) error {
return tracerr.Wrap(hdb.addAccountUpdates(hdb.db, accUpdates)) return tracerr.Wrap(hdb.addAccountUpdates(hdb.dbWrite, accUpdates))
} }
func (hdb *HistoryDB) addAccountUpdates(d meddler.DB, accUpdates []common.AccountUpdate) error { func (hdb *HistoryDB) addAccountUpdates(d meddler.DB, accUpdates []common.AccountUpdate) error {
if len(accUpdates) == 0 { if len(accUpdates) == 0 {
@@ -554,7 +561,7 @@ func (hdb *HistoryDB) addAccountUpdates(d meddler.DB, accUpdates []common.Accoun
func (hdb *HistoryDB) GetAllAccountUpdates() ([]common.AccountUpdate, error) { func (hdb *HistoryDB) GetAllAccountUpdates() ([]common.AccountUpdate, error) {
var accUpdates []*common.AccountUpdate var accUpdates []*common.AccountUpdate
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &accUpdates, hdb.dbRead, &accUpdates,
"SELECT eth_block_num, batch_num, idx, nonce, balance FROM account_update ORDER BY idx;", "SELECT eth_block_num, batch_num, idx, nonce, balance FROM account_update ORDER BY idx;",
) )
return db.SlicePtrsToSlice(accUpdates).([]common.AccountUpdate), tracerr.Wrap(err) return db.SlicePtrsToSlice(accUpdates).([]common.AccountUpdate), tracerr.Wrap(err)
@@ -565,7 +572,7 @@ func (hdb *HistoryDB) GetAllAccountUpdates() ([]common.AccountUpdate, error) {
// BatchNum should be null, and the value will be setted by a trigger when a batch forges the tx. // BatchNum should be null, and the value will be setted by a trigger when a batch forges the tx.
// EffectiveAmount and EffectiveDepositAmount are seted with default values by the DB. // EffectiveAmount and EffectiveDepositAmount are seted with default values by the DB.
func (hdb *HistoryDB) AddL1Txs(l1txs []common.L1Tx) error { func (hdb *HistoryDB) AddL1Txs(l1txs []common.L1Tx) error {
return tracerr.Wrap(hdb.addL1Txs(hdb.db, l1txs)) return tracerr.Wrap(hdb.addL1Txs(hdb.dbWrite, l1txs))
} }
// addL1Txs inserts L1 txs to the DB. USD and DepositAmountUSD will be set automatically before storing the tx. // addL1Txs inserts L1 txs to the DB. USD and DepositAmountUSD will be set automatically before storing the tx.
@@ -619,7 +626,7 @@ func (hdb *HistoryDB) addL1Txs(d meddler.DB, l1txs []common.L1Tx) error {
// AddL2Txs inserts L2 txs to the DB. TokenID, USD and FeeUSD will be set automatically before storing the tx. // AddL2Txs inserts L2 txs to the DB. TokenID, USD and FeeUSD will be set automatically before storing the tx.
func (hdb *HistoryDB) AddL2Txs(l2txs []common.L2Tx) error { func (hdb *HistoryDB) AddL2Txs(l2txs []common.L2Tx) error {
return tracerr.Wrap(hdb.addL2Txs(hdb.db, l2txs)) return tracerr.Wrap(hdb.addL2Txs(hdb.dbWrite, l2txs))
} }
// addL2Txs inserts L2 txs to the DB. TokenID, USD and FeeUSD will be set automatically before storing the tx. // addL2Txs inserts L2 txs to the DB. TokenID, USD and FeeUSD will be set automatically before storing the tx.
@@ -686,7 +693,7 @@ func (hdb *HistoryDB) addTxs(d meddler.DB, txs []txWrite) error {
func (hdb *HistoryDB) GetAllExits() ([]common.ExitInfo, error) { func (hdb *HistoryDB) GetAllExits() ([]common.ExitInfo, error) {
var exits []*common.ExitInfo var exits []*common.ExitInfo
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &exits, hdb.dbRead, &exits,
`SELECT exit_tree.batch_num, exit_tree.account_idx, exit_tree.merkle_proof, `SELECT exit_tree.batch_num, exit_tree.account_idx, exit_tree.merkle_proof,
exit_tree.balance, exit_tree.instant_withdrawn, exit_tree.delayed_withdraw_request, exit_tree.balance, exit_tree.instant_withdrawn, exit_tree.delayed_withdraw_request,
exit_tree.delayed_withdrawn FROM exit_tree ORDER BY item_id;`, exit_tree.delayed_withdrawn FROM exit_tree ORDER BY item_id;`,
@@ -698,7 +705,7 @@ func (hdb *HistoryDB) GetAllExits() ([]common.ExitInfo, error) {
func (hdb *HistoryDB) GetAllL1UserTxs() ([]common.L1Tx, error) { func (hdb *HistoryDB) GetAllL1UserTxs() ([]common.L1Tx, error) {
var txs []*common.L1Tx var txs []*common.L1Tx
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &txs, // Note that '\x' gets parsed as a big.Int with value = 0 hdb.dbRead, &txs, // Note that '\x' gets parsed as a big.Int with value = 0
`SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin, `SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin,
tx.from_idx, tx.effective_from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, tx.token_id, tx.from_idx, tx.effective_from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, tx.token_id,
tx.amount, (CASE WHEN tx.batch_num IS NULL THEN NULL WHEN tx.amount_success THEN tx.amount ELSE '\x' END) AS effective_amount, tx.amount, (CASE WHEN tx.batch_num IS NULL THEN NULL WHEN tx.amount_success THEN tx.amount ELSE '\x' END) AS effective_amount,
@@ -715,7 +722,7 @@ func (hdb *HistoryDB) GetAllL1CoordinatorTxs() ([]common.L1Tx, error) {
// Since the query specifies that only coordinator txs are returned, it's safe to assume // Since the query specifies that only coordinator txs are returned, it's safe to assume
// that returned txs will always have effective amounts // that returned txs will always have effective amounts
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &txs, hdb.dbRead, &txs,
`SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin, `SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin,
tx.from_idx, tx.effective_from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, tx.token_id, tx.from_idx, tx.effective_from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, tx.token_id,
tx.amount, tx.amount AS effective_amount, tx.amount, tx.amount AS effective_amount,
@@ -730,7 +737,7 @@ func (hdb *HistoryDB) GetAllL1CoordinatorTxs() ([]common.L1Tx, error) {
func (hdb *HistoryDB) GetAllL2Txs() ([]common.L2Tx, error) { func (hdb *HistoryDB) GetAllL2Txs() ([]common.L2Tx, error) {
var txs []*common.L2Tx var txs []*common.L2Tx
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &txs, hdb.dbRead, &txs,
`SELECT tx.id, tx.batch_num, tx.position, `SELECT tx.id, tx.batch_num, tx.position,
tx.from_idx, tx.to_idx, tx.amount, tx.token_id, tx.from_idx, tx.to_idx, tx.amount, tx.token_id,
tx.fee, tx.nonce, tx.type, tx.eth_block_num tx.fee, tx.nonce, tx.type, tx.eth_block_num
@@ -743,7 +750,7 @@ func (hdb *HistoryDB) GetAllL2Txs() ([]common.L2Tx, error) {
func (hdb *HistoryDB) GetUnforgedL1UserTxs(toForgeL1TxsNum int64) ([]common.L1Tx, error) { func (hdb *HistoryDB) GetUnforgedL1UserTxs(toForgeL1TxsNum int64) ([]common.L1Tx, error) {
var txs []*common.L1Tx var txs []*common.L1Tx
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &txs, // only L1 user txs can have batch_num set to null hdb.dbRead, &txs, // only L1 user txs can have batch_num set to null
`SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin, `SELECT tx.id, tx.to_forge_l1_txs_num, tx.position, tx.user_origin,
tx.from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, tx.token_id, tx.from_idx, tx.from_eth_addr, tx.from_bjj, tx.to_idx, tx.token_id,
tx.amount, NULL AS effective_amount, tx.amount, NULL AS effective_amount,
@@ -760,7 +767,7 @@ func (hdb *HistoryDB) GetUnforgedL1UserTxs(toForgeL1TxsNum int64) ([]common.L1Tx
// GetLastTxsPosition for a given to_forge_l1_txs_num // GetLastTxsPosition for a given to_forge_l1_txs_num
func (hdb *HistoryDB) GetLastTxsPosition(toForgeL1TxsNum int64) (int, error) { func (hdb *HistoryDB) GetLastTxsPosition(toForgeL1TxsNum int64) (int, error) {
row := hdb.db.QueryRow( row := hdb.dbRead.QueryRow(
"SELECT position FROM tx WHERE to_forge_l1_txs_num = $1 ORDER BY position DESC;", "SELECT position FROM tx WHERE to_forge_l1_txs_num = $1 ORDER BY position DESC;",
toForgeL1TxsNum, toForgeL1TxsNum,
) )
@@ -774,15 +781,15 @@ func (hdb *HistoryDB) GetSCVars() (*common.RollupVariables, *common.AuctionVaria
var rollup common.RollupVariables var rollup common.RollupVariables
var auction common.AuctionVariables var auction common.AuctionVariables
var wDelayer common.WDelayerVariables var wDelayer common.WDelayerVariables
if err := meddler.QueryRow(hdb.db, &rollup, if err := meddler.QueryRow(hdb.dbRead, &rollup,
"SELECT * FROM rollup_vars ORDER BY eth_block_num DESC LIMIT 1;"); err != nil { "SELECT * FROM rollup_vars ORDER BY eth_block_num DESC LIMIT 1;"); err != nil {
return nil, nil, nil, tracerr.Wrap(err) return nil, nil, nil, tracerr.Wrap(err)
} }
if err := meddler.QueryRow(hdb.db, &auction, if err := meddler.QueryRow(hdb.dbRead, &auction,
"SELECT * FROM auction_vars ORDER BY eth_block_num DESC LIMIT 1;"); err != nil { "SELECT * FROM auction_vars ORDER BY eth_block_num DESC LIMIT 1;"); err != nil {
return nil, nil, nil, tracerr.Wrap(err) return nil, nil, nil, tracerr.Wrap(err)
} }
if err := meddler.QueryRow(hdb.db, &wDelayer, if err := meddler.QueryRow(hdb.dbRead, &wDelayer,
"SELECT * FROM wdelayer_vars ORDER BY eth_block_num DESC LIMIT 1;"); err != nil { "SELECT * FROM wdelayer_vars ORDER BY eth_block_num DESC LIMIT 1;"); err != nil {
return nil, nil, nil, tracerr.Wrap(err) return nil, nil, nil, tracerr.Wrap(err)
} }
@@ -827,7 +834,7 @@ func (hdb *HistoryDB) AddBucketUpdatesTest(d meddler.DB, bucketUpdates []common.
func (hdb *HistoryDB) GetAllBucketUpdates() ([]common.BucketUpdate, error) { func (hdb *HistoryDB) GetAllBucketUpdates() ([]common.BucketUpdate, error) {
var bucketUpdates []*common.BucketUpdate var bucketUpdates []*common.BucketUpdate
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &bucketUpdates, hdb.dbRead, &bucketUpdates,
`SELECT eth_block_num, num_bucket, block_stamp, withdrawals `SELECT eth_block_num, num_bucket, block_stamp, withdrawals
FROM bucket_update ORDER BY item_id;`, FROM bucket_update ORDER BY item_id;`,
) )
@@ -853,7 +860,7 @@ func (hdb *HistoryDB) addTokenExchanges(d meddler.DB, tokenExchanges []common.To
func (hdb *HistoryDB) GetAllTokenExchanges() ([]common.TokenExchange, error) { func (hdb *HistoryDB) GetAllTokenExchanges() ([]common.TokenExchange, error) {
var tokenExchanges []*common.TokenExchange var tokenExchanges []*common.TokenExchange
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &tokenExchanges, hdb.dbRead, &tokenExchanges,
"SELECT eth_block_num, eth_addr, value_usd FROM token_exchange ORDER BY item_id;", "SELECT eth_block_num, eth_addr, value_usd FROM token_exchange ORDER BY item_id;",
) )
return db.SlicePtrsToSlice(tokenExchanges).([]common.TokenExchange), tracerr.Wrap(err) return db.SlicePtrsToSlice(tokenExchanges).([]common.TokenExchange), tracerr.Wrap(err)
@@ -881,7 +888,7 @@ func (hdb *HistoryDB) addEscapeHatchWithdrawals(d meddler.DB,
func (hdb *HistoryDB) GetAllEscapeHatchWithdrawals() ([]common.WDelayerEscapeHatchWithdrawal, error) { func (hdb *HistoryDB) GetAllEscapeHatchWithdrawals() ([]common.WDelayerEscapeHatchWithdrawal, error) {
var escapeHatchWithdrawals []*common.WDelayerEscapeHatchWithdrawal var escapeHatchWithdrawals []*common.WDelayerEscapeHatchWithdrawal
err := meddler.QueryAll( err := meddler.QueryAll(
hdb.db, &escapeHatchWithdrawals, hdb.dbRead, &escapeHatchWithdrawals,
"SELECT eth_block_num, who_addr, to_addr, token_addr, amount FROM escape_hatch_withdrawal ORDER BY item_id;", "SELECT eth_block_num, who_addr, to_addr, token_addr, amount FROM escape_hatch_withdrawal ORDER BY item_id;",
) )
return db.SlicePtrsToSlice(escapeHatchWithdrawals).([]common.WDelayerEscapeHatchWithdrawal), return db.SlicePtrsToSlice(escapeHatchWithdrawals).([]common.WDelayerEscapeHatchWithdrawal),
@@ -894,7 +901,7 @@ func (hdb *HistoryDB) GetAllEscapeHatchWithdrawals() ([]common.WDelayerEscapeHat
// exist in the smart contracts. // exist in the smart contracts.
func (hdb *HistoryDB) SetInitialSCVars(rollup *common.RollupVariables, func (hdb *HistoryDB) SetInitialSCVars(rollup *common.RollupVariables,
auction *common.AuctionVariables, wDelayer *common.WDelayerVariables) error { auction *common.AuctionVariables, wDelayer *common.WDelayerVariables) error {
txn, err := hdb.db.Beginx() txn, err := hdb.dbWrite.Beginx()
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -978,7 +985,7 @@ func (hdb *HistoryDB) setExtraInfoForgedL1UserTxs(d sqlx.Ext, txs []common.L1Tx)
// the pagination system of the API/DB depends on this. Within blocks, all // the pagination system of the API/DB depends on this. Within blocks, all
// items should also be in the correct order (Accounts, Tokens, Txs, etc.) // items should also be in the correct order (Accounts, Tokens, Txs, etc.)
func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) { func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) {
txn, err := hdb.db.Beginx() txn, err := hdb.dbWrite.Beginx()
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -1136,7 +1143,7 @@ func (hdb *HistoryDB) AddBlockSCData(blockData *common.BlockData) (err error) {
func (hdb *HistoryDB) GetCoordinatorAPI(bidderAddr ethCommon.Address) (*CoordinatorAPI, error) { func (hdb *HistoryDB) GetCoordinatorAPI(bidderAddr ethCommon.Address) (*CoordinatorAPI, error) {
coordinator := &CoordinatorAPI{} coordinator := &CoordinatorAPI{}
err := meddler.QueryRow( err := meddler.QueryRow(
hdb.db, coordinator, hdb.dbRead, coordinator,
"SELECT * FROM coordinator WHERE bidder_addr = $1 ORDER BY item_id DESC LIMIT 1;", "SELECT * FROM coordinator WHERE bidder_addr = $1 ORDER BY item_id DESC LIMIT 1;",
bidderAddr, bidderAddr,
) )
@@ -1145,14 +1152,14 @@ func (hdb *HistoryDB) GetCoordinatorAPI(bidderAddr ethCommon.Address) (*Coordina
// AddAuctionVars insert auction vars into the DB // AddAuctionVars insert auction vars into the DB
func (hdb *HistoryDB) AddAuctionVars(auctionVars *common.AuctionVariables) error { func (hdb *HistoryDB) AddAuctionVars(auctionVars *common.AuctionVariables) error {
return tracerr.Wrap(meddler.Insert(hdb.db, "auction_vars", auctionVars)) return tracerr.Wrap(meddler.Insert(hdb.dbWrite, "auction_vars", auctionVars))
} }
// GetTokensTest used to get tokens in a testing context // GetTokensTest used to get tokens in a testing context
func (hdb *HistoryDB) GetTokensTest() ([]TokenWithUSD, error) { func (hdb *HistoryDB) GetTokensTest() ([]TokenWithUSD, error) {
tokens := []*TokenWithUSD{} tokens := []*TokenWithUSD{}
if err := meddler.QueryAll( if err := meddler.QueryAll(
hdb.db, &tokens, hdb.dbRead, &tokens,
"SELECT * FROM TOKEN", "SELECT * FROM TOKEN",
); err != nil { ); err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)

View File

@@ -39,12 +39,12 @@ func TestMain(m *testing.M) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
historyDB = NewHistoryDB(db, nil) historyDB = NewHistoryDB(db, db, nil)
if err != nil { if err != nil {
panic(err) panic(err)
} }
apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second) apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second)
historyDBWithACC = NewHistoryDB(db, apiConnCon) historyDBWithACC = NewHistoryDB(db, db, apiConnCon)
// Run tests // Run tests
result := m.Run() result := m.Run()
// Close DB // Close DB
@@ -817,11 +817,11 @@ func TestSetExtraInfoForgedL1UserTxs(t *testing.T) {
} }
// Add second batch to trigger the update of the batch_num, // Add second batch to trigger the update of the batch_num,
// while avoiding the implicit call of setExtraInfoForgedL1UserTxs // while avoiding the implicit call of setExtraInfoForgedL1UserTxs
err = historyDB.addBlock(historyDB.db, &blocks[1].Block) err = historyDB.addBlock(historyDB.dbWrite, &blocks[1].Block)
require.NoError(t, err) require.NoError(t, err)
err = historyDB.addBatch(historyDB.db, &blocks[1].Rollup.Batches[0].Batch) err = historyDB.addBatch(historyDB.dbWrite, &blocks[1].Rollup.Batches[0].Batch)
require.NoError(t, err) require.NoError(t, err)
err = historyDB.addAccounts(historyDB.db, blocks[1].Rollup.Batches[0].CreatedAccounts) err = historyDB.addAccounts(historyDB.dbWrite, blocks[1].Rollup.Batches[0].CreatedAccounts)
require.NoError(t, err) require.NoError(t, err)
// Set the Effective{Amount,DepositAmount} of the L1UserTxs that are forged in the second block // Set the Effective{Amount,DepositAmount} of the L1UserTxs that are forged in the second block
@@ -831,7 +831,7 @@ func TestSetExtraInfoForgedL1UserTxs(t *testing.T) {
l1Txs[1].EffectiveAmount = big.NewInt(0) l1Txs[1].EffectiveAmount = big.NewInt(0)
l1Txs[2].EffectiveDepositAmount = big.NewInt(0) l1Txs[2].EffectiveDepositAmount = big.NewInt(0)
l1Txs[2].EffectiveAmount = big.NewInt(0) l1Txs[2].EffectiveAmount = big.NewInt(0)
err = historyDB.setExtraInfoForgedL1UserTxs(historyDB.db, l1Txs) err = historyDB.setExtraInfoForgedL1UserTxs(historyDB.dbWrite, l1Txs)
require.NoError(t, err) require.NoError(t, err)
dbL1Txs, err := historyDB.GetAllL1UserTxs() dbL1Txs, err := historyDB.GetAllL1UserTxs()
@@ -918,10 +918,10 @@ func TestUpdateExitTree(t *testing.T) {
common.WithdrawInfo{Idx: 259, NumExitRoot: 3, InstantWithdraw: false, common.WithdrawInfo{Idx: 259, NumExitRoot: 3, InstantWithdraw: false,
Owner: tc.UsersByIdx[259].Addr, Token: tokenAddr}, Owner: tc.UsersByIdx[259].Addr, Token: tokenAddr},
) )
err = historyDB.addBlock(historyDB.db, &block.Block) err = historyDB.addBlock(historyDB.dbWrite, &block.Block)
require.NoError(t, err) require.NoError(t, err)
err = historyDB.updateExitTree(historyDB.db, block.Block.Num, err = historyDB.updateExitTree(historyDB.dbWrite, block.Block.Num,
block.Rollup.Withdrawals, block.WDelayer.Withdrawals) block.Rollup.Withdrawals, block.WDelayer.Withdrawals)
require.NoError(t, err) require.NoError(t, err)
@@ -951,10 +951,10 @@ func TestUpdateExitTree(t *testing.T) {
Token: tokenAddr, Token: tokenAddr,
Amount: big.NewInt(80), Amount: big.NewInt(80),
}) })
err = historyDB.addBlock(historyDB.db, &block.Block) err = historyDB.addBlock(historyDB.dbWrite, &block.Block)
require.NoError(t, err) require.NoError(t, err)
err = historyDB.updateExitTree(historyDB.db, block.Block.Num, err = historyDB.updateExitTree(historyDB.dbWrite, block.Block.Num,
block.Rollup.Withdrawals, block.WDelayer.Withdrawals) block.Rollup.Withdrawals, block.WDelayer.Withdrawals)
require.NoError(t, err) require.NoError(t, err)
@@ -997,7 +997,7 @@ func TestGetBestBidCoordinator(t *testing.T) {
URL: "bar", URL: "bar",
}, },
} }
err = historyDB.addCoordinators(historyDB.db, coords) err = historyDB.addCoordinators(historyDB.dbWrite, coords)
require.NoError(t, err) require.NoError(t, err)
bids := []common.Bid{ bids := []common.Bid{
@@ -1015,7 +1015,7 @@ func TestGetBestBidCoordinator(t *testing.T) {
}, },
} }
err = historyDB.addBids(historyDB.db, bids) err = historyDB.addBids(historyDB.dbWrite, bids)
require.NoError(t, err) require.NoError(t, err)
forger10, err := historyDB.GetBestBidCoordinator(10) forger10, err := historyDB.GetBestBidCoordinator(10)
@@ -1053,7 +1053,7 @@ func TestAddBucketUpdates(t *testing.T) {
Withdrawals: big.NewInt(42), Withdrawals: big.NewInt(42),
}, },
} }
err := historyDB.addBucketUpdates(historyDB.db, bucketUpdates) err := historyDB.addBucketUpdates(historyDB.dbWrite, bucketUpdates)
require.NoError(t, err) require.NoError(t, err)
dbBucketUpdates, err := historyDB.GetAllBucketUpdates() dbBucketUpdates, err := historyDB.GetAllBucketUpdates()
require.NoError(t, err) require.NoError(t, err)
@@ -1078,7 +1078,7 @@ func TestAddTokenExchanges(t *testing.T) {
ValueUSD: 67890, ValueUSD: 67890,
}, },
} }
err := historyDB.addTokenExchanges(historyDB.db, tokenExchanges) err := historyDB.addTokenExchanges(historyDB.dbWrite, tokenExchanges)
require.NoError(t, err) require.NoError(t, err)
dbTokenExchanges, err := historyDB.GetAllTokenExchanges() dbTokenExchanges, err := historyDB.GetAllTokenExchanges()
require.NoError(t, err) require.NoError(t, err)
@@ -1107,7 +1107,7 @@ func TestAddEscapeHatchWithdrawals(t *testing.T) {
Amount: big.NewInt(20003), Amount: big.NewInt(20003),
}, },
} }
err := historyDB.addEscapeHatchWithdrawals(historyDB.db, escapeHatchWithdrawals) err := historyDB.addEscapeHatchWithdrawals(historyDB.dbWrite, escapeHatchWithdrawals)
require.NoError(t, err) require.NoError(t, err)
dbEscapeHatchWithdrawals, err := historyDB.GetAllEscapeHatchWithdrawals() dbEscapeHatchWithdrawals, err := historyDB.GetAllEscapeHatchWithdrawals()
require.NoError(t, err) require.NoError(t, err)
@@ -1175,16 +1175,12 @@ func TestGetMetricsAPI(t *testing.T) {
res, err := historyDBWithACC.GetMetricsAPI(common.BatchNum(numBatches)) res, err := historyDBWithACC.GetMetricsAPI(common.BatchNum(numBatches))
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, float64(numTx)/float64(numBatches-1), res.TransactionsPerBatch) assert.Equal(t, float64(numTx)/float64(numBatches), res.TransactionsPerBatch)
// Frequency is not exactly the desired one, some decimals may appear // Frequency is not exactly the desired one, some decimals may appear
assert.GreaterOrEqual(t, res.BatchFrequency, float64(frequency)) // There is a -2 as time for first and last batch is not taken into account
assert.Less(t, res.BatchFrequency, float64(frequency+1)) assert.InEpsilon(t, float64(frequency)*float64(numBatches-2)/float64(numBatches), res.BatchFrequency, 0.01)
// Truncate frecuency into an int to do an exact check assert.InEpsilon(t, float64(numTx)/float64(frequency*blockNum-frequency), res.TransactionsPerSecond, 0.01)
assert.Equal(t, frequency, int(res.BatchFrequency))
// This may also be different in some decimals
// Truncate it to the third decimal to compare
assert.Equal(t, math.Trunc((float64(numTx)/float64(frequency*blockNum-frequency))/0.001)*0.001, math.Trunc(res.TransactionsPerSecond/0.001)*0.001)
assert.Equal(t, int64(3), res.TotalAccounts) assert.Equal(t, int64(3), res.TotalAccounts)
assert.Equal(t, int64(3), res.TotalBJJs) assert.Equal(t, int64(3), res.TotalBJJs)
// Til does not set fees // Til does not set fees

View File

@@ -34,7 +34,7 @@ func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCre
defer l2db.apiConnCon.Release() defer l2db.apiConnCon.Release()
auth := new(AccountCreationAuthAPI) auth := new(AccountCreationAuthAPI)
return auth, tracerr.Wrap(meddler.QueryRow( return auth, tracerr.Wrap(meddler.QueryRow(
l2db.db, auth, l2db.dbRead, auth,
"SELECT * FROM account_creation_auth WHERE eth_addr = $1;", "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
addr, addr,
)) ))
@@ -49,7 +49,7 @@ func (l2db *L2DB) AddTxAPI(tx *PoolL2TxWrite) error {
} }
defer l2db.apiConnCon.Release() defer l2db.apiConnCon.Release()
row := l2db.db.QueryRow(`SELECT row := l2db.dbRead.QueryRow(`SELECT
($1::NUMERIC * token.usd * fee_percentage($2::NUMERIC)) / ($1::NUMERIC * token.usd * fee_percentage($2::NUMERIC)) /
(10.0 ^ token.decimals::NUMERIC) (10.0 ^ token.decimals::NUMERIC)
FROM token WHERE token.token_id = $3;`, FROM token WHERE token.token_id = $3;`,
@@ -84,7 +84,7 @@ func (l2db *L2DB) AddTxAPI(tx *PoolL2TxWrite) error {
namesPart, valuesPart, namesPart, valuesPart,
len(values)+1, len(values)+2) //nolint:gomnd len(values)+1, len(values)+2) //nolint:gomnd
values = append(values, common.PoolL2TxStatePending, l2db.maxTxs) values = append(values, common.PoolL2TxStatePending, l2db.maxTxs)
res, err := l2db.db.Exec(q, values...) res, err := l2db.dbWrite.Exec(q, values...)
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -118,7 +118,7 @@ func (l2db *L2DB) GetTxAPI(txID common.TxID) (*PoolTxAPI, error) {
defer l2db.apiConnCon.Release() defer l2db.apiConnCon.Release()
tx := new(PoolTxAPI) tx := new(PoolTxAPI)
return tx, tracerr.Wrap(meddler.QueryRow( return tx, tracerr.Wrap(meddler.QueryRow(
l2db.db, tx, l2db.dbRead, tx,
selectPoolTxAPI+"WHERE tx_id = $1;", selectPoolTxAPI+"WHERE tx_id = $1;",
txID, txID,
)) ))

View File

@@ -21,7 +21,8 @@ import (
// L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
// due to them being forged or invalid after a safety period // due to them being forged or invalid after a safety period
type L2DB struct { type L2DB struct {
db *sqlx.DB dbRead *sqlx.DB
dbWrite *sqlx.DB
safetyPeriod common.BatchNum safetyPeriod common.BatchNum
ttl time.Duration ttl time.Duration
maxTxs uint32 // limit of txs that are accepted in the pool maxTxs uint32 // limit of txs that are accepted in the pool
@@ -33,7 +34,7 @@ type L2DB struct {
// To create it, it's needed db connection, safety period expressed in batches, // To create it, it's needed db connection, safety period expressed in batches,
// maxTxs that the DB should have and TTL (time to live) for pending txs. // maxTxs that the DB should have and TTL (time to live) for pending txs.
func NewL2DB( func NewL2DB(
db *sqlx.DB, dbRead, dbWrite *sqlx.DB,
safetyPeriod common.BatchNum, safetyPeriod common.BatchNum,
maxTxs uint32, maxTxs uint32,
minFeeUSD float64, minFeeUSD float64,
@@ -41,7 +42,8 @@ func NewL2DB(
apiConnCon *db.APIConnectionController, apiConnCon *db.APIConnectionController,
) *L2DB { ) *L2DB {
return &L2DB{ return &L2DB{
db: db, dbRead: dbRead,
dbWrite: dbWrite,
safetyPeriod: safetyPeriod, safetyPeriod: safetyPeriod,
ttl: TTL, ttl: TTL,
maxTxs: maxTxs, maxTxs: maxTxs,
@@ -53,12 +55,18 @@ func NewL2DB(
// DB returns a pointer to the L2DB.db. This method should be used only for // DB returns a pointer to the L2DB.db. This method should be used only for
// internal testing purposes. // internal testing purposes.
func (l2db *L2DB) DB() *sqlx.DB { func (l2db *L2DB) DB() *sqlx.DB {
return l2db.db return l2db.dbWrite
}
// MinFeeUSD returns the minimum fee in USD that is required to accept txs into
// the pool
func (l2db *L2DB) MinFeeUSD() float64 {
return l2db.minFeeUSD
} }
// AddAccountCreationAuth inserts an account creation authorization into the DB // AddAccountCreationAuth inserts an account creation authorization into the DB
func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error { func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
_, err := l2db.db.Exec( _, err := l2db.dbWrite.Exec(
`INSERT INTO account_creation_auth (eth_addr, bjj, signature) `INSERT INTO account_creation_auth (eth_addr, bjj, signature)
VALUES ($1, $2, $3);`, VALUES ($1, $2, $3);`,
auth.EthAddr, auth.BJJ, auth.Signature, auth.EthAddr, auth.BJJ, auth.Signature,
@@ -70,7 +78,7 @@ func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error
func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) { func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
auth := new(common.AccountCreationAuth) auth := new(common.AccountCreationAuth)
return auth, tracerr.Wrap(meddler.QueryRow( return auth, tracerr.Wrap(meddler.QueryRow(
l2db.db, auth, l2db.dbRead, auth,
"SELECT * FROM account_creation_auth WHERE eth_addr = $1;", "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
addr, addr,
)) ))
@@ -99,7 +107,7 @@ func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
WHERE tx_pool.tx_id = tx_update.id; WHERE tx_pool.tx_id = tx_update.id;
` `
if len(txUpdates) > 0 { if len(txUpdates) > 0 {
if _, err := sqlx.NamedExec(l2db.db, query, txUpdates); err != nil { if _, err := sqlx.NamedExec(l2db.dbWrite, query, txUpdates); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
} }
@@ -158,7 +166,7 @@ func NewPoolL2TxWriteFromPoolL2Tx(tx *common.PoolL2Tx) *PoolL2TxWrite {
func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error { func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
insertTx := NewPoolL2TxWriteFromPoolL2Tx(tx) insertTx := NewPoolL2TxWriteFromPoolL2Tx(tx)
// insert tx // insert tx
return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", insertTx)) return tracerr.Wrap(meddler.Insert(l2db.dbWrite, "tx_pool", insertTx))
} }
// selectPoolTxCommon select part of queries to get common.PoolL2Tx // selectPoolTxCommon select part of queries to get common.PoolL2Tx
@@ -175,7 +183,7 @@ FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) { func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
tx := new(common.PoolL2Tx) tx := new(common.PoolL2Tx)
return tx, tracerr.Wrap(meddler.QueryRow( return tx, tracerr.Wrap(meddler.QueryRow(
l2db.db, tx, l2db.dbRead, tx,
selectPoolTxCommon+"WHERE tx_id = $1;", selectPoolTxCommon+"WHERE tx_id = $1;",
txID, txID,
)) ))
@@ -185,7 +193,7 @@ func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) { func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) {
var txs []*common.PoolL2Tx var txs []*common.PoolL2Tx
err := meddler.QueryAll( err := meddler.QueryAll(
l2db.db, &txs, l2db.dbRead, &txs,
selectPoolTxCommon+"WHERE state = $1", selectPoolTxCommon+"WHERE state = $1",
common.PoolL2TxStatePending, common.PoolL2TxStatePending,
) )
@@ -210,8 +218,8 @@ func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) er
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
query = l2db.db.Rebind(query) query = l2db.dbWrite.Rebind(query)
_, err = l2db.db.Exec(query, args...) _, err = l2db.dbWrite.Exec(query, args...)
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -233,8 +241,8 @@ func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) err
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
query = l2db.db.Rebind(query) query = l2db.dbWrite.Rebind(query)
_, err = l2db.db.Exec(query, args...) _, err = l2db.dbWrite.Exec(query, args...)
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -255,8 +263,8 @@ func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) e
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
query = l2db.db.Rebind(query) query = l2db.dbWrite.Rebind(query)
_, err = l2db.db.Exec(query, args...) _, err = l2db.dbWrite.Exec(query, args...)
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -264,7 +272,7 @@ func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) e
// of unique FromIdx // of unique FromIdx
func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.Idx, error) { func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.Idx, error) {
var idxs []common.Idx var idxs []common.Idx
rows, err := l2db.db.Query(`SELECT DISTINCT from_idx FROM tx_pool rows, err := l2db.dbRead.Query(`SELECT DISTINCT from_idx FROM tx_pool
WHERE state = $1;`, common.PoolL2TxStatePending) WHERE state = $1;`, common.PoolL2TxStatePending)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
@@ -305,7 +313,7 @@ func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxNonce, batchNu
// named query which works with slices, and doens't handle an extra // named query which works with slices, and doens't handle an extra
// individual argument. // individual argument.
query := fmt.Sprintf(invalidateOldNoncesQuery, batchNum) query := fmt.Sprintf(invalidateOldNoncesQuery, batchNum)
if _, err := sqlx.NamedExec(l2db.db, query, updatedAccounts); err != nil { if _, err := sqlx.NamedExec(l2db.dbWrite, query, updatedAccounts); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
return nil return nil
@@ -314,7 +322,7 @@ func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxNonce, batchNu
// Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
// The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error { func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
_, err := l2db.db.Exec( _, err := l2db.dbWrite.Exec(
`UPDATE tx_pool SET batch_num = NULL, state = $1 `UPDATE tx_pool SET batch_num = NULL, state = $1
WHERE (state = $2 OR state = $3 OR state = $4) AND batch_num > $5`, WHERE (state = $2 OR state = $3 OR state = $4) AND batch_num > $5`,
common.PoolL2TxStatePending, common.PoolL2TxStatePending,
@@ -330,7 +338,7 @@ func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
// it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded // it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded
func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) { func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
now := time.Now().UTC().Unix() now := time.Now().UTC().Unix()
_, err = l2db.db.Exec( _, err = l2db.dbWrite.Exec(
`DELETE FROM tx_pool WHERE ( `DELETE FROM tx_pool WHERE (
batch_num < $1 AND (state = $2 OR state = $3) batch_num < $1 AND (state = $2 OR state = $3)
) OR ( ) OR (
@@ -351,7 +359,7 @@ func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
// the `external_delete` column. An external process can set this column to // the `external_delete` column. An external process can set this column to
// true to instruct the coordinator to delete the tx when possible. // true to instruct the coordinator to delete the tx when possible.
func (l2db *L2DB) PurgeByExternalDelete() error { func (l2db *L2DB) PurgeByExternalDelete() error {
_, err := l2db.db.Exec( _, err := l2db.dbWrite.Exec(
`DELETE from tx_pool WHERE (external_delete = true AND state = $1);`, `DELETE from tx_pool WHERE (external_delete = true AND state = $1);`,
common.PoolL2TxStatePending, common.PoolL2TxStatePending,
) )

View File

@@ -37,11 +37,11 @@ func TestMain(m *testing.M) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
l2DB = NewL2DB(db, 10, 1000, 0.0, 24*time.Hour, nil) l2DB = NewL2DB(db, db, 10, 1000, 0.0, 24*time.Hour, nil)
apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second) apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second)
l2DBWithACC = NewL2DB(db, 10, 1000, 0.0, 24*time.Hour, apiConnCon) l2DBWithACC = NewL2DB(db, db, 10, 1000, 0.0, 24*time.Hour, apiConnCon)
test.WipeDB(l2DB.DB()) test.WipeDB(l2DB.DB())
historyDB = historydb.NewHistoryDB(db, nil) historyDB = historydb.NewHistoryDB(db, db, nil)
// Run tests // Run tests
result := m.Run() result := m.Run()
// Close DB // Close DB
@@ -660,7 +660,7 @@ func TestPurge(t *testing.T) {
} }
// Set batchNum keeped txs // Set batchNum keeped txs
for i := range keepedIDs { for i := range keepedIDs {
_, err = l2DB.db.Exec( _, err = l2DB.dbWrite.Exec(
"UPDATE tx_pool SET batch_num = $1 WHERE tx_id = $2;", "UPDATE tx_pool SET batch_num = $1 WHERE tx_id = $2;",
safeBatchNum, keepedIDs[i], safeBatchNum, keepedIDs[i],
) )
@@ -679,7 +679,7 @@ func TestPurge(t *testing.T) {
deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0) deleteTimestamp := time.Unix(time.Now().UTC().Unix()-int64(l2DB.ttl.Seconds()+float64(4*time.Second)), 0)
for _, id := range afterTTLIDs { for _, id := range afterTTLIDs {
// Set timestamp // Set timestamp
_, err = l2DB.db.Exec( _, err = l2DB.dbWrite.Exec(
"UPDATE tx_pool SET timestamp = $1, state = $2 WHERE tx_id = $3;", "UPDATE tx_pool SET timestamp = $1, state = $2 WHERE tx_id = $3;",
deleteTimestamp, common.PoolL2TxStatePending, id, deleteTimestamp, common.PoolL2TxStatePending, id,
) )
@@ -797,7 +797,7 @@ func TestPurgeByExternalDelete(t *testing.T) {
require.NoError(t, l2DB.StartForging( require.NoError(t, l2DB.StartForging(
[]common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID}, []common.TxID{txs[4].TxID, txs[5].TxID, txs[6].TxID, txs[7].TxID},
1)) 1))
_, err = l2DB.db.Exec( _, err = l2DB.dbWrite.Exec(
`UPDATE tx_pool SET external_delete = true WHERE `UPDATE tx_pool SET external_delete = true WHERE
tx_id IN ($1, $2, $3, $4) tx_id IN ($1, $2, $3, $4)
;`, ;`,

1
go.mod
View File

@@ -21,6 +21,7 @@ require (
github.com/miguelmota/go-ethereum-hdwallet v0.0.0-20200123000308-a60dcd172b4c github.com/miguelmota/go-ethereum-hdwallet v0.0.0-20200123000308-a60dcd172b4c
github.com/mitchellh/copystructure v1.0.0 github.com/mitchellh/copystructure v1.0.0
github.com/mitchellh/mapstructure v1.3.0 github.com/mitchellh/mapstructure v1.3.0
github.com/prometheus/client_golang v1.3.0
github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351 github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351
github.com/russross/meddler v1.0.0 github.com/russross/meddler v1.0.0
github.com/stretchr/testify v1.6.1 github.com/stretchr/testify v1.6.1

6
go.sum
View File

@@ -68,6 +68,7 @@ github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZw
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ= github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ=
@@ -444,6 +445,7 @@ github.com/mattn/go-sqlite3 v1.12.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsO
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U= github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg= github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ= github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
@@ -544,23 +546,27 @@ github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g= github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQP1xR9D75/vuwEF3g=
github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc=
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.1.0 h1:ElTg5tNp4DqfV7UQjDqv2+RJlNzsDtvNAWccbItceIE=
github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.1.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY=
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA=

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@@ -64,7 +65,8 @@ type Node struct {
// General // General
cfg *config.Node cfg *config.Node
mode Mode mode Mode
sqlConn *sqlx.DB sqlConnRead *sqlx.DB
sqlConnWrite *sqlx.DB
ctx context.Context ctx context.Context
wg sync.WaitGroup wg sync.WaitGroup
cancel context.CancelFunc cancel context.CancelFunc
@@ -74,15 +76,34 @@ type Node struct {
func NewNode(mode Mode, cfg *config.Node) (*Node, error) { func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
meddler.Debug = cfg.Debug.MeddlerLogs meddler.Debug = cfg.Debug.MeddlerLogs
// Stablish DB connection // Stablish DB connection
db, err := dbUtils.InitSQLDB( dbWrite, err := dbUtils.InitSQLDB(
cfg.PostgreSQL.Port, cfg.PostgreSQL.PortWrite,
cfg.PostgreSQL.Host, cfg.PostgreSQL.HostWrite,
cfg.PostgreSQL.User, cfg.PostgreSQL.UserWrite,
cfg.PostgreSQL.Password, cfg.PostgreSQL.PasswordWrite,
cfg.PostgreSQL.Name, cfg.PostgreSQL.NameWrite,
) )
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
}
var dbRead *sqlx.DB
if cfg.PostgreSQL.HostRead == "" {
dbRead = dbWrite
} else if cfg.PostgreSQL.HostRead == cfg.PostgreSQL.HostWrite {
return nil, tracerr.Wrap(fmt.Errorf(
"PostgreSQL.HostRead and PostgreSQL.HostWrite must be different",
))
} else {
dbRead, err = dbUtils.InitSQLDB(
cfg.PostgreSQL.PortRead,
cfg.PostgreSQL.HostRead,
cfg.PostgreSQL.UserRead,
cfg.PostgreSQL.PasswordRead,
cfg.PostgreSQL.NameRead,
)
if err != nil {
return nil, tracerr.Wrap(fmt.Errorf("dbUtils.InitSQLDB: %w", err))
}
} }
var apiConnCon *dbUtils.APIConnectionController var apiConnCon *dbUtils.APIConnectionController
if cfg.API.Explorer || mode == ModeCoordinator { if cfg.API.Explorer || mode == ModeCoordinator {
@@ -92,7 +113,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
) )
} }
historyDB := historydb.NewHistoryDB(db, apiConnCon) historyDB := historydb.NewHistoryDB(dbRead, dbWrite, apiConnCon)
ethClient, err := ethclient.Dial(cfg.Web3.URL) ethClient, err := ethclient.Dial(cfg.Web3.URL)
if err != nil { if err != nil {
@@ -201,7 +222,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
var l2DB *l2db.L2DB var l2DB *l2db.L2DB
if mode == ModeCoordinator { if mode == ModeCoordinator {
l2DB = l2db.NewL2DB( l2DB = l2db.NewL2DB(
db, dbRead, dbWrite,
cfg.Coordinator.L2DB.SafetyPeriod, cfg.Coordinator.L2DB.SafetyPeriod,
cfg.Coordinator.L2DB.MaxTxs, cfg.Coordinator.L2DB.MaxTxs,
cfg.Coordinator.L2DB.MinFeeUSD, cfg.Coordinator.L2DB.MinFeeUSD,
@@ -315,6 +336,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
PurgeBlockDelay: cfg.Coordinator.L2DB.PurgeBlockDelay, PurgeBlockDelay: cfg.Coordinator.L2DB.PurgeBlockDelay,
InvalidateBlockDelay: cfg.Coordinator.L2DB.InvalidateBlockDelay, InvalidateBlockDelay: cfg.Coordinator.L2DB.InvalidateBlockDelay,
}, },
ForgeBatchGasCost: cfg.Coordinator.EthClient.ForgeBatchGasCost,
VerifierIdx: uint8(verifierIdx), VerifierIdx: uint8(verifierIdx),
TxProcessorConfig: txProcessorCfg, TxProcessorConfig: txProcessorCfg,
}, },
@@ -391,7 +413,8 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
sync: sync, sync: sync,
cfg: cfg, cfg: cfg,
mode: mode, mode: mode,
sqlConn: db, sqlConnRead: dbRead,
sqlConnWrite: dbWrite,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
}, nil }, nil
@@ -444,16 +467,20 @@ func NewNodeAPI(
// cancelation. // cancelation.
func (a *NodeAPI) Run(ctx context.Context) error { func (a *NodeAPI) Run(ctx context.Context) error {
server := &http.Server{ server := &http.Server{
Addr: a.addr,
Handler: a.engine, Handler: a.engine,
// TODO: Figure out best parameters for production // TODO: Figure out best parameters for production
ReadTimeout: 30 * time.Second, //nolint:gomnd ReadTimeout: 30 * time.Second, //nolint:gomnd
WriteTimeout: 30 * time.Second, //nolint:gomnd WriteTimeout: 30 * time.Second, //nolint:gomnd
MaxHeaderBytes: 1 << 20, //nolint:gomnd MaxHeaderBytes: 1 << 20, //nolint:gomnd
} }
go func() { listener, err := net.Listen("tcp", a.addr)
if err != nil {
return tracerr.Wrap(err)
}
log.Infof("NodeAPI is ready at %v", a.addr) log.Infof("NodeAPI is ready at %v", a.addr)
if err := server.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed { go func() {
if err := server.Serve(listener); err != nil &&
tracerr.Unwrap(err) != http.ErrServerClosed {
log.Fatalf("Listen: %s\n", err) log.Fatalf("Listen: %s\n", err)
} }
}() }()
@@ -649,6 +676,10 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1) n.wg.Add(1)
go func() { go func() {
// Do an initial update on startup
if err := n.nodeAPI.api.UpdateMetrics(); err != nil {
log.Errorw("API.UpdateMetrics", "err", err)
}
for { for {
select { select {
case <-n.ctx.Done(): case <-n.ctx.Done():
@@ -665,6 +696,10 @@ func (n *Node) StartNodeAPI() {
n.wg.Add(1) n.wg.Add(1)
go func() { go func() {
// Do an initial update on startup
if err := n.nodeAPI.api.UpdateRecommendedFee(); err != nil {
log.Errorw("API.UpdateRecommendedFee", "err", err)
}
for { for {
select { select {
case <-n.ctx.Done(): case <-n.ctx.Done():

View File

@@ -20,7 +20,7 @@ func TestPriceUpdater(t *testing.T) {
pass := os.Getenv("POSTGRES_PASS") pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
assert.NoError(t, err) assert.NoError(t, err)
historyDB := historydb.NewHistoryDB(db, nil) historyDB := historydb.NewHistoryDB(db, db, nil)
// Clean DB // Clean DB
test.WipeDB(historyDB.DB()) test.WipeDB(historyDB.DB())
// Populate DB // Populate DB

44
synchronizer/metrics.go Normal file
View File

@@ -0,0 +1,44 @@
package synchronizer
import "github.com/prometheus/client_golang/prometheus"
var (
metricReorgsCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "sync_reorgs",
Help: "",
},
)
metricSyncedLastBlockNum = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "sync_synced_last_block_num",
Help: "",
},
)
metricEthLastBlockNum = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "sync_eth_last_block_num",
Help: "",
},
)
metricSyncedLastBatchNum = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "sync_synced_last_batch_num",
Help: "",
},
)
metricEthLastBatchNum = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "sync_eth_last_batch_num",
Help: "",
},
)
)
func init() {
prometheus.MustRegister(metricReorgsCount)
prometheus.MustRegister(metricSyncedLastBlockNum)
prometheus.MustRegister(metricEthLastBlockNum)
prometheus.MustRegister(metricSyncedLastBatchNum)
prometheus.MustRegister(metricEthLastBatchNum)
}

View File

@@ -662,12 +662,16 @@ func (s *Synchronizer) Sync2(ctx context.Context,
} }
for _, batchData := range rollupData.Batches { for _, batchData := range rollupData.Batches {
metricSyncedLastBatchNum.Set(float64(batchData.Batch.BatchNum))
metricEthLastBatchNum.Set(float64(s.stats.Eth.LastBatchNum))
log.Debugw("Synced batch", log.Debugw("Synced batch",
"syncLastBatch", batchData.Batch.BatchNum, "syncLastBatch", batchData.Batch.BatchNum,
"syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum), "syncBatchesPerc", s.stats.batchesPerc(batchData.Batch.BatchNum),
"ethLastBatch", s.stats.Eth.LastBatchNum, "ethLastBatch", s.stats.Eth.LastBatchNum,
) )
} }
metricSyncedLastBlockNum.Set(float64(s.stats.Sync.LastBlock.Num))
metricEthLastBlockNum.Set(float64(s.stats.Eth.LastBlock.Num))
log.Debugw("Synced block", log.Debugw("Synced block",
"syncLastBlockNum", s.stats.Sync.LastBlock.Num, "syncLastBlockNum", s.stats.Sync.LastBlock.Num,
"syncBlocksPerc", s.stats.blocksPerc(), "syncBlocksPerc", s.stats.blocksPerc(),

View File

@@ -315,7 +315,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) {
pass := os.Getenv("POSTGRES_PASS") pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err) require.NoError(t, err)
historyDB := historydb.NewHistoryDB(db, nil) historyDB := historydb.NewHistoryDB(db, db, nil)
// Clear DB // Clear DB
test.WipeDB(historyDB.DB()) test.WipeDB(historyDB.DB())

View File

@@ -2,6 +2,7 @@ package debugapi
import ( import (
"context" "context"
"net"
"net/http" "net/http"
"time" "time"
@@ -12,6 +13,7 @@ import (
"github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/tracerr" "github.com/hermeznetwork/tracerr"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
func handleNoRoute(c *gin.Context) { func handleNoRoute(c *gin.Context) {
@@ -107,6 +109,8 @@ func (a *DebugAPI) Run(ctx context.Context) error {
api.Use(cors.Default()) api.Use(cors.Default())
debugAPI := api.Group("/debug") debugAPI := api.Group("/debug")
debugAPI.GET("/metrics", gin.WrapH(promhttp.Handler()))
debugAPI.GET("sdb/batchnum", a.handleCurrentBatch) debugAPI.GET("sdb/batchnum", a.handleCurrentBatch)
debugAPI.GET("sdb/mtroot", a.handleMTRoot) debugAPI.GET("sdb/mtroot", a.handleMTRoot)
// Accounts returned by these endpoints will always have BatchNum = 0, // Accounts returned by these endpoints will always have BatchNum = 0,
@@ -118,16 +122,20 @@ func (a *DebugAPI) Run(ctx context.Context) error {
debugAPI.GET("sync/stats", a.handleSyncStats) debugAPI.GET("sync/stats", a.handleSyncStats)
debugAPIServer := &http.Server{ debugAPIServer := &http.Server{
Addr: a.addr,
Handler: api, Handler: api,
// Use some hardcoded numberes that are suitable for testing // Use some hardcoded numbers that are suitable for testing
ReadTimeout: 30 * time.Second, //nolint:gomnd ReadTimeout: 30 * time.Second, //nolint:gomnd
WriteTimeout: 30 * time.Second, //nolint:gomnd WriteTimeout: 30 * time.Second, //nolint:gomnd
MaxHeaderBytes: 1 << 20, //nolint:gomnd MaxHeaderBytes: 1 << 20, //nolint:gomnd
} }
go func() { listener, err := net.Listen("tcp", a.addr)
if err != nil {
return tracerr.Wrap(err)
}
log.Infof("DebugAPI is ready at %v", a.addr) log.Infof("DebugAPI is ready at %v", a.addr)
if err := debugAPIServer.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed { go func() {
if err := debugAPIServer.Serve(listener); err != nil &&
tracerr.Unwrap(err) != http.ErrServerClosed {
log.Fatalf("Listen: %s\n", err) log.Fatalf("Listen: %s\n", err)
} }
}() }()

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@@ -145,7 +146,7 @@ const longWaitDuration = 999 * time.Hour
// const provingDuration = 2 * time.Second // const provingDuration = 2 * time.Second
func (s *Mock) runProver(ctx context.Context) { func (s *Mock) runProver(ctx context.Context) {
waitCh := time.After(longWaitDuration) timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -153,21 +154,27 @@ func (s *Mock) runProver(ctx context.Context) {
case msg := <-s.msgCh: case msg := <-s.msgCh:
switch msg.value { switch msg.value {
case "cancel": case "cancel":
waitCh = time.After(longWaitDuration) if !timer.Stop() {
<-timer.C
}
timer.Reset(longWaitDuration)
s.Lock() s.Lock()
if !s.status.IsReady() { if !s.status.IsReady() {
s.status = prover.StatusCodeAborted s.status = prover.StatusCodeAborted
} }
s.Unlock() s.Unlock()
case "prove": case "prove":
waitCh = time.After(s.provingDuration) if !timer.Stop() {
<-timer.C
}
timer.Reset(s.provingDuration)
s.Lock() s.Lock()
s.status = prover.StatusCodeBusy s.status = prover.StatusCodeBusy
s.Unlock() s.Unlock()
} }
msg.ackCh <- true msg.ackCh <- true
case <-waitCh: case <-timer.C:
waitCh = time.After(longWaitDuration) timer.Reset(longWaitDuration)
s.Lock() s.Lock()
if s.status != prover.StatusCodeBusy { if s.status != prover.StatusCodeBusy {
s.Unlock() s.Unlock()
@@ -202,16 +209,20 @@ func (s *Mock) Run(ctx context.Context) error {
apiGroup.POST("/cancel", s.handleCancel) apiGroup.POST("/cancel", s.handleCancel)
debugAPIServer := &http.Server{ debugAPIServer := &http.Server{
Addr: s.addr,
Handler: api, Handler: api,
// Use some hardcoded numberes that are suitable for testing // Use some hardcoded numberes that are suitable for testing
ReadTimeout: 30 * time.Second, //nolint:gomnd ReadTimeout: 30 * time.Second, //nolint:gomnd
WriteTimeout: 30 * time.Second, //nolint:gomnd WriteTimeout: 30 * time.Second, //nolint:gomnd
MaxHeaderBytes: 1 << 20, //nolint:gomnd MaxHeaderBytes: 1 << 20, //nolint:gomnd
} }
go func() { listener, err := net.Listen("tcp", s.addr)
if err != nil {
return tracerr.Wrap(err)
}
log.Infof("prover.MockServer is ready at %v", s.addr) log.Infof("prover.MockServer is ready at %v", s.addr)
if err := debugAPIServer.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed { go func() {
if err := debugAPIServer.Serve(listener); err != nil &&
tracerr.Unwrap(err) != http.ErrServerClosed {
log.Fatalf("Listen: %s\n", err) log.Fatalf("Listen: %s\n", err)
} }
}() }()

View File

@@ -38,7 +38,7 @@ func addTokens(t *testing.T, tc *til.Context, db *sqlx.DB) {
}) })
} }
hdb := historydb.NewHistoryDB(db, nil) hdb := historydb.NewHistoryDB(db, db, nil)
assert.NoError(t, hdb.AddBlock(&common.Block{ assert.NoError(t, hdb.AddBlock(&common.Block{
Num: 1, Num: 1,
})) }))
@@ -75,7 +75,7 @@ func initTxSelector(t *testing.T, chainID uint16, hermezContractAddr ethCommon.A
pass := os.Getenv("POSTGRES_PASS") pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err) require.NoError(t, err)
l2DB := l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil) l2DB := l2db.NewL2DB(db, db, 10, 100, 0.0, 24*time.Hour, nil)
dir, err := ioutil.TempDir("", "tmpSyncDB") dir, err := ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err) require.NoError(t, err)

View File

@@ -605,7 +605,7 @@ func (tp *TxProcessor) ProcessL1Tx(exitTree *merkletree.MerkleTree, tx *common.L
// execute exit flow // execute exit flow
// coordIdxsMap is 'nil', as at L1Txs there is no L2 fees // coordIdxsMap is 'nil', as at L1Txs there is no L2 fees
exitAccount, newExit, err := tp.applyExit(nil, nil, exitTree, tx.Tx()) exitAccount, newExit, err := tp.applyExit(nil, nil, exitTree, tx.Tx(), tx.Amount)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return nil, nil, false, nil, tracerr.Wrap(err) return nil, nil, false, nil, tracerr.Wrap(err)
@@ -730,7 +730,7 @@ func (tp *TxProcessor) ProcessL2Tx(coordIdxsMap map[common.TokenID]common.Idx,
} }
case common.TxTypeExit: case common.TxTypeExit:
// execute exit flow // execute exit flow
exitAccount, newExit, err := tp.applyExit(coordIdxsMap, collectedFees, exitTree, tx.Tx()) exitAccount, newExit, err := tp.applyExit(coordIdxsMap, collectedFees, exitTree, tx.Tx(), tx.Amount)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return nil, nil, false, tracerr.Wrap(err) return nil, nil, false, tracerr.Wrap(err)
@@ -1104,7 +1104,7 @@ func (tp *TxProcessor) applyCreateAccountDepositTransfer(tx *common.L1Tx) error
// new Leaf in the ExitTree. // new Leaf in the ExitTree.
func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx, func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
collectedFees map[common.TokenID]*big.Int, exitTree *merkletree.MerkleTree, collectedFees map[common.TokenID]*big.Int, exitTree *merkletree.MerkleTree,
tx common.Tx) (*common.Account, bool, error) { tx common.Tx, originalAmount *big.Int) (*common.Account, bool, error) {
// 0. subtract tx.Amount from current Account in StateMT // 0. subtract tx.Amount from current Account in StateMT
// add the tx.Amount into the Account (tx.FromIdx) in the ExitMT // add the tx.Amount into the Account (tx.FromIdx) in the ExitMT
acc, err := tp.s.GetAccount(tx.FromIdx) acc, err := tp.s.GetAccount(tx.FromIdx)
@@ -1174,7 +1174,17 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
if exitTree == nil { if exitTree == nil {
return nil, false, nil return nil, false, nil
} }
if tx.Amount.Cmp(big.NewInt(0)) == 0 { // Amount == 0
// Do not add the Exit when Amount=0, not EffectiveAmount=0. In
// txprocessor.applyExit function, the tx.Amount is in reality the
// EffectiveAmount, that's why is used here the originalAmount
// parameter, which contains the real value of the tx.Amount (not
// tx.EffectiveAmount). This is a particularity of the approach of the
// circuit, the idea will be in the future to update the circuit and
// when Amount>0 but EffectiveAmount=0, to not add the Exit in the
// Exits MerkleTree, but for the moment the Go code is adapted to the
// circuit.
if originalAmount.Cmp(big.NewInt(0)) == 0 { // Amount == 0
// if the Exit Amount==0, the Exit is not added to the ExitTree // if the Exit Amount==0, the Exit is not added to the ExitTree
return nil, false, nil return nil, false, nil
} }
@@ -1187,6 +1197,8 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
exitAccount := &common.Account{ exitAccount := &common.Account{
TokenID: acc.TokenID, TokenID: acc.TokenID,
Nonce: common.Nonce(0), Nonce: common.Nonce(0),
// as is a common.Tx, the Amount is already an
// EffectiveAmount
Balance: tx.Amount, Balance: tx.Amount,
BJJ: acc.BJJ, BJJ: acc.BJJ,
EthAddr: acc.EthAddr, EthAddr: acc.EthAddr,
@@ -1200,6 +1212,8 @@ func (tp *TxProcessor) applyExit(coordIdxsMap map[common.TokenID]common.Idx,
tp.zki.Sign2[tp.i] = big.NewInt(1) tp.zki.Sign2[tp.i] = big.NewInt(1)
} }
tp.zki.Ay2[tp.i] = accBJJY tp.zki.Ay2[tp.i] = accBJJY
// as is a common.Tx, the Amount is already an
// EffectiveAmount
tp.zki.Balance2[tp.i] = tx.Amount tp.zki.Balance2[tp.i] = tx.Amount
tp.zki.EthAddr2[tp.i] = common.EthAddrToBigInt(acc.EthAddr) tp.zki.EthAddr2[tp.i] = common.EthAddrToBigInt(acc.EthAddr)
// as Leaf didn't exist in the ExitTree, set NewExit[i]=1 // as Leaf didn't exist in the ExitTree, set NewExit[i]=1

View File

@@ -1018,22 +1018,22 @@ func TestUpdatedAccounts(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
set := ` set := `
Type: Blockchain Type: Blockchain
AddToken(1) AddToken(1)
CreateAccountCoordinator(0) Coord // 256 CreateAccountCoordinator(0) Coord // 256
CreateAccountCoordinator(1) Coord // 257 CreateAccountCoordinator(1) Coord // 257
> batch // 1 > batch // 1
CreateAccountDeposit(0) A: 50 // 258 CreateAccountDeposit(0) A: 50 // 258
CreateAccountDeposit(0) B: 60 // 259 CreateAccountDeposit(0) B: 60 // 259
CreateAccountDeposit(1) A: 70 // 260 CreateAccountDeposit(1) A: 70 // 260
CreateAccountDeposit(1) B: 80 // 261 CreateAccountDeposit(1) B: 80 // 261
> batchL1 // 2 > batchL1 // 2
> batchL1 // 3 > batchL1 // 3
Transfer(0) A-B: 5 (126) Transfer(0) A-B: 5 (126)
> batch // 4 > batch // 4
Exit(1) B: 5 (126) Exit(1) B: 5 (126)
> batch // 5 > batch // 5
> block > block
` `
chainID := uint16(0) chainID := uint16(0)

File diff suppressed because one or more lines are too long

53
txselector/metrics.go Normal file
View File

@@ -0,0 +1,53 @@
package txselector
import "github.com/prometheus/client_golang/prometheus"
var (
metricGetL2TxSelection = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "txsel_get_l2_txselecton_total",
Help: "",
},
)
metricGetL1L2TxSelection = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "txsel_get_l1_l2_txselecton_total",
Help: "",
},
)
metricSelectedL1CoordinatorTxs = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "txsel_selected_l1_coordinator_txs",
Help: "",
},
)
metricSelectedL1UserTxs = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "txsel_selected_l1_user_txs",
Help: "",
},
)
metricSelectedL2Txs = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "txsel_selected_l2_txs",
Help: "",
},
)
metricDiscardedL2Txs = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "txsel_discarded_l2_txs",
Help: "",
},
)
)
func init() {
prometheus.MustRegister(metricGetL2TxSelection)
prometheus.MustRegister(metricGetL1L2TxSelection)
prometheus.MustRegister(metricSelectedL1CoordinatorTxs)
prometheus.MustRegister(metricSelectedL1UserTxs)
prometheus.MustRegister(metricSelectedL2Txs)
prometheus.MustRegister(metricDiscardedL2Txs)
}

View File

@@ -133,9 +133,11 @@ func (txsel *TxSelector) coordAccountForTokenID(l1CoordinatorTxs []common.L1Tx,
// included in the next batch. // included in the next batch.
func (txsel *TxSelector) GetL2TxSelection(selectionConfig *SelectionConfig) ([]common.Idx, func (txsel *TxSelector) GetL2TxSelection(selectionConfig *SelectionConfig) ([]common.Idx,
[][]byte, []common.L1Tx, []common.PoolL2Tx, []common.PoolL2Tx, error) { [][]byte, []common.L1Tx, []common.PoolL2Tx, []common.PoolL2Tx, error) {
coordIdxs, accCreationAuths, _, l1CoordinatorTxs, l2Txs, discardedL2Txs, err := metricGetL2TxSelection.Inc()
txsel.GetL1L2TxSelection(selectionConfig, []common.L1Tx{}) coordIdxs, accCreationAuths, _, l1CoordinatorTxs, l2Txs,
return coordIdxs, accCreationAuths, l1CoordinatorTxs, l2Txs, discardedL2Txs, tracerr.Wrap(err) discardedL2Txs, err := txsel.getL1L2TxSelection(selectionConfig, []common.L1Tx{})
return coordIdxs, accCreationAuths, l1CoordinatorTxs, l2Txs,
discardedL2Txs, tracerr.Wrap(err)
} }
// GetL1L2TxSelection returns the selection of L1 + L2 txs. // GetL1L2TxSelection returns the selection of L1 + L2 txs.
@@ -147,6 +149,16 @@ func (txsel *TxSelector) GetL2TxSelection(selectionConfig *SelectionConfig) ([]c
// creation exists. The L1UserTxs, L1CoordinatorTxs, PoolL2Txs that will be // creation exists. The L1UserTxs, L1CoordinatorTxs, PoolL2Txs that will be
// included in the next batch. // included in the next batch.
func (txsel *TxSelector) GetL1L2TxSelection(selectionConfig *SelectionConfig, func (txsel *TxSelector) GetL1L2TxSelection(selectionConfig *SelectionConfig,
l1UserTxs []common.L1Tx) ([]common.Idx, [][]byte, []common.L1Tx,
[]common.L1Tx, []common.PoolL2Tx, []common.PoolL2Tx, error) {
metricGetL1L2TxSelection.Inc()
coordIdxs, accCreationAuths, l1UserTxs, l1CoordinatorTxs, l2Txs,
discardedL2Txs, err := txsel.getL1L2TxSelection(selectionConfig, l1UserTxs)
return coordIdxs, accCreationAuths, l1UserTxs, l1CoordinatorTxs, l2Txs,
discardedL2Txs, tracerr.Wrap(err)
}
func (txsel *TxSelector) getL1L2TxSelection(selectionConfig *SelectionConfig,
l1UserTxs []common.L1Tx) ([]common.Idx, [][]byte, []common.L1Tx, l1UserTxs []common.L1Tx) ([]common.Idx, [][]byte, []common.L1Tx,
[]common.L1Tx, []common.PoolL2Tx, []common.PoolL2Tx, error) { []common.L1Tx, []common.PoolL2Tx, []common.PoolL2Tx, error) {
// WIP.0: the TxSelector is not optimized and will need a redesign. The // WIP.0: the TxSelector is not optimized and will need a redesign. The
@@ -452,6 +464,11 @@ func (txsel *TxSelector) GetL1L2TxSelection(selectionConfig *SelectionConfig,
return nil, nil, nil, nil, nil, nil, tracerr.Wrap(err) return nil, nil, nil, nil, nil, nil, tracerr.Wrap(err)
} }
metricSelectedL1CoordinatorTxs.Set(float64(len(l1CoordinatorTxs)))
metricSelectedL1UserTxs.Set(float64(len(l1UserTxs)))
metricSelectedL2Txs.Set(float64(len(finalL2Txs)))
metricDiscardedL2Txs.Set(float64(len(discardedL2Txs)))
return coordIdxs, accAuths, l1UserTxs, l1CoordinatorTxs, finalL2Txs, discardedL2Txs, nil return coordIdxs, accAuths, l1UserTxs, l1CoordinatorTxs, finalL2Txs, discardedL2Txs, nil
} }

View File

@@ -29,7 +29,7 @@ func initTest(t *testing.T, chainID uint16, hermezContractAddr ethCommon.Address
pass := os.Getenv("POSTGRES_PASS") pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err) require.NoError(t, err)
l2DB := l2db.NewL2DB(db, 10, 100, 0.0, 24*time.Hour, nil) l2DB := l2db.NewL2DB(db, db, 10, 100, 0.0, 24*time.Hour, nil)
dir, err := ioutil.TempDir("", "tmpdb") dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err) require.NoError(t, err)
@@ -106,7 +106,7 @@ func addTokens(t *testing.T, tc *til.Context, db *sqlx.DB) {
}) })
} }
hdb := historydb.NewHistoryDB(db, nil) hdb := historydb.NewHistoryDB(db, db, nil)
assert.NoError(t, hdb.AddBlock(&common.Block{ assert.NoError(t, hdb.AddBlock(&common.Block{
Num: 1, Num: 1,
})) }))