Browse Source

Add DebugAPI to Node, fix StateDB

- Allow starting the DebugAPI from the node via config
- In StateDB:
    - Make checkpoints when ProcessTxs() succeeds
    - Remove extra hardcoded `statedb` path that was redundant
    - Replace hardcoded `[:4]` by `[:]` when parsing idx, which failed because
      idx is 6 bytes length now.
- Extra: In node, use waitgroup instead of `stoppedXXX` channels to wait for
syncrhonizer goroutines to finish.
feature/sql-semaphore1
Eduard S 4 years ago
parent
commit
5b6639a947
6 changed files with 58 additions and 21 deletions
  1. +3
    -0
      cli/node/cfg.buidler.toml
  2. +3
    -0
      config/config.go
  3. +3
    -4
      db/statedb/statedb.go
  4. +8
    -3
      db/statedb/txprocessors.go
  5. +35
    -11
      node/node.go
  6. +6
    -3
      test/debugapi/debugapi.go

+ 3
- 0
cli/node/cfg.buidler.toml

@ -1,3 +1,6 @@
[Debug]
APIAddress = "localhost:12345"
[StateDB]
Path = "/tmp/iden3-test/hermez/statedb"

+ 3
- 0
config/config.go

@ -84,6 +84,9 @@ type Node struct {
ReceiptTimeout Duration `validate:"required"`
IntervalReceiptLoop Duration `validate:"required"`
} `validate:"required"`
Debug struct {
APIAddress string
}
}
// Load loads a generic config.

+ 3
- 4
db/statedb/statedb.go

