You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
6.2 KiB

  1. package node
  2. import (
  3. "time"
  4. "github.com/ethereum/go-ethereum/ethclient"
  5. "github.com/hermeznetwork/hermez-node/batchbuilder"
  6. "github.com/hermeznetwork/hermez-node/config"
  7. "github.com/hermeznetwork/hermez-node/coordinator"
  8. "github.com/hermeznetwork/hermez-node/db/historydb"
  9. "github.com/hermeznetwork/hermez-node/db/l2db"
  10. "github.com/hermeznetwork/hermez-node/db/statedb"
  11. "github.com/hermeznetwork/hermez-node/eth"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/hermez-node/synchronizer"
  14. "github.com/hermeznetwork/hermez-node/txselector"
  15. )
  16. // Mode sets the working mode of the node (synchronizer or coordinator)
  17. type Mode string
  18. const (
  19. // ModeCoordinator defines the mode of the HermezNode as Coordinator, which
  20. // means that the node is set to forge (which also will be synchronizing with
  21. // the L1 blockchain state)
  22. ModeCoordinator Mode = "coordinator"
  23. // ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
  24. // means that the node is set to only synchronize with the L1 blockchain state
  25. // and will not forge
  26. ModeSynchronizer Mode = "synchronizer"
  27. )
  28. // Node is the Hermez Node
  29. type Node struct {
  30. // Coordinator
  31. coord *coordinator.Coordinator
  32. coordCfg *config.Coordinator
  33. stopForge chan bool
  34. stopGetProofCallForge chan bool
  35. stopForgeCallConfirm chan bool
  36. stoppedForge chan bool
  37. stoppedGetProofCallForge chan bool
  38. stoppedForgeCallConfirm chan bool
  39. // Synchronizer
  40. sync *synchronizer.Synchronizer
  41. stopSync chan bool
  42. stoppedSync chan bool
  43. // General
  44. cfg *config.Node
  45. mode Mode
  46. }
  47. // NewNode creates a Node
  48. func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, error) {
  49. historyDB, err := historydb.NewHistoryDB(
  50. cfg.PostgreSQL.Port,
  51. cfg.PostgreSQL.Host,
  52. cfg.PostgreSQL.User,
  53. cfg.PostgreSQL.Password,
  54. cfg.HistoryDB.Name,
  55. )
  56. if err != nil {
  57. return nil, err
  58. }
  59. stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, true, 32)
  60. if err != nil {
  61. return nil, err
  62. }
  63. ethClient, err := ethclient.Dial(cfg.Web3.URL)
  64. if err != nil {
  65. return nil, err
  66. }
  67. client := eth.NewClient(ethClient, nil, nil, nil)
  68. sync := synchronizer.NewSynchronizer(client, historyDB, stateDB)
  69. var coord *coordinator.Coordinator
  70. if mode == ModeCoordinator {
  71. l2DB, err := l2db.NewL2DB(
  72. cfg.PostgreSQL.Port,
  73. cfg.PostgreSQL.Host,
  74. cfg.PostgreSQL.User,
  75. cfg.PostgreSQL.Password,
  76. cfg.L2DB.Name,
  77. cfg.L2DB.SafetyPeriod,
  78. cfg.L2DB.MaxTxs,
  79. cfg.L2DB.TTL.Duration,
  80. )
  81. if err != nil {
  82. return nil, err
  83. }
  84. // TODO: Get (maxL1UserTxs, maxL1OperatorTxs, maxTxs) from the smart contract
  85. txSelector, err := txselector.NewTxSelector(cfg.TxSelector.Path, stateDB, l2DB, 10, 10, 10)
  86. if err != nil {
  87. return nil, err
  88. }
  89. // TODO: Get (configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) from smart contract
  90. nLevels := uint64(32) //nolint:gomnd
  91. batchBuilder, err := batchbuilder.NewBatchBuilder(cfg.BatchBuilder.Path, stateDB, nil, 0, nLevels)
  92. if err != nil {
  93. return nil, err
  94. }
  95. if err != nil {
  96. return nil, err
  97. }
  98. coord = coordinator.NewCoordinator(
  99. coordinator.Config{
  100. ForgerAddress: coordCfg.ForgerAddress,
  101. },
  102. historyDB,
  103. txSelector,
  104. batchBuilder,
  105. client,
  106. )
  107. }
  108. return &Node{
  109. coord: coord,
  110. coordCfg: coordCfg,
  111. sync: sync,
  112. cfg: cfg,
  113. mode: mode,
  114. }, nil
  115. }
  116. // StartCoordinator starts the coordinator
  117. func (n *Node) StartCoordinator() {
  118. log.Info("Starting Coordinator...")
  119. n.stopForge = make(chan bool)
  120. n.stopGetProofCallForge = make(chan bool)
  121. n.stopForgeCallConfirm = make(chan bool)
  122. n.stoppedForge = make(chan bool)
  123. n.stoppedGetProofCallForge = make(chan bool)
  124. n.stoppedForgeCallConfirm = make(chan bool)
  125. batchCh0 := make(chan *coordinator.BatchInfo)
  126. batchCh1 := make(chan *coordinator.BatchInfo)
  127. go func() {
  128. defer func() { n.stoppedForge <- true }()
  129. for {
  130. select {
  131. case <-n.stopForge:
  132. return
  133. default:
  134. if forge, err := n.coord.ForgeLoopFn(batchCh0, n.stopForge); err == coordinator.ErrStop {
  135. return
  136. } else if err != nil {
  137. log.Errorw("Coordinator.ForgeLoopFn", "error", err)
  138. } else if !forge {
  139. time.Sleep(n.coordCfg.ForgeLoopInterval.Duration)
  140. }
  141. }
  142. }
  143. }()
  144. go func() {
  145. defer func() { n.stoppedGetProofCallForge <- true }()
  146. for {
  147. select {
  148. case <-n.stopGetProofCallForge:
  149. return
  150. default:
  151. if err := n.coord.GetProofCallForgeLoopFn(
  152. batchCh0, batchCh1, n.stopGetProofCallForge); err == coordinator.ErrStop {
  153. return
  154. } else if err != nil {
  155. log.Errorw("Coordinator.GetProofCallForgeLoopFn", "error", err)
  156. }
  157. }
  158. }
  159. }()
  160. go func() {
  161. defer func() { n.stoppedForgeCallConfirm <- true }()
  162. for {
  163. select {
  164. case <-n.stopForgeCallConfirm:
  165. return
  166. default:
  167. if err := n.coord.ForgeCallConfirmLoopFn(
  168. batchCh1, n.stopForgeCallConfirm); err == coordinator.ErrStop {
  169. return
  170. } else if err != nil {
  171. log.Errorw("Coordinator.ForgeCallConfirmLoopFn", "error", err)
  172. }
  173. }
  174. }
  175. }()
  176. }
  177. // StopCoordinator stops the coordinator
  178. func (n *Node) StopCoordinator() {
  179. log.Info("Stopping Coordinator...")
  180. n.stopForge <- true
  181. n.stopGetProofCallForge <- true
  182. n.stopForgeCallConfirm <- true
  183. <-n.stoppedForge
  184. <-n.stoppedGetProofCallForge
  185. <-n.stoppedForgeCallConfirm
  186. }
  187. // StartSynchronizer starts the synchronizer
  188. func (n *Node) StartSynchronizer() {
  189. log.Info("Starting Synchronizer...")
  190. n.stopSync = make(chan bool)
  191. n.stoppedSync = make(chan bool)
  192. go func() {
  193. defer func() { n.stoppedSync <- true }()
  194. for {
  195. select {
  196. case <-n.stopSync:
  197. log.Info("Coordinator stopped")
  198. return
  199. case <-time.After(n.cfg.Synchronizer.SyncLoopInterval.Duration):
  200. if err := n.sync.Sync(); err != nil {
  201. log.Errorw("Synchronizer.Sync", "error", err)
  202. }
  203. }
  204. }
  205. }()
  206. }
  207. // StopSynchronizer stops the synchronizer
  208. func (n *Node) StopSynchronizer() {
  209. log.Info("Stopping Synchronizer...")
  210. n.stopSync <- true
  211. <-n.stoppedSync
  212. }
  213. // Start the node
  214. func (n *Node) Start() {
  215. log.Infow("Starting node...", "mode", n.mode)
  216. if n.mode == ModeCoordinator {
  217. n.StartCoordinator()
  218. }
  219. n.StartSynchronizer()
  220. }
  221. // Stop the node
  222. func (n *Node) Stop() {
  223. log.Infow("Stopping node...")
  224. if n.mode == ModeCoordinator {
  225. n.StopCoordinator()
  226. }
  227. n.StopSynchronizer()
  228. }