You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

603 lines
19 KiB

package coordinator
import (
"context"
"errors"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/tracerr"
)
// TxManager handles everything related to ethereum transactions: It makes the
// call to forge, waits for transaction confirmation, and keeps checking them
// until a number of confirmed blocks have passed.
type TxManager struct {
cfg Config
ethClient eth.ClientInterface
l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
coord *Coordinator // Used only to send messages to stop the pipeline
batchCh chan *BatchInfo
chainID *big.Int
account accounts.Account
consts synchronizer.SCConsts
stats synchronizer.Stats
vars synchronizer.SCVariables
statsVarsCh chan statsVars
discardPipelineCh chan int // int refers to the pipelineNum
minPipelineNum int
queue Queue
// lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
lastSuccessBatch common.BatchNum
// lastPendingBatch common.BatchNum
// accNonce is the account nonce in the last mined block (due to mined txs)
accNonce uint64
// accNextNonce is the nonce that we should use to send the next tx.
// In some cases this will be a reused nonce of an already pending tx.
accNextNonce uint64
// accPendingNonce is the pending nonce of the account due to pending txs
// accPendingNonce uint64
lastSentL1BatchBlockNum int64
}
// NewTxManager creates a new TxManager
func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) {
chainID, err := ethClient.EthChainID()
if err != nil {
return nil, tracerr.Wrap(err)
}
address, err := ethClient.EthAddress()
if err != nil {
return nil, tracerr.Wrap(err)
}
accNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
if err != nil {
return nil, err
}
// accPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
// if err != nil {
// return nil, err
// }
// if accNonce != accPendingNonce {
// return nil, tracerr.Wrap(fmt.Errorf("currentNonce (%v) != accPendingNonce (%v)",
// accNonce, accPendingNonce))
// }
log.Infow("TxManager started", "nonce", accNonce)
return &TxManager{
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
statsVarsCh: make(chan statsVars, queueLen),
discardPipelineCh: make(chan int, queueLen),
account: accounts.Account{
Address: *address,
},
chainID: chainID,
consts: *scConsts,
vars: *initSCVars,
minPipelineNum: 0,
queue: NewQueue(),
accNonce: accNonce,
accNextNonce: accNonce,
// accPendingNonce: accPendingNonce,
}, nil
}
// AddBatch is a thread safe method to pass a new batch TxManager to be sent to
// the smart contract via the forge call
func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) {
select {
case t.batchCh <- batchInfo:
case <-ctx.Done():
}
}
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
select {
case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
case <-ctx.Done():
}
}
// DiscardPipeline is a thread safe method to notify about a discarded pipeline
// due to a reorg
func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) {
select {
case t.discardPipelineCh <- pipelineNum:
case <-ctx.Done():
}
}
func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
updateSCVars(&t.vars, vars)
}
// NewAuth generates a new auth object for an ethereum transaction
func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
if err != nil {
return nil, tracerr.Wrap(err)
}
inc := new(big.Int).Set(gasPrice)
const gasPriceDiv = 100
inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv))
gasPrice.Add(gasPrice, inc)
// log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice)
auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID)
if err != nil {
return nil, tracerr.Wrap(err)
}
auth.Value = big.NewInt(0) // in wei
// TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
auth.GasLimit = 1000000
auth.GasPrice = gasPrice
auth.Nonce = nil
return auth, nil
}
func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error {
nextBlock := t.stats.Eth.LastBlock.Num + 1
if !t.canForgeAt(nextBlock) {
return tracerr.Wrap(fmt.Errorf("can't forge in the next block: %v", nextBlock))
}
if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch {
return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock))
}
margin := t.cfg.SendBatchBlocksMarginCheck
if margin != 0 {
if !t.canForgeAt(nextBlock + margin) {
return tracerr.Wrap(fmt.Errorf("can't forge after %v blocks: %v",
margin, nextBlock))
}
if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch {
return tracerr.Wrap(fmt.Errorf("can't forge non-L1Batch after %v blocks: %v",
margin, nextBlock))
}
}
return nil
}
func addPerc(v *big.Int, p int64) *big.Int {
r := new(big.Int).Set(v)
r.Mul(r, big.NewInt(p))
// nolint reason: to calculate percetnages we divide by 100
r.Div(r, big.NewInt(100)) //nolit:gomnd
return r.Add(v, r)
}
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error {
var ethTx *types.Transaction
var err error
auth, err := t.NewAuth(ctx)
if err != nil {
return tracerr.Wrap(err)
}
auth.Nonce = big.NewInt(int64(t.accNextNonce))
if resend {
auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce()))
}
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 {
return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)",
auth.GasPrice, t.cfg.MaxGasPrice))
}
// RollupForgeBatch() calls ethclient.SendTransaction()
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
if errors.Is(err, core.ErrNonceTooLow) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Add(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrNonceTooHigh) {
log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Sub(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrUnderpriced) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if errors.Is(err, core.ErrReplaceUnderpriced) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if err != nil {
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
"batchNum", batchInfo.BatchNum)
} else {
break
}
select {
case <-ctx.Done():
return tracerr.Wrap(common.ErrDone)
case <-time.After(t.cfg.EthClientAttemptsDelay):
}
}
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
}
if !resend {
t.accNextNonce = auth.Nonce.Uint64() + 1
}
batchInfo.EthTx = ethTx
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash())
now := time.Now()
batchInfo.SendTimestamp = now
if resend {
batchInfo.Debug.ResendNum++
}
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
t.cfg.debugBatchStore(batchInfo)
// t.lastPendingBatch = batchInfo.BatchNum
if !resend {
if batchInfo.L1Batch {
t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1
}
}
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// checkEthTransactionReceipt takes the txHash from the BatchInfo and stores
// the corresponding receipt if found
func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
txHash := batchInfo.EthTx.Hash()
var receipt *types.Receipt
var err error
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
if ctx.Err() != nil {
continue
} else if tracerr.Unwrap(err) == ethereum.NotFound {
err = nil
break
} else if err != nil {
log.Errorw("TxManager ethClient.EthTransactionReceipt",
"attempt", attempt, "err", err)
} else {
break
}
select {
case <-ctx.Done():
return tracerr.Wrap(common.ErrDone)
case <-time.After(t.cfg.EthClientAttemptsDelay):
}
}
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
}
batchInfo.Receipt = receipt
t.cfg.debugBatchStore(batchInfo)
return nil
}
func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo.Receipt
if receipt != nil {
if batchInfo.EthTx.Nonce()+1 > t.accNonce {
t.accNonce = batchInfo.EthTx.Nonce() + 1
}
if receipt.Status == types.ReceiptStatusFailed {
batchInfo.Debug.Status = StatusFailed
t.cfg.debugBatchStore(batchInfo)
_, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash,
"batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
"err", err)
if batchInfo.BatchNum <= t.lastSuccessBatch {
t.lastSuccessBatch = batchInfo.BatchNum - 1
}
return nil, tracerr.Wrap(fmt.Errorf(
"ethereum transaction receipt status is failed: %w", err))
} else if receipt.Status == types.ReceiptStatusSuccessful {
batchInfo.Debug.Status = StatusMined
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
batchInfo.Debug.StartBlockNum
if batchInfo.Debug.StartToMineDelay == 0 {
if block, err := t.ethClient.EthBlockByNumber(ctx,
receipt.BlockNumber.Int64()); err != nil {
log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err)
} else {
batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub(
batchInfo.Debug.SendTimestamp).Seconds()
batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
}
}
t.cfg.debugBatchStore(batchInfo)
if batchInfo.BatchNum > t.lastSuccessBatch {
t.lastSuccessBatch = batchInfo.BatchNum
}
confirm := t.stats.Eth.LastBlock.Num - receipt.BlockNumber.Int64()
return &confirm, nil
}
}
return nil, nil
}
// TODO:
// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions)
// Queue of BatchInfos
type Queue struct {
list []*BatchInfo
// nonceByBatchNum map[common.BatchNum]uint64
next int
}
// NewQueue returns a new queue
func NewQueue() Queue {
return Queue{
list: make([]*BatchInfo, 0),
// nonceByBatchNum: make(map[common.BatchNum]uint64),
next: 0,
}
}
// Len is the length of the queue
func (q *Queue) Len() int {
return len(q.list)
}
// At returns the BatchInfo at position (or nil if position is out of bounds)
func (q *Queue) At(position int) *BatchInfo {
if position >= len(q.list) {
return nil
}
return q.list[position]
}
// Next returns the next BatchInfo (or nil if queue is empty)
func (q *Queue) Next() (int, *BatchInfo) {
if len(q.list) == 0 {
return 0, nil
}
defer func() { q.next = (q.next + 1) % len(q.list) }()
return q.next, q.list[q.next]
}
// Remove removes the BatchInfo at position
func (q *Queue) Remove(position int) {
// batchInfo := q.list[position]
// delete(q.nonceByBatchNum, batchInfo.BatchNum)
q.list = append(q.list[:position], q.list[position+1:]...)
if len(q.list) == 0 {
q.next = 0
} else {
q.next = position % len(q.list)
}
}
// Push adds a new BatchInfo
func (q *Queue) Push(batchInfo *BatchInfo) {
q.list = append(q.list, batchInfo)
// q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce()
}
// func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) {
// nonce, ok := q.nonceByBatchNum[batchNum]
// return nonce, ok
// }
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
waitDuration := longWaitDuration
var statsVars statsVars
select {
case statsVars = <-t.statsVarsCh:
case <-ctx.Done():
}
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
for {
select {
case <-ctx.Done():
log.Info("TxManager done")
return
case statsVars := <-t.statsVarsCh:
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
case pipelineNum := <-t.discardPipelineCh:
t.minPipelineNum = pipelineNum + 1
if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("TxManager: removeBadBatchInfos", "err", err)
continue
}
case batchInfo := <-t.batchCh:
if batchInfo.PipelineNum < t.minPipelineNum {
log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum",
"num", batchInfo.PipelineNum, "minNum", t.minPipelineNum)
}
if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil {
log.Warnw("TxManager: shouldSend", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)})
continue
}
if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil {
continue
} else if err != nil {
// If we reach here it's because our ethNode has
// been unable to send the transaction to
// ethereum. This could be due to the ethNode
// failure, or an invalid transaction (that
// can't be mined)
log.Warnw("TxManager: forgeBatch send failed", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch send: %v", err)})
continue
}
t.queue.Push(batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil {
waitDuration = longWaitDuration
continue
}
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck
// Our ethNode is giving an error different
// than "not found" when getting the receipt
// for the transaction, so we can't figure out
// if it was not mined, mined and succesfull or
// mined and failed. This could be due to the
// ethNode failure.
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
}
confirm, err := t.handleReceipt(ctx, batchInfo)
if ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck
// Transaction was rejected
if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("TxManager: removeBadBatchInfos", "err", err)
continue
}
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
continue
}
now := time.Now()
if !t.cfg.EthNoReuseNonce && confirm == nil &&
now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout {
log.Infow("TxManager: forgeBatch tx not been mined timeout, resending",
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil {
continue
} else if err != nil {
// If we reach here it's because our ethNode has
// been unable to send the transaction to
// ethereum. This could be due to the ethNode
// failure, or an invalid transaction (that
// can't be mined)
log.Warnw("TxManager: forgeBatch resend failed", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch resend: %v", err)})
continue
}
}
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager: forgeBatch tx confirmed",
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
t.queue.Remove(queuePosition)
}
}
}
}
func (t *TxManager) removeBadBatchInfos(ctx context.Context) error {
next := 0
// batchNum := 0
for {
batchInfo := t.queue.At(next)
if batchInfo == nil {
break
}
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
return nil
} else if err != nil {
// Our ethNode is giving an error different
// than "not found" when getting the receipt
// for the transaction, so we can't figure out
// if it was not mined, mined and succesfull or
// mined and failed. This could be due to the
// ethNode failure.
next++
continue
}
confirm, err := t.handleReceipt(ctx, batchInfo)
if ctx.Err() != nil {
return nil
} else if err != nil {
// Transaction was rejected
if t.minPipelineNum <= batchInfo.PipelineNum {
t.minPipelineNum = batchInfo.PipelineNum + 1
}
t.queue.Remove(next)
continue
}
// If tx is pending but is from a cancelled pipeline, remove it
// from the queue
if confirm == nil {
if batchInfo.PipelineNum < t.minPipelineNum {
// batchNum++
t.queue.Remove(next)
continue
}
}
next++
}
accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil)
if err != nil {
return err
}
if !t.cfg.EthNoReuseNonce {
t.accNextNonce = accNonce
}
return nil
}
func (t *TxManager) canForgeAt(blockNum int64) bool {
return canForge(&t.consts.Auction, &t.vars.Auction,
&t.stats.Sync.Auction.CurrentSlot, &t.stats.Sync.Auction.NextSlot,
t.cfg.ForgerAddress, blockNum)
}
func (t *TxManager) mustL1L2Batch(blockNum int64) bool {
lastL1BatchBlockNum := t.lastSentL1BatchBlockNum
if t.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
lastL1BatchBlockNum = t.stats.Sync.LastL1BatchBlock
}
return blockNum-lastL1BatchBlockNum >= t.vars.Rollup.ForgeL1L2BatchTimeout-1
}