Update coordinator to work better under real net

- cli / node
    - Update handler of SIGINT so that after 3 SIGINTs, the process terminates
      unconditionally
- coordinator
    - Store stats without pointer
    - In all functions that send a variable via channel, check for context done
      to avoid deadlock (due to no process reading from the channel, which has
      no queue) when the node is stopped.
    - Abstract `canForge` so that it can be used outside of the `Coordinator`
    - In `canForge` check the blockNumber in current and next slot.
    - Update tests due to smart contract changes in slot handling, and minimum
      bid defaults
    - TxManager
        - Add consts, vars and stats to allow evaluating `canForge`
        - Add `canForge` method (not used yet)
        - Store batch and nonces status (last success and last pending)
        - Track nonces internally instead of relying on the ethereum node (this
          is required to work with ganache when there are pending txs)
        - Handle the (common) case of the receipt not being found after the tx
          is sent.
        - Don't start the main loop until we get an initial messae fo the stats
          and vars (so that in the loop the stats and vars are set to
          synchronizer values)
- eth / ethereum client
    - Add necessary methods to create the auth object for transactions manually
      so that we can set the nonce, gas price, gas limit, etc manually
    - Update `RollupForgeBatch` to take an auth object as input (so that the
      coordinator can set parameters manually)
- synchronizer
    - In stats, add `NextSlot`
This commit is contained in:
Eduard S
2021-01-15 19:37:39 +01:00
parent 168432c559
commit 70482605c4
20 changed files with 664 additions and 308 deletions

View File

