diff --git a/cli/node/cfg.buidler.toml b/cli/node/cfg.buidler.toml index 6008fc7..65dd202 100644 --- a/cli/node/cfg.buidler.toml +++ b/cli/node/cfg.buidler.toml @@ -4,6 +4,11 @@ Explorer = true UpdateMetricsInterval = "10s" UpdateRecommendedFeeInterval = "10s" +[PriceUpdater] +Interval = "10s" +URL = "https://api-pub.bitfinex.com/v2/" +Type = "bitfinexV2" + [Debug] APIAddress = "localhost:12345" diff --git a/config/config.go b/config/config.go index 93be85a..b60b37a 100644 --- a/config/config.go +++ b/config/config.go @@ -97,6 +97,11 @@ type Coordinator struct { // Node is the hermez node configuration. type Node struct { + PriceUpdater struct { + Interval Duration `valudate:"required"` + URL string `valudate:"required"` + Type string `valudate:"required"` + } `validate:"required"` StateDB struct { Path string } `validate:"required"` diff --git a/node/node.go b/node/node.go index 619da93..9ac5d51 100644 --- a/node/node.go +++ b/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/hermeznetwork/hermez-node/db/statedb" "github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/priceupdater" "github.com/hermeznetwork/hermez-node/prover" "github.com/hermeznetwork/hermez-node/synchronizer" "github.com/hermeznetwork/hermez-node/test/debugapi" @@ -47,8 +48,9 @@ const ( // Node is the Hermez Node type Node struct { - nodeAPI *NodeAPI - debugAPI *debugapi.DebugAPI + nodeAPI *NodeAPI + debugAPI *debugapi.DebugAPI + priceUpdater *priceupdater.PriceUpdater // Coordinator coord *coordinator.Coordinator @@ -243,17 +245,23 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) { if cfg.Debug.APIAddress != "" { debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB, sync) } + priceUpdater, err := priceupdater.NewPriceUpdater(cfg.PriceUpdater.URL, + priceupdater.APIType(cfg.PriceUpdater.Type), historyDB) + if err != nil { + return nil, tracerr.Wrap(err) + } ctx, cancel := context.WithCancel(context.Background()) return &Node{ - nodeAPI: nodeAPI, - debugAPI: debugAPI, - coord: coord, - sync: sync, - cfg: cfg, - mode: mode, - sqlConn: db, - ctx: ctx, - cancel: cancel, + nodeAPI: nodeAPI, + debugAPI: debugAPI, + priceUpdater: priceUpdater, + coord: coord, + sync: sync, + cfg: cfg, + mode: mode, + sqlConn: db, + ctx: ctx, + cancel: cancel, }, nil } @@ -439,7 +447,23 @@ func (n *Node) StartSynchronizer() { } } }() - // TODO: Run price updater. This is required by the API and the TxSelector + + n.wg.Add(1) + go func() { + for { + select { + case <-n.ctx.Done(): + log.Info("PriceUpdater done") + n.wg.Done() + return + case <-time.After(n.cfg.PriceUpdater.Interval.Duration): + if err := n.priceUpdater.UpdateTokenList(); err != nil { + log.Errorw("PriceUpdater.UpdateTokenList()", "err", err) + } + n.priceUpdater.UpdatePrices(n.ctx) + } + } + }() } // StartDebugAPI starts the DebugAPI diff --git a/priceupdater/priceupdater.go b/priceupdater/priceupdater.go index 69da16a..f8a87c8 100644 --- a/priceupdater/priceupdater.go +++ b/priceupdater/priceupdater.go @@ -1,8 +1,9 @@ package priceupdater import ( + "context" + "fmt" "net/http" - "strconv" "time" "github.com/dghubble/sling" @@ -13,52 +14,92 @@ import ( const ( defaultMaxIdleConns = 10 - defaultIdleConnTimeout = 10 + defaultIdleConnTimeout = 2 * time.Second ) +// APIType defines the token exchange API +type APIType string + +const ( + // APITypeBitFinexV2 is the http API used by bitfinex V2 + APITypeBitFinexV2 APIType = "bitfinexV2" +) + +func (t *APIType) valid() bool { + switch *t { + case APITypeBitFinexV2: + return true + default: + return false + } +} + // PriceUpdater definition type PriceUpdater struct { db *historydb.HistoryDB apiURL string + apiType APIType tokenSymbols []string } // NewPriceUpdater is the constructor for the updater -func NewPriceUpdater(apiURL string, db *historydb.HistoryDB) PriceUpdater { +func NewPriceUpdater(apiURL string, apiType APIType, db *historydb.HistoryDB) (*PriceUpdater, error) { tokenSymbols := []string{} - return PriceUpdater{ + if !apiType.valid() { + return nil, tracerr.Wrap(fmt.Errorf("Invalid apiType: %v", apiType)) + } + return &PriceUpdater{ db: db, apiURL: apiURL, + apiType: apiType, tokenSymbols: tokenSymbols, + }, nil +} + +func getTokenPriceBitfinex(ctx context.Context, client *sling.Sling, + tokenSymbol string) (float64, error) { + state := [10]float64{} + req, err := client.New().Get("ticker/t" + tokenSymbol + "USD").Request() + if err != nil { + return 0, tracerr.Wrap(err) + } + res, err := client.Do(req.WithContext(ctx), &state, nil) + if err != nil { + return 0, tracerr.Wrap(err) + } + if res.StatusCode != http.StatusOK { + return 0, fmt.Errorf("http response is not is %v", res.StatusCode) } + return state[6], nil } // UpdatePrices is triggered by the Coordinator, and internally will update the token prices in the db -func (p *PriceUpdater) UpdatePrices() { +func (p *PriceUpdater) UpdatePrices(ctx context.Context) { tr := &http.Transport{ MaxIdleConns: defaultMaxIdleConns, - IdleConnTimeout: defaultIdleConnTimeout * time.Second, + IdleConnTimeout: defaultIdleConnTimeout, DisableCompression: true, } httpClient := &http.Client{Transport: tr} client := sling.New().Base(p.apiURL).Client(httpClient) - state := [10]float64{} - for _, tokenSymbol := range p.tokenSymbols { - resp, err := client.New().Get("ticker/t" + tokenSymbol + "USD").ReceiveSuccess(&state) - errString := tokenSymbol + " not updated, error: " - if err != nil { - log.Error(errString + err.Error()) - continue + var tokenPrice float64 + var err error + switch p.apiType { + case APITypeBitFinexV2: + tokenPrice, err = getTokenPriceBitfinex(ctx, client, tokenSymbol) } - if resp.StatusCode != http.StatusOK { - log.Error(errString + "response is not 200, is " + strconv.Itoa(resp.StatusCode)) - continue + if ctx.Err() != nil { + return } - err = p.db.UpdateTokenValue(tokenSymbol, state[6]) if err != nil { - log.Error(errString + err.Error()) + log.Errorw("token price not updated (get error)", + "err", err, "token", tokenSymbol, "apiType", p.apiType) + } + if err = p.db.UpdateTokenValue(tokenSymbol, tokenPrice); err != nil { + log.Errorw("token price not updated (db error)", + "err", err, "token", tokenSymbol, "apiType", p.apiType) } } } diff --git a/priceupdater/priceupdater_test.go b/priceupdater/priceupdater_test.go index 85e1a46..650ad3d 100644 --- a/priceupdater/priceupdater_test.go +++ b/priceupdater/priceupdater_test.go @@ -1,6 +1,7 @@ package priceupdater import ( + "context" "math/big" "os" "testing" @@ -11,6 +12,7 @@ import ( "github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/test" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestPriceUpdater(t *testing.T) { @@ -37,15 +39,16 @@ func TestPriceUpdater(t *testing.T) { }) assert.NoError(t, historyDB.AddTokens(tokens)) // Init price updater - pu := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", historyDB) + pu, err := NewPriceUpdater("https://api-pub.bitfinex.com/v2/", APITypeBitFinexV2, historyDB) + require.NoError(t, err) // Update token list assert.NoError(t, pu.UpdateTokenList()) // Update prices - pu.UpdatePrices() + pu.UpdatePrices(context.Background()) // Check that prices have been updated limit := uint(10) fetchedTokens, _, err := historyDB.GetTokens(nil, nil, "", nil, &limit, historydb.OrderAsc) - assert.NoError(t, err) + require.NoError(t, err) // TokenID 0 (ETH) is always on the DB assert.Equal(t, 2, len(fetchedTokens)) for _, token := range fetchedTokens {