Browse Source

Update and integrate price updater

PriceUpdater:
    - Pass context so that it can be canceled during an update loop
    - Define APITypes to make it explicit which API we are using
feature/sql-semaphore1
Eduard S 3 years ago
parent
commit
56fffdcee5
5 changed files with 111 additions and 33 deletions
  1. +5
    -0
      cli/node/cfg.buidler.toml
  2. +5
    -0
      config/config.go
  3. +36
    -12
      node/node.go
  4. +59
    -18
      priceupdater/priceupdater.go
  5. +6
    -3
      priceupdater/priceupdater_test.go

+ 5
- 0
cli/node/cfg.buidler.toml

@ -4,6 +4,11 @@ Explorer = true
UpdateMetricsInterval = "10s" UpdateMetricsInterval = "10s"
UpdateRecommendedFeeInterval = "10s" UpdateRecommendedFeeInterval = "10s"
[PriceUpdater]
Interval = "10s"
URL = "https://api-pub.bitfinex.com/v2/"
Type = "bitfinexV2"
[Debug] [Debug]
APIAddress = "localhost:12345" APIAddress = "localhost:12345"

+ 5
- 0
config/config.go

@ -97,6 +97,11 @@ type Coordinator struct {
// Node is the hermez node configuration. // Node is the hermez node configuration.
type Node struct { type Node struct {
PriceUpdater struct {
Interval Duration `valudate:"required"`
URL string `valudate:"required"`
Type string `valudate:"required"`
} `validate:"required"`
StateDB struct { StateDB struct {
Path string Path string
} `validate:"required"` } `validate:"required"`

+ 36
- 12
node/node.go

@ -22,6 +22,7 @@ import (
"github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log" "github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/priceupdater"
"github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/prover"
"github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/test/debugapi" "github.com/hermeznetwork/hermez-node/test/debugapi"
@ -47,8 +48,9 @@ const (
// Node is the Hermez Node // Node is the Hermez Node
type Node struct { type Node struct {
nodeAPI *NodeAPI
debugAPI *debugapi.DebugAPI
nodeAPI *NodeAPI
debugAPI *debugapi.DebugAPI
priceUpdater *priceupdater.PriceUpdater
// Coordinator // Coordinator
coord *coordinator.Coordinator coord *coordinator.Coordinator
@ -243,17 +245,23 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
if cfg.Debug.APIAddress != "" { if cfg.Debug.APIAddress != "" {
debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync) debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync)
} }
priceUpdater, err := priceupdater.NewPriceUpdater(cfg.PriceUpdater.URL,
priceupdater.APIType(cfg.PriceUpdater.Type), historyDB)
if err != nil {
return nil, tracerr.Wrap(err)
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &Node{ return &Node{
nodeAPI: nodeAPI,
debugAPI: debugAPI,
coord: coord,
sync: sync,
cfg: cfg,
mode: mode,
sqlConn: db,
ctx: ctx,
cancel: cancel,
nodeAPI: nodeAPI,
debugAPI: debugAPI,
priceUpdater: priceUpdater,
coord: coord,
sync: sync,
cfg: cfg,
mode: mode,
sqlConn: db,
ctx: ctx,
cancel: cancel,
}, nil }, nil
} }
@ -439,7 +447,23 @@ func (n *Node) StartSynchronizer() {
} }
} }
}() }()
// TODO: Run price updater. This is required by the API and the TxSelector
n.wg.Add(1)
go func() {
for {
select {
case <-n.ctx.Done():
log.Info("PriceUpdater done")
n.wg.Done()
return
case <-time.After(n.cfg.PriceUpdater.Interval.Duration):
if err := n.priceUpdater.UpdateTokenList(); err != nil {
log.Errorw("PriceUpdater.UpdateTokenList()", "err", err)
}
n.priceUpdater.UpdatePrices(n.ctx)
}
}
}()
} }
// StartDebugAPI starts the DebugAPI // StartDebugAPI starts the DebugAPI

+ 59
- 18
priceupdater/priceupdater.go

