mirror of
https://github.com/arnaucube/hermez-node.git
synced 2026-02-07 11:26:44 +01:00
WIP5
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -47,7 +46,7 @@ type Config struct {
|
||||
// starting the pipeline when we reach a slot in which we can forge.
|
||||
StartSlotBlocksDelay int64
|
||||
// ScheduleBatchBlocksAheadCheck is the number of blocks ahead in which
|
||||
// the forger address is checked to be allowed to forge (appart from
|
||||
// the forger address is checked to be allowed to forge (apart from
|
||||
// checking the next block), used to decide when to stop scheduling new
|
||||
// batches (by stopping the pipeline).
|
||||
// For example, if we are at block 10 and ScheduleBatchBlocksAheadCheck
|
||||
@@ -57,7 +56,7 @@ type Config struct {
|
||||
// scheduling a batch and having it mined.
|
||||
ScheduleBatchBlocksAheadCheck int64
|
||||
// SendBatchBlocksMarginCheck is the number of margin blocks ahead in
|
||||
// which the coordinator is also checked to be allowed to forge, appart
|
||||
// which the coordinator is also checked to be allowed to forge, apart
|
||||
// from the next block; used to decide when to stop sending batches to
|
||||
// the smart contract.
|
||||
// For example, if we are at block 10 and SendBatchBlocksMarginCheck is
|
||||
@@ -139,7 +138,8 @@ type Coordinator struct {
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
||||
pipeline *Pipeline
|
||||
pipeline *Pipeline
|
||||
lastNonFailedBatchNum common.BatchNum
|
||||
|
||||
purger *Purger
|
||||
txManager *TxManager
|
||||
@@ -233,7 +233,7 @@ func (c *Coordinator) BatchBuilder() *batchbuilder.BatchBuilder {
|
||||
func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
|
||||
c.pipelineNum++
|
||||
return NewPipeline(ctx, c.cfg, c.pipelineNum, c.historyDB, c.l2DB, c.txSelector,
|
||||
c.batchBuilder, c.purger, c.txManager, c.provers, &c.consts)
|
||||
c.batchBuilder, c.purger, c, c.txManager, c.provers, &c.consts)
|
||||
}
|
||||
|
||||
// MsgSyncBlock indicates an update to the Synchronizer stats
|
||||
@@ -254,6 +254,9 @@ type MsgSyncReorg struct {
|
||||
// MsgStopPipeline indicates a signal to reset the pipeline
|
||||
type MsgStopPipeline struct {
|
||||
Reason string
|
||||
// FailedBatchNum indicates the first batchNum that faile in the
|
||||
// pipeline. If FailedBatchNum is 0, it should be ignored.
|
||||
FailedBatchNum common.BatchNum
|
||||
}
|
||||
|
||||
// SendMsg is a thread safe method to pass a message to the Coordinator
|
||||
@@ -342,6 +345,9 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
|
||||
log.Infow("Coordinator: forging state begin", "block",
|
||||
stats.Eth.LastBlock.Num+1, "batch", stats.Sync.LastBatch.BatchNum)
|
||||
batchNum := stats.Sync.LastBatch.BatchNum
|
||||
if c.lastNonFailedBatchNum > batchNum {
|
||||
batchNum = c.lastNonFailedBatchNum
|
||||
}
|
||||
var err error
|
||||
if c.pipeline, err = c.newPipeline(ctx); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
@@ -367,19 +373,17 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) {
|
||||
if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
}
|
||||
_, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
|
||||
stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum))
|
||||
if err != nil {
|
||||
// if c.purger.CanInvalidate(stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)) {
|
||||
// if err := c.txSelector.Reset(stats.Sync.LastBatch.BatchNum); err != nil {
|
||||
// return tracerr.Wrap(err)
|
||||
// }
|
||||
// }
|
||||
if _, err := c.purger.InvalidateMaybe(c.l2DB, c.txSelector.LocalAccountsDB(),
|
||||
stats.Sync.LastBlock.Num, int64(stats.Sync.LastBatch.BatchNum)); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
_, err = c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num,
|
||||
int64(stats.Sync.LastBatch.BatchNum))
|
||||
if err != nil {
|
||||
if _, err := c.purger.PurgeMaybe(c.l2DB, stats.Sync.LastBlock.Num,
|
||||
int64(stats.Sync.LastBatch.BatchNum)); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
}
|
||||
@@ -419,24 +423,29 @@ func (c *Coordinator) handleReorg(ctx context.Context, msg *MsgSyncReorg) error
|
||||
"sync.LastBatch.StateRoot", c.stats.Sync.LastBatch.StateRoot,
|
||||
"pipelineFromBatch.StateRoot", c.pipelineFromBatch.StateRoot)
|
||||
c.txManager.DiscardPipeline(ctx, c.pipelineNum)
|
||||
if err := c.handleStopPipeline(ctx, "reorg"); err != nil {
|
||||
if err := c.handleStopPipeline(ctx, "reorg", 0); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error {
|
||||
// handleStopPipeline handles stopping the pipeline. If failedBatchNum is 0,
|
||||
// the next pipeline will start from the last state of the synchronizer,
|
||||
// otherwise, it will state from failedBatchNum-1.
|
||||
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string, failedBatchNum common.BatchNum) error {
|
||||
batchNum := c.stats.Sync.LastBatch.BatchNum
|
||||
if failedBatchNum != 0 {
|
||||
batchNum = failedBatchNum - 1
|
||||
}
|
||||
if c.pipeline != nil {
|
||||
c.pipeline.Stop(c.ctx)
|
||||
c.pipeline = nil
|
||||
}
|
||||
if err := c.l2DB.Reorg(c.stats.Sync.LastBatch.BatchNum); err != nil {
|
||||
if err := c.l2DB.Reorg(batchNum); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
if strings.Contains(reason, common.AuctionErrMsgCannotForge) { //nolint:staticcheck
|
||||
// TODO: Check that we are in a slot in which we can't forge
|
||||
}
|
||||
c.lastNonFailedBatchNum = batchNum
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -452,7 +461,7 @@ func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error {
|
||||
}
|
||||
case MsgStopPipeline:
|
||||
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
|
||||
if err := c.handleStopPipeline(ctx, msg.Reason); err != nil {
|
||||
if err := c.handleStopPipeline(ctx, msg.Reason, msg.FailedBatchNum); err != nil {
|
||||
return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err))
|
||||
}
|
||||
default:
|
||||
|
||||
@@ -2,6 +2,7 @@ package coordinator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync"
|
||||
@@ -41,10 +42,13 @@ type Pipeline struct {
|
||||
// batchNum common.BatchNum
|
||||
// lastScheduledL1BatchBlockNum int64
|
||||
// lastForgeL1TxsNum int64
|
||||
started bool
|
||||
started bool
|
||||
rw sync.RWMutex
|
||||
errAtBatchNum common.BatchNum
|
||||
|
||||
proversPool *ProversPool
|
||||
provers []prover.Client
|
||||
coord *Coordinator
|
||||
txManager *TxManager
|
||||
historyDB *historydb.HistoryDB
|
||||
l2DB *l2db.L2DB
|
||||
@@ -61,6 +65,18 @@ type Pipeline struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) {
|
||||
p.rw.Lock()
|
||||
defer p.rw.Unlock()
|
||||
p.errAtBatchNum = batchNum
|
||||
}
|
||||
|
||||
func (p *Pipeline) getErrAtBatchNum() common.BatchNum {
|
||||
p.rw.RLock()
|
||||
defer p.rw.RUnlock()
|
||||
return p.errAtBatchNum
|
||||
}
|
||||
|
||||
// NewPipeline creates a new Pipeline
|
||||
func NewPipeline(ctx context.Context,
|
||||
cfg Config,
|
||||
@@ -70,6 +86,7 @@ func NewPipeline(ctx context.Context,
|
||||
txSelector *txselector.TxSelector,
|
||||
batchBuilder *batchbuilder.BatchBuilder,
|
||||
purger *Purger,
|
||||
coord *Coordinator,
|
||||
txManager *TxManager,
|
||||
provers []prover.Client,
|
||||
scConsts *synchronizer.SCConsts,
|
||||
@@ -97,6 +114,7 @@ func NewPipeline(ctx context.Context,
|
||||
provers: provers,
|
||||
proversPool: proversPool,
|
||||
purger: purger,
|
||||
coord: coord,
|
||||
txManager: txManager,
|
||||
consts: *scConsts,
|
||||
statsVarsCh: make(chan statsVars, queueLen),
|
||||
@@ -122,14 +140,54 @@ func (p *Pipeline) reset(batchNum common.BatchNum,
|
||||
p.stats = *stats
|
||||
p.vars = *vars
|
||||
|
||||
err := p.txSelector.Reset(p.state.batchNum)
|
||||
// Reset the StateDB in TxSelector and BatchBuilder from the
|
||||
// synchronizer only if the checkpoint we reset from either:
|
||||
// a. Doesn't exist in the TxSelector/BatchBuilder
|
||||
// b. The batch has already been synced by the synchronizer and has a
|
||||
// different MTRoot than the BatchBuilder
|
||||
// Otherwise, reset from the local checkpoint.
|
||||
|
||||
// First attempt to reset from local checkpoint if such checkpoint exists
|
||||
existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum)
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
err = p.batchBuilder.Reset(p.state.batchNum, true)
|
||||
fromSynchronizerTxSelector := !existsTxSelector
|
||||
if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum)
|
||||
if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
fromSynchronizerBatchBuilder := !existsBatchBuilder
|
||||
if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
// After reset, check that if the batch exists in the historyDB, the
|
||||
// stateRoot matches with the local one, if not, force a reset from
|
||||
// synchronizer
|
||||
batch, err := p.historyDB.GetBatch(p.state.batchNum)
|
||||
if tracerr.Unwrap(err) == sql.ErrNoRows {
|
||||
// nothing to do
|
||||
} else if err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
} else {
|
||||
localStateRoot := p.batchBuilder.LocalStateDB().MT.Root().BigInt()
|
||||
if batch.StateRoot.Cmp(localStateRoot) != 0 {
|
||||
log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+
|
||||
"Forcing reset from Synchronizer", localStateRoot, batch.StateRoot)
|
||||
// StateRoot from synchronizer doesn't match StateRoot
|
||||
// from batchBuilder, force a reset from synchronizer
|
||||
if err := p.txSelector.Reset(p.state.batchNum, true); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil {
|
||||
return tracerr.Wrap(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -203,14 +261,31 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
|
||||
p.stats = statsVars.Stats
|
||||
p.syncSCVars(statsVars.Vars)
|
||||
case <-time.After(waitDuration):
|
||||
// Once errAtBatchNum != 0, we stop forging
|
||||
// batches because there's been an error and we
|
||||
// wait for the pipeline to be stopped.
|
||||
if p.getErrAtBatchNum() != 0 {
|
||||
waitDuration = p.cfg.ForgeRetryInterval
|
||||
continue
|
||||
}
|
||||
batchNum = p.state.batchNum + 1
|
||||
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
|
||||
if p.ctx.Err() != nil {
|
||||
continue
|
||||
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
|
||||
waitDuration = p.cfg.ForgeRetryInterval
|
||||
continue
|
||||
} else if err != nil {
|
||||
waitDuration = p.cfg.SyncRetryInterval
|
||||
p.setErrAtBatchNum(batchNum)
|
||||
waitDuration = p.cfg.ForgeRetryInterval
|
||||
p.coord.SendMsg(p.ctx, MsgStopPipeline{
|
||||
Reason: fmt.Sprintf(
|
||||
"Pipeline.handleForgBatch: %v", err),
|
||||
FailedBatchNum: batchNum,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
p.state.batchNum = batchNum
|
||||
select {
|
||||
case batchChSentServerProof <- batchInfo:
|
||||
@@ -229,16 +304,28 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
|
||||
p.wg.Done()
|
||||
return
|
||||
case batchInfo := <-batchChSentServerProof:
|
||||
// Once errAtBatchNum != 0, we stop forging
|
||||
// batches because there's been an error and we
|
||||
// wait for the pipeline to be stopped.
|
||||
if p.getErrAtBatchNum() != 0 {
|
||||
continue
|
||||
}
|
||||
err := p.waitServerProof(p.ctx, batchInfo)
|
||||
// We are done with this serverProof, add it back to the pool
|
||||
p.proversPool.Add(p.ctx, batchInfo.ServerProof)
|
||||
batchInfo.ServerProof = nil
|
||||
if p.ctx.Err() != nil {
|
||||
continue
|
||||
} else if err != nil {
|
||||
log.Errorw("waitServerProof", "err", err)
|
||||
p.setErrAtBatchNum(batchInfo.BatchNum)
|
||||
p.coord.SendMsg(p.ctx, MsgStopPipeline{
|
||||
Reason: fmt.Sprintf(
|
||||
"Pipeline.waitServerProof: %v", err),
|
||||
FailedBatchNum: batchInfo.BatchNum,
|
||||
})
|
||||
continue
|
||||
}
|
||||
// We are done with this serverProof, add it back to the pool
|
||||
p.proversPool.Add(p.ctx, batchInfo.ServerProof)
|
||||
p.txManager.AddBatch(p.ctx, batchInfo)
|
||||
}
|
||||
}
|
||||
@@ -304,17 +391,14 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
|
||||
var auths [][]byte
|
||||
var coordIdxs []common.Idx
|
||||
|
||||
// TODO: If there are no txs and we are behind the timeout, skip
|
||||
// forging a batch and return a particular error that can be handleded
|
||||
// in the loop where handleForgeBatch is called to retry after an
|
||||
// interval
|
||||
|
||||
// 1. Decide if we forge L2Tx or L1+L2Tx
|
||||
if p.shouldL1L2Batch(batchInfo) {
|
||||
batchInfo.L1Batch = true
|
||||
defer func() {
|
||||
// If there's no error, update the parameters related
|
||||
// to the last L1Batch forged
|
||||
if err == nil {
|
||||
p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
|
||||
p.state.lastForgeL1TxsNum++
|
||||
}
|
||||
}()
|
||||
if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
|
||||
return nil, tracerr.Wrap(errLastL1BatchNotSynced)
|
||||
}
|
||||
@@ -328,6 +412,9 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
|
||||
if err != nil {
|
||||
return nil, tracerr.Wrap(err)
|
||||
}
|
||||
|
||||
p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
|
||||
p.state.lastForgeL1TxsNum++
|
||||
} else {
|
||||
// 2b: only L2 txs
|
||||
coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
|
||||
|
||||
@@ -183,7 +183,8 @@ func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error {
|
||||
func addPerc(v *big.Int, p int64) *big.Int {
|
||||
r := new(big.Int).Set(v)
|
||||
r.Mul(r, big.NewInt(p))
|
||||
r.Div(r, big.NewInt(100))
|
||||
// nolint reason: to calculate percetnages we divide by 100
|
||||
r.Div(r, big.NewInt(100)) //nolit:gomnd
|
||||
return r.Add(v, r)
|
||||
}
|
||||
|
||||
@@ -352,12 +353,14 @@ func (t *TxManager) handleReceipt(ctx context.Context, batchInfo *BatchInfo) (*i
|
||||
// TODO:
|
||||
// - After sending a message: CancelPipeline, stop all consecutive pending Batches (transactions)
|
||||
|
||||
// Queue of BatchInfos
|
||||
type Queue struct {
|
||||
list []*BatchInfo
|
||||
// nonceByBatchNum map[common.BatchNum]uint64
|
||||
next int
|
||||
}
|
||||
|
||||
// NewQueue returns a new queue
|
||||
func NewQueue() Queue {
|
||||
return Queue{
|
||||
list: make([]*BatchInfo, 0),
|
||||
@@ -366,10 +369,12 @@ func NewQueue() Queue {
|
||||
}
|
||||
}
|
||||
|
||||
// Len is the length of the queue
|
||||
func (q *Queue) Len() int {
|
||||
return len(q.list)
|
||||
}
|
||||
|
||||
// At returns the BatchInfo at position (or nil if position is out of bounds)
|
||||
func (q *Queue) At(position int) *BatchInfo {
|
||||
if position >= len(q.list) {
|
||||
return nil
|
||||
@@ -377,6 +382,7 @@ func (q *Queue) At(position int) *BatchInfo {
|
||||
return q.list[position]
|
||||
}
|
||||
|
||||
// Next returns the next BatchInfo (or nil if queue is empty)
|
||||
func (q *Queue) Next() (int, *BatchInfo) {
|
||||
if len(q.list) == 0 {
|
||||
return 0, nil
|
||||
@@ -385,6 +391,7 @@ func (q *Queue) Next() (int, *BatchInfo) {
|
||||
return q.next, q.list[q.next]
|
||||
}
|
||||
|
||||
// Remove removes the BatchInfo at position
|
||||
func (q *Queue) Remove(position int) {
|
||||
// batchInfo := q.list[position]
|
||||
// delete(q.nonceByBatchNum, batchInfo.BatchNum)
|
||||
@@ -396,6 +403,7 @@ func (q *Queue) Remove(position int) {
|
||||
}
|
||||
}
|
||||
|
||||
// Push adds a new BatchInfo
|
||||
func (q *Queue) Push(batchInfo *BatchInfo) {
|
||||
q.list = append(q.list, batchInfo)
|
||||
// q.nonceByBatchNum[batchInfo.BatchNum] = batchInfo.EthTx.Nonce()
|
||||
@@ -517,8 +525,8 @@ func (t *TxManager) Run(ctx context.Context) {
|
||||
Reason: fmt.Sprintf("forgeBatch resend: %v", err)})
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
|
||||
log.Debugw("TxManager: forgeBatch tx confirmed",
|
||||
"tx", batchInfo.EthTx.Hash(), "batch", batchInfo.BatchNum)
|
||||
|
||||
Reference in New Issue
Block a user