You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

592 lines
19 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package coordinator
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/big"
  7. "time"
  8. "github.com/ethereum/go-ethereum"
  9. "github.com/ethereum/go-ethereum/accounts"
  10. "github.com/ethereum/go-ethereum/accounts/abi/bind"
  11. "github.com/ethereum/go-ethereum/core"
  12. "github.com/ethereum/go-ethereum/core/types"
  13. "github.com/hermeznetwork/hermez-node/common"
  14. "github.com/hermeznetwork/hermez-node/db/l2db"
  15. "github.com/hermeznetwork/hermez-node/eth"
  16. "github.com/hermeznetwork/hermez-node/log"
  17. "github.com/hermeznetwork/hermez-node/synchronizer"
  18. "github.com/hermeznetwork/tracerr"
  19. )
  20. // TxManager handles everything related to ethereum transactions: It makes the
  21. // call to forge, waits for transaction confirmation, and keeps checking them
  22. // until a number of confirmed blocks have passed.
  23. type TxManager struct {
  24. cfg Config
  25. ethClient eth.ClientInterface
  26. l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
  27. coord *Coordinator // Used only to send messages to stop the pipeline
  28. batchCh chan *BatchInfo
  29. chainID *big.Int
  30. account accounts.Account
  31. consts synchronizer.SCConsts
  32. stats synchronizer.Stats
  33. vars synchronizer.SCVariables
  34. statsVarsCh chan statsVars
  35. discardPipelineCh chan int // int refers to the pipelineNum
  36. minPipelineNum int
  37. queue Queue
  38. // lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
  39. lastSuccessBatch common.BatchNum
  40. // lastPendingBatch common.BatchNum
  41. // accNonce is the account nonce in the last mined block (due to mined txs)
  42. accNonce uint64
  43. // accNextNonce is the nonce that we should use to send the next tx.
  44. // In some cases this will be a reused nonce of an already pending tx.
  45. accNextNonce uint64
  46. // accPendingNonce is the pending nonce of the account due to pending txs
  47. // accPendingNonce uint64
  48. lastSentL1BatchBlockNum int64
  49. }
  50. // NewTxManager creates a new TxManager
  51. func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
  52. coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) {
  53. chainID, err := ethClient.EthChainID()
  54. if err != nil {
  55. return nil, tracerr.Wrap(err)
  56. }
  57. address, err := ethClient.EthAddress()
  58. if err != nil {
  59. return nil, tracerr.Wrap(err)
  60. }
  61. accNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
  62. if err != nil {
  63. return nil, err
  64. }
  65. // accPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
  66. // if err != nil {
  67. // return nil, err
  68. // }
  69. // if accNonce != accPendingNonce {
  70. // return nil, tracerr.Wrap(fmt.Errorf("currentNonce (%v) != accPendingNonce (%v)",
  71. // accNonce, accPendingNonce))
  72. // }
  73. log.Infow("TxManager started", "nonce", accNonce)
  74. return &TxManager{
  75. cfg: *cfg,
  76. ethClient: ethClient,
  77. l2DB: l2DB,
  78. coord: coord,
  79. batchCh: make(chan *BatchInfo, queueLen),
  80. statsVarsCh: make(chan statsVars, queueLen),
  81. discardPipelineCh: make(chan int, queueLen),
  82. account: accounts.Account{
  83. Address: *address,
  84. },
  85. chainID: chainID,
  86. consts: *scConsts,
  87. vars: *initSCVars,
  88. minPipelineNum: 0,
  89. queue: NewQueue(),
  90. accNonce: accNonce,
  91. accNextNonce: accNonce,
  92. // accPendingNonce: accPendingNonce,
  93. }, nil
  94. }
  95. // AddBatch is a thread safe method to pass a new batch TxManager to be sent to
  96. // the smart contract via the forge call
  97. func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) {
  98. select {
  99. case t.batchCh <- batchInfo:
  100. case <-ctx.Done():
  101. }
  102. }
  103. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  104. func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  105. select {
  106. case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  107. case <-ctx.Done():
  108. }
  109. }
  110. // DiscardPipeline is a thread safe method to notify about a discarded pipeline
  111. // due to a reorg
  112. func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) {
  113. select {
  114. case t.discardPipelineCh <- pipelineNum:
  115. case <-ctx.Done():
  116. }
  117. }
  118. func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
  119. updateSCVars(&t.vars, vars)
  120. }
  121. // NewAuth generates a new auth object for an ethereum transaction
  122. func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
  123. gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
  124. if err != nil {
  125. return nil, tracerr.Wrap(err)
  126. }
  127. inc := new(big.Int).Set(gasPrice)
  128. const gasPriceDiv = 100
  129. inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv))
  130. gasPrice.Add(gasPrice, inc)
  131. // log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice)
  132. auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID)
  133. if err != nil {
  134. return nil, tracerr.Wrap(err)
  135. }
  136. auth.Value = big.NewInt(0) // in wei
  137. // TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
  138. auth.GasLimit = 1000000
  139. auth.GasPrice = gasPrice
  140. auth.Nonce = nil
  141. return auth, nil
  142. }
  143. func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error {
  144. nextBlock := t.stats.Eth.LastBlock.Num + 1
  145. if !t.canForgeAt(nextBlock) {
  146. return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock))
  147. }
  148. if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch {
  149. return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock))
  150. }
  151. margin := t.cfg.SendBatchBlocksMarginCheck
  152. if margin != 0 {
  153. if !t.canForgeAt(nextBlock + margin) {
  154. return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v",
  155. margin, nextBlock))
  156. }
  157. if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch {
  158. return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v",
  159. margin, nextBlock))
  160. }
  161. }
  162. return nil
  163. }
  164. func addPerc(v *big.Int, p int64) *big.Int {
  165. r := new(big.Int).Set(v)
  166. r.Mul(r, big.NewInt(p))
  167. r.Div(r, big.NewInt(100))
  168. return r.Add(v, r)
  169. }
  170. func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error {
  171. var ethTx *types.Transaction
  172. var err error
  173. auth, err := t.NewAuth(ctx)
  174. if err != nil {
  175. return tracerr.Wrap(err)
  176. }
  177. auth.Nonce = big.NewInt(int64(t.accNextNonce))
  178. if resend {
  179. auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce()))
  180. }
  181. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  182. if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 {
  183. return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)",
  184. auth.GasPrice, t.cfg.MaxGasPrice))
  185. }
  186. // RollupForgeBatch() calls ethclient.SendTransaction()
  187. ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
  188. if errors.Is(err, core.ErrNonceTooLow) {
  189. log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce",
  190. "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
  191. auth.Nonce.Add(auth.Nonce, big.NewInt(1))
  192. attempt--
  193. } else if errors.Is(err, core.ErrNonceTooHigh) {
  194. log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce",
  195. "err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
  196. auth.Nonce.Sub(auth.Nonce, big.NewInt(1))
  197. attempt--
  198. } else if errors.Is(err, core.ErrUnderpriced) {
  199. log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
  200. "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
  201. auth.GasPrice = addPerc(auth.GasPrice, 10)
  202. attempt--
  203. } else if errors.Is(err, core.ErrReplaceUnderpriced) {
  204. log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
  205. "err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
  206. auth.GasPrice = addPerc(auth.GasPrice, 10)
  207. attempt--
  208. } else if err != nil {
  209. log.Errorw("TxManager ethClient.RollupForgeBatch",
  210. "attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
  211. "batchNum", batchInfo.BatchNum)
  212. } else {
  213. break
  214. }
  215. select {
  216. case <-ctx.Done():
  217. return tracerr.Wrap(common.ErrDone)
  218. case <-time.After(t.cfg.EthClientAttemptsDelay):
  219. }
  220. }
  221. if err != nil {
  222. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
  223. }
  224. if !resend {
  225. t.accNextNonce = auth.Nonce.Uint64() + 1
  226. }
  227. batchInfo.EthTx = ethTx
  228. log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash())
  229. now := time.Now()
  230. batchInfo.SendTimestamp = now
  231. if resend {
  232. batchInfo.Debug.ResendNum++
  233. }
  234. batchInfo.Debug.Status = StatusSent
  235. batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
  236. batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp
  237. batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
  238. batchInfo.Debug.StartTimestamp).Seconds()
  239. t.cfg.debugBatchStore(batchInfo)
  240. // t.lastPendingBatch = batchInfo.BatchNum
  241. if !resend {
  242. if batchInfo.L1Batch {
  243. t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1
  244. }
  245. }
  246. if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
  247. return tracerr.Wrap(err)
  248. }
  249. return nil
  250. }
  251. // checkEthTransactionReceipt takes the txHash from the BatchInfo and stores
  252. // the corresponding receipt if found
  253. func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
  254. txHash := batchInfo.EthTx.Hash()
  255. var receipt *types.Receipt
  256. var err error
  257. for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
  258. receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
  259. if ctx.Err() != nil {
  260. continue
  261. } else if tracerr.Unwrap(err) == ethereum.NotFound {
  262. err = nil
  263. break
  264. } else if err != nil {
  265. log.Errorw("TxManager ethClient.EthTransactionReceipt",
  266. "attempt", attempt, "err", err)
  267. } else {
  268. break
  269. }
  270. select {
  271. case <-ctx.Done():
  272. return tracerr.Wrap(common.ErrDone)
  273. case <-time.After(t.cfg.EthClientAttemptsDelay):
  274. }
  275. }
  276. if err != nil {
  277. return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
  278. }
  279. batchInfo.Receipt = receipt
  280. t.cfg.debugBatchStore(batchInfo)
  281. return nil
  282. }
  283. func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
  284. receipt := batchInfo.Receipt
  285. if receipt != nil {
  286. if batchInfo.EthTx.Nonce()+1 > t.accNonce {
  287. t.accNonce = batchInfo.EthTx.Nonce() + 1
  288. }
  289. if receipt.Status == types.ReceiptStatusFailed {
  290. batchInfo.Debug.Status = StatusFailed
  291. t.cfg.debugBatchStore(batchInfo)
  292. _, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
  293. log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash,
  294. "batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
  295. "err", err)
  296. if batchInfo.BatchNum <= t.lastSuccessBatch {
  297. t.lastSuccessBatch = batchInfo.BatchNum - 1
  298. }
  299. return nil, tracerr.Wrap(fmt.Errorf(
  300. "ethereum transaction receipt status is failed: %w", err))
  301. } else if receipt.Status == types.ReceiptStatusSuccessful {
  302. batchInfo.Debug.Status = StatusMined
  303. batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
  304. batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
  305. batchInfo.Debug.StartBlockNum
  306. if batchInfo.Debug.StartToMineDelay == 0 {
  307. if block, err := t.ethClient.EthBlockByNumber(ctx,
  308. receipt.BlockNumber.Int64()); err != nil {
  309. log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err)
  310. } else {
  311. batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub(
  312. batchInfo.Debug.SendTimestamp).Seconds()
  313. batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub(
  314. batchInfo.Debug.StartTimestamp).Seconds()
  315. }
  316. }
  317. t.cfg.debugBatchStore(batchInfo)
  318. if batchInfo.BatchNum > t.lastSuccessBatch {
  319. t.lastSuccessBatch = batchInfo.BatchNum
  320. }
  321. confirm := t.stats.Eth.LastBlock.Num - receipt.BlockNumber.Int64()
  322. return &confirm, nil
  323. }
  324. }
  325. return nil, nil
  326. }
  327. // TODO:
  328. // - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions)
  329. type Queue struct {
  330. list []*BatchInfo
  331. // nonceByBatchNum map[common.BatchNum]uint64
  332. next int
  333. }
  334. func NewQueue() Queue {
  335. return Queue{
  336. list: make([]*BatchInfo, 0),
  337. // nonceByBatchNum: make(map[common.BatchNum]uint64),
  338. next: 0,
  339. }
  340. }
  341. func (q *Queue) Len() int {
  342. return len(q.list)
  343. }
  344. func (q *Queue) At(position int) *BatchInfo {
  345. if position >= len(q.list) {
  346. return nil
  347. }
  348. return q.list[position]
  349. }
  350. func (q *Queue) Next() (int, *BatchInfo) {
  351. if len(q.list) == 0 {
  352. return 0, nil
  353. }
  354. defer func() { q.next = (q.next + 1) % len(q.list) }()
  355. return q.next, q.list[q.next]
  356. }
  357. func (q *Queue) Remove(position int) {
  358. // batchInfo := q.list[position]
  359. // delete(q.nonceByBatchNum, batchInfo.BatchNum)
  360. q.list = append(q.list[:position], q.list[position+1:]...)
  361. if len(q.list) == 0 {
  362. q.next = 0
  363. } else {
  364. q.next = position % len(q.list)
  365. }
  366. }
  367. func (q *Queue) Push(batchInfo *BatchInfo) {
  368. q.list = append(q.list, batchInfo)
  369. // q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce()
  370. }
  371. // func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) {
  372. // nonce, ok := q.nonceByBatchNum[batchNum]
  373. // return nonce, ok
  374. // }
  375. // Run the TxManager
  376. func (t *TxManager) Run(ctx context.Context) {
  377. waitDuration := longWaitDuration
  378. var statsVars statsVars
  379. select {
  380. case statsVars = <-t.statsVarsCh:
  381. case <-ctx.Done():
  382. }
  383. t.stats = statsVars.Stats
  384. t.syncSCVars(statsVars.Vars)
  385. log.Infow("TxManager: received initial statsVars",
  386. "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
  387. for {
  388. select {
  389. case <-ctx.Done():
  390. log.Info("TxManager done")
  391. return
  392. case statsVars := <-t.statsVarsCh:
  393. t.stats = statsVars.Stats
  394. t.syncSCVars(statsVars.Vars)
  395. case pipelineNum := <-t.discardPipelineCh:
  396. t.minPipelineNum = pipelineNum + 1
  397. if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
  398. continue
  399. } else if err != nil {
  400. log.Errorw("TxManager: removeBadBatchInfos", "err", err)
  401. continue
  402. }
  403. case batchInfo := <-t.batchCh:
  404. if batchInfo.PipelineNum < t.minPipelineNum {
  405. log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum",
  406. "num", batchInfo.PipelineNum, "minNum", t.minPipelineNum)
  407. }
  408. if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil {
  409. log.Warnw("TxManager: shouldSend", "err", err,
  410. "batch", batchInfo.BatchNum)
  411. t.coord.SendMsg(ctx, MsgStopPipeline{
  412. Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)})
  413. continue
  414. }
  415. if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil {
  416. continue
  417. } else if err != nil {
  418. // If we reach here it's because our ethNode has
  419. // been unable to send the transaction to
  420. // ethereum. This could be due to the ethNode
  421. // failure, or an invalid transaction (that
  422. // can't be mined)
  423. log.Warnw("TxManager: forgeBatch send failed", "err", err,
  424. "batch", batchInfo.BatchNum)
  425. t.coord.SendMsg(ctx, MsgStopPipeline{
  426. Reason: fmt.Sprintf("forgeBatch send: %v", err)})
  427. continue
  428. }
  429. t.queue.Push(batchInfo)
  430. waitDuration = t.cfg.TxManagerCheckInterval
  431. case <-time.After(waitDuration):
  432. queuePosition, batchInfo := t.queue.Next()
  433. if batchInfo == nil {
  434. waitDuration = longWaitDuration
  435. continue
  436. }
  437. if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
  438. continue
  439. } else if err != nil { //nolint:staticcheck
  440. // Our ethNode is giving an error different
  441. // than "not found" when getting the receipt
  442. // for the transaction, so we can't figure out
  443. // if it was not mined, mined and succesfull or
  444. // mined and failed. This could be due to the
  445. // ethNode failure.
  446. t.coord.SendMsg(ctx, MsgStopPipeline{
  447. Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
  448. }
  449. confirm, err := t.handleReceipt(ctx, batchInfo)
  450. if ctx.Err() != nil {
  451. continue
  452. } else if err != nil { //nolint:staticcheck
  453. // Transaction was rejected
  454. if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
  455. continue
  456. } else if err != nil {
  457. log.Errorw("TxManager: removeBadBatchInfos", "err", err)
  458. continue
  459. }
  460. t.coord.SendMsg(ctx, MsgStopPipeline{
  461. Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
  462. continue
  463. }
  464. now := time.Now()
  465. if confirm == nil && now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout {
  466. log.Infow("TxManager: forgeBatch tx not been mined timeout, resending",
  467. "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
  468. if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil {
  469. continue
  470. } else if err != nil {
  471. // If we reach here it's because our ethNode has
  472. // been unable to send the transaction to
  473. // ethereum. This could be due to the ethNode
  474. // failure, or an invalid transaction (that
  475. // can't be mined)
  476. log.Warnw("TxManager: forgeBatch resend failed", "err", err,
  477. "batch", batchInfo.BatchNum)
  478. t.coord.SendMsg(ctx, MsgStopPipeline{
  479. Reason: fmt.Sprintf("forgeBatch resend: %v", err)})
  480. continue
  481. }
  482. }
  483. if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
  484. log.Debugw("TxManager: forgeBatch tx confirmed",
  485. "tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
  486. t.queue.Remove(queuePosition)
  487. }
  488. }
  489. }
  490. }
  491. func (t *TxManager) removeBadBatchInfos(ctx context.Context) error {
  492. next := 0
  493. // batchNum := 0
  494. for {
  495. batchInfo := t.queue.At(next)
  496. if batchInfo == nil {
  497. break
  498. }
  499. if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
  500. return nil
  501. } else if err != nil {
  502. // Our ethNode is giving an error different
  503. // than "not found" when getting the receipt
  504. // for the transaction, so we can't figure out
  505. // if it was not mined, mined and succesfull or
  506. // mined and failed. This could be due to the
  507. // ethNode failure.
  508. next++
  509. continue
  510. }
  511. confirm, err := t.handleReceipt(ctx, batchInfo)
  512. if ctx.Err() != nil {
  513. return nil
  514. } else if err != nil {
  515. // Transaction was rejected
  516. if t.minPipelineNum <= batchInfo.PipelineNum {
  517. t.minPipelineNum = batchInfo.PipelineNum + 1
  518. }
  519. t.queue.Remove(next)
  520. continue
  521. }
  522. // If tx is pending but is from a cancelled pipeline, remove it
  523. // from the queue
  524. if confirm == nil {
  525. if batchInfo.PipelineNum < t.minPipelineNum {
  526. // batchNum++
  527. t.queue.Remove(next)
  528. continue
  529. }
  530. }
  531. next++
  532. }
  533. accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil)
  534. if err != nil {
  535. return err
  536. }
  537. t.accNextNonce = accNonce
  538. return nil
  539. }
  540. func (t *TxManager) canForgeAt(blockNum int64) bool {
  541. return canForge(&t.consts.Auction, &t.vars.Auction,
  542. &t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot,
  543. t.cfg.ForgerAddress, blockNum)
  544. }
  545. func (t *TxManager) mustL1L2Batch(blockNum int64) bool {
  546. lastL1BatchBlockNum := t.lastSentL1BatchBlockNum
  547. if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  548. lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock
  549. }
  550. return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1
  551. }