You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

306 lines
8.1 KiB

package node
import (
"context"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/config"
"github.com/hermeznetwork/hermez-node/coordinator"
dbUtils "github.com/hermeznetwork/hermez-node/db"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/jmoiron/sqlx"
)
// Mode sets the working mode of the node (synchronizer or coordinator)
type Mode string
const (
// ModeCoordinator defines the mode of the HermezNode as Coordinator, which
// means that the node is set to forge (which also will be synchronizing with
// the L1 blockchain state)
ModeCoordinator Mode = "coordinator"
// ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
// means that the node is set to only synchronize with the L1 blockchain state
// and will not forge
ModeSynchronizer Mode = "synchronizer"
)
// Node is the Hermez Node
type Node struct {
// Coordinator
coord *coordinator.Coordinator
coordCfg *config.Coordinator
stopForge chan bool
stopGetProofCallForge chan bool
stopForgeCallConfirm chan bool
stoppedForge chan bool
stoppedGetProofCallForge chan bool
stoppedForgeCallConfirm chan bool
// Synchronizer
sync *synchronizer.Synchronizer
stoppedSync chan bool
// General
cfg *config.Node
mode Mode
sqlConn *sqlx.DB
ctx context.Context
cancel context.CancelFunc
}
// NewNode creates a Node
func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, error) {
// Stablish DB connection
db, err := dbUtils.InitSQLDB(
cfg.PostgreSQL.Port,
cfg.PostgreSQL.Host,
cfg.PostgreSQL.User,
cfg.PostgreSQL.Password,
cfg.PostgreSQL.Name,
)
if err != nil {
return nil, err
}
historyDB := historydb.NewHistoryDB(db)
stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, statedb.TypeSynchronizer, 32)
if err != nil {
return nil, err
}
ethClient, err := ethclient.Dial(cfg.Web3.URL)
if err != nil {
return nil, err
}
client, err := eth.NewClient(ethClient, nil, nil, &eth.ClientConfig{
Ethereum: eth.EthereumConfig{
CallGasLimit: cfg.EthClient.CallGasLimit,
DeployGasLimit: cfg.EthClient.DeployGasLimit,
GasPriceDiv: cfg.EthClient.GasPriceDiv,
ReceiptTimeout: cfg.EthClient.ReceiptTimeout.Duration,
IntervalReceiptLoop: cfg.EthClient.IntervalReceiptLoop.Duration,
},
Rollup: eth.RollupConfig{
Address: cfg.SmartContracts.Rollup,
},
Auction: eth.AuctionConfig{
Address: cfg.SmartContracts.Auction,
TokenHEZ: eth.TokenConfig{
Address: cfg.SmartContracts.TokenHEZ,
Name: cfg.SmartContracts.TokenHEZName,
},
},
})
if err != nil {
return nil, err
}
sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB, synchronizer.Config{
StartBlockNum: synchronizer.ConfigStartBlockNum{
Rollup: cfg.Synchronizer.StartBlockNum.Rollup,
Auction: cfg.Synchronizer.StartBlockNum.Auction,
WDelayer: cfg.Synchronizer.StartBlockNum.WDelayer,
},
})
if err != nil {
return nil, err
}
var coord *coordinator.Coordinator
if mode == ModeCoordinator {
l2DB := l2db.NewL2DB(
db,
coordCfg.L2DB.SafetyPeriod,
coordCfg.L2DB.MaxTxs,
coordCfg.L2DB.TTL.Duration,
)
// TODO: Get (maxL1UserTxs, maxL1OperatorTxs, maxTxs) from the smart contract
txSelector, err := txselector.NewTxSelector(coordCfg.TxSelector.Path, stateDB, l2DB, 10, 10, 10)
if err != nil {
return nil, err
}
// TODO: Get (configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) from smart contract
nLevels := uint64(32) //nolint:gomnd
batchBuilder, err := batchbuilder.NewBatchBuilder(coordCfg.BatchBuilder.Path, stateDB, nil, 0, nLevels)
if err != nil {
return nil, err
}
if err != nil {
return nil, err
}
serverProofs := make([]coordinator.ServerProofInterface, len(coordCfg.ServerProofs))
for i, serverProofCfg := range coordCfg.ServerProofs {
serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL)
}
coord = coordinator.NewCoordinator(
coordinator.Config{
ForgerAddress: coordCfg.ForgerAddress,
},
historyDB,
txSelector,
batchBuilder,
serverProofs,
client,
)
}
ctx, cancel := context.WithCancel(context.Background())
return &Node{
coord: coord,
coordCfg: coordCfg,
sync: sync,
cfg: cfg,
mode: mode,
sqlConn: db,
ctx: ctx,
cancel: cancel,
}, nil
}
// StartCoordinator starts the coordinator
func (n *Node) StartCoordinator() {
log.Info("Starting Coordinator...")
n.stopForge = make(chan bool)
n.stopGetProofCallForge = make(chan bool)
n.stopForgeCallConfirm = make(chan bool)
n.stoppedForge = make(chan bool)
n.stoppedGetProofCallForge = make(chan bool)
n.stoppedForgeCallConfirm = make(chan bool)
queueSize := 1
batchCh0 := make(chan *coordinator.BatchInfo, queueSize)
batchCh1 := make(chan *coordinator.BatchInfo, queueSize)
go func() {
defer func() { n.stoppedForge <- true }()
for {
select {
case <-n.stopForge:
return
default:
if forge, err := n.coord.ForgeLoopFn(batchCh0, n.stopForge); err == coordinator.ErrStop {
return
} else if err != nil {
log.Errorw("Coordinator.ForgeLoopFn", "error", err)
} else if !forge {
time.Sleep(n.coordCfg.ForgeLoopInterval.Duration)
}
}
}
}()
go func() {
defer func() { n.stoppedGetProofCallForge <- true }()
for {
select {
case <-n.stopGetProofCallForge:
return
default:
if err := n.coord.GetProofCallForgeLoopFn(
batchCh0, batchCh1, n.stopGetProofCallForge); err == coordinator.ErrStop {
return
} else if err != nil {
log.Errorw("Coordinator.GetProofCallForgeLoopFn", "error", err)
}
}
}
}()
go func() {
defer func() { n.stoppedForgeCallConfirm <- true }()
for {
select {
case <-n.stopForgeCallConfirm:
return
default:
if err := n.coord.ForgeCallConfirmLoopFn(
batchCh1, n.stopForgeCallConfirm); err == coordinator.ErrStop {
return
} else if err != nil {
log.Errorw("Coordinator.ForgeCallConfirmLoopFn", "error", err)
}
}
}
}()
}
// StopCoordinator stops the coordinator
func (n *Node) StopCoordinator() {
log.Info("Stopping Coordinator...")
n.stopForge <- true
n.stopGetProofCallForge <- true
n.stopForgeCallConfirm <- true
<-n.stoppedForge
<-n.stoppedGetProofCallForge
<-n.stoppedForgeCallConfirm
}
// StartSynchronizer starts the synchronizer
func (n *Node) StartSynchronizer() {
log.Info("Starting Synchronizer...")
n.stoppedSync = make(chan bool)
go func() {
defer func() { n.stoppedSync <- true }()
var lastBlock *common.Block
d := time.Duration(0)
for {
select {
case <-n.ctx.Done():
log.Info("Coordinator stopped")
return
case <-time.After(d):
if blockData, discarded, err := n.sync.Sync2(n.ctx, lastBlock); err != nil {
log.Errorw("Synchronizer.Sync", "error", err)
lastBlock = nil
d = n.cfg.Synchronizer.SyncLoopInterval.Duration
} else if discarded != nil {
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
lastBlock = nil
d = time.Duration(0)
} else if blockData != nil {
lastBlock = &blockData.Block
d = time.Duration(0)
} else {
d = n.cfg.Synchronizer.SyncLoopInterval.Duration
}
}
}
}()
// TODO: Run price updater. This is required by the API and the TxSelector
}
// WaitStopSynchronizer waits for the synchronizer to stop
func (n *Node) WaitStopSynchronizer() {
log.Info("Waiting for Synchronizer to stop...")
<-n.stoppedSync
}
// Start the node
func (n *Node) Start() {
log.Infow("Starting node...", "mode", n.mode)
if n.mode == ModeCoordinator {
n.StartCoordinator()
}
n.StartSynchronizer()
}
// Stop the node
func (n *Node) Stop() {
log.Infow("Stopping node...")
n.cancel()
if n.mode == ModeCoordinator {
n.StopCoordinator()
}
n.WaitStopSynchronizer()
}