From ac1fd9acf74912720b3eb9a6c071c7dba9b8dcb7 Mon Sep 17 00:00:00 2001 From: Arnau B Date: Mon, 8 Feb 2021 18:50:32 +0100 Subject: [PATCH] Add semaphore for API queries to SQL --- api/accountcreationauths.go | 2 +- api/api_test.go | 83 ++- api/batch.go | 2 +- api/handlers.go | 20 +- api/slots.go | 12 +- api/state.go | 8 +- api/token.go | 4 +- api/txshistory.go | 4 +- api/txspool.go | 2 +- cli/node/cfg.buidler.toml | 2 + config/config.go | 7 +- coordinator/coordinator_test.go | 4 +- coordinator/purger_test.go | 2 +- db/historydb/apiqueries.go | 1026 +++++++++++++++++++++++++++++ db/historydb/historydb.go | 901 +------------------------ db/historydb/historydb_test.go | 32 +- db/l2db/apiqueries.go | 85 +++ db/l2db/l2db.go | 41 +- db/l2db/l2db_test.go | 23 +- db/utils.go | 29 + go.mod | 1 + go.sum | 3 + node/node.go | 10 +- priceupdater/priceupdater_test.go | 5 +- synchronizer/synchronizer_test.go | 2 +- test/zkproof/flows_test.go | 4 +- txselector/txselector_test.go | 4 +- 27 files changed, 1338 insertions(+), 980 deletions(-) create mode 100644 db/historydb/apiqueries.go create mode 100644 db/l2db/apiqueries.go diff --git a/api/accountcreationauths.go b/api/accountcreationauths.go index e2c17ed..ed7e3d8 100644 --- a/api/accountcreationauths.go +++ b/api/accountcreationauths.go @@ -26,7 +26,7 @@ func (a *API) postAccountCreationAuth(c *gin.Context) { return } // Insert to DB - if err := a.l2.AddAccountCreationAuth(commonAuth); err != nil { + if err := a.l2.AddAccountCreationAuthAPI(commonAuth); err != nil { retSQLErr(err, c) return } diff --git a/api/api_test.go b/api/api_test.go index 97dee27..bcd6124 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "strconv" + "sync" "testing" "time" @@ -27,6 +28,7 @@ import ( "github.com/hermeznetwork/hermez-node/test/til" "github.com/hermeznetwork/hermez-node/test/txsets" "github.com/hermeznetwork/tracerr" + "github.com/stretchr/testify/require" ) // Pendinger is an interface that allows getting last returned item ID and PendingItems to be used for building fromItem @@ -199,7 +201,8 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - hdb := historydb.NewHistoryDB(database) + apiConnCon := db.NewAPICnnectionController(1, time.Second) + hdb := historydb.NewHistoryDB(database, apiConnCon) if err != nil { panic(err) } @@ -218,7 +221,7 @@ func TestMain(m *testing.M) { panic(err) } // L2DB - l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour) + l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour, apiConnCon) test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB // Config (smart contract constants) chainID := uint16(0) @@ -574,6 +577,82 @@ func TestMain(m *testing.M) { os.Exit(result) } +func TestTimeout(t *testing.T) { + pass := os.Getenv("POSTGRES_PASS") + databaseTO, err := db.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") + require.NoError(t, err) + apiConnConTO := db.NewAPICnnectionController(1, 100*time.Millisecond) + hdbTO := historydb.NewHistoryDB(databaseTO, apiConnConTO) + require.NoError(t, err) + // L2DB + l2DBTO := l2db.NewL2DB(databaseTO, 10, 1000, 24*time.Hour, apiConnConTO) + + // API + apiGinTO := gin.Default() + finishWait := make(chan interface{}) + startWait := make(chan interface{}) + apiGinTO.GET("/wait", func(c *gin.Context) { + cancel, err := apiConnConTO.Acquire() + defer cancel() + require.NoError(t, err) + defer apiConnConTO.Release() + startWait <- nil + <-finishWait + }) + // Start server + serverTO := &http.Server{Addr: ":4444", Handler: apiGinTO} + go func() { + if err := serverTO.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed { + require.NoError(t, err) + } + }() + _config := getConfigTest(0) + _, err = NewAPI( + true, + true, + apiGinTO, + hdbTO, + nil, + l2DBTO, + &_config, + ) + require.NoError(t, err) + + client := &http.Client{} + httpReq, err := http.NewRequest("GET", "http://localhost:4444/tokens", nil) + require.NoError(t, err) + httpReqWait, err := http.NewRequest("GET", "http://localhost:4444/wait", nil) + require.NoError(t, err) + // Request that will get timed out + var wg sync.WaitGroup + wg.Add(1) + go func() { + // Request that will make the API busy + _, err = client.Do(httpReqWait) + require.NoError(t, err) + wg.Done() + }() + <-startWait + resp, err := client.Do(httpReq) + require.NoError(t, err) + require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + defer resp.Body.Close() //nolint + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + // Unmarshal body into return struct + msg := &errorMsg{} + err = json.Unmarshal(body, msg) + require.NoError(t, err) + // Check that the error was the expected down + require.Equal(t, errSQLTimeout, msg.Message) + finishWait <- nil + + // Stop server + wg.Wait() + require.NoError(t, serverTO.Shutdown(context.Background())) + require.NoError(t, databaseTO.Close()) +} + func doGoodReqPaginated( path, order string, iterStruct Pendinger, diff --git a/api/batch.go b/api/batch.go index fe09ba7..48be3dd 100644 --- a/api/batch.go +++ b/api/batch.go @@ -108,7 +108,7 @@ func (a *API) getFullBatch(c *gin.Context) { } // Fetch txs forged in the batch from historyDB maxTxsPerBatch := uint(2048) //nolint:gomnd - txs, _, err := a.h.GetHistoryTxs( + txs, _, err := a.h.GetTxsAPI( nil, nil, nil, nil, batchNum, nil, nil, &maxTxsPerBatch, historydb.OrderAsc, ) if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { diff --git a/api/handlers.go b/api/handlers.go index 933079e..90ce3a7 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -30,6 +30,12 @@ const ( // Error for duplicated key errDuplicatedKey = "Item already exists" + + // Error for timeout due to SQL connection + errSQLTimeout = "The node is under heavy preasure, please try again later" + + // Error message returned when context reaches timeout + errCtxTimeout = "context deadline exceeded" ) var ( @@ -38,16 +44,20 @@ var ( ) func retSQLErr(err error, c *gin.Context) { - log.Warn("HTTP API SQL request error", "err", err) - if sqlErr, ok := tracerr.Unwrap(err).(*pq.Error); ok { + log.Warnw("HTTP API SQL request error", "err", err) + errMsg := tracerr.Unwrap(err).Error() + if errMsg == errCtxTimeout { + c.JSON(http.StatusServiceUnavailable, errorMsg{ + Message: errSQLTimeout, + }) + } else if sqlErr, ok := tracerr.Unwrap(err).(*pq.Error); ok { // https://www.postgresql.org/docs/current/errcodes-appendix.html if sqlErr.Code == "23505" { c.JSON(http.StatusInternalServerError, errorMsg{ Message: errDuplicatedKey, }) } - } - if tracerr.Unwrap(err) == sql.ErrNoRows { + } else if tracerr.Unwrap(err) == sql.ErrNoRows { c.JSON(http.StatusNotFound, errorMsg{ Message: err.Error(), }) @@ -59,7 +69,7 @@ func retSQLErr(err error, c *gin.Context) { } func retBadReq(err error, c *gin.Context) { - log.Warn("HTTP API Bad request error", "err", err) + log.Warnw("HTTP API Bad request error", "err", err) c.JSON(http.StatusBadRequest, errorMsg{ Message: err.Error(), }) diff --git a/api/slots.go b/api/slots.go index 6842f51..877a985 100644 --- a/api/slots.go +++ b/api/slots.go @@ -97,12 +97,12 @@ func (a *API) getSlot(c *gin.Context) { retBadReq(err, c) return } - currentBlock, err := a.h.GetLastBlock() + currentBlock, err := a.h.GetLastBlockAPI() if err != nil { retBadReq(err, c) return } - auctionVars, err := a.h.GetAuctionVars() + auctionVars, err := a.h.GetAuctionVarsAPI() if err != nil { retBadReq(err, c) return @@ -200,12 +200,12 @@ func (a *API) getSlots(c *gin.Context) { return } - currentBlock, err := a.h.GetLastBlock() + currentBlock, err := a.h.GetLastBlockAPI() if err != nil { retBadReq(err, c) return } - auctionVars, err := a.h.GetAuctionVars() + auctionVars, err := a.h.GetAuctionVarsAPI() if err != nil { retBadReq(err, c) return @@ -220,13 +220,13 @@ func (a *API) getSlots(c *gin.Context) { retBadReq(errors.New("It is necessary to add maxSlotNum filter"), c) return } else if *finishedAuction { - currentBlock, err := a.h.GetLastBlock() + currentBlock, err := a.h.GetLastBlockAPI() if err != nil { retBadReq(err, c) return } currentSlot := a.getCurrentSlot(currentBlock.Num) - auctionVars, err := a.h.GetAuctionVars() + auctionVars, err := a.h.GetAuctionVarsAPI() if err != nil { retBadReq(err, c) return diff --git a/api/state.go b/api/state.go index 3f9d034..d5c03b3 100644 --- a/api/state.go +++ b/api/state.go @@ -141,7 +141,7 @@ func (a *API) UpdateNetworkInfo( a.status.Network.NextForgers = nextForgers // Update buckets withdrawals - bucketsUpdate, err := a.h.GetBucketUpdates() + bucketsUpdate, err := a.h.GetBucketUpdatesAPI() if tracerr.Unwrap(err) == sql.ErrNoRows { bucketsUpdate = nil } else if err != nil { @@ -201,7 +201,7 @@ func (a *API) getNextForgers(lastBlock common.Block, currentSlot, lastClosedSlot }} } else { // Get all the relevant updates from the DB - minBidInfo, err = a.h.GetAuctionVarsUntilSetSlotNum(lastClosedSlot, int(lastClosedSlot-currentSlot)+1) + minBidInfo, err = a.h.GetAuctionVarsUntilSetSlotNumAPI(lastClosedSlot, int(lastClosedSlot-currentSlot)+1) if err != nil { return nil, tracerr.Wrap(err) } @@ -279,7 +279,7 @@ func (a *API) UpdateMetrics() error { } batchNum := a.status.Network.LastBatch.BatchNum a.status.RUnlock() - metrics, err := a.h.GetMetrics(batchNum) + metrics, err := a.h.GetMetricsAPI(batchNum) if err != nil { return tracerr.Wrap(err) } @@ -293,7 +293,7 @@ func (a *API) UpdateMetrics() error { // UpdateRecommendedFee update Status.RecommendedFee information func (a *API) UpdateRecommendedFee() error { - feeExistingAccount, err := a.h.GetAvgTxFee() + feeExistingAccount, err := a.h.GetAvgTxFeeAPI() if err != nil { return tracerr.Wrap(err) } diff --git a/api/token.go b/api/token.go index 00d0f65..dc061ce 100644 --- a/api/token.go +++ b/api/token.go @@ -22,7 +22,7 @@ func (a *API) getToken(c *gin.Context) { } tokenID := common.TokenID(*tokenIDUint) // Fetch token from historyDB - token, err := a.h.GetToken(tokenID) + token, err := a.h.GetTokenAPI(tokenID) if err != nil { retSQLErr(err, c) return @@ -45,7 +45,7 @@ func (a *API) getTokens(c *gin.Context) { return } // Fetch exits from historyDB - tokens, pendingItems, err := a.h.GetTokens( + tokens, pendingItems, err := a.h.GetTokensAPI( tokenIDs, symbols, name, fromItem, limit, order, ) if err != nil { diff --git a/api/txshistory.go b/api/txshistory.go index d4e889f..7218948 100644 --- a/api/txshistory.go +++ b/api/txshistory.go @@ -34,7 +34,7 @@ func (a *API) getHistoryTxs(c *gin.Context) { } // Fetch txs from historyDB - txs, pendingItems, err := a.h.GetHistoryTxs( + txs, pendingItems, err := a.h.GetTxsAPI( addr, bjj, tokenID, idx, batchNum, txType, fromItem, limit, order, ) if err != nil { @@ -61,7 +61,7 @@ func (a *API) getHistoryTx(c *gin.Context) { return } // Fetch tx from historyDB - tx, err := a.h.GetHistoryTx(txID) + tx, err := a.h.GetTxAPI(txID) if err != nil { retSQLErr(err, c) return diff --git a/api/txspool.go b/api/txspool.go index 9f74121..207e843 100644 --- a/api/txspool.go +++ b/api/txspool.go @@ -28,7 +28,7 @@ func (a *API) postPoolTx(c *gin.Context) { return } // Insert to DB - if err := a.l2.AddTx(writeTx); err != nil { + if err := a.l2.AddTxAPI(writeTx); err != nil { retSQLErr(err, c) return } diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 805c409..e89db4d 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -3,6 +3,8 @@ Address = "localhost:8086" Explorer = true UpdateMetricsInterval = "10s" UpdateRecommendedFeeInterval = "10s" +MaxSQLConnections = 100 +SQLConnectionTimeout = "2s" [PriceUpdater] Interval = "10s" diff --git a/config/config.go b/config/config.go index 421d34e..2d34285 100644 --- a/config/config.go +++ b/config/config.go @@ -201,9 +201,14 @@ type Node struct { // UpdateMetricsInterval is the interval between updates of the // API metrics UpdateMetricsInterval Duration - // UpdateMetricsInterval is the interval between updates of the + // UpdateRecommendedFeeInterval is the interval between updates of the // recommended fees UpdateRecommendedFeeInterval Duration + // Maximum concurrent connections allowed between API and SQL + MaxSQLConnections int `validate:"required"` + // SQLConnectionTimeout is the maximum amount of time that an API request + // can wait to stablish a SQL connection + SQLConnectionTimeout Duration } `validate:"required"` Debug struct { // APIAddress is the address where the debugAPI will listen if diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index ca12966..a4933e0 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -104,8 +104,8 @@ func newTestModules(t *testing.T) modules { db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") require.NoError(t, err) test.WipeDB(db) - l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour) - historyDB := historydb.NewHistoryDB(db) + l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil) + historyDB := historydb.NewHistoryDB(db, nil) txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB") require.NoError(t, err) diff --git a/coordinator/purger_test.go b/coordinator/purger_test.go index 568fde6..08e2abc 100644 --- a/coordinator/purger_test.go +++ b/coordinator/purger_test.go @@ -21,7 +21,7 @@ func newL2DB(t *testing.T) *l2db.L2DB { db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") require.NoError(t, err) test.WipeDB(db) - return l2db.NewL2DB(db, 10, 100, 24*time.Hour) + return l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil) } func newStateDB(t *testing.T) *statedb.LocalStateDB { diff --git a/db/historydb/apiqueries.go b/db/historydb/apiqueries.go new file mode 100644 index 0000000..d490ba9 --- /dev/null +++ b/db/historydb/apiqueries.go @@ -0,0 +1,1026 @@ +package historydb + +import ( + "errors" + "fmt" + + ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/hermez-node/db" + "github.com/hermeznetwork/tracerr" + "github.com/iden3/go-iden3-crypto/babyjub" + "github.com/jmoiron/sqlx" + "github.com/russross/meddler" +) + +// GetLastBlockAPI retrieve the block with the highest block number from the DB +func (hdb *HistoryDB) GetLastBlockAPI() (*common.Block, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + return hdb.GetLastBlock() +} + +// GetBatchAPI return the batch with the given batchNum +func (hdb *HistoryDB) GetBatchAPI(batchNum common.BatchNum) (*BatchAPI, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + batch := &BatchAPI{} + return batch, tracerr.Wrap(meddler.QueryRow( + hdb.db, batch, + `SELECT batch.item_id, batch.batch_num, batch.eth_block_num, + batch.forger_addr, batch.fees_collected, batch.total_fees_usd, batch.state_root, + batch.num_accounts, batch.exit_root, batch.forge_l1_txs_num, batch.slot_num, + block.timestamp, block.hash, + COALESCE ((SELECT COUNT(*) FROM tx WHERE batch_num = batch.batch_num), 0) AS forged_txs + FROM batch INNER JOIN block ON batch.eth_block_num = block.eth_block_num + WHERE batch_num = $1;`, batchNum, + )) +} + +// GetBatchesAPI return the batches applying the given filters +func (hdb *HistoryDB) GetBatchesAPI( + minBatchNum, maxBatchNum, slotNum *uint, + forgerAddr *ethCommon.Address, + fromItem, limit *uint, order string, +) ([]BatchAPI, uint64, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + queryStr := `SELECT batch.item_id, batch.batch_num, batch.eth_block_num, + batch.forger_addr, batch.fees_collected, batch.total_fees_usd, batch.state_root, + batch.num_accounts, batch.exit_root, batch.forge_l1_txs_num, batch.slot_num, + block.timestamp, block.hash, + COALESCE ((SELECT COUNT(*) FROM tx WHERE batch_num = batch.batch_num), 0) AS forged_txs, + count(*) OVER() AS total_items + FROM batch INNER JOIN block ON batch.eth_block_num = block.eth_block_num ` + // Apply filters + nextIsAnd := false + // minBatchNum filter + if minBatchNum != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "batch.batch_num > ? " + args = append(args, minBatchNum) + nextIsAnd = true + } + // maxBatchNum filter + if maxBatchNum != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "batch.batch_num < ? " + args = append(args, maxBatchNum) + nextIsAnd = true + } + // slotNum filter + if slotNum != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "batch.slot_num = ? " + args = append(args, slotNum) + nextIsAnd = true + } + // forgerAddr filter + if forgerAddr != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "batch.forger_addr = ? " + args = append(args, forgerAddr) + nextIsAnd = true + } + // pagination + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "batch.item_id >= ? " + } else { + queryStr += "batch.item_id <= ? " + } + args = append(args, fromItem) + } + queryStr += "ORDER BY batch.item_id " + if order == OrderAsc { + queryStr += " ASC " + } else { + queryStr += " DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query = hdb.db.Rebind(queryStr) + // log.Debug(query) + batchPtrs := []*BatchAPI{} + if err := meddler.QueryAll(hdb.db, &batchPtrs, query, args...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + batches := db.SlicePtrsToSlice(batchPtrs).([]BatchAPI) + if len(batches) == 0 { + return batches, 0, nil + } + return batches, batches[0].TotalItems - uint64(len(batches)), nil +} + +// GetBestBidAPI returns the best bid in specific slot by slotNum +func (hdb *HistoryDB) GetBestBidAPI(slotNum *int64) (BidAPI, error) { + bid := &BidAPI{} + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return *bid, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + err = meddler.QueryRow( + hdb.db, bid, `SELECT bid.*, block.timestamp, coordinator.forger_addr, coordinator.url + FROM bid INNER JOIN block ON bid.eth_block_num = block.eth_block_num + INNER JOIN ( + SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator + GROUP BY bidder_addr + ) c ON bid.bidder_addr = c.bidder_addr + INNER JOIN coordinator ON c.item_id = coordinator.item_id + WHERE slot_num = $1 ORDER BY item_id DESC LIMIT 1;`, slotNum, + ) + return *bid, tracerr.Wrap(err) +} + +// GetBestBidsAPI returns the best bid in specific slot by slotNum +func (hdb *HistoryDB) GetBestBidsAPI( + minSlotNum, maxSlotNum *int64, + bidderAddr *ethCommon.Address, + limit *uint, order string, +) ([]BidAPI, uint64, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + // JOIN the best bid of each slot with the latest update of each coordinator + queryStr := `SELECT b.*, block.timestamp, coordinator.forger_addr, coordinator.url, + COUNT(*) OVER() AS total_items FROM ( + SELECT slot_num, MAX(item_id) as maxitem + FROM bid GROUP BY slot_num + ) + AS x INNER JOIN bid AS b ON b.item_id = x.maxitem + INNER JOIN block ON b.eth_block_num = block.eth_block_num + INNER JOIN ( + SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator + GROUP BY bidder_addr + ) c ON b.bidder_addr = c.bidder_addr + INNER JOIN coordinator ON c.item_id = coordinator.item_id + WHERE (b.slot_num >= ? AND b.slot_num <= ?)` + args = append(args, minSlotNum) + args = append(args, maxSlotNum) + // Apply filters + if bidderAddr != nil { + queryStr += " AND b.bidder_addr = ? " + args = append(args, bidderAddr) + } + queryStr += " ORDER BY b.slot_num " + if order == OrderAsc { + queryStr += "ASC " + } else { + queryStr += "DESC " + } + if limit != nil { + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + } + query = hdb.db.Rebind(queryStr) + bidPtrs := []*BidAPI{} + if err := meddler.QueryAll(hdb.db, &bidPtrs, query, args...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + // log.Debug(query) + bids := db.SlicePtrsToSlice(bidPtrs).([]BidAPI) + if len(bids) == 0 { + return bids, 0, nil + } + return bids, bids[0].TotalItems - uint64(len(bids)), nil +} + +// GetBidsAPI return the bids applying the given filters +func (hdb *HistoryDB) GetBidsAPI( + slotNum *int64, bidderAddr *ethCommon.Address, + fromItem, limit *uint, order string, +) ([]BidAPI, uint64, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + // JOIN each bid with the latest update of each coordinator + queryStr := `SELECT bid.*, block.timestamp, coord.forger_addr, coord.url, + COUNT(*) OVER() AS total_items + FROM bid INNER JOIN block ON bid.eth_block_num = block.eth_block_num + INNER JOIN ( + SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator + GROUP BY bidder_addr + ) c ON bid.bidder_addr = c.bidder_addr + INNER JOIN coordinator coord ON c.item_id = coord.item_id ` + // Apply filters + nextIsAnd := false + // slotNum filter + if slotNum != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "bid.slot_num = ? " + args = append(args, slotNum) + nextIsAnd = true + } + // bidder filter + if bidderAddr != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "bid.bidder_addr = ? " + args = append(args, bidderAddr) + nextIsAnd = true + } + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "bid.item_id >= ? " + } else { + queryStr += "bid.item_id <= ? " + } + args = append(args, fromItem) + } + // pagination + queryStr += "ORDER BY bid.item_id " + if order == OrderAsc { + queryStr += "ASC " + } else { + queryStr += "DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query, argsQ, err := sqlx.In(queryStr, args...) + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + query = hdb.db.Rebind(query) + bids := []*BidAPI{} + if err := meddler.QueryAll(hdb.db, &bids, query, argsQ...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + if len(bids) == 0 { + return []BidAPI{}, 0, nil + } + return db.SlicePtrsToSlice(bids).([]BidAPI), bids[0].TotalItems - uint64(len(bids)), nil +} + +// GetTokenAPI returns a token from the DB given a TokenID +func (hdb *HistoryDB) GetTokenAPI(tokenID common.TokenID) (*TokenWithUSD, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + return hdb.GetToken(tokenID) +} + +// GetTokensAPI returns a list of tokens from the DB +func (hdb *HistoryDB) GetTokensAPI( + ids []common.TokenID, symbols []string, name string, fromItem, + limit *uint, order string, +) ([]TokenWithUSD, uint64, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + queryStr := `SELECT * , COUNT(*) OVER() AS total_items FROM token ` + // Apply filters + nextIsAnd := false + if len(ids) > 0 { + queryStr += "WHERE token_id IN (?) " + nextIsAnd = true + args = append(args, ids) + } + if len(symbols) > 0 { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "symbol IN (?) " + args = append(args, symbols) + nextIsAnd = true + } + if name != "" { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "name ~ ? " + args = append(args, name) + nextIsAnd = true + } + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "item_id >= ? " + } else { + queryStr += "item_id <= ? " + } + args = append(args, fromItem) + } + // pagination + queryStr += "ORDER BY item_id " + if order == OrderAsc { + queryStr += "ASC " + } else { + queryStr += "DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query, argsQ, err := sqlx.In(queryStr, args...) + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + query = hdb.db.Rebind(query) + tokens := []*TokenWithUSD{} + if err := meddler.QueryAll(hdb.db, &tokens, query, argsQ...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + if len(tokens) == 0 { + return []TokenWithUSD{}, 0, nil + } + return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), uint64(len(tokens)) - tokens[0].TotalItems, nil +} + +// GetTxAPI returns a tx from the DB given a TxID +func (hdb *HistoryDB) GetTxAPI(txID common.TxID) (*TxAPI, error) { + // Warning: amount_success and deposit_amount_success have true as default for + // performance reasons. The expected default value is false (when txs are unforged) + // this case is handled at the function func (tx TxAPI) MarshalJSON() ([]byte, error) + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + tx := &TxAPI{} + err = meddler.QueryRow( + hdb.db, tx, `SELECT tx.item_id, tx.is_l1, tx.id, tx.type, tx.position, + hez_idx(tx.effective_from_idx, token.symbol) AS from_idx, tx.from_eth_addr, tx.from_bjj, + hez_idx(tx.to_idx, token.symbol) AS to_idx, tx.to_eth_addr, tx.to_bjj, + tx.amount, tx.amount_success, tx.token_id, tx.amount_usd, + tx.batch_num, tx.eth_block_num, tx.to_forge_l1_txs_num, tx.user_origin, + tx.deposit_amount, tx.deposit_amount_usd, tx.deposit_amount_success, tx.fee, tx.fee_usd, tx.nonce, + token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, + token.eth_addr, token.name, token.symbol, token.decimals, token.usd, + token.usd_update, block.timestamp + FROM tx INNER JOIN token ON tx.token_id = token.token_id + INNER JOIN block ON tx.eth_block_num = block.eth_block_num + WHERE tx.id = $1;`, txID, + ) + return tx, tracerr.Wrap(err) +} + +// GetTxsAPI returns a list of txs from the DB using the HistoryTx struct +// and pagination info +func (hdb *HistoryDB) GetTxsAPI( + ethAddr *ethCommon.Address, bjj *babyjub.PublicKeyComp, + tokenID *common.TokenID, idx *common.Idx, batchNum *uint, txType *common.TxType, + fromItem, limit *uint, order string, +) ([]TxAPI, uint64, error) { + // Warning: amount_success and deposit_amount_success have true as default for + // performance reasons. The expected default value is false (when txs are unforged) + // this case is handled at the function func (tx TxAPI) MarshalJSON() ([]byte, error) + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + if ethAddr != nil && bjj != nil { + return nil, 0, tracerr.Wrap(errors.New("ethAddr and bjj are incompatible")) + } + var query string + var args []interface{} + queryStr := `SELECT tx.item_id, tx.is_l1, tx.id, tx.type, tx.position, + hez_idx(tx.effective_from_idx, token.symbol) AS from_idx, tx.from_eth_addr, tx.from_bjj, + hez_idx(tx.to_idx, token.symbol) AS to_idx, tx.to_eth_addr, tx.to_bjj, + tx.amount, tx.amount_success, tx.token_id, tx.amount_usd, + tx.batch_num, tx.eth_block_num, tx.to_forge_l1_txs_num, tx.user_origin, + tx.deposit_amount, tx.deposit_amount_usd, tx.deposit_amount_success, tx.fee, tx.fee_usd, tx.nonce, + token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, + token.eth_addr, token.name, token.symbol, token.decimals, token.usd, + token.usd_update, block.timestamp, count(*) OVER() AS total_items + FROM tx INNER JOIN token ON tx.token_id = token.token_id + INNER JOIN block ON tx.eth_block_num = block.eth_block_num ` + // Apply filters + nextIsAnd := false + // ethAddr filter + if ethAddr != nil { + queryStr += "WHERE (tx.from_eth_addr = ? OR tx.to_eth_addr = ?) " + nextIsAnd = true + args = append(args, ethAddr, ethAddr) + } else if bjj != nil { // bjj filter + queryStr += "WHERE (tx.from_bjj = ? OR tx.to_bjj = ?) " + nextIsAnd = true + args = append(args, bjj, bjj) + } + // tokenID filter + if tokenID != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "tx.token_id = ? " + args = append(args, tokenID) + nextIsAnd = true + } + // idx filter + if idx != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "(tx.effective_from_idx = ? OR tx.to_idx = ?) " + args = append(args, idx, idx) + nextIsAnd = true + } + // batchNum filter + if batchNum != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "tx.batch_num = ? " + args = append(args, batchNum) + nextIsAnd = true + } + // txType filter + if txType != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "tx.type = ? " + args = append(args, txType) + nextIsAnd = true + } + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "tx.item_id >= ? " + } else { + queryStr += "tx.item_id <= ? " + } + args = append(args, fromItem) + nextIsAnd = true + } + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "tx.batch_num IS NOT NULL " + + // pagination + queryStr += "ORDER BY tx.item_id " + if order == OrderAsc { + queryStr += " ASC " + } else { + queryStr += " DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query = hdb.db.Rebind(queryStr) + // log.Debug(query) + txsPtrs := []*TxAPI{} + if err := meddler.QueryAll(hdb.db, &txsPtrs, query, args...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + txs := db.SlicePtrsToSlice(txsPtrs).([]TxAPI) + if len(txs) == 0 { + return txs, 0, nil + } + return txs, txs[0].TotalItems - uint64(len(txs)), nil +} + +// GetExitAPI returns a exit from the DB +func (hdb *HistoryDB) GetExitAPI(batchNum *uint, idx *common.Idx) (*ExitAPI, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + exit := &ExitAPI{} + err = meddler.QueryRow( + hdb.db, exit, `SELECT exit_tree.item_id, exit_tree.batch_num, + hez_idx(exit_tree.account_idx, token.symbol) AS account_idx, + account.bjj, account.eth_addr, + exit_tree.merkle_proof, exit_tree.balance, exit_tree.instant_withdrawn, + exit_tree.delayed_withdraw_request, exit_tree.delayed_withdrawn, + token.token_id, token.item_id AS token_item_id, + token.eth_block_num AS token_block, token.eth_addr AS token_eth_addr, token.name, token.symbol, + token.decimals, token.usd, token.usd_update + FROM exit_tree INNER JOIN account ON exit_tree.account_idx = account.idx + INNER JOIN token ON account.token_id = token.token_id + WHERE exit_tree.batch_num = $1 AND exit_tree.account_idx = $2;`, batchNum, idx, + ) + return exit, tracerr.Wrap(err) +} + +// GetExitsAPI returns a list of exits from the DB and pagination info +func (hdb *HistoryDB) GetExitsAPI( + ethAddr *ethCommon.Address, bjj *babyjub.PublicKeyComp, tokenID *common.TokenID, + idx *common.Idx, batchNum *uint, onlyPendingWithdraws *bool, + fromItem, limit *uint, order string, +) ([]ExitAPI, uint64, error) { + if ethAddr != nil && bjj != nil { + return nil, 0, tracerr.Wrap(errors.New("ethAddr and bjj are incompatible")) + } + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + queryStr := `SELECT exit_tree.item_id, exit_tree.batch_num, + hez_idx(exit_tree.account_idx, token.symbol) AS account_idx, + account.bjj, account.eth_addr, + exit_tree.merkle_proof, exit_tree.balance, exit_tree.instant_withdrawn, + exit_tree.delayed_withdraw_request, exit_tree.delayed_withdrawn, + token.token_id, token.item_id AS token_item_id, + token.eth_block_num AS token_block, token.eth_addr AS token_eth_addr, token.name, token.symbol, + token.decimals, token.usd, token.usd_update, COUNT(*) OVER() AS total_items + FROM exit_tree INNER JOIN account ON exit_tree.account_idx = account.idx + INNER JOIN token ON account.token_id = token.token_id ` + // Apply filters + nextIsAnd := false + // ethAddr filter + if ethAddr != nil { + queryStr += "WHERE account.eth_addr = ? " + nextIsAnd = true + args = append(args, ethAddr) + } else if bjj != nil { // bjj filter + queryStr += "WHERE account.bjj = ? " + nextIsAnd = true + args = append(args, bjj) + } + // tokenID filter + if tokenID != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "account.token_id = ? " + args = append(args, tokenID) + nextIsAnd = true + } + // idx filter + if idx != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "exit_tree.account_idx = ? " + args = append(args, idx) + nextIsAnd = true + } + // batchNum filter + if batchNum != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "exit_tree.batch_num = ? " + args = append(args, batchNum) + nextIsAnd = true + } + // onlyPendingWithdraws + if onlyPendingWithdraws != nil { + if *onlyPendingWithdraws { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "(exit_tree.instant_withdrawn IS NULL AND exit_tree.delayed_withdrawn IS NULL) " + nextIsAnd = true + } + } + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "exit_tree.item_id >= ? " + } else { + queryStr += "exit_tree.item_id <= ? " + } + args = append(args, fromItem) + // nextIsAnd = true + } + // pagination + queryStr += "ORDER BY exit_tree.item_id " + if order == OrderAsc { + queryStr += " ASC " + } else { + queryStr += " DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query = hdb.db.Rebind(queryStr) + // log.Debug(query) + exits := []*ExitAPI{} + if err := meddler.QueryAll(hdb.db, &exits, query, args...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + if len(exits) == 0 { + return []ExitAPI{}, 0, nil + } + return db.SlicePtrsToSlice(exits).([]ExitAPI), exits[0].TotalItems - uint64(len(exits)), nil +} + +// GetBucketUpdatesAPI retrieves latest values for each bucket +func (hdb *HistoryDB) GetBucketUpdatesAPI() ([]BucketUpdateAPI, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var bucketUpdates []*BucketUpdateAPI + err = meddler.QueryAll( + hdb.db, &bucketUpdates, + `SELECT num_bucket, withdrawals FROM bucket_update + WHERE item_id in(SELECT max(item_id) FROM bucket_update + group by num_bucket) + ORDER BY num_bucket ASC;`, + ) + return db.SlicePtrsToSlice(bucketUpdates).([]BucketUpdateAPI), tracerr.Wrap(err) +} + +// GetCoordinatorsAPI returns a list of coordinators from the DB and pagination info +func (hdb *HistoryDB) GetCoordinatorsAPI( + bidderAddr, forgerAddr *ethCommon.Address, + fromItem, limit *uint, order string, +) ([]CoordinatorAPI, uint64, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + queryStr := `SELECT coordinator.*, COUNT(*) OVER() AS total_items + FROM coordinator INNER JOIN ( + SELECT MAX(item_id) AS item_id FROM coordinator + GROUP BY bidder_addr + ) c ON coordinator.item_id = c.item_id ` + // Apply filters + nextIsAnd := false + if bidderAddr != nil { + queryStr += "WHERE bidder_addr = ? " + nextIsAnd = true + args = append(args, bidderAddr) + } + if forgerAddr != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "forger_addr = ? " + nextIsAnd = true + args = append(args, forgerAddr) + } + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "coordinator.item_id >= ? " + } else { + queryStr += "coordinator.item_id <= ? " + } + args = append(args, fromItem) + } + // pagination + queryStr += "ORDER BY coordinator.item_id " + if order == OrderAsc { + queryStr += " ASC " + } else { + queryStr += " DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query = hdb.db.Rebind(queryStr) + + coordinators := []*CoordinatorAPI{} + if err := meddler.QueryAll(hdb.db, &coordinators, query, args...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + if len(coordinators) == 0 { + return []CoordinatorAPI{}, 0, nil + } + return db.SlicePtrsToSlice(coordinators).([]CoordinatorAPI), + coordinators[0].TotalItems - uint64(len(coordinators)), nil +} + +// GetAuctionVarsAPI returns auction variables +func (hdb *HistoryDB) GetAuctionVarsAPI() (*common.AuctionVariables, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + auctionVars := &common.AuctionVariables{} + err = meddler.QueryRow( + hdb.db, auctionVars, `SELECT * FROM auction_vars;`, + ) + return auctionVars, tracerr.Wrap(err) +} + +// GetAuctionVarsUntilSetSlotNumAPI returns all the updates of the auction vars +// from the last entry in which DefaultSlotSetBidSlotNum <= slotNum +func (hdb *HistoryDB) GetAuctionVarsUntilSetSlotNumAPI(slotNum int64, maxItems int) ([]MinBidInfo, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + auctionVars := []*MinBidInfo{} + query := ` + SELECT DISTINCT default_slot_set_bid, default_slot_set_bid_slot_num FROM auction_vars + WHERE default_slot_set_bid_slot_num < $1 + ORDER BY default_slot_set_bid_slot_num DESC + LIMIT $2; + ` + err = meddler.QueryAll(hdb.db, &auctionVars, query, slotNum, maxItems) + if err != nil { + return nil, tracerr.Wrap(err) + } + return db.SlicePtrsToSlice(auctionVars).([]MinBidInfo), nil +} + +// GetAccountAPI returns an account by its index +func (hdb *HistoryDB) GetAccountAPI(idx common.Idx) (*AccountAPI, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + account := &AccountAPI{} + err = meddler.QueryRow(hdb.db, account, `SELECT account.item_id, hez_idx(account.idx, + token.symbol) as idx, account.batch_num, account.bjj, account.eth_addr, + token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, + token.eth_addr as token_eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update + FROM account INNER JOIN token ON account.token_id = token.token_id WHERE idx = $1;`, idx) + + if err != nil { + return nil, tracerr.Wrap(err) + } + + return account, nil +} + +// GetAccountsAPI returns a list of accounts from the DB and pagination info +func (hdb *HistoryDB) GetAccountsAPI( + tokenIDs []common.TokenID, ethAddr *ethCommon.Address, + bjj *babyjub.PublicKeyComp, fromItem, limit *uint, order string, +) ([]AccountAPI, uint64, error) { + if ethAddr != nil && bjj != nil { + return nil, 0, tracerr.Wrap(errors.New("ethAddr and bjj are incompatible")) + } + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + var query string + var args []interface{} + queryStr := `SELECT account.item_id, hez_idx(account.idx, token.symbol) as idx, account.batch_num, + account.bjj, account.eth_addr, token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, + token.eth_addr as token_eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update, + COUNT(*) OVER() AS total_items + FROM account INNER JOIN token ON account.token_id = token.token_id ` + // Apply filters + nextIsAnd := false + // ethAddr filter + if ethAddr != nil { + queryStr += "WHERE account.eth_addr = ? " + nextIsAnd = true + args = append(args, ethAddr) + } else if bjj != nil { // bjj filter + queryStr += "WHERE account.bjj = ? " + nextIsAnd = true + args = append(args, bjj) + } + // tokenID filter + if len(tokenIDs) > 0 { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + queryStr += "account.token_id IN (?) " + args = append(args, tokenIDs) + nextIsAnd = true + } + if fromItem != nil { + if nextIsAnd { + queryStr += "AND " + } else { + queryStr += "WHERE " + } + if order == OrderAsc { + queryStr += "account.item_id >= ? " + } else { + queryStr += "account.item_id <= ? " + } + args = append(args, fromItem) + } + // pagination + queryStr += "ORDER BY account.item_id " + if order == OrderAsc { + queryStr += " ASC " + } else { + queryStr += " DESC " + } + queryStr += fmt.Sprintf("LIMIT %d;", *limit) + query, argsQ, err := sqlx.In(queryStr, args...) + if err != nil { + return nil, 0, tracerr.Wrap(err) + } + query = hdb.db.Rebind(query) + + accounts := []*AccountAPI{} + if err := meddler.QueryAll(hdb.db, &accounts, query, argsQ...); err != nil { + return nil, 0, tracerr.Wrap(err) + } + if len(accounts) == 0 { + return []AccountAPI{}, 0, nil + } + + return db.SlicePtrsToSlice(accounts).([]AccountAPI), + accounts[0].TotalItems - uint64(len(accounts)), nil +} + +// GetMetricsAPI returns metrics +func (hdb *HistoryDB) GetMetricsAPI(lastBatchNum common.BatchNum) (*Metrics, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + metricsTotals := &MetricsTotals{} + metrics := &Metrics{} + err = meddler.QueryRow( + hdb.db, metricsTotals, `SELECT COUNT(tx.*) as total_txs, + COALESCE (MIN(tx.batch_num), 0) as batch_num, COALESCE (MIN(block.timestamp), + NOW()) AS min_timestamp, COALESCE (MAX(block.timestamp), NOW()) AS max_timestamp + FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num + WHERE block.timestamp >= NOW() - INTERVAL '24 HOURS';`) + if err != nil { + return nil, tracerr.Wrap(err) + } + + seconds := metricsTotals.MaxTimestamp.Sub(metricsTotals.MinTimestamp).Seconds() + // Avoid dividing by 0 + if seconds == 0 { + seconds++ + } + + metrics.TransactionsPerSecond = float64(metricsTotals.TotalTransactions) / seconds + + if (lastBatchNum - metricsTotals.FirstBatchNum) > 0 { + metrics.TransactionsPerBatch = float64(metricsTotals.TotalTransactions) / + float64(lastBatchNum-metricsTotals.FirstBatchNum+1) + } else { + metrics.TransactionsPerBatch = float64(0) + } + + err = meddler.QueryRow( + hdb.db, metricsTotals, `SELECT COUNT(*) AS total_batches, + COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch + WHERE batch_num > $1;`, metricsTotals.FirstBatchNum) + if err != nil { + return nil, tracerr.Wrap(err) + } + if metricsTotals.TotalBatches > 0 { + metrics.BatchFrequency = seconds / float64(metricsTotals.TotalBatches) + } else { + metrics.BatchFrequency = 0 + } + if metricsTotals.TotalTransactions > 0 { + metrics.AvgTransactionFee = metricsTotals.TotalFeesUSD / float64(metricsTotals.TotalTransactions) + } else { + metrics.AvgTransactionFee = 0 + } + err = meddler.QueryRow( + hdb.db, metrics, + `SELECT COUNT(*) AS total_bjjs, COUNT(DISTINCT(bjj)) AS total_accounts FROM account;`) + if err != nil { + return nil, tracerr.Wrap(err) + } + + return metrics, nil +} + +// GetAvgTxFeeAPI returns average transaction fee of the last 1h +func (hdb *HistoryDB) GetAvgTxFeeAPI() (float64, error) { + cancel, err := hdb.apiConnCon.Acquire() + defer cancel() + if err != nil { + return 0, tracerr.Wrap(err) + } + defer hdb.apiConnCon.Release() + metricsTotals := &MetricsTotals{} + err = meddler.QueryRow( + hdb.db, metricsTotals, `SELECT COUNT(tx.*) as total_txs, + COALESCE (MIN(tx.batch_num), 0) as batch_num + FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num + WHERE block.timestamp >= NOW() - INTERVAL '1 HOURS';`) + if err != nil { + return 0, tracerr.Wrap(err) + } + err = meddler.QueryRow( + hdb.db, metricsTotals, `SELECT COUNT(*) AS total_batches, + COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch + WHERE batch_num > $1;`, metricsTotals.FirstBatchNum) + if err != nil { + return 0, tracerr.Wrap(err) + } + + var avgTransactionFee float64 + if metricsTotals.TotalTransactions > 0 { + avgTransactionFee = metricsTotals.TotalFeesUSD / float64(metricsTotals.TotalTransactions) + } else { + avgTransactionFee = 0 + } + + return avgTransactionFee, nil +} diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 1a36df2..8abd994 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -1,8 +1,6 @@ package historydb import ( - "errors" - "fmt" "math" "math/big" "strings" @@ -11,7 +9,6 @@ import ( "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/db" "github.com/hermeznetwork/tracerr" - "github.com/iden3/go-iden3-crypto/babyjub" "github.com/jmoiron/sqlx" //nolint:errcheck // driver for postgres DB @@ -30,12 +27,13 @@ const ( // HistoryDB persist the historic of the rollup type HistoryDB struct { - db *sqlx.DB + db *sqlx.DB + apiConnCon *db.APIConnectionController } // NewHistoryDB initialize the DB -func NewHistoryDB(db *sqlx.DB) *HistoryDB { - return &HistoryDB{db: db} +func NewHistoryDB(db *sqlx.DB, apiConnCon *db.APIConnectionController) *HistoryDB { + return &HistoryDB{db: db, apiConnCon: apiConnCon} } // DB returns a pointer to the L2DB.db. This method should be used only for @@ -87,8 +85,8 @@ func (hdb *HistoryDB) GetAllBlocks() ([]common.Block, error) { return db.SlicePtrsToSlice(blocks).([]common.Block), tracerr.Wrap(err) } -// GetBlocks retrieve blocks from the DB, given a range of block numbers defined by from and to -func (hdb *HistoryDB) GetBlocks(from, to int64) ([]common.Block, error) { +// getBlocks retrieve blocks from the DB, given a range of block numbers defined by from and to +func (hdb *HistoryDB) getBlocks(from, to int64) ([]common.Block, error) { var blocks []*common.Block err := meddler.QueryAll( hdb.db, &blocks, @@ -166,116 +164,6 @@ func (hdb *HistoryDB) addBatches(d meddler.DB, batches []common.Batch) error { return nil } -// GetBatchAPI return the batch with the given batchNum -func (hdb *HistoryDB) GetBatchAPI(batchNum common.BatchNum) (*BatchAPI, error) { - batch := &BatchAPI{} - return batch, tracerr.Wrap(meddler.QueryRow( - hdb.db, batch, - `SELECT batch.item_id, batch.batch_num, batch.eth_block_num, - batch.forger_addr, batch.fees_collected, batch.total_fees_usd, batch.state_root, - batch.num_accounts, batch.exit_root, batch.forge_l1_txs_num, batch.slot_num, - block.timestamp, block.hash, - COALESCE ((SELECT COUNT(*) FROM tx WHERE batch_num = batch.batch_num), 0) AS forged_txs - FROM batch INNER JOIN block ON batch.eth_block_num = block.eth_block_num - WHERE batch_num = $1;`, batchNum, - )) -} - -// GetBatchesAPI return the batches applying the given filters -func (hdb *HistoryDB) GetBatchesAPI( - minBatchNum, maxBatchNum, slotNum *uint, - forgerAddr *ethCommon.Address, - fromItem, limit *uint, order string, -) ([]BatchAPI, uint64, error) { - var query string - var args []interface{} - queryStr := `SELECT batch.item_id, batch.batch_num, batch.eth_block_num, - batch.forger_addr, batch.fees_collected, batch.total_fees_usd, batch.state_root, - batch.num_accounts, batch.exit_root, batch.forge_l1_txs_num, batch.slot_num, - block.timestamp, block.hash, - COALESCE ((SELECT COUNT(*) FROM tx WHERE batch_num = batch.batch_num), 0) AS forged_txs, - count(*) OVER() AS total_items - FROM batch INNER JOIN block ON batch.eth_block_num = block.eth_block_num ` - // Apply filters - nextIsAnd := false - // minBatchNum filter - if minBatchNum != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "batch.batch_num > ? " - args = append(args, minBatchNum) - nextIsAnd = true - } - // maxBatchNum filter - if maxBatchNum != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "batch.batch_num < ? " - args = append(args, maxBatchNum) - nextIsAnd = true - } - // slotNum filter - if slotNum != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "batch.slot_num = ? " - args = append(args, slotNum) - nextIsAnd = true - } - // forgerAddr filter - if forgerAddr != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "batch.forger_addr = ? " - args = append(args, forgerAddr) - nextIsAnd = true - } - // pagination - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "batch.item_id >= ? " - } else { - queryStr += "batch.item_id <= ? " - } - args = append(args, fromItem) - } - queryStr += "ORDER BY batch.item_id " - if order == OrderAsc { - queryStr += " ASC " - } else { - queryStr += " DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query = hdb.db.Rebind(queryStr) - // log.Debug(query) - batchPtrs := []*BatchAPI{} - if err := meddler.QueryAll(hdb.db, &batchPtrs, query, args...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - batches := db.SlicePtrsToSlice(batchPtrs).([]BatchAPI) - if len(batches) == 0 { - return batches, 0, nil - } - return batches, batches[0].TotalItems - uint64(len(batches)), nil -} - // GetAllBatches retrieve all batches from the DB func (hdb *HistoryDB) GetAllBatches() ([]common.Batch, error) { var batches []*common.Batch @@ -375,22 +263,6 @@ func (hdb *HistoryDB) GetAllBids() ([]common.Bid, error) { return db.SlicePtrsToSlice(bids).([]common.Bid), tracerr.Wrap(err) } -// GetBestBidAPI returns the best bid in specific slot by slotNum -func (hdb *HistoryDB) GetBestBidAPI(slotNum *int64) (BidAPI, error) { - bid := &BidAPI{} - err := meddler.QueryRow( - hdb.db, bid, `SELECT bid.*, block.timestamp, coordinator.forger_addr, coordinator.url - FROM bid INNER JOIN block ON bid.eth_block_num = block.eth_block_num - INNER JOIN ( - SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator - GROUP BY bidder_addr - ) c ON bid.bidder_addr = c.bidder_addr - INNER JOIN coordinator ON c.item_id = coordinator.item_id - WHERE slot_num = $1 ORDER BY item_id DESC LIMIT 1;`, slotNum, - ) - return *bid, tracerr.Wrap(err) -} - // GetBestBidCoordinator returns the forger address of the highest bidder in a slot by slotNum func (hdb *HistoryDB) GetBestBidCoordinator(slotNum int64) (*common.BidCoordinator, error) { bidCoord := &common.BidCoordinator{} @@ -416,133 +288,6 @@ func (hdb *HistoryDB) GetBestBidCoordinator(slotNum int64) (*common.BidCoordinat return bidCoord, tracerr.Wrap(err) } -// GetBestBidsAPI returns the best bid in specific slot by slotNum -func (hdb *HistoryDB) GetBestBidsAPI( - minSlotNum, maxSlotNum *int64, - bidderAddr *ethCommon.Address, - limit *uint, order string, -) ([]BidAPI, uint64, error) { - var query string - var args []interface{} - // JOIN the best bid of each slot with the latest update of each coordinator - queryStr := `SELECT b.*, block.timestamp, coordinator.forger_addr, coordinator.url, - COUNT(*) OVER() AS total_items FROM ( - SELECT slot_num, MAX(item_id) as maxitem - FROM bid GROUP BY slot_num - ) - AS x INNER JOIN bid AS b ON b.item_id = x.maxitem - INNER JOIN block ON b.eth_block_num = block.eth_block_num - INNER JOIN ( - SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator - GROUP BY bidder_addr - ) c ON b.bidder_addr = c.bidder_addr - INNER JOIN coordinator ON c.item_id = coordinator.item_id - WHERE (b.slot_num >= ? AND b.slot_num <= ?)` - args = append(args, minSlotNum) - args = append(args, maxSlotNum) - // Apply filters - if bidderAddr != nil { - queryStr += " AND b.bidder_addr = ? " - args = append(args, bidderAddr) - } - queryStr += " ORDER BY b.slot_num " - if order == OrderAsc { - queryStr += "ASC " - } else { - queryStr += "DESC " - } - if limit != nil { - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - } - query = hdb.db.Rebind(queryStr) - bidPtrs := []*BidAPI{} - if err := meddler.QueryAll(hdb.db, &bidPtrs, query, args...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - // log.Debug(query) - bids := db.SlicePtrsToSlice(bidPtrs).([]BidAPI) - if len(bids) == 0 { - return bids, 0, nil - } - return bids, bids[0].TotalItems - uint64(len(bids)), nil -} - -// GetBidsAPI return the bids applying the given filters -func (hdb *HistoryDB) GetBidsAPI( - slotNum *int64, bidderAddr *ethCommon.Address, - fromItem, limit *uint, order string, -) ([]BidAPI, uint64, error) { - var query string - var args []interface{} - // JOIN each bid with the latest update of each coordinator - queryStr := `SELECT bid.*, block.timestamp, coord.forger_addr, coord.url, - COUNT(*) OVER() AS total_items - FROM bid INNER JOIN block ON bid.eth_block_num = block.eth_block_num - INNER JOIN ( - SELECT bidder_addr, MAX(item_id) AS item_id FROM coordinator - GROUP BY bidder_addr - ) c ON bid.bidder_addr = c.bidder_addr - INNER JOIN coordinator coord ON c.item_id = coord.item_id ` - // Apply filters - nextIsAnd := false - // slotNum filter - if slotNum != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "bid.slot_num = ? " - args = append(args, slotNum) - nextIsAnd = true - } - // bidder filter - if bidderAddr != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "bid.bidder_addr = ? " - args = append(args, bidderAddr) - nextIsAnd = true - } - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "bid.item_id >= ? " - } else { - queryStr += "bid.item_id <= ? " - } - args = append(args, fromItem) - } - // pagination - queryStr += "ORDER BY bid.item_id " - if order == OrderAsc { - queryStr += "ASC " - } else { - queryStr += "DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query, argsQ, err := sqlx.In(queryStr, args...) - if err != nil { - return nil, 0, tracerr.Wrap(err) - } - query = hdb.db.Rebind(query) - bids := []*BidAPI{} - if err := meddler.QueryAll(hdb.db, &bids, query, argsQ...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - if len(bids) == 0 { - return []BidAPI{}, 0, nil - } - return db.SlicePtrsToSlice(bids).([]BidAPI), bids[0].TotalItems - uint64(len(bids)), nil -} - // AddCoordinators insert Coordinators into the DB func (hdb *HistoryDB) AddCoordinators(coordinators []common.Coordinator) error { return tracerr.Wrap(hdb.addCoordinators(hdb.db, coordinators)) @@ -708,77 +453,6 @@ func (hdb *HistoryDB) GetAllTokens() ([]TokenWithUSD, error) { return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), tracerr.Wrap(err) } -// GetTokens returns a list of tokens from the DB -func (hdb *HistoryDB) GetTokens( - ids []common.TokenID, symbols []string, name string, fromItem, - limit *uint, order string, -) ([]TokenWithUSD, uint64, error) { - var query string - var args []interface{} - queryStr := `SELECT * , COUNT(*) OVER() AS total_items FROM token ` - // Apply filters - nextIsAnd := false - if len(ids) > 0 { - queryStr += "WHERE token_id IN (?) " - nextIsAnd = true - args = append(args, ids) - } - if len(symbols) > 0 { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "symbol IN (?) " - args = append(args, symbols) - nextIsAnd = true - } - if name != "" { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "name ~ ? " - args = append(args, name) - nextIsAnd = true - } - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "item_id >= ? " - } else { - queryStr += "item_id <= ? " - } - args = append(args, fromItem) - } - // pagination - queryStr += "ORDER BY item_id " - if order == OrderAsc { - queryStr += "ASC " - } else { - queryStr += "DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query, argsQ, err := sqlx.In(queryStr, args...) - if err != nil { - return nil, 0, tracerr.Wrap(err) - } - query = hdb.db.Rebind(query) - tokens := []*TokenWithUSD{} - if err := meddler.QueryAll(hdb.db, &tokens, query, argsQ...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - if len(tokens) == 0 { - return []TokenWithUSD{}, 0, nil - } - return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), uint64(len(tokens)) - tokens[0].TotalItems, nil -} - // GetTokenSymbols returns all the token symbols from the DB func (hdb *HistoryDB) GetTokenSymbols() ([]string, error) { var tokenSymbols []string @@ -951,153 +625,6 @@ func (hdb *HistoryDB) addTxs(d meddler.DB, txs []txWrite) error { )) } -// GetHistoryTx returns a tx from the DB given a TxID -func (hdb *HistoryDB) GetHistoryTx(txID common.TxID) (*TxAPI, error) { - // Warning: amount_success and deposit_amount_success have true as default for - // performance reasons. The expected default value is false (when txs are unforged) - // this case is handled at the function func (tx TxAPI) MarshalJSON() ([]byte, error) - tx := &TxAPI{} - err := meddler.QueryRow( - hdb.db, tx, `SELECT tx.item_id, tx.is_l1, tx.id, tx.type, tx.position, - hez_idx(tx.effective_from_idx, token.symbol) AS from_idx, tx.from_eth_addr, tx.from_bjj, - hez_idx(tx.to_idx, token.symbol) AS to_idx, tx.to_eth_addr, tx.to_bjj, - tx.amount, tx.amount_success, tx.token_id, tx.amount_usd, - tx.batch_num, tx.eth_block_num, tx.to_forge_l1_txs_num, tx.user_origin, - tx.deposit_amount, tx.deposit_amount_usd, tx.deposit_amount_success, tx.fee, tx.fee_usd, tx.nonce, - token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, - token.eth_addr, token.name, token.symbol, token.decimals, token.usd, - token.usd_update, block.timestamp - FROM tx INNER JOIN token ON tx.token_id = token.token_id - INNER JOIN block ON tx.eth_block_num = block.eth_block_num - WHERE tx.id = $1;`, txID, - ) - return tx, tracerr.Wrap(err) -} - -// GetHistoryTxs returns a list of txs from the DB using the HistoryTx struct -// and pagination info -func (hdb *HistoryDB) GetHistoryTxs( - ethAddr *ethCommon.Address, bjj *babyjub.PublicKeyComp, - tokenID *common.TokenID, idx *common.Idx, batchNum *uint, txType *common.TxType, - fromItem, limit *uint, order string, -) ([]TxAPI, uint64, error) { - // Warning: amount_success and deposit_amount_success have true as default for - // performance reasons. The expected default value is false (when txs are unforged) - // this case is handled at the function func (tx TxAPI) MarshalJSON() ([]byte, error) - if ethAddr != nil && bjj != nil { - return nil, 0, tracerr.Wrap(errors.New("ethAddr and bjj are incompatible")) - } - var query string - var args []interface{} - queryStr := `SELECT tx.item_id, tx.is_l1, tx.id, tx.type, tx.position, - hez_idx(tx.effective_from_idx, token.symbol) AS from_idx, tx.from_eth_addr, tx.from_bjj, - hez_idx(tx.to_idx, token.symbol) AS to_idx, tx.to_eth_addr, tx.to_bjj, - tx.amount, tx.amount_success, tx.token_id, tx.amount_usd, - tx.batch_num, tx.eth_block_num, tx.to_forge_l1_txs_num, tx.user_origin, - tx.deposit_amount, tx.deposit_amount_usd, tx.deposit_amount_success, tx.fee, tx.fee_usd, tx.nonce, - token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, - token.eth_addr, token.name, token.symbol, token.decimals, token.usd, - token.usd_update, block.timestamp, count(*) OVER() AS total_items - FROM tx INNER JOIN token ON tx.token_id = token.token_id - INNER JOIN block ON tx.eth_block_num = block.eth_block_num ` - // Apply filters - nextIsAnd := false - // ethAddr filter - if ethAddr != nil { - queryStr += "WHERE (tx.from_eth_addr = ? OR tx.to_eth_addr = ?) " - nextIsAnd = true - args = append(args, ethAddr, ethAddr) - } else if bjj != nil { // bjj filter - queryStr += "WHERE (tx.from_bjj = ? OR tx.to_bjj = ?) " - nextIsAnd = true - args = append(args, bjj, bjj) - } - // tokenID filter - if tokenID != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "tx.token_id = ? " - args = append(args, tokenID) - nextIsAnd = true - } - // idx filter - if idx != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "(tx.effective_from_idx = ? OR tx.to_idx = ?) " - args = append(args, idx, idx) - nextIsAnd = true - } - // batchNum filter - if batchNum != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "tx.batch_num = ? " - args = append(args, batchNum) - nextIsAnd = true - } - // txType filter - if txType != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "tx.type = ? " - args = append(args, txType) - nextIsAnd = true - } - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "tx.item_id >= ? " - } else { - queryStr += "tx.item_id <= ? " - } - args = append(args, fromItem) - nextIsAnd = true - } - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "tx.batch_num IS NOT NULL " - - // pagination - queryStr += "ORDER BY tx.item_id " - if order == OrderAsc { - queryStr += " ASC " - } else { - queryStr += " DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query = hdb.db.Rebind(queryStr) - // log.Debug(query) - txsPtrs := []*TxAPI{} - if err := meddler.QueryAll(hdb.db, &txsPtrs, query, args...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - txs := db.SlicePtrsToSlice(txsPtrs).([]TxAPI) - if len(txs) == 0 { - return txs, 0, nil - } - return txs, txs[0].TotalItems - uint64(len(txs)), nil -} - // GetAllExits returns all exit from the DB func (hdb *HistoryDB) GetAllExits() ([]common.ExitInfo, error) { var exits []*common.ExitInfo @@ -1110,137 +637,6 @@ func (hdb *HistoryDB) GetAllExits() ([]common.ExitInfo, error) { return db.SlicePtrsToSlice(exits).([]common.ExitInfo), tracerr.Wrap(err) } -// GetExitAPI returns a exit from the DB -func (hdb *HistoryDB) GetExitAPI(batchNum *uint, idx *common.Idx) (*ExitAPI, error) { - exit := &ExitAPI{} - err := meddler.QueryRow( - hdb.db, exit, `SELECT exit_tree.item_id, exit_tree.batch_num, - hez_idx(exit_tree.account_idx, token.symbol) AS account_idx, - account.bjj, account.eth_addr, - exit_tree.merkle_proof, exit_tree.balance, exit_tree.instant_withdrawn, - exit_tree.delayed_withdraw_request, exit_tree.delayed_withdrawn, - token.token_id, token.item_id AS token_item_id, - token.eth_block_num AS token_block, token.eth_addr AS token_eth_addr, token.name, token.symbol, - token.decimals, token.usd, token.usd_update - FROM exit_tree INNER JOIN account ON exit_tree.account_idx = account.idx - INNER JOIN token ON account.token_id = token.token_id - WHERE exit_tree.batch_num = $1 AND exit_tree.account_idx = $2;`, batchNum, idx, - ) - return exit, tracerr.Wrap(err) -} - -// GetExitsAPI returns a list of exits from the DB and pagination info -func (hdb *HistoryDB) GetExitsAPI( - ethAddr *ethCommon.Address, bjj *babyjub.PublicKeyComp, tokenID *common.TokenID, - idx *common.Idx, batchNum *uint, onlyPendingWithdraws *bool, - fromItem, limit *uint, order string, -) ([]ExitAPI, uint64, error) { - if ethAddr != nil && bjj != nil { - return nil, 0, tracerr.Wrap(errors.New("ethAddr and bjj are incompatible")) - } - var query string - var args []interface{} - queryStr := `SELECT exit_tree.item_id, exit_tree.batch_num, - hez_idx(exit_tree.account_idx, token.symbol) AS account_idx, - account.bjj, account.eth_addr, - exit_tree.merkle_proof, exit_tree.balance, exit_tree.instant_withdrawn, - exit_tree.delayed_withdraw_request, exit_tree.delayed_withdrawn, - token.token_id, token.item_id AS token_item_id, - token.eth_block_num AS token_block, token.eth_addr AS token_eth_addr, token.name, token.symbol, - token.decimals, token.usd, token.usd_update, COUNT(*) OVER() AS total_items - FROM exit_tree INNER JOIN account ON exit_tree.account_idx = account.idx - INNER JOIN token ON account.token_id = token.token_id ` - // Apply filters - nextIsAnd := false - // ethAddr filter - if ethAddr != nil { - queryStr += "WHERE account.eth_addr = ? " - nextIsAnd = true - args = append(args, ethAddr) - } else if bjj != nil { // bjj filter - queryStr += "WHERE account.bjj = ? " - nextIsAnd = true - args = append(args, bjj) - } - // tokenID filter - if tokenID != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "account.token_id = ? " - args = append(args, tokenID) - nextIsAnd = true - } - // idx filter - if idx != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "exit_tree.account_idx = ? " - args = append(args, idx) - nextIsAnd = true - } - // batchNum filter - if batchNum != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "exit_tree.batch_num = ? " - args = append(args, batchNum) - nextIsAnd = true - } - // onlyPendingWithdraws - if onlyPendingWithdraws != nil { - if *onlyPendingWithdraws { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "(exit_tree.instant_withdrawn IS NULL AND exit_tree.delayed_withdrawn IS NULL) " - nextIsAnd = true - } - } - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "exit_tree.item_id >= ? " - } else { - queryStr += "exit_tree.item_id <= ? " - } - args = append(args, fromItem) - // nextIsAnd = true - } - // pagination - queryStr += "ORDER BY exit_tree.item_id " - if order == OrderAsc { - queryStr += " ASC " - } else { - queryStr += " DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query = hdb.db.Rebind(queryStr) - // log.Debug(query) - exits := []*ExitAPI{} - if err := meddler.QueryAll(hdb.db, &exits, query, args...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - if len(exits) == 0 { - return []ExitAPI{}, 0, nil - } - return db.SlicePtrsToSlice(exits).([]ExitAPI), exits[0].TotalItems - uint64(len(exits)), nil -} - // GetAllL1UserTxs returns all L1UserTxs from the DB func (hdb *HistoryDB) GetAllL1UserTxs() ([]common.L1Tx, error) { var txs []*common.L1Tx @@ -1381,19 +777,6 @@ func (hdb *HistoryDB) GetAllBucketUpdates() ([]common.BucketUpdate, error) { return db.SlicePtrsToSlice(bucketUpdates).([]common.BucketUpdate), tracerr.Wrap(err) } -// GetBucketUpdates retrieves latest values for each bucket -func (hdb *HistoryDB) GetBucketUpdates() ([]BucketUpdateAPI, error) { - var bucketUpdates []*BucketUpdateAPI - err := meddler.QueryAll( - hdb.db, &bucketUpdates, - `SELECT num_bucket, withdrawals FROM bucket_update - WHERE item_id in(SELECT max(item_id) FROM bucket_update - group by num_bucket) - ORDER BY num_bucket ASC;`, - ) - return db.SlicePtrsToSlice(bucketUpdates).([]BucketUpdateAPI), tracerr.Wrap(err) -} - func (hdb *HistoryDB) addTokenExchanges(d meddler.DB, tokenExchanges []common.TokenExchange) error { if len(tokenExchanges) == 0 { return nil @@ -1698,274 +1081,22 @@ func (hdb *HistoryDB) GetCoordinatorAPI(bidderAddr ethCommon.Address) (*Coordina return coordinator, tracerr.Wrap(err) } -// GetCoordinatorsAPI returns a list of coordinators from the DB and pagination info -func (hdb *HistoryDB) GetCoordinatorsAPI( - bidderAddr, forgerAddr *ethCommon.Address, - fromItem, limit *uint, order string, -) ([]CoordinatorAPI, uint64, error) { - var query string - var args []interface{} - queryStr := `SELECT coordinator.*, COUNT(*) OVER() AS total_items - FROM coordinator INNER JOIN ( - SELECT MAX(item_id) AS item_id FROM coordinator - GROUP BY bidder_addr - ) c ON coordinator.item_id = c.item_id ` - // Apply filters - nextIsAnd := false - if bidderAddr != nil { - queryStr += "WHERE bidder_addr = ? " - nextIsAnd = true - args = append(args, bidderAddr) - } - if forgerAddr != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "forger_addr = ? " - nextIsAnd = true - args = append(args, forgerAddr) - } - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "coordinator.item_id >= ? " - } else { - queryStr += "coordinator.item_id <= ? " - } - args = append(args, fromItem) - } - // pagination - queryStr += "ORDER BY coordinator.item_id " - if order == OrderAsc { - queryStr += " ASC " - } else { - queryStr += " DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query = hdb.db.Rebind(queryStr) - - coordinators := []*CoordinatorAPI{} - if err := meddler.QueryAll(hdb.db, &coordinators, query, args...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - if len(coordinators) == 0 { - return []CoordinatorAPI{}, 0, nil - } - return db.SlicePtrsToSlice(coordinators).([]CoordinatorAPI), - coordinators[0].TotalItems - uint64(len(coordinators)), nil -} - // AddAuctionVars insert auction vars into the DB func (hdb *HistoryDB) AddAuctionVars(auctionVars *common.AuctionVariables) error { return tracerr.Wrap(meddler.Insert(hdb.db, "auction_vars", auctionVars)) } -// GetAuctionVars returns auction variables -func (hdb *HistoryDB) GetAuctionVars() (*common.AuctionVariables, error) { - auctionVars := &common.AuctionVariables{} - err := meddler.QueryRow( - hdb.db, auctionVars, `SELECT * FROM auction_vars;`, - ) - return auctionVars, tracerr.Wrap(err) -} - -// GetAuctionVarsUntilSetSlotNum returns all the updates of the auction vars -// from the last entry in which DefaultSlotSetBidSlotNum <= slotNum -func (hdb *HistoryDB) GetAuctionVarsUntilSetSlotNum(slotNum int64, maxItems int) ([]MinBidInfo, error) { - auctionVars := []*MinBidInfo{} - query := ` - SELECT DISTINCT default_slot_set_bid, default_slot_set_bid_slot_num FROM auction_vars - WHERE default_slot_set_bid_slot_num < $1 - ORDER BY default_slot_set_bid_slot_num DESC - LIMIT $2; - ` - err := meddler.QueryAll(hdb.db, &auctionVars, query, slotNum, maxItems) - if err != nil { - return nil, tracerr.Wrap(err) - } - return db.SlicePtrsToSlice(auctionVars).([]MinBidInfo), nil -} - -// GetAccountAPI returns an account by its index -func (hdb *HistoryDB) GetAccountAPI(idx common.Idx) (*AccountAPI, error) { - account := &AccountAPI{} - err := meddler.QueryRow(hdb.db, account, `SELECT account.item_id, hez_idx(account.idx, - token.symbol) as idx, account.batch_num, account.bjj, account.eth_addr, - token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, - token.eth_addr as token_eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update - FROM account INNER JOIN token ON account.token_id = token.token_id WHERE idx = $1;`, idx) - - if err != nil { - return nil, tracerr.Wrap(err) - } - - return account, nil -} - -// GetAccountsAPI returns a list of accounts from the DB and pagination info -func (hdb *HistoryDB) GetAccountsAPI( - tokenIDs []common.TokenID, ethAddr *ethCommon.Address, - bjj *babyjub.PublicKeyComp, fromItem, limit *uint, order string, -) ([]AccountAPI, uint64, error) { - if ethAddr != nil && bjj != nil { - return nil, 0, tracerr.Wrap(errors.New("ethAddr and bjj are incompatible")) - } - var query string - var args []interface{} - queryStr := `SELECT account.item_id, hez_idx(account.idx, token.symbol) as idx, account.batch_num, - account.bjj, account.eth_addr, token.token_id, token.item_id AS token_item_id, token.eth_block_num AS token_block, - token.eth_addr as token_eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update, - COUNT(*) OVER() AS total_items - FROM account INNER JOIN token ON account.token_id = token.token_id ` - // Apply filters - nextIsAnd := false - // ethAddr filter - if ethAddr != nil { - queryStr += "WHERE account.eth_addr = ? " - nextIsAnd = true - args = append(args, ethAddr) - } else if bjj != nil { // bjj filter - queryStr += "WHERE account.bjj = ? " - nextIsAnd = true - args = append(args, bjj) - } - // tokenID filter - if len(tokenIDs) > 0 { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - queryStr += "account.token_id IN (?) " - args = append(args, tokenIDs) - nextIsAnd = true - } - if fromItem != nil { - if nextIsAnd { - queryStr += "AND " - } else { - queryStr += "WHERE " - } - if order == OrderAsc { - queryStr += "account.item_id >= ? " - } else { - queryStr += "account.item_id <= ? " - } - args = append(args, fromItem) - } - // pagination - queryStr += "ORDER BY account.item_id " - if order == OrderAsc { - queryStr += " ASC " - } else { - queryStr += " DESC " - } - queryStr += fmt.Sprintf("LIMIT %d;", *limit) - query, argsQ, err := sqlx.In(queryStr, args...) - if err != nil { - return nil, 0, tracerr.Wrap(err) - } - query = hdb.db.Rebind(query) - - accounts := []*AccountAPI{} - if err := meddler.QueryAll(hdb.db, &accounts, query, argsQ...); err != nil { - return nil, 0, tracerr.Wrap(err) - } - if len(accounts) == 0 { - return []AccountAPI{}, 0, nil - } - - return db.SlicePtrsToSlice(accounts).([]AccountAPI), - accounts[0].TotalItems - uint64(len(accounts)), nil -} - -// GetMetrics returns metrics -func (hdb *HistoryDB) GetMetrics(lastBatchNum common.BatchNum) (*Metrics, error) { - metricsTotals := &MetricsTotals{} - metrics := &Metrics{} - err := meddler.QueryRow( - hdb.db, metricsTotals, `SELECT COUNT(tx.*) as total_txs, - COALESCE (MIN(tx.batch_num), 0) as batch_num, COALESCE (MIN(block.timestamp), - NOW()) AS min_timestamp, COALESCE (MAX(block.timestamp), NOW()) AS max_timestamp - FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num - WHERE block.timestamp >= NOW() - INTERVAL '24 HOURS';`) - if err != nil { - return nil, tracerr.Wrap(err) - } - - seconds := metricsTotals.MaxTimestamp.Sub(metricsTotals.MinTimestamp).Seconds() - // Avoid dividing by 0 - if seconds == 0 { - seconds++ - } - - metrics.TransactionsPerSecond = float64(metricsTotals.TotalTransactions) / seconds - - if (lastBatchNum - metricsTotals.FirstBatchNum) > 0 { - metrics.TransactionsPerBatch = float64(metricsTotals.TotalTransactions) / - float64(lastBatchNum-metricsTotals.FirstBatchNum+1) - } else { - metrics.TransactionsPerBatch = float64(0) - } - - err = meddler.QueryRow( - hdb.db, metricsTotals, `SELECT COUNT(*) AS total_batches, - COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch - WHERE batch_num > $1;`, metricsTotals.FirstBatchNum) - if err != nil { - return nil, tracerr.Wrap(err) - } - if metricsTotals.TotalBatches > 0 { - metrics.BatchFrequency = seconds / float64(metricsTotals.TotalBatches) - } else { - metrics.BatchFrequency = 0 - } - if metricsTotals.TotalTransactions > 0 { - metrics.AvgTransactionFee = metricsTotals.TotalFeesUSD / float64(metricsTotals.TotalTransactions) - } else { - metrics.AvgTransactionFee = 0 - } - err = meddler.QueryRow( - hdb.db, metrics, - `SELECT COUNT(*) AS total_bjjs, COUNT(DISTINCT(bjj)) AS total_accounts FROM account;`) - if err != nil { +// GetTokensTest used to get tokens in a testing context +func (hdb *HistoryDB) GetTokensTest() ([]TokenWithUSD, error) { + tokens := []*TokenWithUSD{} + if err := meddler.QueryAll( + hdb.db, &tokens, + "SELECT * FROM TOKEN", + ); err != nil { return nil, tracerr.Wrap(err) } - - return metrics, nil -} - -// GetAvgTxFee returns average transaction fee of the last 1h -func (hdb *HistoryDB) GetAvgTxFee() (float64, error) { - metricsTotals := &MetricsTotals{} - err := meddler.QueryRow( - hdb.db, metricsTotals, `SELECT COUNT(tx.*) as total_txs, - COALESCE (MIN(tx.batch_num), 0) as batch_num - FROM tx INNER JOIN block ON tx.eth_block_num = block.eth_block_num - WHERE block.timestamp >= NOW() - INTERVAL '1 HOURS';`) - if err != nil { - return 0, tracerr.Wrap(err) - } - err = meddler.QueryRow( - hdb.db, metricsTotals, `SELECT COUNT(*) AS total_batches, - COALESCE (SUM(total_fees_usd), 0) AS total_fees FROM batch - WHERE batch_num > $1;`, metricsTotals.FirstBatchNum) - if err != nil { - return 0, tracerr.Wrap(err) - } - - var avgTransactionFee float64 - if metricsTotals.TotalTransactions > 0 { - avgTransactionFee = metricsTotals.TotalFeesUSD / float64(metricsTotals.TotalTransactions) - } else { - avgTransactionFee = 0 + if len(tokens) == 0 { + return []TokenWithUSD{}, nil } - - return avgTransactionFee, nil + return db.SlicePtrsToSlice(tokens).([]TokenWithUSD), nil } diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index bb6c46a..49dd546 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -22,6 +22,7 @@ import ( ) var historyDB *HistoryDB +var historyDBWithACC *HistoryDB // In order to run the test you need to run a Posgres DB with // a database named "history" that is accessible by @@ -38,10 +39,12 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - historyDB = NewHistoryDB(db) + historyDB = NewHistoryDB(db, nil) if err != nil { panic(err) } + apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second) + historyDBWithACC = NewHistoryDB(db, apiConnCon) // Run tests result := m.Run() // Close DB @@ -85,7 +88,7 @@ func TestBlocks(t *testing.T) { blocks..., ) // Get all blocks from DB - fetchedBlocks, err := historyDB.GetBlocks(fromBlock, toBlock) + fetchedBlocks, err := historyDB.getBlocks(fromBlock, toBlock) assert.Equal(t, len(blocks), len(fetchedBlocks)) // Compare generated vs getted blocks assert.NoError(t, err) @@ -245,9 +248,8 @@ func TestTokens(t *testing.T) { err := historyDB.AddTokens(tokens) assert.NoError(t, err) tokens = append([]common.Token{ethToken}, tokens...) - limit := uint(10) // Fetch tokens - fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, OrderAsc) + fetchedTokens, err := historyDB.GetTokensTest() assert.NoError(t, err) // Compare fetched tokens vs generated tokens // All the tokens should have USDUpdate setted by the DB trigger @@ -267,7 +269,7 @@ func TestTokens(t *testing.T) { assert.NoError(t, historyDB.UpdateTokenValue(token.Symbol, value)) } // Fetch tokens - fetchedTokens, _, err = historyDB.GetTokens(nil, nil, "", nil, &limit, OrderAsc) + fetchedTokens, err = historyDB.GetTokensTest() assert.NoError(t, err) // Compare fetched tokens vs generated tokens // All the tokens should have USDUpdate setted by the DB trigger @@ -302,9 +304,8 @@ func TestTokensUTF8(t *testing.T) { assert.NoError(t, err) // Work with nonUTFTokens as tokens one gets updated and non UTF-8 characters are lost nonUTFTokens = append([]common.Token{ethToken}, nonUTFTokens...) - limit := uint(10) // Fetch tokens - fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, OrderAsc) + fetchedTokens, err := historyDB.GetTokensTest() assert.NoError(t, err) // Compare fetched tokens vs generated tokens // All the tokens should have USDUpdate setted by the DB trigger @@ -324,7 +325,7 @@ func TestTokensUTF8(t *testing.T) { assert.NoError(t, historyDB.UpdateTokenValue(token.Symbol, value)) } // Fetch tokens - fetchedTokens, _, err = historyDB.GetTokens(nil, nil, "", nil, &limit, OrderAsc) + fetchedTokens, err = historyDB.GetTokensTest() assert.NoError(t, err) // Compare fetched tokens vs generated tokens // All the tokens should have USDUpdate setted by the DB trigger @@ -1087,9 +1088,8 @@ func TestAddEscapeHatchWithdrawals(t *testing.T) { assert.Equal(t, escapeHatchWithdrawals, dbEscapeHatchWithdrawals) } -func TestGetMetrics(t *testing.T) { +func TestGetMetricsAPI(t *testing.T) { test.WipeDB(historyDB.DB()) - set := ` Type: Blockchain @@ -1146,7 +1146,7 @@ func TestGetMetrics(t *testing.T) { assert.NoError(t, err) } - res, err := historyDB.GetMetrics(common.BatchNum(numBatches)) + res, err := historyDBWithACC.GetMetricsAPI(common.BatchNum(numBatches)) assert.NoError(t, err) assert.Equal(t, float64(numTx)/float64(numBatches-1), res.TransactionsPerBatch) @@ -1165,7 +1165,7 @@ func TestGetMetrics(t *testing.T) { assert.Equal(t, float64(0), res.AvgTransactionFee) } -func TestGetMetricsMoreThan24Hours(t *testing.T) { +func TestGetMetricsAPIMoreThan24Hours(t *testing.T) { test.WipeDB(historyDB.DB()) testUsersLen := 3 @@ -1226,7 +1226,7 @@ func TestGetMetricsMoreThan24Hours(t *testing.T) { assert.NoError(t, err) } - res, err := historyDB.GetMetrics(common.BatchNum(numBatches)) + res, err := historyDBWithACC.GetMetricsAPI(common.BatchNum(numBatches)) assert.NoError(t, err) assert.Equal(t, math.Trunc((float64(numTx)/float64(numBatches-1))/0.001)*0.001, math.Trunc(res.TransactionsPerBatch/0.001)*0.001) @@ -1245,15 +1245,15 @@ func TestGetMetricsMoreThan24Hours(t *testing.T) { assert.Equal(t, float64(0), res.AvgTransactionFee) } -func TestGetMetricsEmpty(t *testing.T) { +func TestGetMetricsAPIEmpty(t *testing.T) { test.WipeDB(historyDB.DB()) - _, err := historyDB.GetMetrics(0) + _, err := historyDBWithACC.GetMetricsAPI(0) assert.NoError(t, err) } func TestGetAvgTxFeeEmpty(t *testing.T) { test.WipeDB(historyDB.DB()) - _, err := historyDB.GetAvgTxFee() + _, err := historyDBWithACC.GetAvgTxFeeAPI() assert.NoError(t, err) } diff --git a/db/l2db/apiqueries.go b/db/l2db/apiqueries.go new file mode 100644 index 0000000..114066d --- /dev/null +++ b/db/l2db/apiqueries.go @@ -0,0 +1,85 @@ +package l2db + +import ( + ethCommon "github.com/ethereum/go-ethereum/common" + "github.com/hermeznetwork/hermez-node/common" + "github.com/hermeznetwork/tracerr" + "github.com/russross/meddler" +) + +// AddAccountCreationAuthAPI inserts an account creation authorization into the DB +func (l2db *L2DB) AddAccountCreationAuthAPI(auth *common.AccountCreationAuth) error { + cancel, err := l2db.apiConnCon.Acquire() + defer cancel() + if err != nil { + return tracerr.Wrap(err) + } + defer l2db.apiConnCon.Release() + return l2db.AddAccountCreationAuth(auth) +} + +// GetAccountCreationAuthAPI returns an account creation authorization from the DB +func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCreationAuthAPI, error) { + cancel, err := l2db.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer l2db.apiConnCon.Release() + auth := new(AccountCreationAuthAPI) + return auth, tracerr.Wrap(meddler.QueryRow( + l2db.db, auth, + "SELECT * FROM account_creation_auth WHERE eth_addr = $1;", + addr, + )) +} + +// AddTxAPI inserts a tx to the pool +func (l2db *L2DB) AddTxAPI(tx *PoolL2TxWrite) error { + cancel, err := l2db.apiConnCon.Acquire() + defer cancel() + if err != nil { + return tracerr.Wrap(err) + } + defer l2db.apiConnCon.Release() + row := l2db.db.QueryRow( + "SELECT COUNT(*) FROM tx_pool WHERE state = $1;", + common.PoolL2TxStatePending, + ) + var totalTxs uint32 + if err := row.Scan(&totalTxs); err != nil { + return tracerr.Wrap(err) + } + if totalTxs >= l2db.maxTxs { + return tracerr.New( + "The pool is at full capacity. More transactions are not accepted currently", + ) + } + return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx)) +} + +// selectPoolTxAPI select part of queries to get PoolL2TxRead +const selectPoolTxAPI = `SELECT tx_pool.tx_id, hez_idx(tx_pool.from_idx, token.symbol) AS from_idx, tx_pool.effective_from_eth_addr, +tx_pool.effective_from_bjj, hez_idx(tx_pool.to_idx, token.symbol) AS to_idx, tx_pool.effective_to_eth_addr, +tx_pool.effective_to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce, +tx_pool.state, tx_pool.info, tx_pool.signature, tx_pool.timestamp, tx_pool.batch_num, hez_idx(tx_pool.rq_from_idx, token.symbol) AS rq_from_idx, +hez_idx(tx_pool.rq_to_idx, token.symbol) AS rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount, +tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type, +token.item_id AS token_item_id, token.eth_block_num, token.eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update +FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id ` + +// GetTxAPI return the specified Tx in PoolTxAPI format +func (l2db *L2DB) GetTxAPI(txID common.TxID) (*PoolTxAPI, error) { + cancel, err := l2db.apiConnCon.Acquire() + defer cancel() + if err != nil { + return nil, tracerr.Wrap(err) + } + defer l2db.apiConnCon.Release() + tx := new(PoolTxAPI) + return tx, tracerr.Wrap(meddler.QueryRow( + l2db.db, tx, + selectPoolTxAPI+"WHERE tx_id = $1;", + txID, + )) +} diff --git a/db/l2db/l2db.go b/db/l2db/l2db.go index 0cd90df..3ed8547 100644 --- a/db/l2db/l2db.go +++ b/db/l2db/l2db.go @@ -25,17 +25,25 @@ type L2DB struct { safetyPeriod common.BatchNum ttl time.Duration maxTxs uint32 // limit of txs that are accepted in the pool + apiConnCon *db.APIConnectionController } // NewL2DB creates a L2DB. // To create it, it's needed db connection, safety period expressed in batches, // maxTxs that the DB should have and TTL (time to live) for pending txs. -func NewL2DB(db *sqlx.DB, safetyPeriod common.BatchNum, maxTxs uint32, TTL time.Duration) *L2DB { +func NewL2DB( + db *sqlx.DB, + safetyPeriod common.BatchNum, + maxTxs uint32, + TTL time.Duration, + apiConnCon *db.APIConnectionController, +) *L2DB { return &L2DB{ db: db, safetyPeriod: safetyPeriod, ttl: TTL, maxTxs: maxTxs, + apiConnCon: apiConnCon, } } @@ -47,7 +55,6 @@ func (l2db *L2DB) DB() *sqlx.DB { // AddAccountCreationAuth inserts an account creation authorization into the DB func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error { - // return meddler.Insert(l2db.db, "account_creation_auth", auth) _, err := l2db.db.Exec( `INSERT INTO account_creation_auth (eth_addr, bjj, signature) VALUES ($1, $2, $3);`, @@ -66,16 +73,6 @@ func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.Accoun )) } -// GetAccountCreationAuthAPI returns an account creation authorization from the DB -func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCreationAuthAPI, error) { - auth := new(AccountCreationAuthAPI) - return auth, tracerr.Wrap(meddler.QueryRow( - l2db.db, auth, - "SELECT * FROM account_creation_auth WHERE eth_addr = $1;", - addr, - )) -} - // AddTx inserts a tx to the pool func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error { row := l2db.db.QueryRow( @@ -173,16 +170,6 @@ func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error { return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", insertTx)) } -// selectPoolTxAPI select part of queries to get PoolL2TxRead -const selectPoolTxAPI = `SELECT tx_pool.tx_id, hez_idx(tx_pool.from_idx, token.symbol) AS from_idx, tx_pool.effective_from_eth_addr, -tx_pool.effective_from_bjj, hez_idx(tx_pool.to_idx, token.symbol) AS to_idx, tx_pool.effective_to_eth_addr, -tx_pool.effective_to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce, -tx_pool.state, tx_pool.info, tx_pool.signature, tx_pool.timestamp, tx_pool.batch_num, hez_idx(tx_pool.rq_from_idx, token.symbol) AS rq_from_idx, -hez_idx(tx_pool.rq_to_idx, token.symbol) AS rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount, -tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type, -token.item_id AS token_item_id, token.eth_block_num, token.eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update -FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id ` - // selectPoolTxCommon select part of queries to get common.PoolL2Tx const selectPoolTxCommon = `SELECT tx_pool.tx_id, from_idx, to_idx, tx_pool.to_eth_addr, tx_pool.to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce, @@ -202,16 +189,6 @@ func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) { )) } -// GetTxAPI return the specified Tx in PoolTxAPI format -func (l2db *L2DB) GetTxAPI(txID common.TxID) (*PoolTxAPI, error) { - tx := new(PoolTxAPI) - return tx, tracerr.Wrap(meddler.QueryRow( - l2db.db, tx, - selectPoolTxAPI+"WHERE tx_id = $1;", - txID, - )) -} - // GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) { var txs []*common.PoolL2Tx diff --git a/db/l2db/l2db_test.go b/db/l2db/l2db_test.go index 7a7d811..2a12748 100644 --- a/db/l2db/l2db_test.go +++ b/db/l2db/l2db_test.go @@ -21,6 +21,7 @@ import ( ) var l2DB *L2DB +var l2DBWithACC *L2DB var historyDB *historydb.HistoryDB var tc *til.Context var tokens map[common.TokenID]historydb.TokenWithUSD @@ -34,9 +35,11 @@ func TestMain(m *testing.M) { if err != nil { panic(err) } - l2DB = NewL2DB(db, 10, 1000, 24*time.Hour) + l2DB = NewL2DB(db, 10, 1000, 24*time.Hour, nil) + apiConnCon := dbUtils.NewAPICnnectionController(1, time.Second) + l2DBWithACC = NewL2DB(db, 10, 1000, 24*time.Hour, apiConnCon) test.WipeDB(l2DB.DB()) - historyDB = historydb.NewHistoryDB(db) + historyDB = historydb.NewHistoryDB(db, nil) // Run tests result := m.Run() // Close DB @@ -267,7 +270,7 @@ func TestStartForging(t *testing.T) { assert.NoError(t, err) // Fetch txs and check that they've been updated correctly for _, id := range startForgingTxIDs { - fetchedTx, err := l2DB.GetTxAPI(id) + fetchedTx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Equal(t, common.PoolL2TxStateForging, fetchedTx.State) assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum) @@ -312,7 +315,7 @@ func TestDoneForging(t *testing.T) { // Fetch txs and check that they've been updated correctly for _, id := range doneForgingTxIDs { - fetchedTx, err := l2DB.GetTxAPI(id) + fetchedTx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Equal(t, common.PoolL2TxStateForged, fetchedTx.State) assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum) @@ -344,7 +347,7 @@ func TestInvalidate(t *testing.T) { assert.NoError(t, err) // Fetch txs and check that they've been updated correctly for _, id := range invalidTxIDs { - fetchedTx, err := l2DB.GetTxAPI(id) + fetchedTx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Equal(t, common.PoolL2TxStateInvalid, fetchedTx.State) assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum) @@ -385,7 +388,7 @@ func TestInvalidateOldNonces(t *testing.T) { assert.NoError(t, err) // Fetch txs and check that they've been updated correctly for _, id := range invalidTxIDs { - fetchedTx, err := l2DB.GetTxAPI(id) + fetchedTx, err := l2DBWithACC.GetTxAPI(id) require.NoError(t, err) assert.Equal(t, common.PoolL2TxStateInvalid, fetchedTx.State) assert.Equal(t, &fakeBatchNum, fetchedTx.BatchNum) @@ -460,13 +463,13 @@ func TestReorg(t *testing.T) { err = l2DB.Reorg(lastValidBatch) assert.NoError(t, err) for _, id := range reorgedTxIDs { - tx, err := l2DB.GetTxAPI(id) + tx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Nil(t, tx.BatchNum) assert.Equal(t, common.PoolL2TxStatePending, tx.State) } for _, id := range nonReorgedTxIDs { - fetchedTx, err := l2DB.GetTxAPI(id) + fetchedTx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Equal(t, lastValidBatch, *fetchedTx.BatchNum) } @@ -537,13 +540,13 @@ func TestReorg2(t *testing.T) { err = l2DB.Reorg(lastValidBatch) assert.NoError(t, err) for _, id := range reorgedTxIDs { - tx, err := l2DB.GetTxAPI(id) + tx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Nil(t, tx.BatchNum) assert.Equal(t, common.PoolL2TxStatePending, tx.State) } for _, id := range nonReorgedTxIDs { - fetchedTx, err := l2DB.GetTxAPI(id) + fetchedTx, err := l2DBWithACC.GetTxAPI(id) assert.NoError(t, err) assert.Equal(t, lastValidBatch, *fetchedTx.BatchNum) } diff --git a/db/utils.go b/db/utils.go index 579f705..f376736 100644 --- a/db/utils.go +++ b/db/utils.go @@ -1,16 +1,19 @@ package db import ( + "context" "database/sql" "fmt" "math/big" "reflect" "strings" + "time" "github.com/gobuffalo/packr/v2" "github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/tracerr" "github.com/jmoiron/sqlx" + "github.com/marusama/semaphore/v2" migrate "github.com/rubenv/sql-migrate" "github.com/russross/meddler" ) @@ -84,6 +87,32 @@ func InitSQLDB(port int, host, user, password, name string) (*sqlx.DB, error) { return db, nil } +// APIConnectionController is used to limit the SQL open connections used by the API +type APIConnectionController struct { + smphr semaphore.Semaphore + timeout time.Duration +} + +// NewAPICnnectionController initialize APIConnectionController +func NewAPICnnectionController(maxConnections int, timeout time.Duration) *APIConnectionController { + return &APIConnectionController{ + smphr: semaphore.New(maxConnections), + timeout: timeout, + } +} + +// Acquire reserves a SQL connection. If the connection is not acquired +// within the timeout, the function will return an error +func (acc *APIConnectionController) Acquire() (context.CancelFunc, error) { + ctx, cancel := context.WithTimeout(context.Background(), acc.timeout) //nolint:govet + return cancel, acc.smphr.Acquire(ctx, 1) +} + +// Release frees a SQL connection +func (acc *APIConnectionController) Release() { + acc.smphr.Release(1) +} + // initMeddler registers tags to be used to read/write from SQL DBs using meddler func initMeddler() { meddler.Register("bigint", BigIntMeddler{}) diff --git a/go.mod b/go.mod index 53bfdc5..11924f5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/jmoiron/sqlx v1.2.1-0.20200615141059-0794cb1f47ee github.com/joho/godotenv v1.3.0 github.com/lib/pq v1.8.0 + github.com/marusama/semaphore/v2 v2.4.1 github.com/mattn/go-sqlite3 v2.0.3+incompatible github.com/miguelmota/go-ethereum-hdwallet v0.0.0-20200123000308-a60dcd172b4c github.com/mitchellh/copystructure v1.0.0 diff --git a/go.sum b/go.sum index e06a659..fd813a8 100644 --- a/go.sum +++ b/go.sum @@ -415,6 +415,9 @@ github.com/markbates/oncer v1.0.0 h1:E83IaVAHygyndzPimgUYJjbshhDTALZyXxvk9FOlQRY github.com/markbates/oncer v1.0.0/go.mod h1:Z59JA581E9GP6w96jai+TGqafHPW+cPfRxz2aSZ0mcI= github.com/markbates/safe v1.0.1 h1:yjZkbvRM6IzKj9tlu/zMJLS0n/V351OZWRnF3QfaUxI= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= +github.com/marusama/semaphore v0.0.0-20190110074507-6952cef993b2 h1:sq+a5mb8zHbmHhrIH06oqIMGsanjpbxNgxEgZVfgpvQ= +github.com/marusama/semaphore/v2 v2.4.1 h1:Y29DhhFMvreVgoqF9EtaSJAF9t2E7Sk7i5VW81sqB8I= +github.com/marusama/semaphore/v2 v2.4.1/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= diff --git a/node/node.go b/node/node.go index edd684c..db6e4d5 100644 --- a/node/node.go +++ b/node/node.go @@ -83,8 +83,15 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { if err != nil { return nil, tracerr.Wrap(err) } + var apiConnCon *dbUtils.APIConnectionController + if cfg.API.Explorer || mode == ModeCoordinator { + apiConnCon = dbUtils.NewAPICnnectionController( + cfg.API.MaxSQLConnections, + cfg.API.SQLConnectionTimeout.Duration, + ) + } - historyDB := historydb.NewHistoryDB(db) + historyDB := historydb.NewHistoryDB(db, apiConnCon) ethClient, err := ethclient.Dial(cfg.Web3.URL) if err != nil { @@ -193,6 +200,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { cfg.Coordinator.L2DB.SafetyPeriod, cfg.Coordinator.L2DB.MaxTxs, cfg.Coordinator.L2DB.TTL.Duration, + apiConnCon, ) // Unlock FeeAccount EthAddr in the keystore to generate the diff --git a/priceupdater/priceupdater_test.go b/priceupdater/priceupdater_test.go index 650ad3d..3601dbd 100644 --- a/priceupdater/priceupdater_test.go +++ b/priceupdater/priceupdater_test.go @@ -20,7 +20,7 @@ func TestPriceUpdater(t *testing.T) { pass := os.Getenv("POSTGRES_PASS") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") assert.NoError(t, err) - historyDB := historydb.NewHistoryDB(db) + historyDB := historydb.NewHistoryDB(db, nil) // Clean DB test.WipeDB(historyDB.DB()) // Populate DB @@ -46,8 +46,7 @@ func TestPriceUpdater(t *testing.T) { // Update prices pu.UpdatePrices(context.Background()) // Check that prices have been updated - limit := uint(10) - fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, historydb.OrderAsc) + fetchedTokens, err := historyDB.GetTokensTest() require.NoError(t, err) // TokenID 0 (ETH) is always on the DB assert.Equal(t, 2, len(fetchedTokens)) diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index a356f8c..fbb3bfc 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -314,7 +314,7 @@ func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { pass := os.Getenv("POSTGRES_PASS") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") require.NoError(t, err) - historyDB := historydb.NewHistoryDB(db) + historyDB := historydb.NewHistoryDB(db, nil) // Clear DB test.WipeDB(historyDB.DB()) diff --git a/test/zkproof/flows_test.go b/test/zkproof/flows_test.go index 255407a..c961041 100644 --- a/test/zkproof/flows_test.go +++ b/test/zkproof/flows_test.go @@ -38,7 +38,7 @@ func addTokens(t *testing.T, tc *til.Context, db *sqlx.DB) { }) } - hdb := historydb.NewHistoryDB(db) + hdb := historydb.NewHistoryDB(db, nil) assert.NoError(t, hdb.AddBlock(&common.Block{ Num: 1, })) @@ -75,7 +75,7 @@ func initTxSelector(t *testing.T, chainID uint16, hermezContractAddr ethCommon.A pass := os.Getenv("POSTGRES_PASS") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") require.NoError(t, err) - l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour) + l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil) dir, err := ioutil.TempDir("", "tmpSyncDB") require.NoError(t, err) diff --git a/txselector/txselector_test.go b/txselector/txselector_test.go index 516ea35..681ad62 100644 --- a/txselector/txselector_test.go +++ b/txselector/txselector_test.go @@ -29,7 +29,7 @@ func initTest(t *testing.T, chainID uint16, hermezContractAddr ethCommon.Address pass := os.Getenv("POSTGRES_PASS") db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez") require.NoError(t, err) - l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour) + l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour, nil) dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) @@ -105,7 +105,7 @@ func addTokens(t *testing.T, tc *til.Context, db *sqlx.DB) { }) } - hdb := historydb.NewHistoryDB(db) + hdb := historydb.NewHistoryDB(db, nil) assert.NoError(t, hdb.AddBlock(&common.Block{ Num: 1, }))