Make TxManager more robust

This commit is contained in:
Eduard S
2021-01-28 11:25:06 +01:00
parent deede9541b
commit d284baf8c4
16 changed files with 438 additions and 114 deletions

View File

@@ -47,6 +47,8 @@ type Debug struct {
MineBlockNum int64
// SendBlockNum is the blockNum when the batch was sent to ethereum
SendBlockNum int64
// ResendNum is the number of times the tx has been resent
ResendNum int
// LastScheduledL1BatchBlockNum is the blockNum when the last L1Batch
// was scheduled
LastScheduledL1BatchBlockNum int64
@@ -64,13 +66,17 @@ type Debug struct {
// StartToSendDelay is the delay between starting a batch and sending
// it to ethereum, in seconds
StartToSendDelay float64
// StartToMineDelay is the delay between starting a batch and having
// StartToMineDelay is the delay between starting a batch and having
// it mined in seconds
StartToMineDelay float64
// SendToMineDelay is the delay between sending a batch tx and having
// it mined in seconds
SendToMineDelay float64
}
// BatchInfo contans the Batch information
type BatchInfo struct {
PipelineNum int
BatchNum common.BatchNum
ServerProof prover.Client
ZKInputs *common.ZKInputs
@@ -89,7 +95,11 @@ type BatchInfo struct {
// SendTimestamp the time of batch sent to ethereum
SendTimestamp time.Time
Receipt *types.Receipt
Debug Debug
// Fail is true if:
// - The receipt status is failed
// - A previous parent batch is failed
Fail bool
Debug Debug
}
// DebugStore is a debug function to store the BatchInfo as a json text file in

View File

@@ -3,6 +3,7 @@ package coordinator
import (
"context"
"fmt"
"math/big"
"os"
"strings"
"sync"
@@ -81,6 +82,9 @@ type Config struct {
// transaction will be resent (reusing the nonce) with a newly
// calculated gas price
EthTxResendTimeout time.Duration
// MaxGasPrice is the maximum gas price allowed for ethereum
// transactions
MaxGasPrice *big.Int
// TxManagerCheckInterval is the waiting interval between receipt
// checks of ethereum transactions in the TxManager
TxManagerCheckInterval time.Duration
@@ -103,15 +107,22 @@ func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
}
}
type fromBatch struct {
BatchNum common.BatchNum
ForgerAddr ethCommon.Address
StateRoot *big.Int
}
// Coordinator implements the Coordinator type
type Coordinator struct {
// State
pipelineBatchNum common.BatchNum // batchNum from which we started the pipeline
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
stats synchronizer.Stats
started bool
pipelineNum int // Pipeline sequential number. The first pipeline is 1
pipelineFromBatch fromBatch // batch from which we started the pipeline
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
stats synchronizer.Stats
started bool
cfg Config
@@ -168,10 +179,15 @@ func NewCoordinator(cfg Config,
ctx, cancel := context.WithCancel(context.Background())
c := Coordinator{
pipelineBatchNum: -1,
provers: serverProofs,
consts: *scConsts,
vars: *initSCVars,
pipelineNum: 0,
pipelineFromBatch: fromBatch{
BatchNum: 0,
ForgerAddr: ethCommon.Address{},
StateRoot: big.NewInt(0),
},
provers: serverProofs,
consts: *scConsts,
vars: *initSCVars,
cfg: cfg,
@@ -212,7 +228,8 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
}
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
return NewPipeline(ctx, c.cfg, c.historyDB, c.l2DB, c.txSelector,
c.pipelineNum++
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts)
}
@@ -262,13 +279,18 @@ func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables,
currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64) bool {
if blockNum < auctionConstants.GenesisBlockNum {
log.Infow("canForge: requested blockNum is < genesis", "blockNum", blockNum,
"genesis", auctionConstants.GenesisBlockNum)
return false
}
var slot *common.Slot
if currentSlot.StartBlock <= blockNum && blockNum <= currentSlot.EndBlock {
slot = currentSlot
} else if nextSlot.StartBlock <= blockNum && blockNum <= nextSlot.EndBlock {
slot = nextSlot
} else {
log.Warnw("Coordinator: requested blockNum for canForge is outside slot",
log.Warnw("canForge: requested blockNum is outside current and next slot",
"blockNum", blockNum, "currentSlot", currentSlot,
"nextSlot", nextSlot,
)
@@ -277,13 +299,14 @@ func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.Auc
anyoneForge := false
if !slot.ForgerCommitment &&
auctionConstants.RelativeBlock(blockNum) >= int64(auctionVars.SlotDeadline) {
log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)",
log.Debugw("canForge: anyone can forge in the current slot (slotDeadline passed)",
"block", blockNum)
anyoneForge = true
}
if slot.Forger == addr || anyoneForge {
return true
}
log.Debugw("canForge: can't forge", "slot.Forger", slot.Forger)
return false
}
@@ -314,8 +337,8 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
relativeBlock, c.cfg.StartSlotBlocksDelay)
} else if canForge {
log.Infow("Coordinator: forging state begin", "block",
stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch)
batchNum := common.BatchNum(stats.Sync.LastBatch)
stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum)
batchNum := stats.Sync.LastBatch.BatchNum
var err error
if c.pipeline, err = c.newPipeline(ctx); err != nil {
return tracerr.Wrap(err)
@@ -324,7 +347,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
c.pipeline = nil
return tracerr.Wrap(err)
}
c.pipelineBatchNum = batchNum
// c.pipelineBatchNum = batchNum
}
} else {
if !canForge {
@@ -341,17 +364,18 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
// return err
// }
// }
if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, stats.Sync.LastBatch) {
if err := c.txSelector.Reset(common.BatchNum(stats.Sync.LastBatch)); err != nil {
if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) {
if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil {
return tracerr.Wrap(err)
}
}
_, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum))
if err != nil {
return tracerr.Wrap(err)
}
_, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num, stats.Sync.LastBatch)
_, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num,
int64(stats.Sync.LastBatch.BatchNum))
if err != nil {
return tracerr.Wrap(err)
}
@@ -379,15 +403,19 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error
if c.pipeline != nil {
c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars)
}
if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum",
"sync.LastBatch", c.stats.Sync.LastBatch,
"c.pipelineBatchNum", c.pipelineBatchNum)
if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress &&
c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0 {
// There's been a reorg and the batch state root from which the
// pipeline was started has changed (probably because it was in
// a block that was discarded), and it was sent by a different
// coordinator than us. That batch may never be in the main
// chain, so we stop the pipeline (it will be started again
// once the node is in sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch.ForgerAddr != cfg.ForgerAddr "+
"& sync.LastBatch.StateRoot != pipelineFromBatch.StateRoot",
"sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot,
"pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot)
c.txManager.DiscardPipeline(ctx, c.pipelineNum)
if err := c.handleStopPipeline(ctx, "reorg"); err != nil {
return tracerr.Wrap(err)
}
@@ -396,7 +424,7 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error
}
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error {
if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil {
if err := c.l2DB.Reorg(c.stats.Sync.LastBatch.BatchNum); err != nil {
return tracerr.Wrap(err)
}
if c.pipeline != nil {

View File

@@ -2,6 +2,7 @@ package coordinator
import (
"context"
"errors"
"fmt"
"io/ioutil"
"math/big"
@@ -11,6 +12,7 @@ import (
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
dbUtils "github.com/hermeznetwork/hermez-node/db"
@@ -261,8 +263,8 @@ func TestCoordinatorFlow(t *testing.T) {
var stats synchronizer.Stats
stats.Eth.LastBlock = *ethClient.CtlLastBlock()
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Eth.LastBatch = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch = stats.Eth.LastBatch
stats.Eth.LastBatchNum = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch.BatchNum = common.BatchNum(stats.Eth.LastBatchNum)
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
require.NoError(t, err)
var slot common.Slot
@@ -279,7 +281,7 @@ func TestCoordinatorFlow(t *testing.T) {
// Copy stateDB to synchronizer if there was a new batch
source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch)
dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch)
if stats.Sync.LastBatch != 0 {
if stats.Sync.LastBatch.BatchNum != 0 {
if _, err := os.Stat(dest); os.IsNotExist(err) {
log.Infow("Making pebble checkpoint for sync",
"source", source, "dest", dest)
@@ -566,3 +568,8 @@ func TestCoordinatorStress(t *testing.T) {
// TODO: Test forgeBatch
// TODO: Test waitServerProof
// TODO: Test handleReorg
func TestFoo(t *testing.T) {
a := tracerr.Wrap(fmt.Errorf("AAA: %w", core.ErrNonceTooLow))
fmt.Println(errors.Is(a, core.ErrNonceTooLow))
}

View File

@@ -26,6 +26,7 @@ type statsVars struct {
// Pipeline manages the forging of batches with parallel server proofs
type Pipeline struct {
num int
cfg Config
consts synchronizer.SCConsts
@@ -56,6 +57,7 @@ type Pipeline struct {
// NewPipeline creates a new Pipeline
func NewPipeline(ctx context.Context,
cfg Config,
num int, // Pipeline sequential number
historyDB *historydb.HistoryDB,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
@@ -79,6 +81,7 @@ func NewPipeline(ctx context.Context,
return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
}
return &Pipeline{
num: num,
cfg: cfg,
historyDB: historyDB,
l2DB: l2DB,
@@ -276,8 +279,8 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
if err != nil {
return nil, tracerr.Wrap(err)
}
batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
// Structure to accumulate data and metadata of the batch
batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum}
batchInfo.Debug.StartTimestamp = time.Now()
batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1

View File

@@ -172,7 +172,7 @@ func TestPipelineForgeBatchWithTxs(t *testing.T) {
// users with positive balances
tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB)
syncStats := sync.Stats()
batchNum := common.BatchNum(syncStats.Sync.LastBatch)
batchNum := syncStats.Sync.LastBatch.BatchNum
syncSCVars := sync.SCVars()
pipeline, err := coord.newPipeline(ctx)

View File

@@ -2,6 +2,7 @@ package coordinator
import (
"context"
"errors"
"fmt"
"math/big"
"time"
@@ -9,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/l2db"
@@ -35,12 +37,20 @@ type TxManager struct {
vars synchronizer.SCVariables
statsVarsCh chan statsVars
queue []*BatchInfo
discardPipelineCh chan int // int refers to the pipelineNum
minPipelineNum int
queue Queue
// lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
lastSuccessBatch common.BatchNum
lastPendingBatch common.BatchNum
lastSuccessNonce uint64
lastPendingNonce uint64
// lastPendingBatch common.BatchNum
// accNonce is the account nonce in the last mined block (due to mined txs)
accNonce uint64
// accNextNonce is the nonce that we should use to send the next tx.
// In some cases this will be a reused nonce of an already pending tx.
accNextNonce uint64
// accPendingNonce is the pending nonce of the account due to pending txs
// accPendingNonce uint64
lastSentL1BatchBlockNum int64
}
@@ -56,26 +66,27 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac
if err != nil {
return nil, tracerr.Wrap(err)
}
lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
accNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
if err != nil {
return nil, err
}
lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
if err != nil {
return nil, err
}
if lastSuccessNonce != lastPendingNonce {
return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)",
lastSuccessNonce, lastPendingNonce))
}
log.Infow("TxManager started", "nonce", lastSuccessNonce)
// accPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
// if err != nil {
// return nil, err
// }
// if accNonce != accPendingNonce {
// return nil, tracerr.Wrap(fmt.Errorf("currentNonce (%v) != accPendingNonce (%v)",
// accNonce, accPendingNonce))
// }
log.Infow("TxManager started", "nonce", accNonce)
return &TxManager{
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
statsVarsCh: make(chan statsVars, queueLen),
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
statsVarsCh: make(chan statsVars, queueLen),
discardPipelineCh: make(chan int, queueLen),
account: accounts.Account{
Address: *address,
},
@@ -84,8 +95,11 @@ func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterfac
vars: *initSCVars,
lastSuccessNonce: lastSuccessNonce,
lastPendingNonce: lastPendingNonce,
minPipelineNum: 0,
queue: NewQueue(),
accNonce: accNonce,
accNextNonce: accNonce,
// accPendingNonce: accPendingNonce,
}, nil
}
@@ -106,6 +120,15 @@ func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.St
}
}
// DiscardPipeline is a thread safe method to notify about a discarded pipeline
// due to a reorg
func (t *TxManager) DiscardPipeline(ctx context.Context, pipelineNum int) {
select {
case t.discardPipelineCh <- pipelineNum:
case <-ctx.Done():
}
}
func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
updateSCVars(&t.vars, vars)
}
@@ -157,18 +180,52 @@ func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error {
return nil
}
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
func addPerc(v *big.Int, p int64) *big.Int {
r := new(big.Int).Set(v)
r.Mul(r, big.NewInt(p))
r.Div(r, big.NewInt(100))
return r.Add(v, r)
}
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error {
var ethTx *types.Transaction
var err error
auth, err := t.NewAuth(ctx)
if err != nil {
return tracerr.Wrap(err)
}
auth.Nonce = big.NewInt(int64(t.lastPendingNonce))
t.lastPendingNonce++
auth.Nonce = big.NewInt(int64(t.accNextNonce))
if resend {
auth.Nonce = big.NewInt(int64(batchInfo.EthTx.Nonce()))
}
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
if auth.GasPrice.Cmp(t.cfg.MaxGasPrice) > 0 {
return tracerr.Wrap(fmt.Errorf("calculated gasPrice (%v) > maxGasPrice (%v)",
auth.GasPrice, t.cfg.MaxGasPrice))
}
// RollupForgeBatch() calls ethclient.SendTransaction()
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
if err != nil {
if errors.Is(err, core.ErrNonceTooLow) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Add(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrNonceTooHigh) {
log.Warnw("TxManager ethClient.RollupForgeBatch decrementing nonce",
"err", err, "nonce", auth.Nonce, "batchNum", batchInfo.BatchNum)
auth.Nonce.Sub(auth.Nonce, big.NewInt(1))
attempt--
} else if errors.Is(err, core.ErrUnderpriced) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if errors.Is(err, core.ErrReplaceUnderpriced) {
log.Warnw("TxManager ethClient.RollupForgeBatch incrementing gasPrice",
"err", err, "gasPrice", auth.GasPrice, "batchNum", batchInfo.BatchNum)
auth.GasPrice = addPerc(auth.GasPrice, 10)
attempt--
} else if err != nil {
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
"batchNum", batchInfo.BatchNum)
@@ -184,11 +241,17 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
}
if !resend {
t.accNextNonce = auth.Nonce.Uint64() + 1
}
batchInfo.EthTx = ethTx
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash())
now := time.Now()
batchInfo.SendTimestamp = now
if resend {
batchInfo.Debug.ResendNum++
}
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
batchInfo.Debug.SendTimestamp = batchInfo.SendTimestamp
@@ -196,9 +259,11 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn
batchInfo.Debug.StartTimestamp).Seconds()
t.cfg.debugBatchStore(batchInfo)
t.lastPendingBatch = batchInfo.BatchNum
if batchInfo.L1Batch {
t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1
// t.lastPendingBatch = batchInfo.BatchNum
if !resend {
if batchInfo.L1Batch {
t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1
}
}
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
return tracerr.Wrap(err)
@@ -242,14 +307,14 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B
func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo.Receipt
if receipt != nil {
if batchInfo.EthTx.Nonce > t.lastSuccessNonce {
t.lastSuccessNonce = batchInfo.EthTx.Nonce
if batchInfo.EthTx.Nonce()+1 > t.accNonce {
t.accNonce = batchInfo.EthTx.Nonce() + 1
}
if receipt.Status == types.ReceiptStatusFailed {
batchInfo.Debug.Status = StatusFailed
t.cfg.debugBatchStore(batchInfo)
_, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(),
log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash,
"batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
"err", err)
if batchInfo.BatchNum <= t.lastSuccessBatch {
@@ -262,9 +327,17 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
batchInfo.Debug.StartBlockNum
now := time.Now()
batchInfo.Debug.StartToMineDelay = now.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
if batchInfo.Debug.StartToMineDelay == 0 {
if block, err := t.ethClient.EthBlockByNumber(ctx,
receipt.BlockNumber.Int64()); err != nil {
log.Warnw("TxManager: ethClient.EthBlockByNumber", "err", err)
} else {
batchInfo.Debug.SendToMineDelay = block.Timestamp.Sub(
batchInfo.Debug.SendTimestamp).Seconds()
batchInfo.Debug.StartToMineDelay = block.Timestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
}
}
t.cfg.debugBatchStore(batchInfo)
if batchInfo.BatchNum > t.lastSuccessBatch {
t.lastSuccessBatch = batchInfo.BatchNum
@@ -279,9 +352,62 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i
// TODO:
// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions)
type Queue struct {
list []*BatchInfo
// nonceByBatchNum map[common.BatchNum]uint64
next int
}
func NewQueue() Queue {
return Queue{
list: make([]*BatchInfo, 0),
// nonceByBatchNum: make(map[common.BatchNum]uint64),
next: 0,
}
}
func (q *Queue) Len() int {
return len(q.list)
}
func (q *Queue) At(position int) *BatchInfo {
if position >= len(q.list) {
return nil
}
return q.list[position]
}
func (q *Queue) Next() (int, *BatchInfo) {
if len(q.list) == 0 {
return 0, nil
}
defer func() { q.next = (q.next + 1) % len(q.list) }()
return q.next, q.list[q.next]
}
func (q *Queue) Remove(position int) {
// batchInfo := q.list[position]
// delete(q.nonceByBatchNum, batchInfo.BatchNum)
q.list = append(q.list[:position], q.list[position+1:]...)
if len(q.list) == 0 {
q.next = 0
} else {
q.next = position % len(q.list)
}
}
func (q *Queue) Push(batchInfo *BatchInfo) {
q.list = append(q.list, batchInfo)
// q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce()
}
// func (q *Queue) NonceByBatchNum(batchNum common.BatchNum) (uint64, bool) {
// nonce, ok := q.nonceByBatchNum[batchNum]
// return nonce, ok
// }
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
next := 0
waitDuration := longWaitDuration
var statsVars statsVars
@@ -292,7 +418,7 @@ func (t *TxManager) Run(ctx context.Context) {
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch)
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
for {
select {
@@ -302,7 +428,19 @@ func (t *TxManager) Run(ctx context.Context) {
case statsVars := <-t.statsVarsCh:
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
case pipelineNum := <-t.discardPipelineCh:
t.minPipelineNum = pipelineNum + 1
if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("TxManager: removeBadBatchInfos", "err", err)
continue
}
case batchInfo := <-t.batchCh:
if batchInfo.PipelineNum < t.minPipelineNum {
log.Warnw("TxManager: batchInfo received pipelineNum < minPipelineNum",
"num", batchInfo.PipelineNum, "minNum", t.minPipelineNum)
}
if err := t.shouldSendRollupForgeBatch(batchInfo); err != nil {
log.Warnw("TxManager: shouldSend", "err", err,
"batch", batchInfo.BatchNum)
@@ -310,7 +448,7 @@ func (t *TxManager) Run(ctx context.Context) {
Reason: fmt.Sprintf("forgeBatch shouldSend: %v", err)})
continue
}
if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
if err := t.sendRollupForgeBatch(ctx, batchInfo, false); ctx.Err() != nil {
continue
} else if err != nil {
// If we reach here it's because our ethNode has
@@ -324,16 +462,14 @@ func (t *TxManager) Run(ctx context.Context) {
Reason: fmt.Sprintf("forgeBatch send: %v", err)})
continue
}
t.queue = append(t.queue, batchInfo)
t.queue.Push(batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil {
waitDuration = longWaitDuration
continue
}
current := next
next = (current + 1) % len(t.queue)
batchInfo := t.queue[current]
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck
@@ -352,35 +488,93 @@ func (t *TxManager) Run(ctx context.Context) {
continue
} else if err != nil { //nolint:staticcheck
// Transaction was rejected
next = t.removeFromQueue(current)
if err := t.removeBadBatchInfos(ctx); ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("TxManager: removeBadBatchInfos", "err", err)
continue
}
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
continue
}
now := time.Now()
if confirm == nil && batchInfo.SendTimestamp > t.cfg.EthTxResendTimeout {
log.Infow("TxManager: forgeBatch tx not been mined timeout",
"tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum)
// TODO: Resend Tx with same nonce
if confirm == nil && now.Sub(batchInfo.SendTimestamp) > t.cfg.EthTxResendTimeout {
log.Infow("TxManager: forgeBatch tx not been mined timeout, resending",
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
if err := t.sendRollupForgeBatch(ctx, batchInfo, true); ctx.Err() != nil {
continue
} else if err != nil {
// If we reach here it's because our ethNode has
// been unable to send the transaction to
// ethereum. This could be due to the ethNode
// failure, or an invalid transaction (that
// can't be mined)
log.Warnw("TxManager: forgeBatch resend failed", "err", err,
"batch", batchInfo.BatchNum)
t.coord.SendMsg(ctx, MsgStopPipeline{
Reason: fmt.Sprintf("forgeBatch resend: %v", err)})
continue
}
}
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager: forgeBatch tx confirmed",
"tx", batchInfo.EthTx.Hex(), "batch", batchInfo.BatchNum)
next = t.removeFromQueue(current)
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
t.queue.Remove(queuePosition)
}
}
}
}
// Removes batchInfo at position from the queue, and returns the next position
func (t *TxManager) removeFromQueue(position int) (next int) {
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
next = 0
} else {
next = current % len(t.queue)
func (t *TxManager) removeBadBatchInfos(ctx context.Context) error {
next := 0
// batchNum := 0
for {
batchInfo := t.queue.At(next)
if batchInfo == nil {
break
}
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
return nil
} else if err != nil {
// Our ethNode is giving an error different
// than "not found" when getting the receipt
// for the transaction, so we can't figure out
// if it was not mined, mined and succesfull or
// mined and failed. This could be due to the
// ethNode failure.
next++
continue
}
confirm, err := t.handleReceipt(ctx, batchInfo)
if ctx.Err() != nil {
return nil
} else if err != nil {
// Transaction was rejected
if t.minPipelineNum <= batchInfo.PipelineNum {
t.minPipelineNum = batchInfo.PipelineNum + 1
}
t.queue.Remove(next)
continue
}
// If tx is pending but is from a cancelled pipeline, remove it
// from the queue
if confirm == nil {
if batchInfo.PipelineNum < t.minPipelineNum {
// batchNum++
t.queue.Remove(next)
continue
}
}
next++
}
return next
accNonce, err := t.ethClient.EthNonceAt(ctx, t.account.Address, nil)
if err != nil {
return err
}
t.accNextNonce = accNonce
return nil
}
func (t *TxManager) canForgeAt(blockNum int64) bool {

View File

@@ -0,0 +1,15 @@
package coordinator
import (
"math/big"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAddPerc(t *testing.T) {
assert.Equal(t, "110", addPerc(big.NewInt(100), 10).String())
assert.Equal(t, "101", addPerc(big.NewInt(100), 1).String())
assert.Equal(t, "12", addPerc(big.NewInt(10), 20).String())
assert.Equal(t, "1500", addPerc(big.NewInt(1000), 50).String())
}