You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
6.7 KiB

  1. package node
  2. import (
  3. "context"
  4. "time"
  5. "github.com/ethereum/go-ethereum/ethclient"
  6. "github.com/hermeznetwork/hermez-node/batchbuilder"
  7. "github.com/hermeznetwork/hermez-node/config"
  8. "github.com/hermeznetwork/hermez-node/coordinator"
  9. dbUtils "github.com/hermeznetwork/hermez-node/db"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/db/statedb"
  13. "github.com/hermeznetwork/hermez-node/eth"
  14. "github.com/hermeznetwork/hermez-node/log"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/jmoiron/sqlx"
  18. )
  19. // Mode sets the working mode of the node (synchronizer or coordinator)
  20. type Mode string
  21. const (
  22. // ModeCoordinator defines the mode of the HermezNode as Coordinator, which
  23. // means that the node is set to forge (which also will be synchronizing with
  24. // the L1 blockchain state)
  25. ModeCoordinator Mode = "coordinator"
  26. // ModeSynchronizer defines the mode of the HermezNode as Synchronizer, which
  27. // means that the node is set to only synchronize with the L1 blockchain state
  28. // and will not forge
  29. ModeSynchronizer Mode = "synchronizer"
  30. )
  31. // Node is the Hermez Node
  32. type Node struct {
  33. // Coordinator
  34. coord *coordinator.Coordinator
  35. coordCfg *config.Coordinator
  36. stopForge chan bool
  37. stopGetProofCallForge chan bool
  38. stopForgeCallConfirm chan bool
  39. stoppedForge chan bool
  40. stoppedGetProofCallForge chan bool
  41. stoppedForgeCallConfirm chan bool
  42. // Synchronizer
  43. sync *synchronizer.Synchronizer
  44. stopSync chan bool
  45. stoppedSync chan bool
  46. // General
  47. cfg *config.Node
  48. mode Mode
  49. sqlConn *sqlx.DB
  50. }
  51. // NewNode creates a Node
  52. func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node, error) {
  53. // Stablish DB connection
  54. db, err := dbUtils.InitSQLDB(
  55. cfg.PostgreSQL.Port,
  56. cfg.PostgreSQL.Host,
  57. cfg.PostgreSQL.User,
  58. cfg.PostgreSQL.Password,
  59. cfg.PostgreSQL.Name,
  60. )
  61. if err != nil {
  62. return nil, err
  63. }
  64. historyDB := historydb.NewHistoryDB(db)
  65. stateDB, err := statedb.NewStateDB(cfg.StateDB.Path, statedb.TypeSynchronizer, 32)
  66. if err != nil {
  67. return nil, err
  68. }
  69. ethClient, err := ethclient.Dial(cfg.Web3.URL)
  70. if err != nil {
  71. return nil, err
  72. }
  73. client, err := eth.NewClient(ethClient, nil, nil, nil)
  74. if err != nil {
  75. return nil, err
  76. }
  77. sync, err := synchronizer.NewSynchronizer(client, historyDB, stateDB)
  78. if err != nil {
  79. return nil, err
  80. }
  81. var coord *coordinator.Coordinator
  82. if mode == ModeCoordinator {
  83. l2DB := l2db.NewL2DB(
  84. db,
  85. coordCfg.L2DB.SafetyPeriod,
  86. coordCfg.L2DB.MaxTxs,
  87. coordCfg.L2DB.TTL.Duration,
  88. )
  89. // TODO: Get (maxL1UserTxs, maxL1OperatorTxs, maxTxs) from the smart contract
  90. txSelector, err := txselector.NewTxSelector(coordCfg.TxSelector.Path, stateDB, l2DB, 10, 10, 10)
  91. if err != nil {
  92. return nil, err
  93. }
  94. // TODO: Get (configCircuits []ConfigCircuit, batchNum common.BatchNum, nLevels uint64) from smart contract
  95. nLevels := uint64(32) //nolint:gomnd
  96. batchBuilder, err := batchbuilder.NewBatchBuilder(coordCfg.BatchBuilder.Path, stateDB, nil, 0, nLevels)
  97. if err != nil {
  98. return nil, err
  99. }
  100. if err != nil {
  101. return nil, err
  102. }
  103. serverProofs := make([]coordinator.ServerProofInterface, len(coordCfg.ServerProofs))
  104. for i, serverProofCfg := range coordCfg.ServerProofs {
  105. serverProofs[i] = coordinator.NewServerProof(serverProofCfg.URL)
  106. }
  107. coord = coordinator.NewCoordinator(
  108. coordinator.Config{
  109. ForgerAddress: coordCfg.ForgerAddress,
  110. },
  111. historyDB,
  112. txSelector,
  113. batchBuilder,
  114. serverProofs,
  115. client,
  116. )
  117. }
  118. return &Node{
  119. coord: coord,
  120. coordCfg: coordCfg,
  121. sync: sync,
  122. cfg: cfg,
  123. mode: mode,
  124. sqlConn: db,
  125. }, nil
  126. }
  127. // StartCoordinator starts the coordinator
  128. func (n *Node) StartCoordinator() {
  129. log.Info("Starting Coordinator...")
  130. n.stopForge = make(chan bool)
  131. n.stopGetProofCallForge = make(chan bool)
  132. n.stopForgeCallConfirm = make(chan bool)
  133. n.stoppedForge = make(chan bool)
  134. n.stoppedGetProofCallForge = make(chan bool)
  135. n.stoppedForgeCallConfirm = make(chan bool)
  136. queueSize := 1
  137. batchCh0 := make(chan *coordinator.BatchInfo, queueSize)
  138. batchCh1 := make(chan *coordinator.BatchInfo, queueSize)
  139. go func() {
  140. defer func() { n.stoppedForge <- true }()
  141. for {
  142. select {
  143. case <-n.stopForge:
  144. return
  145. default:
  146. if forge, err := n.coord.ForgeLoopFn(batchCh0, n.stopForge); err == coordinator.ErrStop {
  147. return
  148. } else if err != nil {
  149. log.Errorw("Coordinator.ForgeLoopFn", "error", err)
  150. } else if !forge {
  151. time.Sleep(n.coordCfg.ForgeLoopInterval.Duration)
  152. }
  153. }
  154. }
  155. }()
  156. go func() {
  157. defer func() { n.stoppedGetProofCallForge <- true }()
  158. for {
  159. select {
  160. case <-n.stopGetProofCallForge:
  161. return
  162. default:
  163. if err := n.coord.GetProofCallForgeLoopFn(
  164. batchCh0, batchCh1, n.stopGetProofCallForge); err == coordinator.ErrStop {
  165. return
  166. } else if err != nil {
  167. log.Errorw("Coordinator.GetProofCallForgeLoopFn", "error", err)
  168. }
  169. }
  170. }
  171. }()
  172. go func() {
  173. defer func() { n.stoppedForgeCallConfirm <- true }()
  174. for {
  175. select {
  176. case <-n.stopForgeCallConfirm:
  177. return
  178. default:
  179. if err := n.coord.ForgeCallConfirmLoopFn(
  180. batchCh1, n.stopForgeCallConfirm); err == coordinator.ErrStop {
  181. return
  182. } else if err != nil {
  183. log.Errorw("Coordinator.ForgeCallConfirmLoopFn", "error", err)
  184. }
  185. }
  186. }
  187. }()
  188. }
  189. // StopCoordinator stops the coordinator
  190. func (n *Node) StopCoordinator() {
  191. log.Info("Stopping Coordinator...")
  192. n.stopForge <- true
  193. n.stopGetProofCallForge <- true
  194. n.stopForgeCallConfirm <- true
  195. <-n.stoppedForge
  196. <-n.stoppedGetProofCallForge
  197. <-n.stoppedForgeCallConfirm
  198. }
  199. // StartSynchronizer starts the synchronizer
  200. func (n *Node) StartSynchronizer() {
  201. log.Info("Starting Synchronizer...")
  202. n.stopSync = make(chan bool)
  203. n.stoppedSync = make(chan bool)
  204. go func() {
  205. defer func() { n.stoppedSync <- true }()
  206. for {
  207. select {
  208. case <-n.stopSync:
  209. log.Info("Coordinator stopped")
  210. return
  211. case <-time.After(n.cfg.Synchronizer.SyncLoopInterval.Duration):
  212. if err := n.sync.Sync(context.TODO()); err != nil {
  213. log.Errorw("Synchronizer.Sync", "error", err)
  214. }
  215. }
  216. }
  217. }()
  218. }
  219. // StopSynchronizer stops the synchronizer
  220. func (n *Node) StopSynchronizer() {
  221. log.Info("Stopping Synchronizer...")
  222. n.stopSync <- true
  223. <-n.stoppedSync
  224. }
  225. // Start the node
  226. func (n *Node) Start() {
  227. log.Infow("Starting node...", "mode", n.mode)
  228. if n.mode == ModeCoordinator {
  229. n.StartCoordinator()
  230. }
  231. n.StartSynchronizer()
  232. }
  233. // Stop the node
  234. func (n *Node) Stop() {
  235. log.Infow("Stopping node...")
  236. if n.mode == ModeCoordinator {
  237. n.StopCoordinator()
  238. }
  239. n.StopSynchronizer()
  240. }