You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

398 lines
13 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "time"
  7. "github.com/ethereum/go-ethereum"
  8. "github.com/ethereum/go-ethereum/accounts"
  9. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/hermeznetwork/hermez-node/common"
  12. "github.com/hermeznetwork/hermez-node/db/l2db"
  13. "github.com/hermeznetwork/hermez-node/eth"
  14. "github.com/hermeznetwork/hermez-node/log"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/tracerr"
  17. )
  18. // TxManager handles everything related to ethereum transactions: It makes the
  19. // call to forge, waits for transaction confirmation, and keeps checking them
  20. // until a number of confirmed blocks have passed.
  21. type TxManager struct {
  22. cfg Config
  23. ethClient eth.ClientInterface
  24. l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
  25. coord *Coordinator // Used only to send messages to stop the pipeline
  26. batchCh chan *BatchInfo
  27. chainID *big.Int
  28. account accounts.Account
  29. consts synchronizer.SCConsts
  30. stats synchronizer.Stats
  31. vars synchronizer.SCVariables
  32. statsVarsCh chan statsVars
  33. queue []*BatchInfo
  34. // lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
  35. lastSuccessBatch common.BatchNum
  36. lastPendingBatch common.BatchNum
  37. lastSuccessNonce uint64
  38. lastPendingNonce uint64
  39. lastSentL1BatchBlockNum int64
  40. }
  41. // NewTxManager creates a new TxManager
  42. func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
  43. coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) {
  44. chainID, err := ethClient.EthChainID()
  45. if err != nil {
  46. return nil, tracerr.Wrap(err)
  47. }
  48. address, err := ethClient.EthAddress()
  49. if err != nil {
  50. return nil, tracerr.Wrap(err)
  51. }
  52. lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
  53. if err != nil {
  54. return nil, err
  55. }
  56. lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
  57. if err != nil {
  58. return nil, err
  59. }
  60. if lastSuccessNonce != lastPendingNonce {
  61. return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)",
  62. lastSuccessNonce, lastPendingNonce))
  63. }
  64. log.Infow("TxManager started", "nonce", lastSuccessNonce)
  65. return &TxManager{
  66. cfg: *cfg,
  67. ethClient: ethClient,
  68. l2DB: l2DB,
  69. coord: coord,
  70. batchCh: make(chan *BatchInfo, queueLen),
  71. statsVarsCh: make(chan statsVars, queueLen),
  72. account: accounts.Account{
  73. Address: *address,
  74. },
  75. chainID: chainID,
  76. consts: *scConsts,
  77. vars: *initSCVars,
  78. lastSuccessNonce: lastSuccessNonce,
  79. lastPendingNonce: lastPendingNonce,
  80. }, nil
  81. }
  82. // AddBatch is a thread safe method to pass a new batch TxManager to be sent to
  83. // the smart contract via the forge call
  84. func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) {
  85. select {
  86. case t.batchCh <- batchInfo:
  87. case <-ctx.Done():
  88. }
  89. }
  90. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  91. func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  92. select {
  93. case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  94. case <-ctx.Done():
  95. }
  96. }
  97. func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
  98. updateSCVars(&t.vars, vars)
  99. }
  100. // NewAuth generates a new auth object for an ethereum transaction
  101. func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
  102. gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
  103. if err != nil {
  104. return nil, tracerr.Wrap(err)
  105. }
  106. inc := new(big.Int).Set(gasPrice)
  107. const gasPriceDiv = 100
  108. inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv))
  109. gasPrice.Add(gasPrice, inc)
  110. // log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice)
  111. auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID)
  112. if err != nil {
  113. return nil, tracerr.Wrap(err)
  114. }
  115. auth.Value = big.NewInt(0) // in wei
  116. // TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
  117. auth.GasLimit = 1000000
  118. auth.GasPrice = gasPrice
  119. auth.Nonce = nil
  120. return auth, nil
  121. }
  122. func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error {
  123. nextBlock := t.stats.Eth.LastBlock.Num + 1
  124. if !t.canForgeAt(nextBlock) {
  125. return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock))
  126. }
  127. if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch {
  128. return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock))
  129. }
  130. margin := t.cfg.SendBatchBlocksMarginCheck
  131. if margin != 0 {
  132. if !t.canForgeAt(nextBlock + margin) {
  133. return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v",
  134. margin, nextBlock))
  135. }
  136. if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch {
  137. return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v",
  138. margin, nextBlock))
  139. }
  140. }
  141. return nil
  142. }
  143. func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
  144. var ethTx *types.Transaction
  145. var err error
  146. auth, err := t.NewAuth(ctx)
  147. if err != nil {
  148. return tracerr.Wrap(err)
  149. }
  150. auth.Nonce = big.NewInt(int64(t.lastPendingNonce))
  151. t.lastPendingNonce++
  152. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  153. ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
  154. if err != nil {
  155. log.Errorw("TxManager ethClient.RollupForgeBatch",
  156. "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
  157. "batchNum", batchInfo.BatchNum)
  158. } else {
  159. break
  160. }
  161. select {
  162. case <-ctx.Done():
  163. return tracerr.Wrap(common.ErrDone)
  164. case <-time.After(t.cfg.EthClientAttemptsDelay):
  165. }
  166. }
  167. if err != nil {
  168. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
  169. }
  170. batchInfo.EthTx = ethTx
  171. log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
  172. now := time.Now()
  173. batchInfo.SendTimestamp = now
  174. batchInfo.Debug.Status = StatusSent
  175. batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
  176. batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp
  177. batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
  178. batchInfo.Debug.StartTimestamp).Seconds()
  179. t.cfg.debugBatchStore(batchInfo)
  180. t.lastPendingBatch = batchInfo.BatchNum
  181. if batchInfo.L1Batch {
  182. t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1
  183. }
  184. if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
  185. return tracerr.Wrap(err)
  186. }
  187. return nil
  188. }
  189. // checkEthTransactionReceipt takes the txHash from the BatchInfo and stores
  190. // the corresponding receipt if found
  191. func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
  192. txHash := batchInfo.EthTx.Hash()
  193. var receipt *types.Receipt
  194. var err error
  195. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  196. receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
  197. if ctx.Err() != nil {
  198. continue
  199. } else if tracerr.Unwrap(err) == ethereum.NotFound {
  200. err = nil
  201. break
  202. } else if err != nil {
  203. log.Errorw("TxManager ethClient.EthTransactionReceipt",
  204. "attempt", attempt, "err", err)
  205. } else {
  206. break
  207. }
  208. select {
  209. case <-ctx.Done():
  210. return tracerr.Wrap(common.ErrDone)
  211. case <-time.After(t.cfg.EthClientAttemptsDelay):
  212. }
  213. }
  214. if err != nil {
  215. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
  216. }
  217. batchInfo.Receipt = receipt
  218. t.cfg.debugBatchStore(batchInfo)
  219. return nil
  220. }
  221. func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
  222. receipt := batchInfo.Receipt
  223. if receipt != nil {
  224. if batchInfo.EthTx.Nonce > t.lastSuccessNonce {
  225. t.lastSuccessNonce = batchInfo.EthTx.Nonce
  226. }
  227. if receipt.Status == types.ReceiptStatusFailed {
  228. batchInfo.Debug.Status = StatusFailed
  229. t.cfg.debugBatchStore(batchInfo)
  230. _, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
  231. log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(),
  232. "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
  233. "err", err)
  234. if batchInfo.BatchNum <= t.lastSuccessBatch {
  235. t.lastSuccessBatch = batchInfo.BatchNum - 1
  236. }
  237. return nil, tracerr.Wrap(fmt.Errorf(
  238. "ethereum transaction receipt status is failed: %w", err))
  239. } else if receipt.Status == types.ReceiptStatusSuccessful {
  240. batchInfo.Debug.Status = StatusMined
  241. batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
  242. batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
  243. batchInfo.Debug.StartBlockNum
  244. now := time.Now()
  245. batchInfo.Debug.StartToMineDelay = now.Sub(
  246. batchInfo.Debug.StartTimestamp).Seconds()
  247. t.cfg.debugBatchStore(batchInfo)
  248. if batchInfo.BatchNum > t.lastSuccessBatch {
  249. t.lastSuccessBatch = batchInfo.BatchNum
  250. }
  251. confirm := t.stats.Eth.LastBlock.Num - receipt.BlockNumber.Int64()
  252. return &confirm, nil
  253. }
  254. }
  255. return nil, nil
  256. }
  257. // TODO:
  258. // - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions)
  259. // Run the TxManager
  260. func (t *TxManager) Run(ctx context.Context) {
  261. next := 0
  262. waitDuration := longWaitDuration
  263. var statsVars statsVars
  264. select {
  265. case statsVars = <-t.statsVarsCh:
  266. case <-ctx.Done():
  267. }
  268. t.stats = statsVars.Stats
  269. t.syncSCVars(statsVars.Vars)
  270. log.Infow("TxManager: received initial statsVars",
  271. "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch)
  272. for {
  273. select {
  274. case <-ctx.Done():
  275. log.Info("TxManager done")
  276. return
  277. case statsVars := <-t.statsVarsCh:
  278. t.stats = statsVars.Stats
  279. t.syncSCVars(statsVars.Vars)
  280. case batchInfo := <-t.batchCh:
  281. if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil {
  282. log.Warnw("TxManager: shouldSend", "err", err,
  283. "batch", batchInfo.BatchNum)
  284. t.coord.SendMsg(ctx, MsgStopPipeline{
  285. Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)})
  286. continue
  287. }
  288. if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
  289. continue
  290. } else if err != nil {
  291. // If we reach here it's because our ethNode has
  292. // been unable to send the transaction to
  293. // ethereum. This could be due to the ethNode
  294. // failure, or an invalid transaction (that
  295. // can't be mined)
  296. log.Warnw("TxManager: forgeBatch send failed", "err", err,
  297. "batch", batchInfo.BatchNum)
  298. t.coord.SendMsg(ctx, MsgStopPipeline{
  299. Reason: fmt.Sprintf("forgeBatch send: %v", err)})
  300. continue
  301. }
  302. t.queue = append(t.queue, batchInfo)
  303. waitDuration = t.cfg.TxManagerCheckInterval
  304. case <-time.After(waitDuration):
  305. if len(t.queue) == 0 {
  306. waitDuration = longWaitDuration
  307. continue
  308. }
  309. current := next
  310. next = (current + 1) % len(t.queue)
  311. batchInfo := t.queue[current]
  312. if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
  313. continue
  314. } else if err != nil { //nolint:staticcheck
  315. // Our ethNode is giving an error different
  316. // than "not found" when getting the receipt
  317. // for the transaction, so we can't figure out
  318. // if it was not mined, mined and succesfull or
  319. // mined and failed. This could be due to the
  320. // ethNode failure.
  321. t.coord.SendMsg(ctx, MsgStopPipeline{
  322. Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
  323. }
  324. confirm, err := t.handleReceipt(ctx, batchInfo)
  325. if ctx.Err() != nil {
  326. continue
  327. } else if err != nil { //nolint:staticcheck
  328. // Transaction was rejected
  329. next = t.removeFromQueue(current)
  330. t.coord.SendMsg(ctx, MsgStopPipeline{
  331. Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
  332. continue
  333. }
  334. now := time.Now()
  335. if confirm == nil && batchInfo.SendTimestamp > t.cfg.EthTxResendTimeout {
  336. log.Infow("TxManager: forgeBatch tx not been mined timeout",
  337. "tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum)
  338. // TODO: Resend Tx with same nonce
  339. }
  340. if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
  341. log.Debugw("TxManager: forgeBatch tx confirmed",
  342. "tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum)
  343. next = t.removeFromQueue(current)
  344. }
  345. }
  346. }
  347. }
  348. // Removes batchInfo at position from the queue, and returns the next position
  349. func (t *TxManager) removeFromQueue(position int) (next int) {
  350. t.queue = append(t.queue[:current], t.queue[current+1:]...)
  351. if len(t.queue) == 0 {
  352. next = 0
  353. } else {
  354. next = current % len(t.queue)
  355. }
  356. return next
  357. }
  358. func (t *TxManager) canForgeAt(blockNum int64) bool {
  359. return canForge(&t.consts.Auction, &t.vars.Auction,
  360. &t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot,
  361. t.cfg.ForgerAddress, blockNum)
  362. }
  363. func (t *TxManager) mustL1L2Batch(blockNum int64) bool {
  364. lastL1BatchBlockNum := t.lastSentL1BatchBlockNum
  365. if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  366. lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock
  367. }
  368. return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1
  369. }