mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 03:16:45 +01:00
Merge pull request #385 from hermeznetwork/feature/integratepriceupdater
Update and integrate price updater
This commit is contained in:
@@ -4,6 +4,11 @@ Explorer = true
|
|||||||
UpdateMetricsInterval = "10s"
|
UpdateMetricsInterval = "10s"
|
||||||
UpdateRecommendedFeeInterval = "10s"
|
UpdateRecommendedFeeInterval = "10s"
|
||||||
|
|
||||||
|
[PriceUpdater]
|
||||||
|
Interval = "10s"
|
||||||
|
URL = "https://api-pub.bitfinex.com/v2/"
|
||||||
|
Type = "bitfinexV2"
|
||||||
|
|
||||||
[Debug]
|
[Debug]
|
||||||
APIAddress = "localhost:12345"
|
APIAddress = "localhost:12345"
|
||||||
|
|
||||||
|
|||||||
@@ -96,6 +96,11 @@ type Coordinator struct {
|
|||||||
|
|
||||||
// Node is the hermez node configuration.
|
// Node is the hermez node configuration.
|
||||||
type Node struct {
|
type Node struct {
|
||||||
|
PriceUpdater struct {
|
||||||
|
Interval Duration `valudate:"required"`
|
||||||
|
URL string `valudate:"required"`
|
||||||
|
Type string `valudate:"required"`
|
||||||
|
} `validate:"required"`
|
||||||
StateDB struct {
|
StateDB struct {
|
||||||
Path string
|
Path string
|
||||||
} `validate:"required"`
|
} `validate:"required"`
|
||||||
|
|||||||
26
node/node.go
26
node/node.go
@@ -23,6 +23,7 @@ import (
|
|||||||
"github.com/hermeznetwork/hermez-node/db/statedb"
|
"github.com/hermeznetwork/hermez-node/db/statedb"
|
||||||
"github.com/hermeznetwork/hermez-node/eth"
|
"github.com/hermeznetwork/hermez-node/eth"
|
||||||
"github.com/hermeznetwork/hermez-node/log"
|
"github.com/hermeznetwork/hermez-node/log"
|
||||||
|
"github.com/hermeznetwork/hermez-node/priceupdater"
|
||||||
"github.com/hermeznetwork/hermez-node/prover"
|
"github.com/hermeznetwork/hermez-node/prover"
|
||||||
"github.com/hermeznetwork/hermez-node/synchronizer"
|
"github.com/hermeznetwork/hermez-node/synchronizer"
|
||||||
"github.com/hermeznetwork/hermez-node/test/debugapi"
|
"github.com/hermeznetwork/hermez-node/test/debugapi"
|
||||||
@@ -50,6 +51,7 @@ const (
|
|||||||
type Node struct {
|
type Node struct {
|
||||||
nodeAPI *NodeAPI
|
nodeAPI *NodeAPI
|
||||||
debugAPI *debugapi.DebugAPI
|
debugAPI *debugapi.DebugAPI
|
||||||
|
priceUpdater *priceupdater.PriceUpdater
|
||||||
// Coordinator
|
// Coordinator
|
||||||
coord *coordinator.Coordinator
|
coord *coordinator.Coordinator
|
||||||
|
|
||||||
@@ -242,10 +244,16 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
|
|||||||
if cfg.Debug.APIAddress != "" {
|
if cfg.Debug.APIAddress != "" {
|
||||||
debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync)
|
debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync)
|
||||||
}
|
}
|
||||||
|
priceUpdater, err := priceupdater.NewPriceUpdater(cfg.PriceUpdater.URL,
|
||||||
|
priceupdater.APIType(cfg.PriceUpdater.Type), historyDB)
|
||||||
|
if err != nil {
|
||||||
|
return nil, tracerr.Wrap(err)
|
||||||
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &Node{
|
return &Node{
|
||||||
nodeAPI: nodeAPI,
|
nodeAPI: nodeAPI,
|
||||||
debugAPI: debugAPI,
|
debugAPI: debugAPI,
|
||||||
|
priceUpdater: priceUpdater,
|
||||||
coord: coord,
|
coord: coord,
|
||||||
sync: sync,
|
sync: sync,
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
@@ -442,7 +450,23 @@ func (n *Node) StartSynchronizer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// TODO: Run price updater. This is required by the API and the TxSelector
|
|
||||||
|
n.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-n.ctx.Done():
|
||||||
|
log.Info("PriceUpdater done")
|
||||||
|
n.wg.Done()
|
||||||
|
return
|
||||||
|
case <-time.After(n.cfg.PriceUpdater.Interval.Duration):
|
||||||
|
if err := n.priceUpdater.UpdateTokenList(); err != nil {
|
||||||
|
log.Errorw("PriceUpdater.UpdateTokenList()", "err", err)
|
||||||
|
}
|
||||||
|
n.priceUpdater.UpdatePrices(n.ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// StartDebugAPI starts the DebugAPI
|
// StartDebugAPI starts the DebugAPI
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
package priceupdater
|
package priceupdater
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dghubble/sling"
|
"github.com/dghubble/sling"
|
||||||
@@ -13,52 +14,92 @@ import (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
defaultMaxIdleConns = 10
|
defaultMaxIdleConns = 10
|
||||||
defaultIdleConnTimeout = 10
|
defaultIdleConnTimeout = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// APIType defines the token exchange API
|
||||||
|
type APIType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// APITypeBitFinexV2 is the http API used by bitfinex V2
|
||||||
|
APITypeBitFinexV2 APIType = "bitfinexV2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (t *APIType) valid() bool {
|
||||||
|
switch *t {
|
||||||
|
case APITypeBitFinexV2:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// PriceUpdater definition
|
// PriceUpdater definition
|
||||||
type PriceUpdater struct {
|
type PriceUpdater struct {
|
||||||
db *historydb.HistoryDB
|
db *historydb.HistoryDB
|
||||||
apiURL string
|
apiURL string
|
||||||
|
apiType APIType
|
||||||
tokenSymbols []string
|
tokenSymbols []string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPriceUpdater is the constructor for the updater
|
// NewPriceUpdater is the constructor for the updater
|
||||||
func NewPriceUpdater(apiURL string, db *historydb.HistoryDB) PriceUpdater {
|
func NewPriceUpdater(apiURL string, apiType APIType, db *historydb.HistoryDB) (*PriceUpdater, error) {
|
||||||
tokenSymbols := []string{}
|
tokenSymbols := []string{}
|
||||||
return PriceUpdater{
|
if !apiType.valid() {
|
||||||
|
return nil, tracerr.Wrap(fmt.Errorf("Invalid apiType: %v", apiType))
|
||||||
|
}
|
||||||
|
return &PriceUpdater{
|
||||||
db: db,
|
db: db,
|
||||||
apiURL: apiURL,
|
apiURL: apiURL,
|
||||||
|
apiType: apiType,
|
||||||
tokenSymbols: tokenSymbols,
|
tokenSymbols: tokenSymbols,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getTokenPriceBitfinex(ctx context.Context, client *sling.Sling,
|
||||||
|
tokenSymbol string) (float64, error) {
|
||||||
|
state := [10]float64{}
|
||||||
|
req, err := client.New().Get("ticker/t" + tokenSymbol + "USD").Request()
|
||||||
|
if err != nil {
|
||||||
|
return 0, tracerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
res, err := client.Do(req.WithContext(ctx), &state, nil)
|
||||||
|
if err != nil {
|
||||||
|
return 0, tracerr.Wrap(err)
|
||||||
|
}
|
||||||
|
if res.StatusCode != http.StatusOK {
|
||||||
|
return 0, fmt.Errorf("http response is not is %v", res.StatusCode)
|
||||||
|
}
|
||||||
|
return state[6], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdatePrices is triggered by the Coordinator, and internally will update the token prices in the db
|
// UpdatePrices is triggered by the Coordinator, and internally will update the token prices in the db
|
||||||
func (p *PriceUpdater) UpdatePrices() {
|
func (p *PriceUpdater) UpdatePrices(ctx context.Context) {
|
||||||
tr := &http.Transport{
|
tr := &http.Transport{
|
||||||
MaxIdleConns: defaultMaxIdleConns,
|
MaxIdleConns: defaultMaxIdleConns,
|
||||||
IdleConnTimeout: defaultIdleConnTimeout * time.Second,
|
IdleConnTimeout: defaultIdleConnTimeout,
|
||||||
DisableCompression: true,
|
DisableCompression: true,
|
||||||
}
|
}
|
||||||
httpClient := &http.Client{Transport: tr}
|
httpClient := &http.Client{Transport: tr}
|
||||||
client := sling.New().Base(p.apiURL).Client(httpClient)
|
client := sling.New().Base(p.apiURL).Client(httpClient)
|
||||||
|
|
||||||
state := [10]float64{}
|
|
||||||
|
|
||||||
for _, tokenSymbol := range p.tokenSymbols {
|
for _, tokenSymbol := range p.tokenSymbols {
|
||||||
resp, err := client.New().Get("ticker/t" + tokenSymbol + "USD").ReceiveSuccess(&state)
|
var tokenPrice float64
|
||||||
errString := tokenSymbol + " not updated, error: "
|
var err error
|
||||||
if err != nil {
|
switch p.apiType {
|
||||||
log.Error(errString + err.Error())
|
case APITypeBitFinexV2:
|
||||||
continue
|
tokenPrice, err = getTokenPriceBitfinex(ctx, client, tokenSymbol)
|
||||||
}
|
}
|
||||||
if resp.StatusCode != http.StatusOK {
|
if ctx.Err() != nil {
|
||||||
log.Error(errString + "response is not 200, is " + strconv.Itoa(resp.StatusCode))
|
return
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
err = p.db.UpdateTokenValue(tokenSymbol, state[6])
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(errString + err.Error())
|
log.Errorw("token price not updated (get error)",
|
||||||
|
"err", err, "token", tokenSymbol, "apiType", p.apiType)
|
||||||
|
}
|
||||||
|
if err = p.db.UpdateTokenValue(tokenSymbol, tokenPrice); err != nil {
|
||||||
|
log.Errorw("token price not updated (db error)",
|
||||||
|
"err", err, "token", tokenSymbol, "apiType", p.apiType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package priceupdater
|
package priceupdater
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -11,6 +12,7 @@ import (
|
|||||||
"github.com/hermeznetwork/hermez-node/db/historydb"
|
"github.com/hermeznetwork/hermez-node/db/historydb"
|
||||||
"github.com/hermeznetwork/hermez-node/test"
|
"github.com/hermeznetwork/hermez-node/test"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPriceUpdater(t *testing.T) {
|
func TestPriceUpdater(t *testing.T) {
|
||||||
@@ -37,15 +39,16 @@ func TestPriceUpdater(t *testing.T) {
|
|||||||
})
|
})
|
||||||
assert.NoError(t, historyDB.AddTokens(tokens))
|
assert.NoError(t, historyDB.AddTokens(tokens))
|
||||||
// Init price updater
|
// Init price updater
|
||||||
pu := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", historyDB)
|
pu, err := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", APITypeBitFinexV2, historyDB)
|
||||||
|
require.NoError(t, err)
|
||||||
// Update token list
|
// Update token list
|
||||||
assert.NoError(t, pu.UpdateTokenList())
|
assert.NoError(t, pu.UpdateTokenList())
|
||||||
// Update prices
|
// Update prices
|
||||||
pu.UpdatePrices()
|
pu.UpdatePrices(context.Background())
|
||||||
// Check that prices have been updated
|
// Check that prices have been updated
|
||||||
limit := uint(10)
|
limit := uint(10)
|
||||||
fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, historydb.OrderAsc)
|
fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, historydb.OrderAsc)
|
||||||
assert.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// TokenID 0 (ETH) is always on the DB
|
// TokenID 0 (ETH) is always on the DB
|
||||||
assert.Equal(t, 2, len(fetchedTokens))
|
assert.Equal(t, 2, len(fetchedTokens))
|
||||||
for _, token := range fetchedTokens {
|
for _, token := range fetchedTokens {
|
||||||
|
|||||||
Reference in New Issue
Block a user