You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

256 lines
6.5 KiB

  1. package node
  2. import (
  3. "time"
  4. "github.com/ethereum/go-ethereum/ethclient"
  5. "github.com/hermeznetwork/hermez-node/batchbuilder"
  6. "github.com/hermeznetwork/hermez-node/config"
  7. "github.com/hermeznetwork/hermez-node/coordinator"
  8. dbUtils "github.com/hermeznetwork/hermez-node/db"
  9. "github.com/hermeznetwork/hermez-node/db/historydb"
  10. "github.com/hermeznetwork/hermez-node/db/l2db"
  11. "github.com/hermeznetwork/hermez-node/db/statedb"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/synchronizer"
  15. "github.com/hermeznetwork/hermez-node/txselector"
  16. "github.com/jmoiron/sqlx"
  17. )
  18. // Mode sets the working mode of the node (synchronizer or coordinator)
  19. type Mode string
  20. const (
  21. // ModeCoordinator defines the mode of the HermezNode as Coordinator, which
  22. // means that the node is set to forge (which also will be synchronizing with
  23. // the L1 blockchain state)
  24. ModeCoordinator Mode = "coordinator"
  25. // ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
  26. // means that the node is set to only synchronize with the L1 blockchain state
  27. // and will not forge
  28. ModeSynchronizer Mode = "synchronizer"
  29. )
  30. // Node is the Hermez Node
  31. type Node struct {
  32. // Coordinator
  33. coord *coordinator.Coordinator
  34. coordCfg *config.Coordinator
  35. stopForge chan bool
  36. stopGetProofCallForge chan bool
  37. stopForgeCallConfirm chan bool
  38. stoppedForge chan bool
  39. stoppedGetProofCallForge chan bool
  40. stoppedForgeCallConfirm chan bool
  41. // Synchronizer
  42. sync *synchronizer.Synchronizer
  43. stopSync chan bool
  44. stoppedSync chan bool
  45. // General
  46. cfg *config.Node
  47. mode Mode
  48. sqlConn *sqlx.DB
  49. }
  50. // NewNode creates a Node
  51. func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, error) {
  52. // Stablish DB connection
  53. db, err := dbUtils.InitSQLDB(
  54. cfg.PostgreSQL.Port,
  55. cfg.PostgreSQL.Host,
  56. cfg.PostgreSQL.User,
  57. cfg.PostgreSQL.Password,
  58. cfg.PostgreSQL.Name,
  59. )
  60. if err != nil {
  61. return nil, err
  62. }
  63. historyDB := historydb.NewHistoryDB(db)
  64. stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, statedb.TypeSynchronizer, 32)
  65. if err != nil {
  66. return nil, err
  67. }
  68. ethClient, err := ethclient.Dial(cfg.Web3.URL)
  69. if err != nil {
  70. return nil, err
  71. }
  72. client := eth.NewClient(ethClient, nil, nil, nil)
  73. sync := synchronizer.NewSynchronizer(client, historyDB, stateDB)
  74. var coord *coordinator.Coordinator
  75. if mode == ModeCoordinator {
  76. l2DB := l2db.NewL2DB(
  77. db,
  78. coordCfg.L2DB.SafetyPeriod,
  79. coordCfg.L2DB.MaxTxs,
  80. coordCfg.L2DB.TTL.Duration,
  81. )
  82. // TODO: Get (maxL1UserTxs, maxL1OperatorTxs, maxTxs) from the smart contract
  83. txSelector, err := txselector.NewTxSelector(coordCfg.TxSelector.Path, stateDB, l2DB, 10, 10, 10)
  84. if err != nil {
  85. return nil, err
  86. }
  87. // TODO: Get (configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) from smart contract
  88. nLevels := uint64(32) //nolint:gomnd
  89. batchBuilder, err := batchbuilder.NewBatchBuilder(coordCfg.BatchBuilder.Path, stateDB, nil, 0, nLevels)
  90. if err != nil {
  91. return nil, err
  92. }
  93. if err != nil {
  94. return nil, err
  95. }
  96. serverProofs := make([]coordinator.ServerProofInterface, len(coordCfg.ServerProofs))
  97. for i, serverProofCfg := range coordCfg.ServerProofs {
  98. serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL)
  99. }
  100. coord = coordinator.NewCoordinator(
  101. coordinator.Config{
  102. ForgerAddress: coordCfg.ForgerAddress,
  103. },
  104. historyDB,
  105. txSelector,
  106. batchBuilder,
  107. serverProofs,
  108. client,
  109. )
  110. }
  111. return &Node{
  112. coord: coord,
  113. coordCfg: coordCfg,
  114. sync: sync,
  115. cfg: cfg,
  116. mode: mode,
  117. sqlConn: db,
  118. }, nil
  119. }
  120. // StartCoordinator starts the coordinator
  121. func (n *Node) StartCoordinator() {
  122. log.Info("Starting Coordinator...")
  123. n.stopForge = make(chan bool)
  124. n.stopGetProofCallForge = make(chan bool)
  125. n.stopForgeCallConfirm = make(chan bool)
  126. n.stoppedForge = make(chan bool)
  127. n.stoppedGetProofCallForge = make(chan bool)
  128. n.stoppedForgeCallConfirm = make(chan bool)
  129. batchCh0 := make(chan *coordinator.BatchInfo)
  130. batchCh1 := make(chan *coordinator.BatchInfo)
  131. go func() {
  132. defer func() { n.stoppedForge <- true }()
  133. for {
  134. select {
  135. case <-n.stopForge:
  136. return
  137. default:
  138. if forge, err := n.coord.ForgeLoopFn(batchCh0, n.stopForge); err == coordinator.ErrStop {
  139. return
  140. } else if err != nil {
  141. log.Errorw("Coordinator.ForgeLoopFn", "error", err)
  142. } else if !forge {
  143. time.Sleep(n.coordCfg.ForgeLoopInterval.Duration)
  144. }
  145. }
  146. }
  147. }()
  148. go func() {
  149. defer func() { n.stoppedGetProofCallForge <- true }()
  150. for {
  151. select {
  152. case <-n.stopGetProofCallForge:
  153. return
  154. default:
  155. if err := n.coord.GetProofCallForgeLoopFn(
  156. batchCh0, batchCh1, n.stopGetProofCallForge); err == coordinator.ErrStop {
  157. return
  158. } else if err != nil {
  159. log.Errorw("Coordinator.GetProofCallForgeLoopFn", "error", err)
  160. }
  161. }
  162. }
  163. }()
  164. go func() {
  165. defer func() { n.stoppedForgeCallConfirm <- true }()
  166. for {
  167. select {
  168. case <-n.stopForgeCallConfirm:
  169. return
  170. default:
  171. if err := n.coord.ForgeCallConfirmLoopFn(
  172. batchCh1, n.stopForgeCallConfirm); err == coordinator.ErrStop {
  173. return
  174. } else if err != nil {
  175. log.Errorw("Coordinator.ForgeCallConfirmLoopFn", "error", err)
  176. }
  177. }
  178. }
  179. }()
  180. }
  181. // StopCoordinator stops the coordinator
  182. func (n *Node) StopCoordinator() {
  183. log.Info("Stopping Coordinator...")
  184. n.stopForge <- true
  185. n.stopGetProofCallForge <- true
  186. n.stopForgeCallConfirm <- true
  187. <-n.stoppedForge
  188. <-n.stoppedGetProofCallForge
  189. <-n.stoppedForgeCallConfirm
  190. }
  191. // StartSynchronizer starts the synchronizer
  192. func (n *Node) StartSynchronizer() {
  193. log.Info("Starting Synchronizer...")
  194. n.stopSync = make(chan bool)
  195. n.stoppedSync = make(chan bool)
  196. go func() {
  197. defer func() { n.stoppedSync <- true }()
  198. for {
  199. select {
  200. case <-n.stopSync:
  201. log.Info("Coordinator stopped")
  202. return
  203. case <-time.After(n.cfg.Synchronizer.SyncLoopInterval.Duration):
  204. if err := n.sync.Sync(); err != nil {
  205. log.Errorw("Synchronizer.Sync", "error", err)
  206. }
  207. }
  208. }
  209. }()
  210. }
  211. // StopSynchronizer stops the synchronizer
  212. func (n *Node) StopSynchronizer() {
  213. log.Info("Stopping Synchronizer...")
  214. n.stopSync <- true
  215. <-n.stoppedSync
  216. }
  217. // Start the node
  218. func (n *Node) Start() {
  219. log.Infow("Starting node...", "mode", n.mode)
  220. if n.mode == ModeCoordinator {
  221. n.StartCoordinator()
  222. }
  223. n.StartSynchronizer()
  224. }
  225. // Stop the node
  226. func (n *Node) Stop() {
  227. log.Infow("Stopping node...")
  228. if n.mode == ModeCoordinator {
  229. n.StopCoordinator()
  230. }
  231. n.StopSynchronizer()
  232. }