You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

372 lines
12 KiB

  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. ethCommon "github.com/ethereum/go-ethereum/common"
  7. "github.com/ethereum/go-ethereum/core/types"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/eth"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/hermez-node/txselector"
  14. )
  15. var errTODO = fmt.Errorf("TODO")
  16. // ErrStop is returned when the function is stopped asynchronously via the stop
  17. // channel. It doesn't indicate an error.
  18. var ErrStop = fmt.Errorf("Stopped")
  19. // Config contains the Coordinator configuration
  20. type Config struct {
  21. ForgerAddress ethCommon.Address
  22. }
  23. // Coordinator implements the Coordinator type
  24. type Coordinator struct {
  25. forging bool
  26. // rw *sync.RWMutex
  27. // isForgeSeq bool // WIP just for testing while implementing
  28. config Config
  29. batchNum common.BatchNum
  30. serverProofPool *ServerProofPool
  31. // synchronizer *synchronizer.Synchronizer
  32. hdb *historydb.HistoryDB
  33. txsel *txselector.TxSelector
  34. batchBuilder *batchbuilder.BatchBuilder
  35. ethClient eth.ClientInterface
  36. ethTxs []*types.Transaction
  37. // ethTxStore kvdb.Storage
  38. }
  39. // NewCoordinator creates a new Coordinator
  40. func NewCoordinator(conf Config,
  41. hdb *historydb.HistoryDB,
  42. txsel *txselector.TxSelector,
  43. bb *batchbuilder.BatchBuilder,
  44. serverProofs []ServerProofInterface,
  45. ethClient eth.ClientInterface) *Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here
  46. serverProofPool := NewServerProofPool(len(serverProofs))
  47. for _, serverProof := range serverProofs {
  48. serverProofPool.Add(serverProof)
  49. }
  50. c := Coordinator{
  51. config: conf,
  52. serverProofPool: serverProofPool,
  53. hdb: hdb,
  54. txsel: txsel,
  55. batchBuilder: bb,
  56. ethClient: ethClient,
  57. ethTxs: make([]*types.Transaction, 0),
  58. // ethTxStore: memory.NewMemoryStorage(),
  59. // rw: &sync.RWMutex{},
  60. }
  61. return &c
  62. }
  63. // TODO(Edu): Change the current design of the coordinator structur:
  64. // - Move Start and Stop functions (from node/node.go) here
  65. // - Add concept of StartPipeline, StopPipeline, that spawns and stops the goroutines
  66. // - Add a Manager that calls StartPipeline and StopPipeline, checks when it's time to forge, schedules new batches, etc.
  67. // - Add a TxMonitor that monitors successful ForgeBatch ethereum transactions and waits for N blocks of confirmation, and reports back errors to the Manager.
  68. // ForgeLoopFn is the function ran in a loop that checks if it's time to forge
  69. // and forges a batch if so and sends it to outBatchCh. Returns true if it's
  70. // the coordinator turn to forge.
  71. func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) (forgetime bool, err error) {
  72. // TODO: Move the logic to check if it's forge time or not outside the pipeline
  73. isForgeSequence, err := c.isForgeSequence()
  74. if err != nil {
  75. return false, err
  76. }
  77. if !isForgeSequence {
  78. if c.forging {
  79. log.Info("ForgeLoopFn: forging state end")
  80. c.forging = false
  81. }
  82. log.Debug("ForgeLoopFn: not in forge time")
  83. return false, nil
  84. }
  85. log.Debug("ForgeLoopFn: forge time")
  86. if !c.forging {
  87. // Start pipeline from a batchNum state taken from synchronizer
  88. log.Info("ForgeLoopFn: forging state begin")
  89. // c.batchNum = c.hdb.GetLastBatchNum() // uncomment when HistoryDB is ready
  90. err := c.txsel.Reset(c.batchNum)
  91. if err != nil {
  92. log.Errorw("ForgeLoopFn: TxSelector.Reset", "error", err)
  93. return true, err
  94. }
  95. err = c.batchBuilder.Reset(c.batchNum, true)
  96. if err != nil {
  97. log.Errorw("ForgeLoopFn: BatchBuilder.Reset", "error", err)
  98. return true, err
  99. }
  100. // c.batchQueue = NewBatchQueue()
  101. c.forging = true
  102. }
  103. // TODO once synchronizer has this method ready:
  104. // If there's been a reorg, handle it
  105. // handleReorg() function decides if the reorg must restart the pipeline or not
  106. // if c.synchronizer.Reorg():
  107. _ = c.handleReorg()
  108. defer func() {
  109. if err == ErrStop {
  110. log.Info("ForgeLoopFn: forgeLoopFn stopped")
  111. }
  112. }()
  113. // 0. Wait for an available server proof
  114. // blocking call
  115. serverProof, err := c.serverProofPool.Get(stopCh)
  116. if err != nil {
  117. return true, err
  118. }
  119. defer func() {
  120. if !forgetime || err != nil {
  121. c.serverProofPool.Add(serverProof)
  122. }
  123. }()
  124. log.Debugw("ForgeLoopFn: using serverProof", "server", serverProof)
  125. log.Debugw("ForgeLoopFn: forge start")
  126. // forge for batchNum = batchNum + 1.
  127. batchInfo, err := c.forge(serverProof)
  128. if err != nil {
  129. log.Errorw("forge", "error", err)
  130. return true, err
  131. }
  132. log.Debugw("ForgeLoopFn: forge end", "batchNum", batchInfo.batchNum)
  133. outBatchCh <- batchInfo
  134. return true, nil
  135. }
  136. // GetProofCallForgeLoopFn is the function ran in a loop that gets a forged
  137. // batch via inBatchCh, waits for the proof server to finish, calls the ForgeBatch
  138. // function in the Rollup Smart Contract, and sends the batch to outBatchCh.
  139. func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchInfo, stopCh chan bool) (err error) {
  140. defer func() {
  141. if err == ErrStop {
  142. log.Info("GetProofCallForgeLoopFn: forgeLoopFn stopped")
  143. }
  144. }()
  145. select {
  146. case <-stopCh:
  147. return ErrStop
  148. case batchInfo := <-inBatchCh:
  149. log.Debugw("GetProofCallForgeLoopFn: getProofCallForge start", "batchNum", batchInfo.batchNum)
  150. if err := c.getProofCallForge(batchInfo, stopCh); err != nil {
  151. return err
  152. }
  153. log.Debugw("GetProofCallForgeLoopFn: getProofCallForge end", "batchNum", batchInfo.batchNum)
  154. outBatchCh <- batchInfo
  155. }
  156. return nil
  157. }
  158. // ForgeCallConfirmLoopFn is the function ran in a loop that gets a batch that
  159. // has been sent to the Rollup Smart Contract via inBatchCh and waits for the
  160. // ethereum transaction confirmation.
  161. func (c *Coordinator) ForgeCallConfirmLoopFn(inBatchCh chan *BatchInfo, stopCh chan bool) (err error) {
  162. defer func() {
  163. if err == ErrStop {
  164. log.Info("ForgeCallConfirmLoopFn: forgeConfirmLoopFn stopped")
  165. }
  166. }()
  167. select {
  168. case <-stopCh:
  169. return ErrStop
  170. case batchInfo := <-inBatchCh:
  171. log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm start", "batchNum", batchInfo.batchNum)
  172. if err := c.forgeCallConfirm(batchInfo, stopCh); err != nil {
  173. return err
  174. }
  175. log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm end", "batchNum", batchInfo.batchNum)
  176. }
  177. return nil
  178. }
  179. func (c *Coordinator) forge(serverProof ServerProofInterface) (*BatchInfo, error) {
  180. // remove transactions from the pool that have been there for too long
  181. err := c.purgeRemoveByTimeout()
  182. if err != nil {
  183. return nil, err
  184. }
  185. c.batchNum = c.batchNum + 1
  186. batchInfo := NewBatchInfo(c.batchNum, serverProof) // to accumulate metadata of the batch
  187. var poolL2Txs []common.PoolL2Tx
  188. // var feesInfo
  189. var l1UserTxsExtra, l1OperatorTxs []common.L1Tx
  190. // 1. Decide if we forge L2Tx or L1+L2Tx
  191. if c.shouldL1L2Batch() {
  192. // 2a: L1+L2 txs
  193. // l1UserTxs, toForgeL1TxsNumber := c.hdb.GetNextL1UserTxs() // TODO once HistoryDB is ready, uncomment
  194. var l1UserTxs []common.L1Tx = nil // tmp, depends on HistoryDB
  195. l1UserTxsExtra, l1OperatorTxs, poolL2Txs, err = c.txsel.GetL1L2TxSelection(c.batchNum, l1UserTxs) // TODO once feesInfo is added to method return, add the var
  196. if err != nil {
  197. return nil, err
  198. }
  199. } else {
  200. // 2b: only L2 txs
  201. poolL2Txs, err = c.txsel.GetL2TxSelection(c.batchNum) // TODO once feesInfo is added to method return, add the var
  202. if err != nil {
  203. return nil, err
  204. }
  205. l1UserTxsExtra = nil
  206. l1OperatorTxs = nil
  207. }
  208. // Run purger to invalidate transactions that become invalid beause of
  209. // the poolL2Txs selected. Will mark as invalid the txs that have a
  210. // (fromIdx, nonce) which already appears in the selected txs (includes
  211. // all the nonces smaller than the current one)
  212. err = c.purgeInvalidDueToL2TxsSelection(poolL2Txs)
  213. if err != nil {
  214. return nil, err
  215. }
  216. // 3. Save metadata from TxSelector output for BatchNum
  217. batchInfo.SetTxsInfo(l1UserTxsExtra, l1OperatorTxs, poolL2Txs) // TODO feesInfo
  218. // 4. Call BatchBuilder with TxSelector output
  219. configBatch := &batchbuilder.ConfigBatch{
  220. ForgerAddress: c.config.ForgerAddress,
  221. }
  222. zkInputs, err := c.batchBuilder.BuildBatch(configBatch, l1UserTxsExtra, l1OperatorTxs, poolL2Txs, nil) // TODO []common.TokenID --> feesInfo
  223. if err != nil {
  224. return nil, err
  225. }
  226. // 5. Save metadata from BatchBuilder output for BatchNum
  227. batchInfo.SetZKInputs(zkInputs)
  228. // 6. Call an idle server proof with BatchBuilder output, save server proof info for batchNum
  229. err = batchInfo.serverProof.CalculateProof(zkInputs)
  230. if err != nil {
  231. return nil, err
  232. }
  233. return &batchInfo, nil
  234. }
  235. // getProofCallForge gets the generated zkProof & sends it to the SmartContract
  236. func (c *Coordinator) getProofCallForge(batchInfo *BatchInfo, stopCh chan bool) error {
  237. serverProof := batchInfo.serverProof
  238. proof, err := serverProof.GetProof(stopCh) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  239. c.serverProofPool.Add(serverProof)
  240. batchInfo.serverProof = nil
  241. if err != nil {
  242. return err
  243. }
  244. batchInfo.SetProof(proof)
  245. forgeBatchArgs := c.prepareForgeBatchArgs(batchInfo)
  246. ethTx, err := c.ethClient.RollupForgeBatch(forgeBatchArgs)
  247. if err != nil {
  248. return err
  249. }
  250. // TODO: Move this to the next step (forgeCallConfirm)
  251. log.Debugf("ethClient ForgeCall sent, batchNum: %d", c.batchNum)
  252. batchInfo.SetEthTx(ethTx)
  253. // TODO(FUTURE) once tx data type is defined, store ethTx (returned by ForgeCall)
  254. // TBD if use ethTxStore as a disk k-v database, or use a Queue
  255. // tx, err := c.ethTxStore.NewTx()
  256. // if err != nil {
  257. // return err
  258. // }
  259. // tx.Put(ethTx.Hash(), ethTx.Bytes())
  260. // if err := tx.Commit(); err!=nil {
  261. // return nil
  262. // }
  263. return nil
  264. }
  265. func (c *Coordinator) forgeCallConfirm(batchInfo *BatchInfo, stopCh chan bool) error {
  266. // TODO strategy of this sequence TBD
  267. // confirm eth txs and mark them as accepted sequence
  268. // IDEA: Keep an array in Coordinator with the list of sent ethTx.
  269. // Here, loop over them and only delete them once the number of
  270. // confirmed blocks is over a configured value. If the tx is rejected,
  271. // return error.
  272. // ethTx := ethTxStore.GetFirstPending()
  273. // waitForAccepted(ethTx) // blocking call, returns once the ethTx is mined
  274. // ethTxStore.MarkAccepted(ethTx)
  275. txID := batchInfo.ethTx.Hash()
  276. // TODO: Follow EthereumClient.waitReceipt logic
  277. count := 0
  278. // TODO: Define this waitTime in the config
  279. waitTime := 100 * time.Millisecond //nolint:gomnd
  280. select {
  281. case <-time.After(waitTime):
  282. receipt, err := c.ethClient.EthTransactionReceipt(context.TODO(), txID)
  283. if err != nil {
  284. return err
  285. }
  286. if receipt != nil {
  287. if receipt.Status == types.ReceiptStatusFailed {
  288. return fmt.Errorf("receipt status is failed")
  289. } else if receipt.Status == types.ReceiptStatusSuccessful {
  290. return nil
  291. }
  292. }
  293. // TODO: Call go-ethereum:
  294. // if err == nil && receipt == nil :
  295. // `func (ec *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {`
  296. count++
  297. if time.Duration(count)*waitTime > 60*time.Second {
  298. log.Warnw("Waiting for ethTx receipt for more than 60 seconds", "tx", batchInfo.ethTx)
  299. // TODO: Decide if we resend the Tx with higher gas price
  300. }
  301. case <-stopCh:
  302. return ErrStop
  303. }
  304. return fmt.Errorf("timeout")
  305. }
  306. func (c *Coordinator) handleReorg() error {
  307. return nil // TODO
  308. }
  309. // isForgeSequence returns true if the node is the Forger in the current ethereum block
  310. func (c *Coordinator) isForgeSequence() (bool, error) {
  311. // TODO: Consider checking if we can forge by quering the Synchronizer instead of using ethClient
  312. blockNum, err := c.ethClient.EthCurrentBlock()
  313. if err != nil {
  314. return false, err
  315. }
  316. addr, err := c.ethClient.EthAddress()
  317. if err != nil {
  318. return false, err
  319. }
  320. return c.ethClient.AuctionCanForge(*addr, blockNum+1)
  321. }
  322. func (c *Coordinator) purgeRemoveByTimeout() error {
  323. return nil // TODO
  324. }
  325. func (c *Coordinator) purgeInvalidDueToL2TxsSelection(l2Txs []common.PoolL2Tx) error {
  326. return nil // TODO
  327. }
  328. func (c *Coordinator) shouldL1L2Batch() bool {
  329. return false // TODO
  330. }
  331. func (c *Coordinator) prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  332. // TODO
  333. return &eth.RollupForgeBatchArgs{}
  334. }