@@ -3,14 +3,18 @@ package coordinator
import (
"context"
"fmt"
"strings"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/types"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/tracerr"
)
@@ -18,61 +22,150 @@ import (
// call to forge, waits for transaction confirmation, and keeps checking them
// until a number of confirmed blocks have passed.
type TxManager struct {
cfg Config
ethClient eth.ClientInterface
l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
coord *Coordinator // Used only to send messages to stop the pipeline
batchCh chan *BatchInfo
lastBlockCh chan int64
queue []*BatchInfo
lastBlock int64
// lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed
lastConfirmedBatch common.BatchNum
cfg Config
ethClient eth.ClientInterface
l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
coord *Coordinator // Used only to send messages to stop the pipeline
batchCh chan *BatchInfo
chainID *big.Int
account accounts.Account
consts synchronizer.SCConsts
stats synchronizer.Stats
vars synchronizer.SCVariables
statsVarsCh chan statsVars
queue []*BatchInfo
// lastSuccessBatch stores the last BatchNum that who's forge call was confirmed
lastSuccessBatch common.BatchNum
lastPendingBatch common.BatchNum
lastSuccessNonce uint64
lastPendingNonce uint64
}
// NewTxManager creates a new TxManager
func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
coord *Coordinator) *TxManager {
func NewTxManager(ctx context.Context, cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
coord *Coordinator, scConsts *synchronizer.SCConsts, initSCVars *synchronizer.SCVariables) (*TxManager, error) {
chainID, err := ethClient.EthChainID()
if err != nil {
return nil, tracerr.Wrap(err)
}
address, err := ethClient.EthAddress()
if err != nil {
return nil, tracerr.Wrap(err)
}
lastSuccessNonce, err := ethClient.EthNonceAt(ctx, *address, nil)
if err != nil {
return nil, err
}
lastPendingNonce, err := ethClient.EthPendingNonceAt(ctx, *address)
if err != nil {
return nil, err
}
if lastSuccessNonce != lastPendingNonce {
return nil, tracerr.Wrap(fmt.Errorf("lastSuccessNonce (%v) != lastPendingNonce (%v)",
lastSuccessNonce, lastPendingNonce))
}
log.Infow("TxManager started", "nonce", lastSuccessNonce)
return &TxManager{
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
lastBlockCh: make(chan int64, queueLen),
lastBlock: -1,
}
statsVarsCh: make(chan statsVars, queueLen),
account: accounts.Account{
Address: *address,
},
chainID: chainID,
consts: *scConsts,
vars: *initSCVars,
lastSuccessNonce: lastSuccessNonce,
lastPendingNonce: lastPendingNonce,
}, nil
}
// AddBatch is a thread safe method to pass a new batch TxManager to be sent to
// the smart contract via the forge call
func (t *TxManager) AddBatch(batchInfo *BatchInfo) {
t.batchCh <- batchInfo
func (t *TxManager) AddBatch(ctx context.Context, batchInfo *BatchInfo) {
select {
case t.batchCh <- batchInfo:
case <-ctx.Done():
}
}
// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager
func (t *TxManager) SetLastBlock(lastBlock int64) {
t.lastBlockCh <- lastBlock
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func (t *TxManager) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
select {
case t.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
case <-ctx.Done():
}
}
func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
t.vars.Rollup = *vars.Rollup
}
if vars.Auction != nil {
t.vars.Auction = *vars.Auction
}
if vars.WDelayer != nil {
t.vars.WDelayer = *vars.WDelayer
}
}
// NewAuth generates a new auth object for an ethereum transaction
func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
if err != nil {
return nil, tracerr.Wrap(err)
}
inc := new(big.Int).Set(gasPrice)
const gasPriceDiv = 100
inc.Div(inc, new(big.Int).SetUint64(gasPriceDiv))
gasPrice.Add(gasPrice, inc)
// log.Debugw("TxManager: transaction metadata", "gasPrice", gasPrice)
auth, err := bind.NewKeyStoreTransactorWithChainID(t.ethClient.EthKeyStore(), t.account, t.chainID)
if err != nil {
return nil, tracerr.Wrap(err)
}
auth.Value = big.NewInt(0) // in wei
// TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
auth.GasLimit = 1000000
auth.GasPrice = gasPrice
auth.Nonce = nil
return auth, nil
}
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
// TODO: Check if we can forge in the next blockNum, abort if we can't
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.lastBlock + 1
batchInfo.Debug.SendBlockNum = t.stats.Eth.LastBlock.Num + 1
batchInfo.Debug.SendTimestamp = time.Now()
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
batchInfo.Debug.StartTimestamp).Seconds()
var ethTx *types.Transaction
var err error
auth, err := t.NewAuth(ctx)
if err != nil {
return tracerr.Wrap(err)
}
auth.Nonce = big.NewInt(int64(t.lastPendingNonce))
t.lastPendingNonce++
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs)
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs, auth)
if err != nil {
if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err,
"block", t.lastBlock+1)
return tracerr.Wrap(err)
}
// if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
// log.Errorw("TxManager ethClient.RollupForgeBatch", "err", err,
// "block", t.stats.Eth.LastBlock.Num+1)
// return tracerr.Wrap(err)
// }
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.lastBlock+1,
"attempt", attempt, "err", err, "block", t.stats.Eth.LastBlock.Num+1,
"batchNum", batchInfo.BatchNum)
} else {
break
@@ -89,12 +182,15 @@ func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchIn
batchInfo.EthTx = ethTx
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
t.cfg.debugBatchStore(batchInfo)
t.lastPendingBatch = batchInfo.BatchNum
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
return tracerr.Wrap(err)
}
return nil
}
// checkEthTransactionReceipt takes the txHash from the BatchInfo and stores
// the corresponding receipt if found
func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
txHash := batchInfo.EthTx.Hash()
var receipt *types.Receipt
@@ -103,8 +199,10 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B
receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
if ctx.Err() != nil {
continue
}
if err != nil {
} else if tracerr.Unwrap(err) == ethereum.NotFound {
err = nil
break
} else if err != nil {
log.Errorw("TxManager ethClient.EthTransactionReceipt",
"attempt", attempt, "err", err)
} else {
@@ -124,24 +222,28 @@ func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *B
return nil
}
func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo.Receipt
if receipt != nil {
if receipt.Status == types.ReceiptStatusFailed {
batchInfo.Debug.Status = StatusFailed
t.cfg.debugBatchStore(batchInfo)
log.Errorw("TxManager receipt status is failed", "receipt", receipt)
return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed"))
_, err := t.ethClient.EthCall(ctx, batchInfo.EthTx, receipt.BlockNumber)
log.Warnw("TxManager receipt status is failed", "tx", receipt.TxHash.Hex(),
"batch", batchInfo.BatchNum, "block", receipt.BlockNumber.Int64(),
"err", err)
return nil, tracerr.Wrap(fmt.Errorf(
"ethereum transaction receipt status is failed: %w", err))
} else if receipt.Status == types.ReceiptStatusSuccessful {
batchInfo.Debug.Status = StatusMined
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
batchInfo.Debug.StartBlockNum
t.cfg.debugBatchStore(batchInfo)
if batchInfo.BatchNum > t.lastConfirmedBatch {
t.lastConfirmedBatch = batchInfo.BatchNum
if batchInfo.BatchNum > t.lastSuccessBatch {
t.lastSuccessBatch = batchInfo.BatchNum
}
confirm := t.lastBlock - receipt.BlockNumber.Int64()
confirm := t.stats.Eth.LastBlock.Num - receipt.BlockNumber.Int64()
return &confirm, nil
}
}
@@ -153,25 +255,41 @@ func (t *TxManager) Run(ctx context.Context) {
next := 0
waitDuration := longWaitDuration
var statsVars statsVars
select {
case statsVars = <-t.statsVarsCh:
case <-ctx.Done():
}
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatch)
for {
select {
case <-ctx.Done():
log.Info("TxManager done")
return
case lastBlock := <-t.lastBlockCh:
t.lastBlock = lastBlock
case statsVars := <-t.statsVarsCh:
t.stats = statsVars.Stats
t.syncSCVars(statsVars.Vars)
case batchInfo := <-t.batchCh:
if err := t.callRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
if err := t.sendRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
continue
} else if err != nil {
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)})
// If we reach here it's because our ethNode has
// been unable to send the transaction to
// ethereum. This could be due to the ethNode
// failure, or an invalid transaction (that
// can't be mined)
t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch send: %v", err)})
continue
}
log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum)
t.queue = append(t.queue, batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
waitDuration = longWaitDuration
continue
}
current := next
@@ -180,23 +298,33 @@ func (t *TxManager) Run(ctx context.Context) {
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck
// We can't get the receipt for the
// transaction, so we can't confirm if it was
// mined
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
// Our ethNode is giving an error different
// than "not found" when getting the receipt
// for the transaction, so we can't figure out
// if it was not mined, mined and succesfull or
// mined and failed. This could be due to the
// ethNode failure.
t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
}
confirm, err := t.handleReceipt(batchInfo)
if err != nil { //nolint:staticcheck
confirm, err := t.handleReceipt(ctx, batchInfo)
if ctx.Err() != nil {
continue
} else if err != nil { //nolint:staticcheck
// Transaction was rejected
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
next = 0
} else {
next = current % len(t.queue)
}
t.coord.SendMsg(ctx, MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
}
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager tx for RollupForgeBatch confirmed",
"batch", batchInfo.BatchNum)
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
waitDuration = longWaitDuration
next = 0
} else {
next = current % len(t.queue)
@@ -205,3 +333,11 @@ func (t *TxManager) Run(ctx context.Context) {
}
}
}
// nolint reason: this function will be used in the future
//nolint:unused
func (t *TxManager) canForge(stats *synchronizer.Stats, blockNum int64) bool {
return canForge(&t.consts.Auction, &t.vars.Auction,
&stats.Sync.Auction.CurrentSlot, &stats.Sync.Auction.NextSlot,
t.cfg.ForgerAddress, blockNum)
}