Browse Source

Merge pull request #474 from hermeznetwork/fix/coordinatorl1batch

Fix forging L1Batch too early
arnau 4 years ago
committed by GitHub
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 958 additions and 864 deletions
  1. +1
  2. +3
  3. +1
  4. +19
  5. +0
  6. +428
  7. +297
  8. +207
  9. +1
  10. +1

+ 1
- 0

@ -46,6 +46,7 @@ ForgerAddress = "0xb4124ceb3451635dacedd11767f004d8a28c6ee7" # Boot Coordinator
ConfirmBlocks = 10
L1BatchTimeoutPerc = 0.4
ProofServerPollInterval = "1s"
ForgeRetryInterval = "500ms"
SyncRetryInterval = "1s"

+ 3
- 0

@ -54,6 +54,9 @@ type Coordinator struct {
// ProofServerPollInterval is the waiting interval between polling the
// ProofServer while waiting for a particular status
ProofServerPollInterval Duration `validate:"required"`
// ForgeRetryInterval is the waiting interval between calls forge a
// batch after an error
ForgeRetryInterval Duration `validate:"required"`
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval Duration `validate:"required"`

+ 1
- 1

@ -98,7 +98,7 @@ func (b *BatchInfo) DebugStore(storePath string) error {
// nolint reason: hardcoded 1_000_000 is the number of nanoseconds in a
// millisecond
filename := fmt.Sprintf("%08d-%v.%v.json", b.BatchNum,
filename := fmt.Sprintf("%08d-%v.%03d.json", b.BatchNum,
b.Debug.StartTimestamp.Unix(), b.Debug.StartTimestamp.Nanosecond()/1_000_000)
// nolint reason: 0640 allows rw to owner and r to group

+ 19
- 586

@ -3,14 +3,12 @@ package coordinator
import (
ethCommon ""
@ -24,7 +22,15 @@ import (
const queueLen = 16
var (
errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet")
const (
queueLen = 16
longWaitDuration = 999 * time.Hour
zeroDuration = 0 * time.Second
// Config contains the Coordinator configuration
type Config struct {
@ -39,6 +45,9 @@ type Config struct {
// EthClientAttempts is the number of attempts to do an eth client RPC
// call before giving up
EthClientAttempts int
// ForgeRetryInterval is the waiting interval between calls forge a
// batch after an error
ForgeRetryInterval time.Duration
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time.Duration
@ -225,8 +234,7 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
if c.pipeline, err = c.newPipeline(ctx); err != nil {
return tracerr.Wrap(err)
if err := c.pipeline.Start(batchNum, stats.Sync.LastForgeL1TxsNum,
stats, &c.vars); err != nil {
if err := c.pipeline.Start(batchNum, stats, &c.vars); err != nil {
c.pipeline = nil
return tracerr.Wrap(err)
@ -348,7 +356,7 @@ func (c *Coordinator) Start() {
go func() {
waitDuration := time.Duration(longWaitDuration)
waitDuration := longWaitDuration
for {
select {
case <-c.ctx.Done():
@ -360,23 +368,23 @@ func (c *Coordinator) Start() {
} else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err)
waitDuration = time.Duration(c.cfg.SyncRetryInterval)
waitDuration = c.cfg.SyncRetryInterval
waitDuration = time.Duration(longWaitDuration)
waitDuration = longWaitDuration
case <-time.After(waitDuration):
if c.stats == nil {
waitDuration = time.Duration(longWaitDuration)
waitDuration = longWaitDuration
if err := c.syncStats(c.ctx, c.stats); c.ctx.Err() != nil {
} else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err)
waitDuration = time.Duration(c.cfg.SyncRetryInterval)
waitDuration = c.cfg.SyncRetryInterval
waitDuration = time.Duration(longWaitDuration)
waitDuration = longWaitDuration
@ -400,578 +408,3 @@ func (c *Coordinator) Stop() {
c.pipeline = nil
// TxManager handles everything related to ethereum transactions: It makes the
// call to forge, waits for transaction confirmation, and keeps checking them
// until a number of confirmed blocks have passed.
type TxManager struct {
cfg Config
ethClient eth.ClientInterface
l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
coord *Coordinator // Used only to send messages to stop the pipeline
batchCh chan *BatchInfo
lastBlockCh chan int64
queue []*BatchInfo
lastBlock int64
// lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed
lastConfirmedBatch common.BatchNum
// NewTxManager creates a new TxManager
func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
coord *Coordinator) *TxManager {
return &TxManager{
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
lastBlockCh: make(chan int64, queueLen),
lastBlock: -1,
// AddBatch is a thread safe method to pass a new batch TxManager to be sent to
// the smart contract via the forge call
func (t *TxManager) AddBatch(batchInfo *BatchInfo) {
t.batchCh <- batchInfo
// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager
func (t *TxManager) SetLastBlock(lastBlock int64) {
t.lastBlockCh <- lastBlock
func (t *TxManager) rollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.lastBlock + 1
batchInfo.Debug.SendTimestamp = time.Now()
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
var ethTx *types.Transaction
var err error
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs)
if err != nil {
if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err,
"block", t.lastBlock+1)
return tracerr.Wrap(err)
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.lastBlock+1,
"batchNum", batchInfo.BatchNum)
} else {
select {
case <-ctx.Done():
return tracerr.Wrap(common.ErrDone)
case <-time.After(t.cfg.EthClientAttemptsDelay):
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
batchInfo.EthTx = ethTx
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
return tracerr.Wrap(err)
return nil
func (t *TxManager) ethTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
txHash := batchInfo.EthTx.Hash()
var receipt *types.Receipt
var err error
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
if ctx.Err() != nil {
if err != nil {
log.Errorw("TxManager ethClient.EthTransactionReceipt",
"attempt", attempt, "err", err)
} else {
select {
case <-ctx.Done():
return tracerr.Wrap(common.ErrDone)
case <-time.After(t.cfg.EthClientAttemptsDelay):
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
batchInfo.Receipt = receipt
return nil
func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo.Receipt
if receipt != nil {
if receipt.Status == types.ReceiptStatusFailed {
batchInfo.Debug.Status = StatusFailed
log.Errorw("TxManager receipt status is failed", "receipt", receipt)
return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed"))
} else if receipt.Status == types.ReceiptStatusSuccessful {
batchInfo.Debug.Status = StatusMined
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
if batchInfo.BatchNum > t.lastConfirmedBatch {
t.lastConfirmedBatch = batchInfo.BatchNum
confirm := t.lastBlock - receipt.BlockNumber.Int64()
return &confirm, nil
return nil, nil
const longWaitDuration = 999 * time.Hour
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
next := 0
waitDuration := time.Duration(longWaitDuration)
for {
select {
case <-ctx.Done():
log.Info("TxManager done")
case lastBlock := <-t.lastBlockCh:
t.lastBlock = lastBlock
case batchInfo := <-t.batchCh:
if err := t.rollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
} else if err != nil {
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)})
log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum)
t.queue = append(t.queue, batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
current := next
next = (current + 1) % len(t.queue)
batchInfo := t.queue[current]
if err := t.ethTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
} else if err != nil { //nolint:staticcheck
// We can't get the receipt for the
// transaction, so we can't confirm if it was
// mined
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
confirm, err := t.handleReceipt(batchInfo)
if err != nil { //nolint:staticcheck
// Transaction was rejected
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager tx for RollupForgeBatch confirmed",
"batch", batchInfo.BatchNum)
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
waitDuration = longWaitDuration
next = 0
} else {
next = current % len(t.queue)
type statsVars struct {
Stats synchronizer.Stats
Vars synchronizer.SCVariablesPtr
// Pipeline manages the forging of batches with parallel server proofs
type Pipeline struct {
cfg Config
consts synchronizer.SCConsts
// state
batchNum common.BatchNum
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
started bool
proversPool *ProversPool
provers []prover.Client
txManager *TxManager
historyDB *historydb.HistoryDB
l2DB *l2db.L2DB
txSelector *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
purger *Purger
stats synchronizer.Stats
vars synchronizer.SCVariables
statsVarsCh chan statsVars
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
// NewPipeline creates a new Pipeline
func NewPipeline(ctx context.Context,
cfg Config,
historyDB *historydb.HistoryDB,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
batchBuilder *batchbuilder.BatchBuilder,
purger *Purger,
txManager *TxManager,
provers []prover.Client,
scConsts *synchronizer.SCConsts,
) (*Pipeline, error) {
proversPool := NewProversPool(len(provers))
proversPoolSize := 0
for _, prover := range provers {
if err := prover.WaitReady(ctx); err != nil {
log.Errorw("prover.WaitReady", "err", err)
} else {
if proversPoolSize == 0 {
return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
return &Pipeline{
cfg: cfg,
historyDB: historyDB,
l2DB: l2DB,
txSelector: txSelector,
batchBuilder: batchBuilder,
provers: provers,
proversPool: proversPool,
purger: purger,
txManager: txManager,
consts: *scConsts,
statsVarsCh: make(chan statsVars, queueLen),
}, nil
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}
// reset pipeline state
func (p *Pipeline) reset(batchNum common.BatchNum, lastForgeL1TxsNum int64,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
p.batchNum = batchNum
p.lastForgeL1TxsNum = lastForgeL1TxsNum
p.stats = *stats
p.vars = *vars
p.lastScheduledL1BatchBlockNum = 0
err := p.txSelector.Reset(p.batchNum)
if err != nil {
return tracerr.Wrap(err)
err = p.batchBuilder.Reset(p.batchNum, true)
if err != nil {
return tracerr.Wrap(err)
return nil
func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
p.vars.Rollup = *vars.Rollup
if vars.Auction != nil {
p.vars.Auction = *vars.Auction
if vars.WDelayer != nil {
p.vars.WDelayer = *vars.WDelayer
// Start the forging pipeline
func (p *Pipeline) Start(batchNum common.BatchNum, lastForgeL1TxsNum int64,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
if p.started {
log.Fatal("Pipeline already started")
p.started = true
if err := p.reset(batchNum, lastForgeL1TxsNum, stats, vars); err != nil {
return tracerr.Wrap(err)
p.ctx, p.cancel = context.WithCancel(context.Background())
queueSize := 1
batchChSentServerProof := make(chan *BatchInfo, queueSize)
go func() {
for {
select {
case <-p.ctx.Done():
log.Info("Pipeline forgeBatch loop done")
case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats
batchNum = p.batchNum + 1
batchInfo, err := p.forgeBatch(batchNum)
if p.ctx.Err() != nil {
} else if err != nil {
log.Errorw("forgeBatch", "err", err)
// 6. Wait for an available server proof (blocking call)
serverProof, err := p.proversPool.Get(p.ctx)
if p.ctx.Err() != nil {
} else if err != nil {
log.Errorw("proversPool.Get", "err", err)
batchInfo.ServerProof = serverProof
if err := p.sendServerProof(p.ctx, batchInfo); p.ctx.Err() != nil {
} else if err != nil {
log.Errorw("sendServerProof", "err", err)
batchInfo.ServerProof = nil
p.batchNum = batchNum
batchChSentServerProof <- batchInfo
go func() {
for {
select {
case <-p.ctx.Done():
log.Info("Pipeline waitServerProofSendEth loop done")
case batchInfo := <-batchChSentServerProof:
err := p.waitServerProof(p.ctx, batchInfo)
// We are done with this serverProof, add it back to the pool
batchInfo.ServerProof = nil
if p.ctx.Err() != nil {
if err != nil {
log.Errorw("waitServerProof", "err", err)
return nil
// Stop the forging pipeline
func (p *Pipeline) Stop(ctx context.Context) {
if !p.started {
log.Fatal("Pipeline already stopped")
p.started = false
log.Info("Stopping Pipeline...")
for _, prover := range p.provers {
if err := prover.Cancel(ctx); ctx.Err() != nil {
} else if err != nil {
log.Errorw("prover.Cancel", "err", err)
// sendServerProof sends the circuit inputs to the proof server
func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
// 7. Call the selected idle server proof with BatchBuilder output,
// save server proof info for batchNum
if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
return tracerr.Wrap(err)
return nil
// forgeBatch the next batch.
func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (*BatchInfo, error) {
// remove transactions from the pool that have been there for too long
_, err := p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
_, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
batchInfo := BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
batchInfo.Debug.StartTimestamp = time.Now()
batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
selectionCfg := &txselector.SelectionConfig{
MaxL1UserTxs: common.RollupConstMaxL1UserTx,
TxProcessorConfig: p.cfg.TxProcessorConfig,
var poolL2Txs []common.PoolL2Tx
// var feesInfo
var l1UserTxsExtra, l1CoordTxs []common.L1Tx
var auths [][]byte
var coordIdxs []common.Idx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p.shouldL1L2Batch(&batchInfo) {
batchInfo.L1Batch = true
p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
// 2a: L1+L2 txs
l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum)
if err != nil {
return nil, tracerr.Wrap(err)
coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err =
p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
if err != nil {
return nil, tracerr.Wrap(err)
} else {
// 2b: only L2 txs
coordIdxs, auths, l1CoordTxs, poolL2Txs, err =
if err != nil {
return nil, tracerr.Wrap(err)
l1UserTxsExtra = nil
// 3. Save metadata from TxSelector output for BatchNum
batchInfo.L1UserTxsExtra = l1UserTxsExtra
batchInfo.L1CoordTxs = l1CoordTxs
batchInfo.L1CoordinatorTxsAuths = auths
batchInfo.CoordIdxs = coordIdxs
batchInfo.VerifierIdx = p.cfg.VerifierIdx
if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
return nil, tracerr.Wrap(err)
// Invalidate transactions that become invalid beause of
// the poolL2Txs selected. Will mark as invalid the txs that have a
// (fromIdx, nonce) which already appears in the selected txs (includes
// all the nonces smaller than the current one)
err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
if err != nil {
return nil, tracerr.Wrap(err)
// 4. Call BatchBuilder with TxSelector output
configBatch := &batchbuilder.ConfigBatch{
ForgerAddress: p.cfg.ForgerAddress,
TxProcessorConfig: p.cfg.TxProcessorConfig,
zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
l1CoordTxs, poolL2Txs, nil)
if err != nil {
return nil, tracerr.Wrap(err)
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
if err != nil {
return nil, tracerr.Wrap(err)
batchInfo.L2Txs = l2Txs
// 5. Save metadata from BatchBuilder output for BatchNum
batchInfo.ZKInputs = zkInputs
batchInfo.Debug.Status = StatusForged
return &batchInfo, nil
// waitServerProof gets the generated zkProof & sends it to the SmartContract
func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
if err != nil {
return tracerr.Wrap(err)
batchInfo.Proof = proof
batchInfo.PublicInputs = pubInputs
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
batchInfo.Debug.Status = StatusProof
return nil
func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
// Take the lastL1BatchBlockNum as the biggest between the last
// scheduled one, and the synchronized one.
lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
// Set Debug information
batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
batchInfo.Debug.L1BatchBlockScheduleDeadline =
int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
// Return true if we have passed the l1BatchTimeoutPerc portion of the
// range before the l1batch timeout.
return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
proof := batchInfo.Proof
zki := batchInfo.ZKInputs
return &eth.RollupForgeBatchArgs{
NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
L1UserTxs: batchInfo.L1UserTxsExtra,
L1CoordinatorTxs: batchInfo.L1CoordTxs,
L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
L2TxsData: batchInfo.L2Txs,
FeeIdxCoordinator: batchInfo.CoordIdxs,
// Circuit selector
VerifierIdx: batchInfo.VerifierIdx,
L1Batch: batchInfo.L1Batch,
ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
ProofB: [2][2]*big.Int{
{proof.PiB[0][0], proof.PiB[0][1]},
{proof.PiB[1][0], proof.PiB[1][1]},
ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},

+ 0
- 277

@ -10,10 +10,7 @@ import (
ethKeystore ""
ethCommon ""
dbUtils ""
@ -25,12 +22,10 @@ import (
@ -434,74 +429,6 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) {
assert.Nil(t, coord.pipeline)
func TestPipelineShouldL1L2Batch(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
var stats synchronizer.Stats
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
pipeline, err := coord.newPipeline(ctx)
require.NoError(t, err)
pipeline.vars = coord.vars
// Check that the parameters are the ones we expect and use in this test
require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc)
require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout)
l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc
l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout
startBlock := int64(100)
// Empty batchInfo to pass to shouldL1L2Batch() which sets debug information
batchInfo := BatchInfo{}
// No scheduled L1Batch
// Last L1Batch was a long time ago
stats.Eth.LastBlock.Num = startBlock
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.LastL1BatchBlock = 0
pipeline.stats = stats
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo))
stats.Sync.LastL1BatchBlock = startBlock
// We are are one block before the timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo))
// We are are at timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc)
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo))
// Scheduled L1Batch
pipeline.lastScheduledL1BatchBlockNum = startBlock
stats.Sync.LastL1BatchBlock = startBlock - 10
// We are are one block before the timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo))
// We are are at timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc)
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo))
// ethAddTokens adds the tokens from the blocks to the blockchain
func ethAddTokens(blocks []common.BlockData, client *test.Client) {
for _, block := range blocks {
@ -517,138 +444,6 @@ func ethAddTokens(blocks []common.BlockData, client *test.Client) {
const testTokensLen = 3
const testUsersLen = 4
func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer,
historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context {
// Create a set with `testTokensLen` tokens and for each token
// `testUsersLen` accounts.
var set []til.Instruction
// set = append(set, til.Instruction{Typ: "Blockchain"})
for tokenID := 1; tokenID < testTokensLen; tokenID++ {
set = append(set, til.Instruction{
Typ: til.TypeAddToken,
TokenID: common.TokenID(tokenID),
depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10)
require.True(t, ok)
for tokenID := 0; tokenID < testTokensLen; tokenID++ {
for user := 0; user < testUsersLen; user++ {
set = append(set, til.Instruction{
Typ: common.TxTypeCreateAccountDeposit,
TokenID: common.TokenID(tokenID),
DepositAmount: depositAmount,
From: fmt.Sprintf("User%d", user),
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1})
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1})
set = append(set, til.Instruction{Typ: til.TypeNewBlock})
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocksFromInstructions(set)
require.NoError(t, err)
require.NotNil(t, blocks)
ethAddTokens(blocks, ethClient)
err = ethClient.CtlAddBlocks(blocks)
require.NoError(t, err)
ctx := context.Background()
for {
syncBlock, discards, err := sync.Sync2(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
dbTokens, err := historyDB.GetAllTokens()
require.Nil(t, err)
require.Equal(t, testTokensLen, len(dbTokens))
dbAccounts, err := historyDB.GetAllAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts))
sdbAccounts, err := stateDB.GetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))
return tc
func TestPipeline1(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules)
// preload the synchronier (via the test ethClient) some tokens and
// users with positive balances
tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB)
syncStats := sync.Stats()
batchNum := common.BatchNum(syncStats.Sync.LastBatch)
syncSCVars := sync.SCVars()
pipeline, err := coord.newPipeline(ctx)
require.NoError(t, err)
// Insert some l2txs in the Pool
setPool := `
Type: PoolL2
PoolTransfer(0) User0-User1: 100 (126)
PoolTransfer(0) User1-User2: 200 (126)
PoolTransfer(0) User2-User3: 300 (126)
l2txs, err := tilCtx.GeneratePoolL2Txs(setPool)
require.NoError(t, err)
for _, tx := range l2txs {
err := modules.l2DB.AddTxTest(&tx) //nolint:gosec
require.NoError(t, err)
err = pipeline.reset(batchNum, syncStats.Sync.LastForgeL1TxsNum, syncStats, &synchronizer.SCVariables{
Rollup: *syncSCVars.Rollup,
Auction: *syncSCVars.Auction,
WDelayer: *syncSCVars.WDelayer,
require.NoError(t, err)
// Sanity check
sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))
// Sanity check
sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))
// Sanity check
require.Equal(t, modules.stateDB.MT.Root(),
batchInfo, err := pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 3, len(batchInfo.L2Txs))
batchInfo, err = pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 0, len(batchInfo.L2Txs))
func TestCoordinatorStress(t *testing.T) {
if os.Getenv("TEST_COORD_STRESS") == "" {
@ -714,79 +509,7 @@ func TestCoordinatorStress(t *testing.T) {
func TestRollupForgeBatch(t *testing.T) {
if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" {
const web3URL = "http://localhost:8545"
const password = "test"
addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7")
sk, err := crypto.HexToECDSA(
require.NoError(t, err)
rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0")
pathKeystore, err := ioutil.TempDir("", "tmpKeystore")
require.NoError(t, err)
deleteme = append(deleteme, pathKeystore)
ctx := context.Background()
batchInfo := &BatchInfo{}
proofClient := &prover.MockClient{}
chainID := uint16(0)
ethClient, err := ethclient.Dial(web3URL)
require.NoError(t, err)
ethCfg := eth.EthereumConfig{
CallGasLimit: 300000,
GasPriceDiv: 100,
scryptN := ethKeystore.LightScryptN
scryptP := ethKeystore.LightScryptP
keyStore := ethKeystore.NewKeyStore(pathKeystore,
scryptN, scryptP)
account, err := keyStore.ImportECDSA(sk, password)
require.NoError(t, err)
require.Equal(t, account.Address, addr)
err = keyStore.Unlock(account, password)
require.NoError(t, err)
client, err := eth.NewClient(ethClient, &account, keyStore, &eth.ClientConfig{
Ethereum: ethCfg,
Rollup: eth.RollupConfig{
Address: rollupAddr,
Auction: eth.AuctionConfig{
Address: ethCommon.Address{},
TokenHEZ: eth.TokenConfig{
Address: ethCommon.Address{},
Name: "HEZ",
WDelayer: eth.WDelayerConfig{
Address: ethCommon.Address{},
require.NoError(t, err)
zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1))
zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1}
zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2}
batchInfo.ZKInputs = zkInputs
err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs)
require.NoError(t, err)
proof, pubInputs, err := proofClient.GetProof(ctx)
require.NoError(t, err)
batchInfo.Proof = proof
batchInfo.PublicInputs = pubInputs
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
_, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs)
require.NoError(t, err)
batchInfo.Proof = proof
// TODO: Test Reorg
// TODO: Test Pipeline
// TODO: Test TxMonitor
// TODO: Test forgeBatch
// TODO: Test waitServerProof

+ 428
- 0

@ -0,0 +1,428 @@
package coordinator
import (
type statsVars struct {
Stats synchronizer.Stats
Vars synchronizer.SCVariablesPtr
// Pipeline manages the forging of batches with parallel server proofs
type Pipeline struct {
cfg Config
consts synchronizer.SCConsts
// state
batchNum common.BatchNum
lastScheduledL1BatchBlockNum int64
lastForgeL1TxsNum int64
started bool
proversPool *ProversPool
provers []prover.Client
txManager *TxManager
historyDB *historydb.HistoryDB
l2DB *l2db.L2DB
txSelector *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
purger *Purger
stats synchronizer.Stats
vars synchronizer.SCVariables
statsVarsCh chan statsVars
ctx context.Context
wg sync.WaitGroup
cancel context.CancelFunc
// NewPipeline creates a new Pipeline
func NewPipeline(ctx context.Context,
cfg Config,
historyDB *historydb.HistoryDB,
l2DB *l2db.L2DB,
txSelector *txselector.TxSelector,
batchBuilder *batchbuilder.BatchBuilder,
purger *Purger,
txManager *TxManager,
provers []prover.Client,
scConsts *synchronizer.SCConsts,
) (*Pipeline, error) {
proversPool := NewProversPool(len(provers))
proversPoolSize := 0
for _, prover := range provers {
if err := prover.WaitReady(ctx); err != nil {
log.Errorw("prover.WaitReady", "err", err)
} else {
if proversPoolSize == 0 {
return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
return &Pipeline{
cfg: cfg,
historyDB: historyDB,
l2DB: l2DB,
txSelector: txSelector,
batchBuilder: batchBuilder,
provers: provers,
proversPool: proversPool,
purger: purger,
txManager: txManager,
consts: *scConsts,
statsVarsCh: make(chan statsVars, queueLen),
}, nil
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}
// reset pipeline state
func (p *Pipeline) reset(batchNum common.BatchNum,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
p.batchNum = batchNum
p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
p.stats = *stats
p.vars = *vars
p.lastScheduledL1BatchBlockNum = 0
err := p.txSelector.Reset(p.batchNum)
if err != nil {
return tracerr.Wrap(err)
err = p.batchBuilder.Reset(p.batchNum, true)
if err != nil {
return tracerr.Wrap(err)
return nil
func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
p.vars.Rollup = *vars.Rollup
if vars.Auction != nil {
p.vars.Auction = *vars.Auction
if vars.WDelayer != nil {
p.vars.WDelayer = *vars.WDelayer
// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
// and then waits for an available proof server and sends the zkInputs to it so
// that the proof computation begins.
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
batchInfo, err := p.forgeBatch(batchNum)
if ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {
if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
"lastForgeL1TxsNum", p.lastForgeL1TxsNum,
"syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
} else {
log.Errorw("forgeBatch", "err", err)
return nil, err
// 6. Wait for an available server proof (blocking call)
serverProof, err := p.proversPool.Get(ctx)
if ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {
log.Errorw("proversPool.Get", "err", err)
return nil, err
batchInfo.ServerProof = serverProof
if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
return nil, ctx.Err()
} else if err != nil {
log.Errorw("sendServerProof", "err", err)
batchInfo.ServerProof = nil
return nil, err
return batchInfo, nil
// Start the forging pipeline
func (p *Pipeline) Start(batchNum common.BatchNum,
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
if p.started {
log.Fatal("Pipeline already started")
p.started = true
if err := p.reset(batchNum, stats, vars); err != nil {
return tracerr.Wrap(err)
p.ctx, p.cancel = context.WithCancel(context.Background())
queueSize := 1
batchChSentServerProof := make(chan *BatchInfo, queueSize)
go func() {
waitDuration := zeroDuration
for {
select {
case <-p.ctx.Done():
log.Info("Pipeline forgeBatch loop done")
case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats
case <-time.After(waitDuration):
batchNum = p.batchNum + 1
if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil {
waitDuration = p.cfg.SyncRetryInterval
} else {
p.batchNum = batchNum
batchChSentServerProof <- batchInfo
go func() {
for {
select {
case <-p.ctx.Done():
log.Info("Pipeline waitServerProofSendEth loop done")
case batchInfo := <-batchChSentServerProof:
err := p.waitServerProof(p.ctx, batchInfo)
// We are done with this serverProof, add it back to the pool
batchInfo.ServerProof = nil
if p.ctx.Err() != nil {
if err != nil {
log.Errorw("waitServerProof", "err", err)
return nil
// Stop the forging pipeline
func (p *Pipeline) Stop(ctx context.Context) {
if !p.started {
log.Fatal("Pipeline already stopped")
p.started = false
log.Info("Stopping Pipeline...")
for _, prover := range p.provers {
if err := prover.Cancel(ctx); ctx.Err() != nil {
} else if err != nil {
log.Errorw("prover.Cancel", "err", err)
// sendServerProof sends the circuit inputs to the proof server
func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
// 7. Call the selected idle server proof with BatchBuilder output,
// save server proof info for batchNum
if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
return tracerr.Wrap(err)
return nil
// forgeBatch forges the batchNum batch.
func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
// remove transactions from the pool that have been there for too long
_, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
_, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
batchInfo.Debug.StartTimestamp = time.Now()
batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
selectionCfg := &txselector.SelectionConfig{
MaxL1UserTxs: common.RollupConstMaxL1UserTx,
TxProcessorConfig: p.cfg.TxProcessorConfig,
var poolL2Txs []common.PoolL2Tx
var l1UserTxsExtra, l1CoordTxs []common.L1Tx
var auths [][]byte
var coordIdxs []common.Idx
// 1. Decide if we forge L2Tx or L1+L2Tx
if p.shouldL1L2Batch(batchInfo) {
batchInfo.L1Batch = true
defer func() {
// If there's no error, update the parameters related
// to the last L1Batch forged
if err == nil {
p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
return nil, tracerr.Wrap(errLastL1BatchNotSynced)
// 2a: L1+L2 txs
l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
if err != nil {
return nil, tracerr.Wrap(err)
coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err =
p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
if err != nil {
return nil, tracerr.Wrap(err)
} else {
// 2b: only L2 txs
coordIdxs, auths, l1CoordTxs, poolL2Txs, err =
if err != nil {
return nil, tracerr.Wrap(err)
l1UserTxsExtra = nil
// 3. Save metadata from TxSelector output for BatchNum
batchInfo.L1UserTxsExtra = l1UserTxsExtra
batchInfo.L1CoordTxs = l1CoordTxs
batchInfo.L1CoordinatorTxsAuths = auths
batchInfo.CoordIdxs = coordIdxs
batchInfo.VerifierIdx = p.cfg.VerifierIdx
if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
return nil, tracerr.Wrap(err)
// Invalidate transactions that become invalid beause of
// the poolL2Txs selected. Will mark as invalid the txs that have a
// (fromIdx, nonce) which already appears in the selected txs (includes
// all the nonces smaller than the current one)
err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
if err != nil {
return nil, tracerr.Wrap(err)
// 4. Call BatchBuilder with TxSelector output
configBatch := &batchbuilder.ConfigBatch{
ForgerAddress: p.cfg.ForgerAddress,
TxProcessorConfig: p.cfg.TxProcessorConfig,
zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
l1CoordTxs, poolL2Txs, nil)
if err != nil {
return nil, tracerr.Wrap(err)
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
if err != nil {
return nil, tracerr.Wrap(err)
batchInfo.L2Txs = l2Txs
// 5. Save metadata from BatchBuilder output for BatchNum
batchInfo.ZKInputs = zkInputs
batchInfo.Debug.Status = StatusForged
return batchInfo, nil
// waitServerProof gets the generated zkProof & sends it to the SmartContract
func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
if err != nil {
return tracerr.Wrap(err)
batchInfo.Proof = proof
batchInfo.PublicInputs = pubInputs
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
batchInfo.Debug.Status = StatusProof
return nil
func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
// Take the lastL1BatchBlockNum as the biggest between the last
// scheduled one, and the synchronized one.
lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
// Set Debug information
batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
batchInfo.Debug.L1BatchBlockScheduleDeadline =
int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
// Return true if we have passed the l1BatchTimeoutPerc portion of the
// range before the l1batch timeout.
return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
proof := batchInfo.Proof
zki := batchInfo.ZKInputs
return &eth.RollupForgeBatchArgs{
NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
L1UserTxs: batchInfo.L1UserTxsExtra,
L1CoordinatorTxs: batchInfo.L1CoordTxs,
L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
L2TxsData: batchInfo.L2Txs,
FeeIdxCoordinator: batchInfo.CoordIdxs,
// Circuit selector
VerifierIdx: batchInfo.VerifierIdx,
L1Batch: batchInfo.L1Batch,
ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
ProofB: [2][2]*big.Int{
{proof.PiB[0][0], proof.PiB[0][1]},
{proof.PiB[1][0], proof.PiB[1][1]},
ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},

+ 297
- 0

@ -0,0 +1,297 @@
package coordinator
import (
ethKeystore ""
ethCommon ""
func TestPipelineShouldL1L2Batch(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
var stats synchronizer.Stats
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
pipeline, err := coord.newPipeline(ctx)
require.NoError(t, err)
pipeline.vars = coord.vars
// Check that the parameters are the ones we expect and use in this test
require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc)
require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout)
l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc
l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout
startBlock := int64(100)
// Empty batchInfo to pass to shouldL1L2Batch() which sets debug information
batchInfo := BatchInfo{}
// No scheduled L1Batch
// Last L1Batch was a long time ago
stats.Eth.LastBlock.Num = startBlock
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.LastL1BatchBlock = 0
pipeline.stats = stats
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo))
stats.Sync.LastL1BatchBlock = startBlock
// We are are one block before the timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo))
// We are are at timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc)
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo))
// Scheduled L1Batch
pipeline.lastScheduledL1BatchBlockNum = startBlock
stats.Sync.LastL1BatchBlock = startBlock - 10
// We are are one block before the timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo))
// We are are at timeout range * 0.5
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc)
stats.Sync.LastBlock = stats.Eth.LastBlock
pipeline.stats = stats
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo))
const testTokensLen = 3
const testUsersLen = 4
func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer,
historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context {
// Create a set with `testTokensLen` tokens and for each token
// `testUsersLen` accounts.
var set []til.Instruction
// set = append(set, til.Instruction{Typ: "Blockchain"})
for tokenID := 1; tokenID < testTokensLen; tokenID++ {
set = append(set, til.Instruction{
Typ: til.TypeAddToken,
TokenID: common.TokenID(tokenID),
depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10)
require.True(t, ok)
for tokenID := 0; tokenID < testTokensLen; tokenID++ {
for user := 0; user < testUsersLen; user++ {
set = append(set, til.Instruction{
Typ: common.TxTypeCreateAccountDeposit,
TokenID: common.TokenID(tokenID),
DepositAmount: depositAmount,
From: fmt.Sprintf("User%d", user),
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1})
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1})
set = append(set, til.Instruction{Typ: til.TypeNewBlock})
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocksFromInstructions(set)
require.NoError(t, err)
require.NotNil(t, blocks)
ethAddTokens(blocks, ethClient)
err = ethClient.CtlAddBlocks(blocks)
require.NoError(t, err)
ctx := context.Background()
for {
syncBlock, discards, err := sync.Sync2(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
dbTokens, err := historyDB.GetAllTokens()
require.Nil(t, err)
require.Equal(t, testTokensLen, len(dbTokens))
dbAccounts, err := historyDB.GetAllAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts))
sdbAccounts, err := stateDB.GetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))
return tc
func TestPipelineForgeBatchWithTxs(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules)
// preload the synchronier (via the test ethClient) some tokens and
// users with positive balances
tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB)
syncStats := sync.Stats()
batchNum := common.BatchNum(syncStats.Sync.LastBatch)
syncSCVars := sync.SCVars()
pipeline, err := coord.newPipeline(ctx)
require.NoError(t, err)
// Insert some l2txs in the Pool
setPool := `
Type: PoolL2
PoolTransfer(0) User0-User1: 100 (126)
PoolTransfer(0) User1-User2: 200 (126)
PoolTransfer(0) User2-User3: 300 (126)
l2txs, err := tilCtx.GeneratePoolL2Txs(setPool)
require.NoError(t, err)
for _, tx := range l2txs {
err := modules.l2DB.AddTxTest(&tx) //nolint:gosec
require.NoError(t, err)
err = pipeline.reset(batchNum, syncStats, &synchronizer.SCVariables{
Rollup: *syncSCVars.Rollup,
Auction: *syncSCVars.Auction,
WDelayer: *syncSCVars.WDelayer,
require.NoError(t, err)
// Sanity check
sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))
// Sanity check
sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts()
require.Nil(t, err)
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts))
// Sanity check
require.Equal(t, modules.stateDB.MT.Root(),
batchInfo, err := pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 3, len(batchInfo.L2Txs))
batchInfo, err = pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 0, len(batchInfo.L2Txs))
func TestEthRollupForgeBatch(t *testing.T) {
if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" {
const web3URL = "http://localhost:8545"
const password = "test"
addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7")
sk, err := crypto.HexToECDSA(
require.NoError(t, err)
rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0")
pathKeystore, err := ioutil.TempDir("", "tmpKeystore")
require.NoError(t, err)
deleteme = append(deleteme, pathKeystore)
ctx := context.Background()
batchInfo := &BatchInfo{}
proofClient := &prover.MockClient{}
chainID := uint16(0)
ethClient, err := ethclient.Dial(web3URL)
require.NoError(t, err)
ethCfg := eth.EthereumConfig{
CallGasLimit: 300000,
GasPriceDiv: 100,
scryptN := ethKeystore.LightScryptN
scryptP := ethKeystore.LightScryptP
keyStore := ethKeystore.NewKeyStore(pathKeystore,
scryptN, scryptP)
account, err := keyStore.ImportECDSA(sk, password)
require.NoError(t, err)
require.Equal(t, account.Address, addr)
err = keyStore.Unlock(account, password)
require.NoError(t, err)
client, err := eth.NewClient(ethClient, &account, keyStore, &eth.ClientConfig{
Ethereum: ethCfg,
Rollup: eth.RollupConfig{
Address: rollupAddr,
Auction: eth.AuctionConfig{
Address: ethCommon.Address{},
TokenHEZ: eth.TokenConfig{
Address: ethCommon.Address{},
Name: "HEZ",
WDelayer: eth.WDelayerConfig{
Address: ethCommon.Address{},
require.NoError(t, err)
zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1))
zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1}
zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2}
batchInfo.ZKInputs = zkInputs
err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs)
require.NoError(t, err)
proof, pubInputs, err := proofClient.GetProof(ctx)
require.NoError(t, err)
batchInfo.Proof = proof
batchInfo.PublicInputs = pubInputs
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
_, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs)
require.NoError(t, err)
batchInfo.Proof = proof

+ 207
- 0

@ -0,0 +1,207 @@
package coordinator
import (
// TxManager handles everything related to ethereum transactions: It makes the
// call to forge, waits for transaction confirmation, and keeps checking them
// until a number of confirmed blocks have passed.
type TxManager struct {
cfg Config
ethClient eth.ClientInterface
l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
coord *Coordinator // Used only to send messages to stop the pipeline
batchCh chan *BatchInfo
lastBlockCh chan int64
queue []*BatchInfo
lastBlock int64
// lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed
lastConfirmedBatch common.BatchNum
// NewTxManager creates a new TxManager
func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB,
coord *Coordinator) *TxManager {
return &TxManager{
cfg: *cfg,
ethClient: ethClient,
l2DB: l2DB,
coord: coord,
batchCh: make(chan *BatchInfo, queueLen),
lastBlockCh: make(chan int64, queueLen),
lastBlock: -1,
// AddBatch is a thread safe method to pass a new batch TxManager to be sent to
// the smart contract via the forge call
func (t *TxManager) AddBatch(batchInfo *BatchInfo) {
t.batchCh <- batchInfo
// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager
func (t *TxManager) SetLastBlock(lastBlock int64) {
t.lastBlockCh <- lastBlock
func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error {
batchInfo.Debug.Status = StatusSent
batchInfo.Debug.SendBlockNum = t.lastBlock + 1
batchInfo.Debug.SendTimestamp = time.Now()
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub(
var ethTx *types.Transaction
var err error
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs)
if err != nil {
if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) {
log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err,
"block", t.lastBlock+1)
return tracerr.Wrap(err)
log.Errorw("TxManager ethClient.RollupForgeBatch",
"attempt", attempt, "err", err, "block", t.lastBlock+1,
"batchNum", batchInfo.BatchNum)
} else {
select {
case <-ctx.Done():
return tracerr.Wrap(common.ErrDone)
case <-time.After(t.cfg.EthClientAttemptsDelay):
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err))
batchInfo.EthTx = ethTx
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex())
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil {
return tracerr.Wrap(err)
return nil
func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error {
txHash := batchInfo.EthTx.Hash()
var receipt *types.Receipt
var err error
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ {
receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash)
if ctx.Err() != nil {
if err != nil {
log.Errorw("TxManager ethClient.EthTransactionReceipt",
"attempt", attempt, "err", err)
} else {
select {
case <-ctx.Done():
return tracerr.Wrap(common.ErrDone)
case <-time.After(t.cfg.EthClientAttemptsDelay):
if err != nil {
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err))
batchInfo.Receipt = receipt
return nil
func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
receipt := batchInfo.Receipt
if receipt != nil {
if receipt.Status == types.ReceiptStatusFailed {
batchInfo.Debug.Status = StatusFailed
log.Errorw("TxManager receipt status is failed", "receipt", receipt)
return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed"))
} else if receipt.Status == types.ReceiptStatusSuccessful {
batchInfo.Debug.Status = StatusMined
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64()
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum -
if batchInfo.BatchNum > t.lastConfirmedBatch {
t.lastConfirmedBatch = batchInfo.BatchNum
confirm := t.lastBlock - receipt.BlockNumber.Int64()
return &confirm, nil
return nil, nil
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
next := 0
waitDuration := longWaitDuration
for {
select {
case <-ctx.Done():
log.Info("TxManager done")
case lastBlock := <-t.lastBlockCh:
t.lastBlock = lastBlock
case batchInfo := <-t.batchCh:
if err := t.callRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil {
} else if err != nil {
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)})
log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum)
t.queue = append(t.queue, batchInfo)
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
current := next
next = (current + 1) % len(t.queue)
batchInfo := t.queue[current]
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
} else if err != nil { //nolint:staticcheck
// We can't get the receipt for the
// transaction, so we can't confirm if it was
// mined
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)})
confirm, err := t.handleReceipt(batchInfo)
if err != nil { //nolint:staticcheck
// Transaction was rejected
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)})
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks {
log.Debugw("TxManager tx for RollupForgeBatch confirmed",
"batch", batchInfo.BatchNum)
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
waitDuration = longWaitDuration
next = 0
} else {
next = current % len(t.queue)

+ 1
- 0

@ -261,6 +261,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
ForgerAddress: cfg.Coordinator.ForgerAddress,
ConfirmBlocks: cfg.Coordinator.ConfirmBlocks,
L1BatchTimeoutPerc: cfg.Coordinator.L1BatchTimeoutPerc,
ForgeRetryInterval: cfg.Coordinator.ForgeRetryInterval.Duration,
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
EthClientAttempts: cfg.Coordinator.EthClient.Attempts,
EthClientAttemptsDelay: cfg.Coordinator.EthClient.AttemptsDelay.Duration,

+ 1
- 0

@ -63,6 +63,7 @@ func NewStatsHolder(firstBlockNum int64, refreshPeriod time.Duration) *StatsHold
stats := Stats{}
stats.Eth.RefreshPeriod = refreshPeriod
stats.Eth.FirstBlockNum = firstBlockNum
stats.Sync.LastForgeL1TxsNum = -1
return &StatsHolder{Stats: stats}
