You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

343 lines
11 KiB

  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "time"
  7. "github.com/ethereum/go-ethereum"
  8. "github.com/ethereum/go-ethereum/accounts"
  9. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  10. "github.com/ethereum/go-ethereum/core/types"
  11. "github.com/hermeznetwork/hermez-node/common"
  12. "github.com/hermeznetwork/hermez-node/db/l2db"
  13. "github.com/hermeznetwork/hermez-node/eth"
  14. "github.com/hermeznetwork/hermez-node/log"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/tracerr"
  17. )
  18. // TxManager handles everything related to ethereum transactions: It makes the
  19. // call to forge, waits for transaction confirmation, and keeps checking them
  20. // until a number of confirmed blocks have passed.
  21. type TxManager struct {
  22. cfg Config
  23. ethClient eth.ClientInterface
  24. l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
  25. coord *Coordinator // Used only to send messages to stop the pipeline
  26. batchCh chan *BatchInfo
  27. chainID *big.Int
  28. account accounts.Account
  29. consts synchronizer.SCConsts
  30. stats synchronizer.Stats
  31. vars synchronizer.SCVariables
  32. statsVarsCh chan statsVars
  33. queue []*BatchInfo
  34. // lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
  35. lastSuccessBatch common.BatchNum
  36. lastPendingBatch common.BatchNum
  37. lastSuccessNonce uint64
  38. lastPendingNonce uint64
  39. }
  40. // NewTxManager creates a new TxManager
  41. func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
  42. coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) {
  43. chainID, err := ethClient.EthChainID()
  44. if err != nil {
  45. return nil, tracerr.Wrap(err)
  46. }
  47. address, err := ethClient.EthAddress()
  48. if err != nil {
  49. return nil, tracerr.Wrap(err)
  50. }
  51. lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
  52. if err != nil {
  53. return nil, err
  54. }
  55. lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
  56. if err != nil {
  57. return nil, err
  58. }
  59. if lastSuccessNonce != lastPendingNonce {
  60. return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)",
  61. lastSuccessNonce, lastPendingNonce))
  62. }
  63. log.Infow("TxManager started", "nonce", lastSuccessNonce)
  64. return &TxManager{
  65. cfg: *cfg,
  66. ethClient: ethClient,
  67. l2DB: l2DB,
  68. coord: coord,
  69. batchCh: make(chan *BatchInfo, queueLen),
  70. statsVarsCh: make(chan statsVars, queueLen),
  71. account: accounts.Account{
  72. Address: *address,
  73. },
  74. chainID: chainID,
  75. consts: *scConsts,
  76. vars: *initSCVars,
  77. lastSuccessNonce: lastSuccessNonce,
  78. lastPendingNonce: lastPendingNonce,
  79. }, nil
  80. }
  81. // AddBatch is a thread safe method to pass a new batch TxManager to be sent to
  82. // the smart contract via the forge call
  83. func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) {
  84. select {
  85. case t.batchCh <- batchInfo:
  86. case <-ctx.Done():
  87. }
  88. }
  89. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  90. func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  91. select {
  92. case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  93. case <-ctx.Done():
  94. }
  95. }
  96. func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
  97. if vars.Rollup != nil {
  98. t.vars.Rollup = *vars.Rollup
  99. }
  100. if vars.Auction != nil {
  101. t.vars.Auction = *vars.Auction
  102. }
  103. if vars.WDelayer != nil {
  104. t.vars.WDelayer = *vars.WDelayer
  105. }
  106. }
  107. // NewAuth generates a new auth object for an ethereum transaction
  108. func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
  109. gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
  110. if err != nil {
  111. return nil, tracerr.Wrap(err)
  112. }
  113. inc := new(big.Int).Set(gasPrice)
  114. const gasPriceDiv = 100
  115. inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv))
  116. gasPrice.Add(gasPrice, inc)
  117. // log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice)
  118. auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID)
  119. if err != nil {
  120. return nil, tracerr.Wrap(err)
  121. }
  122. auth.Value = big.NewInt(0) // in wei
  123. // TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
  124. auth.GasLimit = 1000000
  125. auth.GasPrice = gasPrice
  126. auth.Nonce = nil
  127. return auth, nil
  128. }
  129. func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
  130. // TODO: Check if we can forge in the next blockNum, abort if we can't
  131. batchInfo.Debug.Status = StatusSent
  132. batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
  133. batchInfo.Debug.SendTimestamp = time.Now()
  134. batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
  135. batchInfo.Debug.StartTimestamp).Seconds()
  136. var ethTx *types.Transaction
  137. var err error
  138. auth, err := t.NewAuth(ctx)
  139. if err != nil {
  140. return tracerr.Wrap(err)
  141. }
  142. auth.Nonce = big.NewInt(int64(t.lastPendingNonce))
  143. t.lastPendingNonce++
  144. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  145. ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
  146. if err != nil {
  147. // if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
  148. // log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err,
  149. // "block", t.stats.Eth.LastBlock.Num+1)
  150. // return tracerr.Wrap(err)
  151. // }
  152. log.Errorw("TxManager ethClient.RollupForgeBatch",
  153. "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
  154. "batchNum", batchInfo.BatchNum)
  155. } else {
  156. break
  157. }
  158. select {
  159. case <-ctx.Done():
  160. return tracerr.Wrap(common.ErrDone)
  161. case <-time.After(t.cfg.EthClientAttemptsDelay):
  162. }
  163. }
  164. if err != nil {
  165. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
  166. }
  167. batchInfo.EthTx = ethTx
  168. log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
  169. t.cfg.debugBatchStore(batchInfo)
  170. t.lastPendingBatch = batchInfo.BatchNum
  171. if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
  172. return tracerr.Wrap(err)
  173. }
  174. return nil
  175. }
  176. // checkEthTransactionReceipt takes the txHash from the BatchInfo and stores
  177. // the corresponding receipt if found
  178. func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
  179. txHash := batchInfo.EthTx.Hash()
  180. var receipt *types.Receipt
  181. var err error
  182. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  183. receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
  184. if ctx.Err() != nil {
  185. continue
  186. } else if tracerr.Unwrap(err) == ethereum.NotFound {
  187. err = nil
  188. break
  189. } else if err != nil {
  190. log.Errorw("TxManager ethClient.EthTransactionReceipt",
  191. "attempt", attempt, "err", err)
  192. } else {
  193. break
  194. }
  195. select {
  196. case <-ctx.Done():
  197. return tracerr.Wrap(common.ErrDone)
  198. case <-time.After(t.cfg.EthClientAttemptsDelay):
  199. }
  200. }
  201. if err != nil {
  202. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
  203. }
  204. batchInfo.Receipt = receipt
  205. t.cfg.debugBatchStore(batchInfo)
  206. return nil
  207. }
  208. func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
  209. receipt := batchInfo.Receipt
  210. if receipt != nil {
  211. if receipt.Status == types.ReceiptStatusFailed {
  212. batchInfo.Debug.Status = StatusFailed
  213. t.cfg.debugBatchStore(batchInfo)
  214. _, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
  215. log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(),
  216. "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
  217. "err", err)
  218. return nil, tracerr.Wrap(fmt.Errorf(
  219. "ethereum transaction receipt status is failed: %w", err))
  220. } else if receipt.Status == types.ReceiptStatusSuccessful {
  221. batchInfo.Debug.Status = StatusMined
  222. batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
  223. batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
  224. batchInfo.Debug.StartBlockNum
  225. t.cfg.debugBatchStore(batchInfo)
  226. if batchInfo.BatchNum > t.lastSuccessBatch {
  227. t.lastSuccessBatch = batchInfo.BatchNum
  228. }
  229. confirm := t.stats.Eth.LastBlock.Num - receipt.BlockNumber.Int64()
  230. return &confirm, nil
  231. }
  232. }
  233. return nil, nil
  234. }
  235. // Run the TxManager
  236. func (t *TxManager) Run(ctx context.Context) {
  237. next := 0
  238. waitDuration := longWaitDuration
  239. var statsVars statsVars
  240. select {
  241. case statsVars = <-t.statsVarsCh:
  242. case <-ctx.Done():
  243. }
  244. t.stats = statsVars.Stats
  245. t.syncSCVars(statsVars.Vars)
  246. log.Infow("TxManager: received initial statsVars",
  247. "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch)
  248. for {
  249. select {
  250. case <-ctx.Done():
  251. log.Info("TxManager done")
  252. return
  253. case statsVars := <-t.statsVarsCh:
  254. t.stats = statsVars.Stats
  255. t.syncSCVars(statsVars.Vars)
  256. case batchInfo := <-t.batchCh:
  257. if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
  258. continue
  259. } else if err != nil {
  260. // If we reach here it's because our ethNode has
  261. // been unable to send the transaction to
  262. // ethereum. This could be due to the ethNode
  263. // failure, or an invalid transaction (that
  264. // can't be mined)
  265. t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch send: %v", err)})
  266. continue
  267. }
  268. t.queue = append(t.queue, batchInfo)
  269. waitDuration = t.cfg.TxManagerCheckInterval
  270. case <-time.After(waitDuration):
  271. if len(t.queue) == 0 {
  272. waitDuration = longWaitDuration
  273. continue
  274. }
  275. current := next
  276. next = (current + 1) % len(t.queue)
  277. batchInfo := t.queue[current]
  278. if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
  279. continue
  280. } else if err != nil { //nolint:staticcheck
  281. // Our ethNode is giving an error different
  282. // than "not found" when getting the receipt
  283. // for the transaction, so we can't figure out
  284. // if it was not mined, mined and succesfull or
  285. // mined and failed. This could be due to the
  286. // ethNode failure.
  287. t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
  288. }
  289. confirm, err := t.handleReceipt(ctx, batchInfo)
  290. if ctx.Err() != nil {
  291. continue
  292. } else if err != nil { //nolint:staticcheck
  293. // Transaction was rejected
  294. t.queue = append(t.queue[:current], t.queue[current+1:]...)
  295. if len(t.queue) == 0 {
  296. next = 0
  297. } else {
  298. next = current % len(t.queue)
  299. }
  300. t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
  301. }
  302. if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
  303. log.Debugw("TxManager tx for RollupForgeBatch confirmed",
  304. "batch", batchInfo.BatchNum)
  305. t.queue = append(t.queue[:current], t.queue[current+1:]...)
  306. if len(t.queue) == 0 {
  307. next = 0
  308. } else {
  309. next = current % len(t.queue)
  310. }
  311. }
  312. }
  313. }
  314. }
  315. // nolint reason: this function will be used in the future
  316. //nolint:unused
  317. func (t *TxManager) canForge(stats *synchronizer.Stats, blockNum int64) bool {
  318. return canForge(&t.consts.Auction, &t.vars.Auction,
  319. &stats.Sync.Auction.CurrentSlot, &stats.Sync.Auction.NextSlot,
  320. t.cfg.ForgerAddress, blockNum)
  321. }