The coordinator implementation has been refactored to allow all the goroutines to be handled from the node.feature/sql-semaphore1
@ -1 +0,0 @@ |
|||
gotest.sh |
@ -0,0 +1 @@ |
|||
cfg.example.secret.toml |
@ -0,0 +1,29 @@ |
|||
[StateDB] |
|||
Path = "/tmp/iden3-test/hermez/statedb" |
|||
|
|||
[PostgreSQL] |
|||
Port = 5432 |
|||
Host = "localhost" |
|||
User = "hermez" |
|||
Password = "yourpasswordhere" |
|||
|
|||
[L2DB] |
|||
Name = "l2" |
|||
SafetyPeriod = 10 |
|||
MaxTxs = 512 |
|||
TTL = "24h" |
|||
|
|||
[HistoryDB] |
|||
Name = "history" |
|||
|
|||
[Web3] |
|||
URL = "XXX" |
|||
|
|||
[TxSelector] |
|||
Path = "/tmp/iden3-test/hermez/txselector" |
|||
|
|||
[BatchBuilder] |
|||
Path = "/tmp/iden3-test/hermez/batchbuilder" |
|||
|
|||
[Synchronizer] |
|||
SyncLoopInterval = "1s" |
@ -0,0 +1,2 @@ |
|||
ForgerAddress = "0x6BB84Cc84D4A34467aD12a2039A312f7029e2071" |
|||
ForgerLoopInterval = "500ms" |
@ -0,0 +1,156 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"fmt" |
|||
"os" |
|||
"os/signal" |
|||
|
|||
"github.com/hermeznetwork/hermez-node/config" |
|||
"github.com/hermeznetwork/hermez-node/log" |
|||
"github.com/hermeznetwork/hermez-node/node" |
|||
"github.com/urfave/cli/v2" |
|||
) |
|||
|
|||
const ( |
|||
flagCfg = "cfg" |
|||
flagCoordCfg = "coordcfg" |
|||
flagMode = "mode" |
|||
modeSync = "sync" |
|||
modeCoord = "coord" |
|||
) |
|||
|
|||
func cmdInit(c *cli.Context) error { |
|||
log.Info("Init") |
|||
cfg, err := parseCli(c) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
fmt.Println("TODO", cfg) |
|||
return err |
|||
} |
|||
|
|||
func cmdRun(c *cli.Context) error { |
|||
cfg, err := parseCli(c) |
|||
if err != nil { |
|||
return fmt.Errorf("error parsing flags and config: %w", err) |
|||
} |
|||
node, err := node.NewNode(cfg.mode, cfg.node, cfg.coord) |
|||
if err != nil { |
|||
return fmt.Errorf("error starting node: %w", err) |
|||
} |
|||
node.Start() |
|||
|
|||
stopCh := make(chan interface{}) |
|||
|
|||
// catch ^C to send the stop signal
|
|||
ossig := make(chan os.Signal, 1) |
|||
signal.Notify(ossig, os.Interrupt) |
|||
go func() { |
|||
for sig := range ossig { |
|||
if sig == os.Interrupt { |
|||
stopCh <- nil |
|||
} |
|||
} |
|||
}() |
|||
<-stopCh |
|||
node.Stop() |
|||
|
|||
return nil |
|||
} |
|||
|
|||
// Config is the configuration of the hermez node execution
|
|||
type Config struct { |
|||
mode node.Mode |
|||
node *config.Node |
|||
coord *config.Coordinator |
|||
} |
|||
|
|||
func parseCli(c *cli.Context) (*Config, error) { |
|||
cfg, err := getConfig(c) |
|||
if err != nil { |
|||
if err := cli.ShowAppHelp(c); err != nil { |
|||
panic(err) |
|||
} |
|||
return nil, err |
|||
} |
|||
return cfg, nil |
|||
} |
|||
|
|||
func getConfig(c *cli.Context) (*Config, error) { |
|||
var cfg Config |
|||
mode := c.String(flagMode) |
|||
switch mode { |
|||
case modeSync: |
|||
cfg.mode = node.ModeSynchronizer |
|||
case modeCoord: |
|||
cfg.mode = node.ModeCoordinator |
|||
default: |
|||
return nil, fmt.Errorf("invalid mode \"%v\"", mode) |
|||
} |
|||
|
|||
if cfg.mode == node.ModeCoordinator { |
|||
coordCfgPath := c.String(flagCoordCfg) |
|||
if coordCfgPath == "" { |
|||
return nil, fmt.Errorf("required flag \"%v\" not set", flagCoordCfg) |
|||
} |
|||
coordCfg, err := config.LoadCoordinator(coordCfgPath) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
cfg.coord = coordCfg |
|||
} |
|||
nodeCfgPath := c.String(flagCfg) |
|||
if nodeCfgPath == "" { |
|||
return nil, fmt.Errorf("required flag \"%v\" not set", flagCfg) |
|||
} |
|||
nodeCfg, err := config.LoadNode(nodeCfgPath) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
cfg.node = nodeCfg |
|||
|
|||
return &cfg, nil |
|||
} |
|||
|
|||
func main() { |
|||
app := cli.NewApp() |
|||
app.Name = "hermez-node" |
|||
app.Version = "0.1.0-alpha" |
|||
app.Flags = []cli.Flag{ |
|||
&cli.StringFlag{ |
|||
Name: flagMode, |
|||
Usage: fmt.Sprintf("Set node `MODE` (can be \"%v\" or \"%v\")", modeSync, modeCoord), |
|||
Required: true, |
|||
}, |
|||
&cli.StringFlag{ |
|||
Name: flagCfg, |
|||
Usage: "Node configuration `FILE`", |
|||
Required: true, |
|||
}, |
|||
&cli.StringFlag{ |
|||
Name: flagCoordCfg, |
|||
Usage: "Coordinator configuration `FILE`", |
|||
}, |
|||
} |
|||
|
|||
app.Commands = []*cli.Command{ |
|||
{ |
|||
Name: "init", |
|||
Aliases: []string{}, |
|||
Usage: "Initialize the hermez-node", |
|||
Action: cmdInit, |
|||
}, |
|||
{ |
|||
Name: "run", |
|||
Aliases: []string{}, |
|||
Usage: "Run the hermez-node in the indicated mode", |
|||
Action: cmdRun, |
|||
}, |
|||
} |
|||
|
|||
err := app.Run(os.Args) |
|||
if err != nil { |
|||
fmt.Printf("\nError: %v\n", err) |
|||
os.Exit(1) |
|||
} |
|||
} |
@ -0,0 +1,101 @@ |
|||
package config |
|||
|
|||
import ( |
|||
"fmt" |
|||
"io/ioutil" |
|||
"time" |
|||
|
|||
"github.com/BurntSushi/toml" |
|||
ethCommon "github.com/ethereum/go-ethereum/common" |
|||
"gopkg.in/go-playground/validator.v9" |
|||
) |
|||
|
|||
// Duration is a wrapper type that parses time duration from text.
|
|||
type Duration struct { |
|||
time.Duration |
|||
} |
|||
|
|||
// UnmarshalText unmarshalls time duration from text.
|
|||
func (d *Duration) UnmarshalText(data []byte) error { |
|||
duration, err := time.ParseDuration(string(data)) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
d.Duration = duration |
|||
return nil |
|||
} |
|||
|
|||
// Coordinator is the coordinator specific configuration.
|
|||
type Coordinator struct { |
|||
ForgerAddress ethCommon.Address `validate:"required"` |
|||
ForgeLoopInterval Duration `validate:"required"` |
|||
} |
|||
|
|||
// Node is the hermez node configuration.
|
|||
type Node struct { |
|||
StateDB struct { |
|||
Path string |
|||
} `validate:"required"` |
|||
PostgreSQL struct { |
|||
Port int `validate:"required"` |
|||
Host string `validate:"required"` |
|||
User string `validate:"required"` |
|||
Password string `validate:"required"` |
|||
} `validate:"required"` |
|||
L2DB struct { |
|||
Name string `validate:"required"` |
|||
SafetyPeriod uint16 `validate:"required"` |
|||
MaxTxs uint32 `validate:"required"` |
|||
TTL Duration `validate:"required"` |
|||
} `validate:"required"` |
|||
HistoryDB struct { |
|||
Name string `validate:"required"` |
|||
} `validate:"required"` |
|||
Web3 struct { |
|||
URL string `validate:"required"` |
|||
} `validate:"required"` |
|||
TxSelector struct { |
|||
Path string `validate:"required"` |
|||
} `validate:"required"` |
|||
BatchBuilder struct { |
|||
Path string `validate:"required"` |
|||
} `validate:"required"` |
|||
Synchronizer struct { |
|||
SyncLoopInterval Duration `validate:"required"` |
|||
} `validate:"required"` |
|||
} |
|||
|
|||
// Load loads a generic config.
|
|||
func Load(path string, cfg interface{}) error { |
|||
bs, err := ioutil.ReadFile(path) //nolint:gosec
|
|||
if err != nil { |
|||
return err |
|||
} |
|||
cfgToml := string(bs) |
|||
if _, err := toml.Decode(cfgToml, cfg); err != nil { |
|||
return err |
|||
} |
|||
validate := validator.New() |
|||
if err := validate.Struct(cfg); err != nil { |
|||
return fmt.Errorf("error validating configuration file: %w", err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// LoadCoordinator loads the Coordinator configuration from path.
|
|||
func LoadCoordinator(path string) (*Coordinator, error) { |
|||
var cfg Coordinator |
|||
if err := Load(path, &cfg); err != nil { |
|||
return nil, fmt.Errorf("error loading coordinator configuration file: %w", err) |
|||
} |
|||
return &cfg, nil |
|||
} |
|||
|
|||
// LoadNode loads the Node configuration from path.
|
|||
func LoadNode(path string) (*Node, error) { |
|||
var cfg Node |
|||
if err := Load(path, &cfg); err != nil { |
|||
return nil, fmt.Errorf("error loading node configuration file: %w", err) |
|||
} |
|||
return &cfg, nil |
|||
} |
@ -1,13 +1,251 @@ |
|||
package node |
|||
|
|||
type mode string |
|||
import ( |
|||
"time" |
|||
|
|||
// ModeCoordinator defines the mode of the HermezNode as Coordinator, which
|
|||
// means that the node is set to forge (which also will be synchronizing with
|
|||
// the L1 blockchain state)
|
|||
const ModeCoordinator mode = "coordinator" |
|||
"github.com/ethereum/go-ethereum/ethclient" |
|||
"github.com/hermeznetwork/hermez-node/batchbuilder" |
|||
"github.com/hermeznetwork/hermez-node/config" |
|||
"github.com/hermeznetwork/hermez-node/coordinator" |
|||
"github.com/hermeznetwork/hermez-node/db/historydb" |
|||
"github.com/hermeznetwork/hermez-node/db/l2db" |
|||
"github.com/hermeznetwork/hermez-node/db/statedb" |
|||
"github.com/hermeznetwork/hermez-node/eth" |
|||
"github.com/hermeznetwork/hermez-node/log" |
|||
"github.com/hermeznetwork/hermez-node/synchronizer" |
|||
"github.com/hermeznetwork/hermez-node/txselector" |
|||
) |
|||
|
|||
// ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
|
|||
// means that the node is set to only synchronize with the L1 blockchain state
|
|||
// and will not forge
|
|||
const ModeSynchronizer mode = "synchronizer" |
|||
// Mode sets the working mode of the node (synchronizer or coordinator)
|
|||
type Mode string |
|||
|
|||
const ( |
|||
// ModeCoordinator defines the mode of the HermezNode as Coordinator, which
|
|||
// means that the node is set to forge (which also will be synchronizing with
|
|||
// the L1 blockchain state)
|
|||
ModeCoordinator Mode = "coordinator" |
|||
|
|||
// ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
|
|||
// means that the node is set to only synchronize with the L1 blockchain state
|
|||
// and will not forge
|
|||
ModeSynchronizer Mode = "synchronizer" |
|||
) |
|||
|
|||
// Node is the Hermez Node
|
|||
type Node struct { |
|||
// Coordinator
|
|||
coord *coordinator.Coordinator |
|||
coordCfg *config.Coordinator |
|||
stopForge chan bool |
|||
stopGetProofCallForge chan bool |
|||
stopForgeCallConfirm chan bool |
|||
stoppedForge chan bool |
|||
stoppedGetProofCallForge chan bool |
|||
stoppedForgeCallConfirm chan bool |
|||
|
|||
// Synchronizer
|
|||
sync *synchronizer.Synchronizer |
|||
stopSync chan bool |
|||
stoppedSync chan bool |
|||
|
|||
// General
|
|||
cfg *config.Node |
|||
mode Mode |
|||
} |
|||
|
|||
// NewNode creates a Node
|
|||
func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, error) { |
|||
historyDB, err := historydb.NewHistoryDB( |
|||
cfg.PostgreSQL.Port, |
|||
cfg.PostgreSQL.Host, |
|||
cfg.PostgreSQL.User, |
|||
cfg.PostgreSQL.Password, |
|||
cfg.HistoryDB.Name, |
|||
) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, true, 32) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
|
|||
ethClient, err := ethclient.Dial(cfg.Web3.URL) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
client := eth.NewClient(ethClient, nil, nil, nil) |
|||
|
|||
sync := synchronizer.NewSynchronizer(client, historyDB, stateDB) |
|||
|
|||
var coord *coordinator.Coordinator |
|||
if mode == ModeCoordinator { |
|||
l2DB, err := l2db.NewL2DB( |
|||
cfg.PostgreSQL.Port, |
|||
cfg.PostgreSQL.Host, |
|||
cfg.PostgreSQL.User, |
|||
cfg.PostgreSQL.Password, |
|||
cfg.L2DB.Name, |
|||
cfg.L2DB.SafetyPeriod, |
|||
cfg.L2DB.MaxTxs, |
|||
cfg.L2DB.TTL.Duration, |
|||
) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
// TODO: Get (maxL1UserTxs, maxL1OperatorTxs, maxTxs) from the smart contract
|
|||
txSelector, err := txselector.NewTxSelector(cfg.TxSelector.Path, stateDB, l2DB, 10, 10, 10) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
// TODO: Get (configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) from smart contract
|
|||
nLevels := uint64(32) //nolint:gomnd
|
|||
batchBuilder, err := batchbuilder.NewBatchBuilder(cfg.BatchBuilder.Path, stateDB, nil, 0, nLevels) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
coord = coordinator.NewCoordinator( |
|||
coordinator.Config{ |
|||
ForgerAddress: coordCfg.ForgerAddress, |
|||
}, |
|||
historyDB, |
|||
txSelector, |
|||
batchBuilder, |
|||
client, |
|||
) |
|||
} |
|||
return &Node{ |
|||
coord: coord, |
|||
coordCfg: coordCfg, |
|||
sync: sync, |
|||
cfg: cfg, |
|||
mode: mode, |
|||
}, nil |
|||
} |
|||
|
|||
// StartCoordinator starts the coordinator
|
|||
func (n *Node) StartCoordinator() { |
|||
log.Info("Starting Coordinator...") |
|||
|
|||
n.stopForge = make(chan bool) |
|||
n.stopGetProofCallForge = make(chan bool) |
|||
n.stopForgeCallConfirm = make(chan bool) |
|||
|
|||
n.stoppedForge = make(chan bool) |
|||
n.stoppedGetProofCallForge = make(chan bool) |
|||
n.stoppedForgeCallConfirm = make(chan bool) |
|||
|
|||
batchCh0 := make(chan *coordinator.BatchInfo) |
|||
batchCh1 := make(chan *coordinator.BatchInfo) |
|||
|
|||
go func() { |
|||
defer func() { n.stoppedForge <- true }() |
|||
for { |
|||
select { |
|||
case <-n.stopForge: |
|||
return |
|||
default: |
|||
if forge, err := n.coord.ForgeLoopFn(batchCh0, n.stopForge); err == coordinator.ErrStop { |
|||
return |
|||
} else if err != nil { |
|||
log.Errorw("Coordinator.ForgeLoopFn", "error", err) |
|||
} else if !forge { |
|||
time.Sleep(n.coordCfg.ForgeLoopInterval.Duration) |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
go func() { |
|||
defer func() { n.stoppedGetProofCallForge <- true }() |
|||
for { |
|||
select { |
|||
case <-n.stopGetProofCallForge: |
|||
return |
|||
default: |
|||
if err := n.coord.GetProofCallForgeLoopFn( |
|||
batchCh0, batchCh1, n.stopGetProofCallForge); err == coordinator.ErrStop { |
|||
return |
|||
} else if err != nil { |
|||
log.Errorw("Coordinator.GetProofCallForgeLoopFn", "error", err) |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
go func() { |
|||
defer func() { n.stoppedForgeCallConfirm <- true }() |
|||
for { |
|||
select { |
|||
case <-n.stopForgeCallConfirm: |
|||
return |
|||
default: |
|||
if err := n.coord.ForgeCallConfirmLoopFn( |
|||
batchCh1, n.stopForgeCallConfirm); err == coordinator.ErrStop { |
|||
return |
|||
} else if err != nil { |
|||
log.Errorw("Coordinator.ForgeCallConfirmLoopFn", "error", err) |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
} |
|||
|
|||
// StopCoordinator stops the coordinator
|
|||
func (n *Node) StopCoordinator() { |
|||
log.Info("Stopping Coordinator...") |
|||
n.stopForge <- true |
|||
n.stopGetProofCallForge <- true |
|||
n.stopForgeCallConfirm <- true |
|||
<-n.stoppedForge |
|||
<-n.stoppedGetProofCallForge |
|||
<-n.stoppedForgeCallConfirm |
|||
} |
|||
|
|||
// StartSynchronizer starts the synchronizer
|
|||
func (n *Node) StartSynchronizer() { |
|||
log.Info("Starting Synchronizer...") |
|||
n.stopSync = make(chan bool) |
|||
n.stoppedSync = make(chan bool) |
|||
go func() { |
|||
defer func() { n.stoppedSync <- true }() |
|||
for { |
|||
select { |
|||
case <-n.stopSync: |
|||
log.Info("Coordinator stopped") |
|||
return |
|||
case <-time.After(n.cfg.Synchronizer.SyncLoopInterval.Duration): |
|||
if err := n.sync.Sync(); err != nil { |
|||
log.Errorw("Synchronizer.Sync", "error", err) |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
} |
|||
|
|||
// StopSynchronizer stops the synchronizer
|
|||
func (n *Node) StopSynchronizer() { |
|||
log.Info("Stopping Synchronizer...") |
|||
n.stopSync <- true |
|||
<-n.stoppedSync |
|||
} |
|||
|
|||
// Start the node
|
|||
func (n *Node) Start() { |
|||
log.Infow("Starting node...", "mode", n.mode) |
|||
if n.mode == ModeCoordinator { |
|||
n.StartCoordinator() |
|||
} |
|||
n.StartSynchronizer() |
|||
} |
|||
|
|||
// Stop the node
|
|||
func (n *Node) Stop() { |
|||
log.Infow("Stopping node...") |
|||
if n.mode == ModeCoordinator { |
|||
n.StopCoordinator() |
|||
} |
|||
n.StopSynchronizer() |
|||
} |