You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

207 lines
6.5 KiB

  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/ethereum/go-ethereum/core/types"
  8. "github.com/hermeznetwork/hermez-node/common"
  9. "github.com/hermeznetwork/hermez-node/db/l2db"
  10. "github.com/hermeznetwork/hermez-node/eth"
  11. "github.com/hermeznetwork/hermez-node/log"
  12. "github.com/hermeznetwork/tracerr"
  13. )
  14. // TxManager handles everything related to ethereum transactions: It makes the
  15. // call to forge, waits for transaction confirmation, and keeps checking them
  16. // until a number of confirmed blocks have passed.
  17. type TxManager struct {
  18. cfg Config
  19. ethClient eth.ClientInterface
  20. l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
  21. coord *Coordinator // Used only to send messages to stop the pipeline
  22. batchCh chan *BatchInfo
  23. lastBlockCh chan int64
  24. queue []*BatchInfo
  25. lastBlock int64
  26. // lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed
  27. lastConfirmedBatch common.BatchNum
  28. }
  29. // NewTxManager creates a new TxManager
  30. func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
  31. coord *Coordinator) *TxManager {
  32. return &TxManager{
  33. cfg: *cfg,
  34. ethClient: ethClient,
  35. l2DB: l2DB,
  36. coord: coord,
  37. batchCh: make(chan *BatchInfo, queueLen),
  38. lastBlockCh: make(chan int64, queueLen),
  39. lastBlock: -1,
  40. }
  41. }
  42. // AddBatch is a thread safe method to pass a new batch TxManager to be sent to
  43. // the smart contract via the forge call
  44. func (t *TxManager) AddBatch(batchInfo *BatchInfo) {
  45. t.batchCh <- batchInfo
  46. }
  47. // SetLastBlock is a thread safe method to pass the lastBlock to the TxManager
  48. func (t *TxManager) SetLastBlock(lastBlock int64) {
  49. t.lastBlockCh <- lastBlock
  50. }
  51. func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
  52. batchInfo.Debug.Status = StatusSent
  53. batchInfo.Debug.SendBlockNum = t.lastBlock + 1
  54. batchInfo.Debug.SendTimestamp = time.Now()
  55. batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
  56. batchInfo.Debug.StartTimestamp).Seconds()
  57. var ethTx *types.Transaction
  58. var err error
  59. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  60. ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs)
  61. if err != nil {
  62. if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
  63. log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err,
  64. "block", t.lastBlock+1)
  65. return tracerr.Wrap(err)
  66. }
  67. log.Errorw("TxManager ethClient.RollupForgeBatch",
  68. "attempt", attempt, "err", err, "block", t.lastBlock+1,
  69. "batchNum", batchInfo.BatchNum)
  70. } else {
  71. break
  72. }
  73. select {
  74. case <-ctx.Done():
  75. return tracerr.Wrap(common.ErrDone)
  76. case <-time.After(t.cfg.EthClientAttemptsDelay):
  77. }
  78. }
  79. if err != nil {
  80. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
  81. }
  82. batchInfo.EthTx = ethTx
  83. log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
  84. t.cfg.debugBatchStore(batchInfo)
  85. if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
  86. return tracerr.Wrap(err)
  87. }
  88. return nil
  89. }
  90. func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
  91. txHash := batchInfo.EthTx.Hash()
  92. var receipt *types.Receipt
  93. var err error
  94. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  95. receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
  96. if ctx.Err() != nil {
  97. continue
  98. }
  99. if err != nil {
  100. log.Errorw("TxManager ethClient.EthTransactionReceipt",
  101. "attempt", attempt, "err", err)
  102. } else {
  103. break
  104. }
  105. select {
  106. case <-ctx.Done():
  107. return tracerr.Wrap(common.ErrDone)
  108. case <-time.After(t.cfg.EthClientAttemptsDelay):
  109. }
  110. }
  111. if err != nil {
  112. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
  113. }
  114. batchInfo.Receipt = receipt
  115. t.cfg.debugBatchStore(batchInfo)
  116. return nil
  117. }
  118. func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
  119. receipt := batchInfo.Receipt
  120. if receipt != nil {
  121. if receipt.Status == types.ReceiptStatusFailed {
  122. batchInfo.Debug.Status = StatusFailed
  123. t.cfg.debugBatchStore(batchInfo)
  124. log.Errorw("TxManager receipt status is failed", "receipt", receipt)
  125. return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed"))
  126. } else if receipt.Status == types.ReceiptStatusSuccessful {
  127. batchInfo.Debug.Status = StatusMined
  128. batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
  129. batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
  130. batchInfo.Debug.StartBlockNum
  131. t.cfg.debugBatchStore(batchInfo)
  132. if batchInfo.BatchNum > t.lastConfirmedBatch {
  133. t.lastConfirmedBatch = batchInfo.BatchNum
  134. }
  135. confirm := t.lastBlock - receipt.BlockNumber.Int64()
  136. return &confirm, nil
  137. }
  138. }
  139. return nil, nil
  140. }
  141. // Run the TxManager
  142. func (t *TxManager) Run(ctx context.Context) {
  143. next := 0
  144. waitDuration := longWaitDuration
  145. for {
  146. select {
  147. case <-ctx.Done():
  148. log.Info("TxManager done")
  149. return
  150. case lastBlock := <-t.lastBlockCh:
  151. t.lastBlock = lastBlock
  152. case batchInfo := <-t.batchCh:
  153. if err := t.callRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
  154. continue
  155. } else if err != nil {
  156. t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)})
  157. continue
  158. }
  159. log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum)
  160. t.queue = append(t.queue, batchInfo)
  161. waitDuration = t.cfg.TxManagerCheckInterval
  162. case <-time.After(waitDuration):
  163. if len(t.queue) == 0 {
  164. continue
  165. }
  166. current := next
  167. next = (current + 1) % len(t.queue)
  168. batchInfo := t.queue[current]
  169. if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
  170. continue
  171. } else if err != nil { //nolint:staticcheck
  172. // We can't get the receipt for the
  173. // transaction, so we can't confirm if it was
  174. // mined
  175. t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
  176. }
  177. confirm, err := t.handleReceipt(batchInfo)
  178. if err != nil { //nolint:staticcheck
  179. // Transaction was rejected
  180. t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
  181. }
  182. if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
  183. log.Debugw("TxManager tx for RollupForgeBatch confirmed",
  184. "batch", batchInfo.BatchNum)
  185. t.queue = append(t.queue[:current], t.queue[current+1:]...)
  186. if len(t.queue) == 0 {
  187. waitDuration = longWaitDuration
  188. next = 0
  189. } else {
  190. next = current % len(t.queue)
  191. }
  192. }
  193. }
  194. }
  195. }