You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

655 lines
22 KiB

/*
Package coordinator handles all the logic related to forging batches as a
coordinator in the hermez network.
The forging of batches is done with a pipeline in order to allow multiple
batches being forged in parallel. The maximum number of batches that can be
forged in parallel is determined by the number of available proof servers.
The Coordinator begins with the pipeline stopped. The main Coordinator
goroutine keeps listening for synchronizer events sent by the node package,
which allow the coordinator to determine if the configured forger address is
allowed to forge at the current block or not. When the forger address becomes
allowed to forge, the pipeline is started, and when it terminates being allowed
to forge, the pipeline is stopped.
The Pipeline consists of two goroutines. The first one is in charge of
preparing a batch internally, which involves making a selection of transactions
and calculating the ZKInputs for the batch proof, and sending these ZKInputs to
an idle proof server. This goroutine will keep preparing batches while there
are idle proof servers, if the forging policy determines that a batch should be
forged in the current state. The second goroutine is in charge of waiting for
the proof server to finish computing the proof, retreiving it, prepare the
arguments for the `forgeBatch` Rollup transaction, and sending the result to
the TxManager. All the batch information moves between functions and
goroutines via the BatchInfo struct.
Finally, the TxManager contains a single goroutine that makes forgeBatch
ethereum transactions for the batches sent by the Pipeline, and keeps them in a
list to check them periodically. In the periodic checks, the ethereum
transaction is checked for successfulness, and it's only forgotten after a
number of confirmation blocks have passed after being successfully mined. At
any point if a transaction failure is detected, the TxManager can signal the
Coordinator to reset the Pipeline in order to reforge the failed batches.
The Coordinator goroutine acts as a manager. The synchronizer events (which
notify about new blocks and associated new state) that it receives are
broadcasted to the Pipeline and the TxManager. This allows the Coordinator,
Pipeline and TxManager to have a copy of the current hermez network state
required to perform their duties.
*/
package coordinator
import (
"context"
"fmt"
"math/big"
"os"
"sync"
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/config"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/prover"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/txprocessor"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/hermeznetwork/tracerr"
)
var (
errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet")
errSkipBatchByPolicy = fmt.Errorf("skip batch by policy")
)
const (
queueLen = 16
longWaitDuration = 999 * time.Hour
zeroDuration = 0 * time.Second
)
// Config contains the Coordinator configuration
type Config struct {
// ForgerAddress is the address under which this coordinator is forging
ForgerAddress ethCommon.Address
// ConfirmBlocks is the number of confirmation blocks to wait for sent
// ethereum transactions before forgetting about them
ConfirmBlocks int64
// L1BatchTimeoutPerc is the portion of the range before the L1Batch
// timeout that will trigger a schedule to forge an L1Batch
L1BatchTimeoutPerc float64
// StartSlotBlocksDelay is the number of blocks of delay to wait before
// starting the pipeline when we reach a slot in which we can forge.
StartSlotBlocksDelay int64
// ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which
// the forger address is checked to be allowed to forge (apart from
// checking the next block), used to decide when to stop scheduling new
// batches (by stopping the pipeline).
// For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck
// is 5, even though at block 11 we canForge, the pipeline will be
// stopped if we can't forge at block 15.
// This value should be the expected number of blocks it takes between
// scheduling a batch and having it mined.
ScheduleBatchBlocksAheadCheck int64
// SendBatchBlocksMarginCheck is the number of margin blocks ahead in
// which the coordinator is also checked to be allowed to forge, apart
// from the next block; used to decide when to stop sending batches to
// the smart contract.
// For example, if we are at block 10 and SendBatchBlocksMarginCheck is
// 5, even though at block 11 we canForge, the batch will be discarded
// if we can't forge at block 15.
// This value should be the expected number of blocks it takes between
// sending a batch and having it mined.
SendBatchBlocksMarginCheck int64
// EthClientAttempts is the number of attempts to do an eth client RPC
// call before giving up
EthClientAttempts int
// ForgeRetryInterval is the waiting interval between calls forge a
// batch after an error
ForgeRetryInterval time.Duration
// ForgeDelay is the delay after which a batch is forged if the slot is
// already committed. If set to 0s, the coordinator will continuously
// forge at the maximum rate.
ForgeDelay time.Duration
// ForgeNoTxsDelay is the delay after which a batch is forged even if
// there are no txs to forge if the slot is already committed. If set
// to 0s, the coordinator will continuously forge even if the batches
// are empty.
ForgeNoTxsDelay time.Duration
// MustForgeAtSlotDeadline enables the coordinator to forge slots if
// the empty slots reach the slot deadline.
MustForgeAtSlotDeadline bool
// IgnoreSlotCommitment disables forcing the coordinator to forge a
// slot immediately when the slot is not committed. If set to false,
// the coordinator will immediately forge a batch at the beginning of
// a slot if it's the slot winner.
IgnoreSlotCommitment bool
// ForgeOncePerSlotIfTxs will make the coordinator forge at most one
// batch per slot, only if there are included txs in that batch, or
// pending l1UserTxs in the smart contract. Setting this parameter
// overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline`
// and `IgnoreSlotCommitment`.
ForgeOncePerSlotIfTxs bool
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time.Duration
// PurgeByExtDelInterval is the waiting interval between calls
// to the PurgeByExternalDelete function of the l2db which deletes
// pending txs externally marked by the column `external_delete`
PurgeByExtDelInterval time.Duration
// EthClientAttemptsDelay is delay between attempts do do an eth client
// RPC call
EthClientAttemptsDelay time.Duration
// EthTxResendTimeout is the timeout after which a non-mined ethereum
// transaction will be resent (reusing the nonce) with a newly
// calculated gas price
EthTxResendTimeout time.Duration
// EthNoReuseNonce disables reusing nonces of pending transactions for
// new replacement transactions
EthNoReuseNonce bool
// MaxGasPrice is the maximum gas price allowed for ethereum
// transactions
MaxGasPrice *big.Int
// GasPriceIncPerc is the percentage increase of gas price set in an
// ethereum transaction from the suggested gas price by the ehtereum
// node
GasPriceIncPerc int64
// TxManagerCheckInterval is the waiting interval between receipt
// checks of ethereum transactions in the TxManager
TxManagerCheckInterval time.Duration
// DebugBatchPath if set, specifies the path where batchInfo is stored
// in JSON in every step/update of the pipeline
DebugBatchPath string
Purger PurgerCfg
// VerifierIdx is the index of the verifier contract registered in the
// smart contract
VerifierIdx uint8
// ForgeBatchGasCost contains the cost of each action in the
// ForgeBatch transaction.
ForgeBatchGasCost config.ForgeBatchGasCost
TxProcessorConfig txprocessor.Config
}
func (c *Config) debugBatchStore(batchInfo *BatchInfo) {
if c.DebugBatchPath != "" {
if err := batchInfo.DebugStore(c.DebugBatchPath); err != nil {
log.Warnw("Error storing debug BatchInfo",
"path", c.DebugBatchPath, "err", err)
}
}
}
type fromBatch struct {
BatchNum common.BatchNum
ForgerAddr ethCommon.Address
StateRoot *big.Int
}
// Coordinator implements the Coordinator type
type Coordinator struct {
// State
pipelineNum int // Pipeline sequential number. The first pipeline is 1
pipelineFromBatch fromBatch // batch from which we started the pipeline
provers []prover.Client
consts common.SCConsts
vars common.SCVariables
stats synchronizer.Stats
started bool
cfg Config
historyDB *historydb.HistoryDB
l2DB *l2db.L2DB
txSelector *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
msgCh chan interface{}
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
// mutexL2DBUpdateDelete protects updates to the L2DB so that
// these two processes always happen exclusively:
// - Pipeline taking pending txs, running through the TxProcessor and
// marking selected txs as forging
// - Coordinator deleting pending txs that have been marked with
// `external_delete`.
// Without this mutex, the coordinator could delete a pending txs that
// has just been selected by the TxProcessor in the pipeline.
mutexL2DBUpdateDelete sync.Mutex
pipeline *Pipeline
lastNonFailedBatchNum common.BatchNum
purger *Purger
txManager *TxManager
}
// NewCoordinator creates a new Coordinator
func NewCoordinator(cfg Config,
historyDB *historydb.HistoryDB,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
batchBuilder *batchbuilder.BatchBuilder,
serverProofs []prover.Client,
ethClient eth.ClientInterface,
scConsts *common.SCConsts,
initSCVars *common.SCVariables,
) (*Coordinator, error) {
// nolint reason: hardcoded `1.0`, by design the percentage can't be over 100%
if cfg.L1BatchTimeoutPerc >= 1.0 { //nolint:gomnd
return nil, tracerr.Wrap(fmt.Errorf("invalid value for Config.L1BatchTimeoutPerc (%v >= 1.0)",
cfg.L1BatchTimeoutPerc))
}
if cfg.EthClientAttempts < 1 {
return nil, tracerr.Wrap(fmt.Errorf("invalid value for Config.EthClientAttempts (%v < 1)",
cfg.EthClientAttempts))
}
if cfg.DebugBatchPath != "" {
if err := os.MkdirAll(cfg.DebugBatchPath, 0744); err != nil {
return nil, tracerr.Wrap(err)
}
}
purger := Purger{
cfg: cfg.Purger,
lastPurgeBlock: 0,
lastPurgeBatch: 0,
lastInvalidateBlock: 0,
lastInvalidateBatch: 0,
}
ctx, cancel := context.WithCancel(context.Background())
c := Coordinator{
pipelineNum: 0,
pipelineFromBatch: fromBatch{
BatchNum: 0,
ForgerAddr: ethCommon.Address{},
StateRoot: big.NewInt(0),
},
provers: serverProofs,
consts: *scConsts,
vars: *initSCVars,
cfg: cfg,
historyDB: historyDB,
l2DB: l2DB,
txSelector: txSelector,
batchBuilder: batchBuilder,
purger: &purger,
msgCh: make(chan interface{}),
ctx: ctx,
// wg
cancel: cancel,
}
ctxTimeout, ctxTimeoutCancel := context.WithTimeout(ctx, 1*time.Second)
defer ctxTimeoutCancel()
txManager, err := NewTxManager(ctxTimeout, &cfg, ethClient, l2DB, &c,
scConsts, initSCVars)
if err != nil {
return nil, tracerr.Wrap(err)
}
c.txManager = txManager
// Set Eth LastBlockNum to -1 in stats so that stats.Synced() is
// guaranteed to return false before it's updated with a real stats
c.stats.Eth.LastBlock.Num = -1
return &c, nil
}
// TxSelector returns the inner TxSelector
func (c *Coordinator) TxSelector() *txselector.TxSelector {
return c.txSelector
}
// BatchBuilder returns the inner BatchBuilder
func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
return c.batchBuilder
}
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
c.pipelineNum++
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
c.batchBuilder, &c.mutexL2DBUpdateDelete, c.purger, c, c.txManager,
c.provers, &c.consts)
}
// MsgSyncBlock indicates an update to the Synchronizer stats
type MsgSyncBlock struct {
Stats synchronizer.Stats
Batches []common.BatchData
// Vars contains each Smart Contract variables if they are updated, or
// nil if they haven't changed.
Vars common.SCVariablesPtr
}
// MsgSyncReorg indicates a reorg
type MsgSyncReorg struct {
Stats synchronizer.Stats
Vars common.SCVariablesPtr
}
// MsgStopPipeline indicates a signal to reset the pipeline
type MsgStopPipeline struct {
Reason string
// FailedBatchNum indicates the first batchNum that failed in the
// pipeline. If FailedBatchNum is 0, it should be ignored.
FailedBatchNum common.BatchNum
}
// SendMsg is a thread safe method to pass a message to the Coordinator
func (c *Coordinator) SendMsg(ctx context.Context, msg interface{}) {
select {
case c.msgCh <- msg:
case <-ctx.Done():
}
}
func updateSCVars(vars *common.SCVariables, update common.SCVariablesPtr) {
if update.Rollup != nil {
vars.Rollup = *update.Rollup
}
if update.Auction != nil {
vars.Auction = *update.Auction
}
if update.WDelayer != nil {
vars.WDelayer = *update.WDelayer
}
}
func (c *Coordinator) syncSCVars(vars common.SCVariablesPtr) {
updateSCVars(&c.vars, vars)
}
func canForge(auctionConstants *common.AuctionConstants, auctionVars *common.AuctionVariables,
currentSlot *common.Slot, nextSlot *common.Slot, addr ethCommon.Address, blockNum int64,
mustForgeAtDeadline bool) bool {
if blockNum < auctionConstants.GenesisBlockNum {
log.Infow("canForge: requested blockNum is < genesis", "blockNum", blockNum,
"genesis", auctionConstants.GenesisBlockNum)
return false
}
var slot *common.Slot
if currentSlot.StartBlock <= blockNum && blockNum <= currentSlot.EndBlock {
slot = currentSlot
} else if nextSlot.StartBlock <= blockNum && blockNum <= nextSlot.EndBlock {
slot = nextSlot
} else {
log.Warnw("canForge: requested blockNum is outside current and next slot",
"blockNum", blockNum, "currentSlot", currentSlot,
"nextSlot", nextSlot,
)
return false
}
anyoneForge := false
if !slot.ForgerCommitment &&
auctionConstants.RelativeBlock(blockNum) >= int64(auctionVars.SlotDeadline) {
log.Debugw("canForge: anyone can forge in the current slot (slotDeadline passed)",
"block", blockNum)
anyoneForge = true
}
if slot.Forger == addr || (anyoneForge && mustForgeAtDeadline) {
return true
}
log.Debugw("canForge: can't forge", "slot.Forger", slot.Forger)
return false
}
func (c *Coordinator) canForgeAt(blockNum int64) bool {
return canForge(&c.consts.Auction, &c.vars.Auction,
&c.stats.Sync.Auction.CurrentSlot, &c.stats.Sync.Auction.NextSlot,
c.cfg.ForgerAddress, blockNum, c.cfg.MustForgeAtSlotDeadline)
}
func (c *Coordinator) canForge() bool {
blockNum := c.stats.Eth.LastBlock.Num + 1
return canForge(&c.consts.Auction, &c.vars.Auction,
&c.stats.Sync.Auction.CurrentSlot, &c.stats.Sync.Auction.NextSlot,
c.cfg.ForgerAddress, blockNum, c.cfg.MustForgeAtSlotDeadline)
}
func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error {
nextBlock := c.stats.Eth.LastBlock.Num + 1
canForge := c.canForgeAt(nextBlock)
if c.cfg.ScheduleBatchBlocksAheadCheck != 0 && canForge {
canForge = c.canForgeAt(nextBlock + c.cfg.ScheduleBatchBlocksAheadCheck)
}
if c.pipeline == nil {
relativeBlock := c.consts.Auction.RelativeBlock(nextBlock)
if canForge && relativeBlock < c.cfg.StartSlotBlocksDelay {
log.Debugf("Coordinator: delaying pipeline start due to "+
"relativeBlock (%v) < cfg.StartSlotBlocksDelay (%v)",
relativeBlock, c.cfg.StartSlotBlocksDelay)
} else if canForge {
log.Infow("Coordinator: forging state begin", "block",
stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum)
fromBatch := fromBatch{
BatchNum: stats.Sync.LastBatch.BatchNum,
ForgerAddr: stats.Sync.LastBatch.ForgerAddr,
StateRoot: stats.Sync.LastBatch.StateRoot,
}
if c.lastNonFailedBatchNum > fromBatch.BatchNum {
fromBatch.BatchNum = c.lastNonFailedBatchNum
fromBatch.ForgerAddr = c.cfg.ForgerAddress
fromBatch.StateRoot = big.NewInt(0)
}
// Before starting the pipeline make sure we reset any
// l2tx from the pool that was forged in a batch that
// didn't end up being mined. We are already doing
// this in handleStopPipeline, but we do it again as a
// failsafe in case the last synced batchnum is
// different than in the previous call to l2DB.Reorg,
// or in case the node was restarted when there was a
// started batch that included l2txs but was not mined.
if err := c.l2DB.Reorg(fromBatch.BatchNum); err != nil {
return tracerr.Wrap(err)
}
var err error
if c.pipeline, err = c.newPipeline(ctx); err != nil {
return tracerr.Wrap(err)
}
c.pipelineFromBatch = fromBatch
// Start the pipeline
if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil {
c.pipeline = nil
return tracerr.Wrap(err)
}
}
} else {
if !canForge {
log.Infow("Coordinator: forging state end", "block", stats.Eth.LastBlock.Num+1)
c.pipeline.Stop(c.ctx)
c.pipeline = nil
}
}
if c.pipeline == nil {
if _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)); err != nil {
return tracerr.Wrap(err)
}
if _, err := c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num,
int64(stats.Sync.LastBatch.BatchNum)); err != nil {
return tracerr.Wrap(err)
}
}
return nil
}
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
c.stats = msg.Stats
c.syncSCVars(msg.Vars)
c.txManager.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars)
if c.pipeline != nil {
c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars)
}
if !c.stats.Synced() {
return nil
}
return c.syncStats(ctx, &c.stats)
}
func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error {
c.stats = msg.Stats
c.syncSCVars(msg.Vars)
c.txManager.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars)
if c.pipeline != nil {
c.pipeline.SetSyncStatsVars(ctx, &msg.Stats, &msg.Vars)
}
if c.stats.Sync.LastBatch.ForgerAddr != c.cfg.ForgerAddress &&
(c.stats.Sync.LastBatch.StateRoot == nil || c.pipelineFromBatch.StateRoot == nil ||
c.stats.Sync.LastBatch.StateRoot.Cmp(c.pipelineFromBatch.StateRoot) != 0) {
// There's been a reorg and the batch state root from which the
// pipeline was started has changed (probably because it was in
// a block that was discarded), and it was sent by a different
// coordinator than us. That batch may never be in the main
// chain, so we stop the pipeline (it will be started again
// once the node is in sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch.ForgerAddr != cfg.ForgerAddr "+
"& sync.LastBatch.StateRoot != pipelineFromBatch.StateRoot",
"sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot,
"pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot)
c.txManager.DiscardPipeline(ctx, c.pipelineNum)
if err := c.handleStopPipeline(ctx, "reorg", 0); err != nil {
return tracerr.Wrap(err)
}
}
return nil
}
// handleStopPipeline handles stopping the pipeline. If failedBatchNum is 0,
// the next pipeline will start from the last state of the synchronizer,
// otherwise, it will state from failedBatchNum-1.
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string,
failedBatchNum common.BatchNum) error {
batchNum := c.stats.Sync.LastBatch.BatchNum
if failedBatchNum != 0 {
batchNum = failedBatchNum - 1
}
if c.pipeline != nil {
c.pipeline.Stop(c.ctx)
c.pipeline = nil
}
if err := c.l2DB.Reorg(batchNum); err != nil {
return tracerr.Wrap(err)
}
c.lastNonFailedBatchNum = batchNum
return nil
}
func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case MsgSyncBlock:
if err := c.handleMsgSyncBlock(ctx, &msg); err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleMsgSyncBlock error: %w", err))
}
case MsgSyncReorg:
if err := c.handleReorg(ctx, &msg); err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleReorg error: %w", err))
}
case MsgStopPipeline:
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
if err := c.handleStopPipeline(ctx, msg.Reason, msg.FailedBatchNum); err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err))
}
default:
log.Fatalw("Coordinator Unexpected Coordinator msg of type %T: %+v", msg, msg)
}
return nil
}
// Start the coordinator
func (c *Coordinator) Start() {
if c.started {
log.Fatal("Coordinator already started")
}
c.started = true
c.wg.Add(1)
go func() {
c.txManager.Run(c.ctx)
c.wg.Done()
}()
c.wg.Add(1)
go func() {
timer := time.NewTimer(longWaitDuration)
for {
select {
case <-c.ctx.Done():
log.Info("Coordinator done")
c.wg.Done()
return
case msg := <-c.msgCh:
if err := c.handleMsg(c.ctx, msg); c.ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err)
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue
}
case <-timer.C:
timer.Reset(longWaitDuration)
if !c.stats.Synced() {
continue
}
if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil {
continue
} else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err)
if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue
}
}
}
}()
c.wg.Add(1)
go func() {
for {
select {
case <-c.ctx.Done():
log.Info("Coordinator L2DB.PurgeByExternalDelete loop done")
c.wg.Done()
return
case <-time.After(c.cfg.PurgeByExtDelInterval):
c.mutexL2DBUpdateDelete.Lock()
if err := c.l2DB.PurgeByExternalDelete(); err != nil {
log.Errorw("L2DB.PurgeByExternalDelete", "err", err)
}
c.mutexL2DBUpdateDelete.Unlock()
}
}
}()
}
const stopCtxTimeout = 200 * time.Millisecond
// Stop the coordinator
func (c *Coordinator) Stop() {
if !c.started {
log.Fatal("Coordinator already stopped")
}
c.started = false
log.Infow("Stopping Coordinator...")
c.cancel()
c.wg.Wait()
if c.pipeline != nil {
ctx, cancel := context.WithTimeout(context.Background(), stopCtxTimeout)
defer cancel()
c.pipeline.Stop(ctx)
c.pipeline = nil
}
}