You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
6.5 KiB

  1. package node
  2. import (
  3. "time"
  4. "github.com/ethereum/go-ethereum/ethclient"
  5. "github.com/hermeznetwork/hermez-node/batchbuilder"
  6. "github.com/hermeznetwork/hermez-node/config"
  7. "github.com/hermeznetwork/hermez-node/coordinator"
  8. "github.com/hermeznetwork/hermez-node/db/historydb"
  9. "github.com/hermeznetwork/hermez-node/db/l2db"
  10. "github.com/hermeznetwork/hermez-node/db/statedb"
  11. "github.com/hermeznetwork/hermez-node/eth"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/hermez-node/synchronizer"
  14. "github.com/hermeznetwork/hermez-node/txselector"
  15. )
  16. // Mode sets the working mode of the node (synchronizer or coordinator)
  17. type Mode string
  18. const (
  19. // ModeCoordinator defines the mode of the HermezNode as Coordinator, which
  20. // means that the node is set to forge (which also will be synchronizing with
  21. // the L1 blockchain state)
  22. ModeCoordinator Mode = "coordinator"
  23. // ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
  24. // means that the node is set to only synchronize with the L1 blockchain state
  25. // and will not forge
  26. ModeSynchronizer Mode = "synchronizer"
  27. )
  28. // Node is the Hermez Node
  29. type Node struct {
  30. // Coordinator
  31. coord *coordinator.Coordinator
  32. coordCfg *config.Coordinator
  33. stopForge chan bool
  34. stopGetProofCallForge chan bool
  35. stopForgeCallConfirm chan bool
  36. stoppedForge chan bool
  37. stoppedGetProofCallForge chan bool
  38. stoppedForgeCallConfirm chan bool
  39. // Synchronizer
  40. sync *synchronizer.Synchronizer
  41. stopSync chan bool
  42. stoppedSync chan bool
  43. // General
  44. cfg *config.Node
  45. mode Mode
  46. }
  47. // NewNode creates a Node
  48. func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, error) {
  49. historyDB, err := historydb.NewHistoryDB(
  50. cfg.PostgreSQL.Port,
  51. cfg.PostgreSQL.Host,
  52. cfg.PostgreSQL.User,
  53. cfg.PostgreSQL.Password,
  54. cfg.HistoryDB.Name,
  55. )
  56. if err != nil {
  57. return nil, err
  58. }
  59. stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, true, 32)
  60. if err != nil {
  61. return nil, err
  62. }
  63. ethClient, err := ethclient.Dial(cfg.Web3.URL)
  64. if err != nil {
  65. return nil, err
  66. }
  67. client := eth.NewClient(ethClient, nil, nil, nil)
  68. sync := synchronizer.NewSynchronizer(client, historyDB, stateDB)
  69. var coord *coordinator.Coordinator
  70. if mode == ModeCoordinator {
  71. l2DB, err := l2db.NewL2DB(
  72. cfg.PostgreSQL.Port,
  73. cfg.PostgreSQL.Host,
  74. cfg.PostgreSQL.User,
  75. cfg.PostgreSQL.Password,
  76. coordCfg.L2DB.Name,
  77. coordCfg.L2DB.SafetyPeriod,
  78. coordCfg.L2DB.MaxTxs,
  79. coordCfg.L2DB.TTL.Duration,
  80. )
  81. if err != nil {
  82. return nil, err
  83. }
  84. // TODO: Get (maxL1UserTxs, maxL1OperatorTxs, maxTxs) from the smart contract
  85. txSelector, err := txselector.NewTxSelector(coordCfg.TxSelector.Path, stateDB, l2DB, 10, 10, 10)
  86. if err != nil {
  87. return nil, err
  88. }
  89. // TODO: Get (configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) from smart contract
  90. nLevels := uint64(32) //nolint:gomnd
  91. batchBuilder, err := batchbuilder.NewBatchBuilder(coordCfg.BatchBuilder.Path, stateDB, nil, 0, nLevels)
  92. if err != nil {
  93. return nil, err
  94. }
  95. if err != nil {
  96. return nil, err
  97. }
  98. serverProofs := make([]coordinator.ServerProofInterface, len(coordCfg.ServerProofs))
  99. for i, serverProofCfg := range coordCfg.ServerProofs {
  100. serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL)
  101. }
  102. coord = coordinator.NewCoordinator(
  103. coordinator.Config{
  104. ForgerAddress: coordCfg.ForgerAddress,
  105. },
  106. historyDB,
  107. txSelector,
  108. batchBuilder,
  109. serverProofs,
  110. client,
  111. )
  112. }
  113. return &Node{
  114. coord: coord,
  115. coordCfg: coordCfg,
  116. sync: sync,
  117. cfg: cfg,
  118. mode: mode,
  119. }, nil
  120. }
  121. // StartCoordinator starts the coordinator
  122. func (n *Node) StartCoordinator() {
  123. log.Info("Starting Coordinator...")
  124. n.stopForge = make(chan bool)
  125. n.stopGetProofCallForge = make(chan bool)
  126. n.stopForgeCallConfirm = make(chan bool)
  127. n.stoppedForge = make(chan bool)
  128. n.stoppedGetProofCallForge = make(chan bool)
  129. n.stoppedForgeCallConfirm = make(chan bool)
  130. batchCh0 := make(chan *coordinator.BatchInfo)
  131. batchCh1 := make(chan *coordinator.BatchInfo)
  132. go func() {
  133. defer func() { n.stoppedForge <- true }()
  134. for {
  135. select {
  136. case <-n.stopForge:
  137. return
  138. default:
  139. if forge, err := n.coord.ForgeLoopFn(batchCh0, n.stopForge); err == coordinator.ErrStop {
  140. return
  141. } else if err != nil {
  142. log.Errorw("Coordinator.ForgeLoopFn", "error", err)
  143. } else if !forge {
  144. time.Sleep(n.coordCfg.ForgeLoopInterval.Duration)
  145. }
  146. }
  147. }
  148. }()
  149. go func() {
  150. defer func() { n.stoppedGetProofCallForge <- true }()
  151. for {
  152. select {
  153. case <-n.stopGetProofCallForge:
  154. return
  155. default:
  156. if err := n.coord.GetProofCallForgeLoopFn(
  157. batchCh0, batchCh1, n.stopGetProofCallForge); err == coordinator.ErrStop {
  158. return
  159. } else if err != nil {
  160. log.Errorw("Coordinator.GetProofCallForgeLoopFn", "error", err)
  161. }
  162. }
  163. }
  164. }()
  165. go func() {
  166. defer func() { n.stoppedForgeCallConfirm <- true }()
  167. for {
  168. select {
  169. case <-n.stopForgeCallConfirm:
  170. return
  171. default:
  172. if err := n.coord.ForgeCallConfirmLoopFn(
  173. batchCh1, n.stopForgeCallConfirm); err == coordinator.ErrStop {
  174. return
  175. } else if err != nil {
  176. log.Errorw("Coordinator.ForgeCallConfirmLoopFn", "error", err)
  177. }
  178. }
  179. }
  180. }()
  181. }
  182. // StopCoordinator stops the coordinator
  183. func (n *Node) StopCoordinator() {
  184. log.Info("Stopping Coordinator...")
  185. n.stopForge <- true
  186. n.stopGetProofCallForge <- true
  187. n.stopForgeCallConfirm <- true
  188. <-n.stoppedForge
  189. <-n.stoppedGetProofCallForge
  190. <-n.stoppedForgeCallConfirm
  191. }
  192. // StartSynchronizer starts the synchronizer
  193. func (n *Node) StartSynchronizer() {
  194. log.Info("Starting Synchronizer...")
  195. n.stopSync = make(chan bool)
  196. n.stoppedSync = make(chan bool)
  197. go func() {
  198. defer func() { n.stoppedSync <- true }()
  199. for {
  200. select {
  201. case <-n.stopSync:
  202. log.Info("Coordinator stopped")
  203. return
  204. case <-time.After(n.cfg.Synchronizer.SyncLoopInterval.Duration):
  205. if err := n.sync.Sync(); err != nil {
  206. log.Errorw("Synchronizer.Sync", "error", err)
  207. }
  208. }
  209. }
  210. }()
  211. }
  212. // StopSynchronizer stops the synchronizer
  213. func (n *Node) StopSynchronizer() {
  214. log.Info("Stopping Synchronizer...")
  215. n.stopSync <- true
  216. <-n.stoppedSync
  217. }
  218. // Start the node
  219. func (n *Node) Start() {
  220. log.Infow("Starting node...", "mode", n.mode)
  221. if n.mode == ModeCoordinator {
  222. n.StartCoordinator()
  223. }
  224. n.StartSynchronizer()
  225. }
  226. // Stop the node
  227. func (n *Node) Stop() {
  228. log.Infow("Stopping node...")
  229. if n.mode == ModeCoordinator {
  230. n.StopCoordinator()
  231. }
  232. n.StopSynchronizer()
  233. }