@ -48,8 +48,6 @@ var (
)
const (
// PathStateDB defines the subpath of the StateDB
PathStateDB = "/statedb"
// PathBatchNum defines the subpath of the Batch Checkpoint in the
// subpath of the StateDB
PathBatchNum = "/BatchNum"
@ -88,7 +86,7 @@ type StateDB struct {
func NewStateDB(path string, typ TypeStateDB, nLevels int) (*StateDB, error) {
var sto *pebble.PebbleStorage
var err error
sto, err = pebble.NewPebbleStorage(path+PathStateDB+PathCurrent, false)
sto, err = pebble.NewPebbleStorage(path+PathCurrent, false)
if err != nil {
return nil, err
}
@ -105,7 +103,7 @@ func NewStateDB(path string, typ TypeStateDB, nLevels int) (*StateDB, error) {
}
sdb := &StateDB{
path: path + PathStateDB,
path: path,
db: sto,
mt: mt,
typ: typ,
@ -163,6 +161,7 @@ func (s *StateDB) setCurrentBatch() error {
func (s *StateDB) MakeCheckpoint() error {
// advance currentBatch
s.currentBatch++
log.Debugw("Making StateDB checkpoint", "batch", s.currentBatch, "type", s.typ)
checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(s.currentBatch))

+ 8
- 3
db/statedb/txprocessors.go

@ -49,8 +49,13 @@ type ProcessTxOutput struct {
// the HistoryDB, and adds Nonce & TokenID to the L2Txs.
// And if TypeSynchronizer returns an array of common.Account with all the
// created accounts.
func (s *StateDB) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinatortxs []common.L1Tx, l2txs []common.PoolL2Tx) (*ProcessTxOutput, error) {
var err error
func (s *StateDB) ProcessTxs(coordIdxs []common.Idx, l1usertxs, l1coordinatortxs []common.L1Tx, l2txs []common.PoolL2Tx) (ptOut *ProcessTxOutput, err error) {
defer func() {
if err == nil {
err = s.MakeCheckpoint()
}
}()
var exitTree *merkletree.MerkleTree
var createdAccounts []common.Account
@ -829,7 +834,7 @@ func (s *StateDB) getIdx() (common.Idx, error) {
if err != nil {
return 0, err
}
return common.IdxFromBytes(idxBytes[:4])
return common.IdxFromBytes(idxBytes[:])
}
// setIdx stores Idx in the localStateDB

+ 35
- 11
node/node.go

@ -2,6 +2,7 @@ package node
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
@ -16,6 +17,7 @@ import (
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/test/debugapi"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/jmoiron/sqlx"
)
@ -37,6 +39,7 @@ const (
// Node is the Hermez Node
type Node struct {
debugAPI *debugapi.DebugAPI
// Coordinator
coord *coordinator.Coordinator
coordCfg *config.Coordinator
@ -48,14 +51,14 @@ type Node struct {
stoppedForgeCallConfirm chan bool
// Synchronizer
sync *synchronizer.Synchronizer
stoppedSync chan bool
sync *synchronizer.Synchronizer
// General
cfg *config.Node
mode Mode
sqlConn *sqlx.DB
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
}
@ -155,8 +158,14 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
client,
)
}
var debugAPI *debugapi.DebugAPI
println("apiaddr", cfg.Debug.APIAddress)
if cfg.Debug.APIAddress != "" {
debugAPI = debugapi.NewDebugAPI(cfg.Debug.APIAddress, stateDB)
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
debugAPI: debugAPI,
coord: coord,
coordCfg: coordCfg,
sync: sync,
@ -172,6 +181,9 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
func (n *Node) StartCoordinator() {
log.Info("Starting Coordinator...")
// TODO: Replace stopXXX by context
// TODO: Replace stoppedXXX by waitgroup
n.stopForge = make(chan bool)
n.stopGetProofCallForge = make(chan bool)
n.stopForgeCallConfirm = make(chan bool)
@ -249,18 +261,18 @@ func (n *Node) StopCoordinator() {
// StartSynchronizer starts the synchronizer
func (n *Node) StartSynchronizer() {
log.Info("Starting Synchronizer...")
// stopped channel is size 1 so that the defer doesn't block
n.stoppedSync = make(chan bool, 1)
n.wg.Add(1)
go func() {
defer func() {
n.stoppedSync <- true
log.Info("Synchronizer routine stopped")
n.wg.Done()
}()
var lastBlock *common.Block
d := time.Duration(0)
for {
select {
case <-n.ctx.Done():
log.Info("Synchronizer stopped")
log.Info("Synchronizer done")
return
case <-time.After(d):
if blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock); err != nil {
@ -283,15 +295,27 @@ func (n *Node) StartSynchronizer() {
// TODO: Run price updater. This is required by the API and the TxSelector
}
// WaitStopSynchronizer waits for the synchronizer to stop
func (n *Node) WaitStopSynchronizer() {
log.Info("Waiting for Synchronizer to stop...")
<-n.stoppedSync
// StartDebugAPI starts the DebugAPI
func (n *Node) StartDebugAPI() {
log.Info("Starting DebugAPI...")
n.wg.Add(1)
go func() {
defer func() {
log.Info("DebugAPI routine stopped")
n.wg.Done()
}()
if err := n.debugAPI.Run(n.ctx); err != nil {
log.Fatalw("DebugAPI.Run", "err", err)
}
}()
}
// Start the node
func (n *Node) Start() {
log.Infow("Starting node...", "mode", n.mode)
if n.debugAPI != nil {
n.StartDebugAPI()
}
if n.mode == ModeCoordinator {
n.StartCoordinator()
}
@ -305,5 +329,5 @@ func (n *Node) Stop() {
if n.mode == ModeCoordinator {
n.StopCoordinator()
}
n.WaitStopSynchronizer()
n.wg.Wait()
}

+ 6
- 3
test/debugapi/debugapi.go

@ -92,6 +92,9 @@ func (a *DebugAPI) Run(ctx context.Context) error {
debugAPI.GET("sdb/batchnum", a.handleCurrentBatch)
debugAPI.GET("sdb/mtroot", a.handleMTRoot)
// Accounts returned by these endpoints will always have BatchNum = 0,
// because the stateDB doesn't store the BatchNum in which an account
// is created.
debugAPI.GET("sdb/accounts", a.handleAccounts)
debugAPI.GET("sdb/accounts/:Idx", a.handleAccount)
@ -104,7 +107,7 @@ func (a *DebugAPI) Run(ctx context.Context) error {
MaxHeaderBytes: 1 << 20, //nolint:gomnd
}
go func() {
log.Infof("Debug API is ready at %v", a.addr)
log.Infof("DebugAPI is ready at %v", a.addr)
if err := debugAPIServer.ListenAndServe(); err != nil &&
err != http.ErrServerClosed {
log.Fatalf("Listen: %s\n", err)
@ -112,10 +115,10 @@ func (a *DebugAPI) Run(ctx context.Context) error {
}()
<-ctx.Done()
log.Info("Stopping Debug API...")
log.Info("Stopping DebugAPI...")
if err := debugAPIServer.Shutdown(context.Background()); err != nil {
return err
}
log.Info("Debug API stopped")
log.Info("DebugAPI done")
return nil
}

Loading…
Cancel
Save