Add semaphore for API queries to SQL

This commit is contained in:
Arnau B
2021-02-08 18:50:32 +01:00
committed by Eduard S
parent 56d05ce57a
commit ac1fd9acf7
27 changed files with 1339 additions and 981 deletions

View File

@@ -26,7 +26,7 @@ func (a *API) postAccountCreationAuth(c *gin.Context) {
return
}
// Insert to DB
if err := a.l2.AddAccountCreationAuth(commonAuth); err != nil {
if err := a.l2.AddAccountCreationAuthAPI(commonAuth); err != nil {
retSQLErr(err, c)
return
}

View File

@@ -11,6 +11,7 @@ import (
"net/http"
"os"
"strconv"
"sync"
"testing"
"time"
@@ -27,6 +28,7 @@ import (
"github.com/hermeznetwork/hermez-node/test/til"
"github.com/hermeznetwork/hermez-node/test/txsets"
"github.com/hermeznetwork/tracerr"
"github.com/stretchr/testify/require"
)
// Pendinger is an interface that allows getting last returned item ID and PendingItems to be used for building fromItem
@@ -199,7 +201,8 @@ func TestMain(m *testing.M) {
if err != nil {
panic(err)
}
hdb := historydb.NewHistoryDB(database)
apiConnCon := db.NewAPICnnectionController(1, time.Second)
hdb := historydb.NewHistoryDB(database, apiConnCon)
if err != nil {
panic(err)
}
@@ -218,7 +221,7 @@ func TestMain(m *testing.M) {
panic(err)
}
// L2DB
l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour)
l2DB := l2db.NewL2DB(database, 10, 1000, 24*time.Hour, apiConnCon)
test.WipeDB(l2DB.DB()) // this will clean HistoryDB and L2DB
// Config (smart contract constants)
chainID := uint16(0)
@@ -574,6 +577,82 @@ func TestMain(m *testing.M) {
os.Exit(result)
}
func TestTimeout(t *testing.T) {
pass := os.Getenv("POSTGRES_PASS")
databaseTO, err := db.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err)
apiConnConTO := db.NewAPICnnectionController(1, 100*time.Millisecond)
hdbTO := historydb.NewHistoryDB(databaseTO, apiConnConTO)
require.NoError(t, err)
// L2DB
l2DBTO := l2db.NewL2DB(databaseTO, 10, 1000, 24*time.Hour, apiConnConTO)
// API
apiGinTO := gin.Default()
finishWait := make(chan interface{})
startWait := make(chan interface{})
apiGinTO.GET("/wait", func(c *gin.Context) {
cancel, err := apiConnConTO.Acquire()
defer cancel()
require.NoError(t, err)
defer apiConnConTO.Release()
startWait <- nil
<-finishWait
})
// Start server
serverTO := &http.Server{Addr: ":4444", Handler: apiGinTO}
go func() {
if err := serverTO.ListenAndServe(); err != nil && tracerr.Unwrap(err) != http.ErrServerClosed {
require.NoError(t, err)
}
}()
_config := getConfigTest(0)
_, err = NewAPI(
true,
true,
apiGinTO,
hdbTO,
nil,
l2DBTO,
&_config,
)
require.NoError(t, err)
client := &http.Client{}
httpReq, err := http.NewRequest("GET", "http://localhost:4444/tokens", nil)
require.NoError(t, err)
httpReqWait, err := http.NewRequest("GET", "http://localhost:4444/wait", nil)
require.NoError(t, err)
// Request that will get timed out
var wg sync.WaitGroup
wg.Add(1)
go func() {
// Request that will make the API busy
_, err = client.Do(httpReqWait)
require.NoError(t, err)
wg.Done()
}()
<-startWait
resp, err := client.Do(httpReq)
require.NoError(t, err)
require.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
defer resp.Body.Close() //nolint
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
// Unmarshal body into return struct
msg := &errorMsg{}
err = json.Unmarshal(body, msg)
require.NoError(t, err)
// Check that the error was the expected down
require.Equal(t, errSQLTimeout, msg.Message)
finishWait <- nil
// Stop server
wg.Wait()
require.NoError(t, serverTO.Shutdown(context.Background()))
require.NoError(t, databaseTO.Close())
}
func doGoodReqPaginated(
path, order string,
iterStruct Pendinger,

View File

@@ -108,7 +108,7 @@ func (a *API) getFullBatch(c *gin.Context) {
}
// Fetch txs forged in the batch from historyDB
maxTxsPerBatch := uint(2048) //nolint:gomnd
txs, _, err := a.h.GetHistoryTxs(
txs, _, err := a.h.GetTxsAPI(
nil, nil, nil, nil, batchNum, nil, nil, &maxTxsPerBatch, historydb.OrderAsc,
)
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {

View File

@@ -30,6 +30,12 @@ const (
// Error for duplicated key
errDuplicatedKey = "Item already exists"
// Error for timeout due to SQL connection
errSQLTimeout = "The node is under heavy preasure, please try again later"
// Error message returned when context reaches timeout
errCtxTimeout = "context deadline exceeded"
)
var (
@@ -38,16 +44,20 @@ var (
)
func retSQLErr(err error, c *gin.Context) {
log.Warn("HTTP API SQL request error", "err", err)
if sqlErr, ok := tracerr.Unwrap(err).(*pq.Error); ok {
log.Warnw("HTTP API SQL request error", "err", err)
errMsg := tracerr.Unwrap(err).Error()
if errMsg == errCtxTimeout {
c.JSON(http.StatusServiceUnavailable, errorMsg{
Message: errSQLTimeout,
})
} else if sqlErr, ok := tracerr.Unwrap(err).(*pq.Error); ok {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
if sqlErr.Code == "23505" {
c.JSON(http.StatusInternalServerError, errorMsg{
Message: errDuplicatedKey,
})
}
}
if tracerr.Unwrap(err) == sql.ErrNoRows {
} else if tracerr.Unwrap(err) == sql.ErrNoRows {
c.JSON(http.StatusNotFound, errorMsg{
Message: err.Error(),
})
@@ -59,7 +69,7 @@ func retSQLErr(err error, c *gin.Context) {
}
func retBadReq(err error, c *gin.Context) {
log.Warn("HTTP API Bad request error", "err", err)
log.Warnw("HTTP API Bad request error", "err", err)
c.JSON(http.StatusBadRequest, errorMsg{
Message: err.Error(),
})

View File

@@ -97,12 +97,12 @@ func (a *API) getSlot(c *gin.Context) {
retBadReq(err, c)
return
}
currentBlock, err := a.h.GetLastBlock()
currentBlock, err := a.h.GetLastBlockAPI()
if err != nil {
retBadReq(err, c)
return
}
auctionVars, err := a.h.GetAuctionVars()
auctionVars, err := a.h.GetAuctionVarsAPI()
if err != nil {
retBadReq(err, c)
return
@@ -200,12 +200,12 @@ func (a *API) getSlots(c *gin.Context) {
return
}
currentBlock, err := a.h.GetLastBlock()
currentBlock, err := a.h.GetLastBlockAPI()
if err != nil {
retBadReq(err, c)
return
}
auctionVars, err := a.h.GetAuctionVars()
auctionVars, err := a.h.GetAuctionVarsAPI()
if err != nil {
retBadReq(err, c)
return
@@ -220,13 +220,13 @@ func (a *API) getSlots(c *gin.Context) {
retBadReq(errors.New("It is necessary to add maxSlotNum filter"), c)
return
} else if *finishedAuction {
currentBlock, err := a.h.GetLastBlock()
currentBlock, err := a.h.GetLastBlockAPI()
if err != nil {
retBadReq(err, c)
return
}
currentSlot := a.getCurrentSlot(currentBlock.Num)
auctionVars, err := a.h.GetAuctionVars()
auctionVars, err := a.h.GetAuctionVarsAPI()
if err != nil {
retBadReq(err, c)
return

View File

@@ -141,7 +141,7 @@ func (a *API) UpdateNetworkInfo(
a.status.Network.NextForgers = nextForgers
// Update buckets withdrawals
bucketsUpdate, err := a.h.GetBucketUpdates()
bucketsUpdate, err := a.h.GetBucketUpdatesAPI()
if tracerr.Unwrap(err) == sql.ErrNoRows {
bucketsUpdate = nil
} else if err != nil {
@@ -201,7 +201,7 @@ func (a *API) getNextForgers(lastBlock common.Block, currentSlot, lastClosedSlot
}}
} else {
// Get all the relevant updates from the DB
minBidInfo, err = a.h.GetAuctionVarsUntilSetSlotNum(lastClosedSlot, int(lastClosedSlot-currentSlot)+1)
minBidInfo, err = a.h.GetAuctionVarsUntilSetSlotNumAPI(lastClosedSlot, int(lastClosedSlot-currentSlot)+1)
if err != nil {
return nil, tracerr.Wrap(err)
}
@@ -279,7 +279,7 @@ func (a *API) UpdateMetrics() error {
}
batchNum := a.status.Network.LastBatch.BatchNum
a.status.RUnlock()
metrics, err := a.h.GetMetrics(batchNum)
metrics, err := a.h.GetMetricsAPI(batchNum)
if err != nil {
return tracerr.Wrap(err)
}
@@ -293,7 +293,7 @@ func (a *API) UpdateMetrics() error {
// UpdateRecommendedFee update Status.RecommendedFee information
func (a *API) UpdateRecommendedFee() error {
feeExistingAccount, err := a.h.GetAvgTxFee()
feeExistingAccount, err := a.h.GetAvgTxFeeAPI()
if err != nil {
return tracerr.Wrap(err)
}

View File

@@ -22,7 +22,7 @@ func (a *API) getToken(c *gin.Context) {
}
tokenID := common.TokenID(*tokenIDUint)
// Fetch token from historyDB
token, err := a.h.GetToken(tokenID)
token, err := a.h.GetTokenAPI(tokenID)
if err != nil {
retSQLErr(err, c)
return
@@ -45,7 +45,7 @@ func (a *API) getTokens(c *gin.Context) {
return
}
// Fetch exits from historyDB
tokens, pendingItems, err := a.h.GetTokens(
tokens, pendingItems, err := a.h.GetTokensAPI(
tokenIDs, symbols, name, fromItem, limit, order,
)
if err != nil {

View File

@@ -34,7 +34,7 @@ func (a *API) getHistoryTxs(c *gin.Context) {
}
// Fetch txs from historyDB
txs, pendingItems, err := a.h.GetHistoryTxs(
txs, pendingItems, err := a.h.GetTxsAPI(
addr, bjj, tokenID, idx, batchNum, txType, fromItem, limit, order,
)
if err != nil {
@@ -61,7 +61,7 @@ func (a *API) getHistoryTx(c *gin.Context) {
return
}
// Fetch tx from historyDB
tx, err := a.h.GetHistoryTx(txID)
tx, err := a.h.GetTxAPI(txID)
if err != nil {
retSQLErr(err, c)
return

View File

@@ -28,7 +28,7 @@ func (a *API) postPoolTx(c *gin.Context) {
return
}
// Insert to DB
if err := a.l2.AddTx(writeTx); err != nil {
if err := a.l2.AddTxAPI(writeTx); err != nil {
retSQLErr(err, c)
return
}