@ -1,8 +1,9 @@
package priceupdater package priceupdater
import ( import (
"context"
"fmt"
"net/http" "net/http"
"strconv"
"time" "time"
"github.com/dghubble/sling" "github.com/dghubble/sling"
@ -13,52 +14,92 @@ import (
const ( const (
defaultMaxIdleConns = 10 defaultMaxIdleConns = 10
defaultIdleConnTimeout = 10
defaultIdleConnTimeout = 2 * time.Second
) )
// APIType defines the token exchange API
type APIType string
const (
// APITypeBitFinexV2 is the http API used by bitfinex V2
APITypeBitFinexV2 APIType = "bitfinexV2"
)
func (t *APIType) valid() bool {
switch *t {
case APITypeBitFinexV2:
return true
default:
return false
}
}
// PriceUpdater definition // PriceUpdater definition
type PriceUpdater struct { type PriceUpdater struct {
db *historydb.HistoryDB db *historydb.HistoryDB
apiURL string apiURL string
apiType APIType
tokenSymbols []string tokenSymbols []string
} }
// NewPriceUpdater is the constructor for the updater // NewPriceUpdater is the constructor for the updater
func NewPriceUpdater(apiURL string, db *historydb.HistoryDB) PriceUpdater {
func NewPriceUpdater(apiURL string, apiType APIType, db *historydb.HistoryDB) (*PriceUpdater, error) {
tokenSymbols := []string{} tokenSymbols := []string{}
return PriceUpdater{
if !apiType.valid() {
return nil, tracerr.Wrap(fmt.Errorf("Invalid apiType: %v", apiType))
}
return &PriceUpdater{
db: db, db: db,
apiURL: apiURL, apiURL: apiURL,
apiType: apiType,
tokenSymbols: tokenSymbols, tokenSymbols: tokenSymbols,
}, nil
}
func getTokenPriceBitfinex(ctx context.Context, client *sling.Sling,
tokenSymbol string) (float64, error) {
state := [10]float64{}
req, err := client.New().Get("ticker/t" + tokenSymbol + "USD").Request()
if err != nil {
return 0, tracerr.Wrap(err)
}
res, err := client.Do(req.WithContext(ctx), &state, nil)
if err != nil {
return 0, tracerr.Wrap(err)
}
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("http response is not is %v", res.StatusCode)
} }
return state[6], nil
} }
// UpdatePrices is triggered by the Coordinator, and internally will update the token prices in the db // UpdatePrices is triggered by the Coordinator, and internally will update the token prices in the db
func (p *PriceUpdater) UpdatePrices() {
func (p *PriceUpdater) UpdatePrices(ctx context.Context) {
tr := &http.Transport{ tr := &http.Transport{
MaxIdleConns: defaultMaxIdleConns, MaxIdleConns: defaultMaxIdleConns,
IdleConnTimeout: defaultIdleConnTimeout * time.Second,
IdleConnTimeout: defaultIdleConnTimeout,
DisableCompression: true, DisableCompression: true,
} }
httpClient := &http.Client{Transport: tr} httpClient := &http.Client{Transport: tr}
client := sling.New().Base(p.apiURL).Client(httpClient) client := sling.New().Base(p.apiURL).Client(httpClient)
state := [10]float64{}
for _, tokenSymbol := range p.tokenSymbols { for _, tokenSymbol := range p.tokenSymbols {
resp, err := client.New().Get("ticker/t" + tokenSymbol + "USD").ReceiveSuccess(&state)
errString := tokenSymbol + " not updated, error: "
if err != nil {
log.Error(errString + err.Error())
continue
var tokenPrice float64
var err error
switch p.apiType {
case APITypeBitFinexV2:
tokenPrice, err = getTokenPriceBitfinex(ctx, client, tokenSymbol)
} }
if resp.StatusCode != http.StatusOK {
log.Error(errString + "response is not 200, is " + strconv.Itoa(resp.StatusCode))
continue
if ctx.Err() != nil {
return
} }
err = p.db.UpdateTokenValue(tokenSymbol, state[6])
if err != nil { if err != nil {
log.Error(errString + err.Error())
log.Errorw("token price not updated (get error)",
"err", err, "token", tokenSymbol, "apiType", p.apiType)
}
if err = p.db.UpdateTokenValue(tokenSymbol, tokenPrice); err != nil {
log.Errorw("token price not updated (db error)",
"err", err, "token", tokenSymbol, "apiType", p.apiType)
} }
} }
} }

+ 6
- 3
priceupdater/priceupdater_test.go

@ -1,6 +1,7 @@
package priceupdater package priceupdater
import ( import (
"context"
"math/big" "math/big"
"os" "os"
"testing" "testing"
@ -11,6 +12,7 @@ import (
"github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/test" "github.com/hermeznetwork/hermez-node/test"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestPriceUpdater(t *testing.T) { func TestPriceUpdater(t *testing.T) {
@ -37,15 +39,16 @@ func TestPriceUpdater(t *testing.T) {
}) })
assert.NoError(t, historyDB.AddTokens(tokens)) assert.NoError(t, historyDB.AddTokens(tokens))
// Init price updater // Init price updater
pu := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", historyDB)
pu, err := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", APITypeBitFinexV2, historyDB)
require.NoError(t, err)
// Update token list // Update token list
assert.NoError(t, pu.UpdateTokenList()) assert.NoError(t, pu.UpdateTokenList())
// Update prices // Update prices
pu.UpdatePrices()
pu.UpdatePrices(context.Background())
// Check that prices have been updated // Check that prices have been updated
limit := uint(10) limit := uint(10)
fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, historydb.OrderAsc) fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, historydb.OrderAsc)
assert.NoError(t, err)
require.NoError(t, err)
// TokenID 0 (ETH) is always on the DB // TokenID 0 (ETH) is always on the DB
assert.Equal(t, 2, len(fetchedTokens)) assert.Equal(t, 2, len(fetchedTokens))
for _, token := range fetchedTokens { for _, token := range fetchedTokens {

Loading…
Cancel
Save