Browse Source

Connect price updater to historydb

feature/sql-semaphore1
Arnau B 3 years ago
parent
commit
f653ff8a73
3 changed files with 100 additions and 117 deletions
  1. +21
    -3
      db/historydb/historydb.go
  2. +29
    -86
      priceupdater/priceupdater.go
  3. +50
    -28
      priceupdater/priceupdater_test.go

+ 21
- 3
db/historydb/historydb.go

@ -259,10 +259,10 @@ func (hdb *HistoryDB) addTokens(d meddler.DB, tokens []common.Token) error {
}
// UpdateTokenValue updates the USD value of a token
func (hdb *HistoryDB) UpdateTokenValue(tokenID common.TokenID, value float64) error {
func (hdb *HistoryDB) UpdateTokenValue(tokenSymbol string, value float64) error {
_, err := hdb.db.Exec(
"UPDATE token SET usd = $1 WHERE token_id = $2;",
value, tokenID,
"UPDATE token SET usd = $1 WHERE symbol = $2;",
value, tokenSymbol,
)
return err
}
@ -277,6 +277,24 @@ func (hdb *HistoryDB) GetTokens() ([]*common.Token, error) {
return tokens, err
}
// GetTokenSymbols returns all the token symbols from the DB
func (hdb *HistoryDB) GetTokenSymbols() ([]string, error) {
var tokenSymbols []string
rows, err := hdb.db.Query("SELECT symbol FROM token;")
if err != nil {
return nil, err
}
sym := new(string)
for rows.Next() {
err = rows.Scan(sym)
if err != nil {
return nil, err
}
tokenSymbols = append(tokenSymbols, *sym)
}
return tokenSymbols, nil
}
// AddAccounts insert accounts into the DB
func (hdb *HistoryDB) AddAccounts(accounts []common.Account) error {
return hdb.addAccounts(hdb.db, accounts)

+ 29
- 86
priceupdater/priceupdater.go

@ -1,13 +1,13 @@
package priceupdater
import (
"errors"
"fmt"
"net/http"
"sync"
"strconv"
"time"
"github.com/dghubble/sling"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/log"
)
const (
@ -15,116 +15,59 @@ const (
defaultIdleConnTimeout = 10
)
var (
// ErrSymbolDoesNotExistInDatabase is used when trying to get a token that is not in the DB
ErrSymbolDoesNotExistInDatabase = errors.New("symbol does not exist in database")
)
// ConfigPriceUpdater contains the configuration set by the coordinator
type ConfigPriceUpdater struct {
RecommendedFee uint64 // in dollars
RecommendedCreateAccountFee uint64 // in dollars
TokensList []string
APIURL string
}
// TokenInfo contains the updated value for the token
type TokenInfo struct {
Symbol string
Value float64
LastUpdated time.Time
}
// PriceUpdater definition
type PriceUpdater struct {
db map[string]TokenInfo
config ConfigPriceUpdater
mu sync.RWMutex
db *historydb.HistoryDB
apiURL string
tokenSymbols []string
}
// NewPriceUpdater is the constructor for the updater
func NewPriceUpdater(config ConfigPriceUpdater) PriceUpdater {
func NewPriceUpdater(apiURL string, db *historydb.HistoryDB) PriceUpdater {
tokenSymbols := []string{}
return PriceUpdater{
db: make(map[string]TokenInfo),
config: config,
db: db,
apiURL: apiURL,
tokenSymbols: tokenSymbols,
}
}
// UpdatePrices is triggered by the Coordinator, and internally will update the token prices in the db
func (p *PriceUpdater) UpdatePrices() error {
func (p *PriceUpdater) UpdatePrices() {
tr := &http.Transport{
MaxIdleConns: defaultMaxIdleConns,
IdleConnTimeout: defaultIdleConnTimeout * time.Second,
DisableCompression: true,
}
httpClient := &http.Client{Transport: tr}
client := sling.New().Base(p.config.APIURL).Client(httpClient)
client := sling.New().Base(p.apiURL).Client(httpClient)
state := [10]float64{}
for _, tokenSymbol := range p.config.TokensList {
for _, tokenSymbol := range p.tokenSymbols {
resp, err := client.New().Get("ticker/t" + tokenSymbol + "USD").ReceiveSuccess(&state)
errString := tokenSymbol + " not updated, error: "
if err != nil {
return err
log.Error(errString + err.Error())
continue
}
// if resp.StatusCode != 200 {
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Unexpected response status code: %v", resp.StatusCode)
log.Error(errString + "response is not 200, is " + strconv.Itoa(resp.StatusCode))
continue
}
tinfo := TokenInfo{
Symbol: tokenSymbol,
Value: state[6],
LastUpdated: time.Now(),
err = p.db.UpdateTokenValue(tokenSymbol, state[6])
if err != nil {
log.Error(errString + err.Error())
}
p.UpdateTokenInfo(tinfo)
}
return nil
}
// UpdateConfig allows to update the price-updater configuration
func (p *PriceUpdater) UpdateConfig(config ConfigPriceUpdater) {
p.mu.Lock()
defer p.mu.Unlock()
p.config = config
}
// Get one token information
func (p *PriceUpdater) Get(tokenSymbol string) (TokenInfo, error) {
var info TokenInfo
// Check if symbol exists in database
p.mu.RLock()
defer p.mu.RUnlock()
if info, ok := p.db[tokenSymbol]; ok {
return info, nil
// UpdateTokenList get the registered token symbols from HistoryDB
func (p *PriceUpdater) UpdateTokenList() error {
tokenSymbols, err := p.db.GetTokenSymbols()
if err != nil {
return err
}
return info, ErrSymbolDoesNotExistInDatabase
}
// GetPrices gets all the prices contained in the db
func (p *PriceUpdater) GetPrices() map[string]TokenInfo {
var info = make(map[string]TokenInfo)
p.mu.RLock()
defer p.mu.RUnlock()
for key, value := range p.db {
info[key] = value
}
return info
}
// UpdateTokenInfo updates one token info
func (p *PriceUpdater) UpdateTokenInfo(tokenInfo TokenInfo) {
p.mu.Lock()
defer p.mu.Unlock()
p.db[tokenInfo.Symbol] = tokenInfo
p.tokenSymbols = tokenSymbols
return nil
}

+ 50
- 28
priceupdater/priceupdater_test.go

@ -1,38 +1,60 @@
package priceupdater
import (
"math/big"
"os"
"testing"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/common"
dbUtils "github.com/hermeznetwork/hermez-node/db"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/test"
"github.com/stretchr/testify/assert"
)
func TestCon(t *testing.T) {
config := ConfigPriceUpdater{
RecommendedFee: 1,
RecommendedCreateAccountFee: 1,
TokensList: []string{"ETH", "NEC"},
APIURL: "https://api-pub.bitfinex.com/v2/",
}
pud := NewPriceUpdater(config)
err := pud.UpdatePrices()
assert.Equal(t, err, nil)
info, _ := pud.Get("ETH")
assert.NotZero(t, info.Value)
info2, _ := pud.Get("NEC")
assert.NotZero(t, info2.Value)
info3, err := pud.Get("INVENTED")
if assert.Error(t, err) {
assert.Equal(t, ErrSymbolDoesNotExistInDatabase, err)
func TestPriceUpdater(t *testing.T) {
// Init DB
pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
assert.NoError(t, err)
historyDB := historydb.NewHistoryDB(db)
// Clean DB
assert.NoError(t, historyDB.Reorg(-1))
// Populate DB
// Gen blocks and add them to DB
blocks := test.GenBlocks(1, 2)
assert.NoError(t, historyDB.AddBlocks(blocks))
// Gen tokens and add them to DB
tokens := []common.Token{}
tokens = append(tokens, common.Token{
TokenID: 0,
EthBlockNum: blocks[0].EthBlockNum,
EthAddr: ethCommon.BigToAddress(big.NewInt(1)),
Name: "Ether",
Symbol: "ETH",
Decimals: 18,
})
tokens = append(tokens, common.Token{
TokenID: 1,
EthBlockNum: blocks[0].EthBlockNum,
EthAddr: ethCommon.BigToAddress(big.NewInt(2)),
Name: "DAI",
Symbol: "DAI",
Decimals: 18,
})
assert.NoError(t, historyDB.AddTokens(tokens))
// Init price updater
pu := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", historyDB)
// Update token list
assert.NoError(t, pu.UpdateTokenList())
// Update prices
pu.UpdatePrices()
// Check that prices have been updated
fetchedTokens, err := historyDB.GetTokens()
assert.NoError(t, err)
for _, token := range fetchedTokens {
assert.NotNil(t, token.USD)
assert.NotNil(t, token.USDUpdate)
}
assert.Equal(t, info3.Value, float64(0))
prices := pud.GetPrices()
assert.Equal(t, prices["ETH"], info)
assert.Equal(t, prices["NEC"], info2)
}

Loading…
Cancel
Save