@ -0,0 +1,428 @@ |
|||
package coordinator |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"math/big" |
|||
"sync" |
|||
"time" |
|||
|
|||
"github.com/hermeznetwork/hermez-node/batchbuilder" |
|||
"github.com/hermeznetwork/hermez-node/common" |
|||
"github.com/hermeznetwork/hermez-node/db/historydb" |
|||
"github.com/hermeznetwork/hermez-node/db/l2db" |
|||
"github.com/hermeznetwork/hermez-node/eth" |
|||
"github.com/hermeznetwork/hermez-node/log" |
|||
"github.com/hermeznetwork/hermez-node/prover" |
|||
"github.com/hermeznetwork/hermez-node/synchronizer" |
|||
"github.com/hermeznetwork/hermez-node/txselector" |
|||
"github.com/hermeznetwork/tracerr" |
|||
) |
|||
|
|||
type statsVars struct { |
|||
Stats synchronizer.Stats |
|||
Vars synchronizer.SCVariablesPtr |
|||
} |
|||
|
|||
// Pipeline manages the forging of batches with parallel server proofs
|
|||
type Pipeline struct { |
|||
cfg Config |
|||
consts synchronizer.SCConsts |
|||
|
|||
// state
|
|||
batchNum common.BatchNum |
|||
lastScheduledL1BatchBlockNum int64 |
|||
lastForgeL1TxsNum int64 |
|||
started bool |
|||
|
|||
proversPool *ProversPool |
|||
provers []prover.Client |
|||
txManager *TxManager |
|||
historyDB *historydb.HistoryDB |
|||
l2DB *l2db.L2DB |
|||
txSelector *txselector.TxSelector |
|||
batchBuilder *batchbuilder.BatchBuilder |
|||
purger *Purger |
|||
|
|||
stats synchronizer.Stats |
|||
vars synchronizer.SCVariables |
|||
statsVarsCh chan statsVars |
|||
|
|||
ctx context.Context |
|||
wg sync.WaitGroup |
|||
cancel context.CancelFunc |
|||
} |
|||
|
|||
// NewPipeline creates a new Pipeline
|
|||
func NewPipeline(ctx context.Context, |
|||
cfg Config, |
|||
historyDB *historydb.HistoryDB, |
|||
l2DB *l2db.L2DB, |
|||
txSelector *txselector.TxSelector, |
|||
batchBuilder *batchbuilder.BatchBuilder, |
|||
purger *Purger, |
|||
txManager *TxManager, |
|||
provers []prover.Client, |
|||
scConsts *synchronizer.SCConsts, |
|||
) (*Pipeline, error) { |
|||
proversPool := NewProversPool(len(provers)) |
|||
proversPoolSize := 0 |
|||
for _, prover := range provers { |
|||
if err := prover.WaitReady(ctx); err != nil { |
|||
log.Errorw("prover.WaitReady", "err", err) |
|||
} else { |
|||
proversPool.Add(prover) |
|||
proversPoolSize++ |
|||
} |
|||
} |
|||
if proversPoolSize == 0 { |
|||
return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool")) |
|||
} |
|||
return &Pipeline{ |
|||
cfg: cfg, |
|||
historyDB: historyDB, |
|||
l2DB: l2DB, |
|||
txSelector: txSelector, |
|||
batchBuilder: batchBuilder, |
|||
provers: provers, |
|||
proversPool: proversPool, |
|||
purger: purger, |
|||
txManager: txManager, |
|||
consts: *scConsts, |
|||
statsVarsCh: make(chan statsVars, queueLen), |
|||
}, nil |
|||
} |
|||
|
|||
// SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
|
|||
func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) { |
|||
p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars} |
|||
} |
|||
|
|||
// reset pipeline state
|
|||
func (p *Pipeline) reset(batchNum common.BatchNum, |
|||
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { |
|||
p.batchNum = batchNum |
|||
p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum |
|||
p.stats = *stats |
|||
p.vars = *vars |
|||
p.lastScheduledL1BatchBlockNum = 0 |
|||
|
|||
err := p.txSelector.Reset(p.batchNum) |
|||
if err != nil { |
|||
return tracerr.Wrap(err) |
|||
} |
|||
err = p.batchBuilder.Reset(p.batchNum, true) |
|||
if err != nil { |
|||
return tracerr.Wrap(err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) { |
|||
if vars.Rollup != nil { |
|||
p.vars.Rollup = *vars.Rollup |
|||
} |
|||
if vars.Auction != nil { |
|||
p.vars.Auction = *vars.Auction |
|||
} |
|||
if vars.WDelayer != nil { |
|||
p.vars.WDelayer = *vars.WDelayer |
|||
} |
|||
} |
|||
|
|||
// handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
|
|||
// and then waits for an available proof server and sends the zkInputs to it so
|
|||
// that the proof computation begins.
|
|||
func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) { |
|||
batchInfo, err := p.forgeBatch(batchNum) |
|||
if ctx.Err() != nil { |
|||
return nil, ctx.Err() |
|||
} else if err != nil { |
|||
if tracerr.Unwrap(err) == errLastL1BatchNotSynced { |
|||
log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err, |
|||
"lastForgeL1TxsNum", p.lastForgeL1TxsNum, |
|||
"syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum) |
|||
} else { |
|||
log.Errorw("forgeBatch", "err", err) |
|||
} |
|||
return nil, err |
|||
} |
|||
// 6. Wait for an available server proof (blocking call)
|
|||
serverProof, err := p.proversPool.Get(ctx) |
|||
if ctx.Err() != nil { |
|||
return nil, ctx.Err() |
|||
} else if err != nil { |
|||
log.Errorw("proversPool.Get", "err", err) |
|||
return nil, err |
|||
} |
|||
batchInfo.ServerProof = serverProof |
|||
if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { |
|||
return nil, ctx.Err() |
|||
} else if err != nil { |
|||
log.Errorw("sendServerProof", "err", err) |
|||
batchInfo.ServerProof = nil |
|||
p.proversPool.Add(serverProof) |
|||
return nil, err |
|||
} |
|||
return batchInfo, nil |
|||
} |
|||
|
|||
// Start the forging pipeline
|
|||
func (p *Pipeline) Start(batchNum common.BatchNum, |
|||
stats *synchronizer.Stats, vars *synchronizer.SCVariables) error { |
|||
if p.started { |
|||
log.Fatal("Pipeline already started") |
|||
} |
|||
p.started = true |
|||
|
|||
if err := p.reset(batchNum, stats, vars); err != nil { |
|||
return tracerr.Wrap(err) |
|||
} |
|||
p.ctx, p.cancel = context.WithCancel(context.Background()) |
|||
|
|||
queueSize := 1 |
|||
batchChSentServerProof := make(chan *BatchInfo, queueSize) |
|||
|
|||
p.wg.Add(1) |
|||
go func() { |
|||
waitDuration := zeroDuration |
|||
for { |
|||
select { |
|||
case <-p.ctx.Done(): |
|||
log.Info("Pipeline forgeBatch loop done") |
|||
p.wg.Done() |
|||
return |
|||
case statsVars := <-p.statsVarsCh: |
|||
p.stats = statsVars.Stats |
|||
p.syncSCVars(statsVars.Vars) |
|||
case <-time.After(waitDuration): |
|||
batchNum = p.batchNum + 1 |
|||
if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil { |
|||
waitDuration = p.cfg.SyncRetryInterval |
|||
continue |
|||
} else { |
|||
p.batchNum = batchNum |
|||
batchChSentServerProof <- batchInfo |
|||
} |
|||
} |
|||
} |
|||
}() |
|||
|
|||
p.wg.Add(1) |
|||
go func() { |
|||
for { |
|||
select { |
|||
case <-p.ctx.Done(): |
|||
log.Info("Pipeline waitServerProofSendEth loop done") |
|||
p.wg.Done() |
|||
return |
|||
case batchInfo := <-batchChSentServerProof: |
|||
err := p.waitServerProof(p.ctx, batchInfo) |
|||
// We are done with this serverProof, add it back to the pool
|
|||
p.proversPool.Add(batchInfo.ServerProof) |
|||
batchInfo.ServerProof = nil |
|||
if p.ctx.Err() != nil { |
|||
continue |
|||
} |
|||
if err != nil { |
|||
log.Errorw("waitServerProof", "err", err) |
|||
continue |
|||
} |
|||
p.txManager.AddBatch(batchInfo) |
|||
} |
|||
} |
|||
}() |
|||
return nil |
|||
} |
|||
|
|||
// Stop the forging pipeline
|
|||
func (p *Pipeline) Stop(ctx context.Context) { |
|||
if !p.started { |
|||
log.Fatal("Pipeline already stopped") |
|||
} |
|||
p.started = false |
|||
log.Info("Stopping Pipeline...") |
|||
p.cancel() |
|||
p.wg.Wait() |
|||
for _, prover := range p.provers { |
|||
if err := prover.Cancel(ctx); ctx.Err() != nil { |
|||
continue |
|||
} else if err != nil { |
|||
log.Errorw("prover.Cancel", "err", err) |
|||
} |
|||
} |
|||
} |
|||
|
|||
// sendServerProof sends the circuit inputs to the proof server
|
|||
func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { |
|||
p.cfg.debugBatchStore(batchInfo) |
|||
|
|||
// 7. Call the selected idle server proof with BatchBuilder output,
|
|||
// save server proof info for batchNum
|
|||
if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { |
|||
return tracerr.Wrap(err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
// forgeBatch forges the batchNum batch.
|
|||
func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) { |
|||
// remove transactions from the pool that have been there for too long
|
|||
_, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(), |
|||
p.stats.Sync.LastBlock.Num, int64(batchNum)) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
_, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
|
|||
batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
|
|||
batchInfo.Debug.StartTimestamp = time.Now() |
|||
batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 |
|||
|
|||
selectionCfg := &txselector.SelectionConfig{ |
|||
MaxL1UserTxs: common.RollupConstMaxL1UserTx, |
|||
TxProcessorConfig: p.cfg.TxProcessorConfig, |
|||
} |
|||
|
|||
var poolL2Txs []common.PoolL2Tx |
|||
var l1UserTxsExtra, l1CoordTxs []common.L1Tx |
|||
var auths [][]byte |
|||
var coordIdxs []common.Idx |
|||
|
|||
// 1. Decide if we forge L2Tx or L1+L2Tx
|
|||
if p.shouldL1L2Batch(batchInfo) { |
|||
batchInfo.L1Batch = true |
|||
defer func() { |
|||
// If there's no error, update the parameters related
|
|||
// to the last L1Batch forged
|
|||
if err == nil { |
|||
p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 |
|||
p.lastForgeL1TxsNum++ |
|||
} |
|||
}() |
|||
if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum { |
|||
return nil, tracerr.Wrap(errLastL1BatchNotSynced) |
|||
} |
|||
// 2a: L1+L2 txs
|
|||
l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err = |
|||
p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
} else { |
|||
// 2b: only L2 txs
|
|||
coordIdxs, auths, l1CoordTxs, poolL2Txs, err = |
|||
p.txSelector.GetL2TxSelection(selectionCfg) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
l1UserTxsExtra = nil |
|||
} |
|||
|
|||
// 3. Save metadata from TxSelector output for BatchNum
|
|||
batchInfo.L1UserTxsExtra = l1UserTxsExtra |
|||
batchInfo.L1CoordTxs = l1CoordTxs |
|||
batchInfo.L1CoordinatorTxsAuths = auths |
|||
batchInfo.CoordIdxs = coordIdxs |
|||
batchInfo.VerifierIdx = p.cfg.VerifierIdx |
|||
|
|||
if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
|
|||
// Invalidate transactions that become invalid beause of
|
|||
// the poolL2Txs selected. Will mark as invalid the txs that have a
|
|||
// (fromIdx, nonce) which already appears in the selected txs (includes
|
|||
// all the nonces smaller than the current one)
|
|||
err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
|
|||
// 4. Call BatchBuilder with TxSelector output
|
|||
configBatch := &batchbuilder.ConfigBatch{ |
|||
ForgerAddress: p.cfg.ForgerAddress, |
|||
TxProcessorConfig: p.cfg.TxProcessorConfig, |
|||
} |
|||
zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra, |
|||
l1CoordTxs, poolL2Txs, nil) |
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
|
|||
if err != nil { |
|||
return nil, tracerr.Wrap(err) |
|||
} |
|||
batchInfo.L2Txs = l2Txs |
|||
|
|||
// 5. Save metadata from BatchBuilder output for BatchNum
|
|||
batchInfo.ZKInputs = zkInputs |
|||
batchInfo.Debug.Status = StatusForged |
|||
p.cfg.debugBatchStore(batchInfo) |
|||
|
|||
return batchInfo, nil |
|||
} |
|||
|
|||
// waitServerProof gets the generated zkProof & sends it to the SmartContract
|
|||
func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { |
|||
proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
|
|||
if err != nil { |
|||
return tracerr.Wrap(err) |
|||
} |
|||
batchInfo.Proof = proof |
|||
batchInfo.PublicInputs = pubInputs |
|||
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) |
|||
batchInfo.Debug.Status = StatusProof |
|||
p.cfg.debugBatchStore(batchInfo) |
|||
return nil |
|||
} |
|||
|
|||
func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool { |
|||
// Take the lastL1BatchBlockNum as the biggest between the last
|
|||
// scheduled one, and the synchronized one.
|
|||
lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum |
|||
if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum { |
|||
lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock |
|||
} |
|||
// Set Debug information
|
|||
batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum |
|||
batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock |
|||
batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum |
|||
batchInfo.Debug.L1BatchBlockScheduleDeadline = |
|||
int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc) |
|||
// Return true if we have passed the l1BatchTimeoutPerc portion of the
|
|||
// range before the l1batch timeout.
|
|||
return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >= |
|||
int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc) |
|||
} |
|||
|
|||
func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { |
|||
proof := batchInfo.Proof |
|||
zki := batchInfo.ZKInputs |
|||
return ð.RollupForgeBatchArgs{ |
|||
NewLastIdx: int64(zki.Metadata.NewLastIdxRaw), |
|||
NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(), |
|||
NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(), |
|||
L1UserTxs: batchInfo.L1UserTxsExtra, |
|||
L1CoordinatorTxs: batchInfo.L1CoordTxs, |
|||
L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths, |
|||
L2TxsData: batchInfo.L2Txs, |
|||
FeeIdxCoordinator: batchInfo.CoordIdxs, |
|||
// Circuit selector
|
|||
VerifierIdx: batchInfo.VerifierIdx, |
|||
L1Batch: batchInfo.L1Batch, |
|||
ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]}, |
|||
ProofB: [2][2]*big.Int{ |
|||
{proof.PiB[0][0], proof.PiB[0][1]}, |
|||
{proof.PiB[1][0], proof.PiB[1][1]}, |
|||
}, |
|||
ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]}, |
|||
} |
|||
} |
@ -0,0 +1,297 @@ |
|||
package coordinator |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"io/ioutil" |
|||
"math/big" |
|||
"os" |
|||
"testing" |
|||
|
|||
ethKeystore "github.com/ethereum/go-ethereum/accounts/keystore" |
|||
ethCommon "github.com/ethereum/go-ethereum/common" |
|||
"github.com/ethereum/go-ethereum/crypto" |
|||
"github.com/ethereum/go-ethereum/ethclient" |
|||
"github.com/hermeznetwork/hermez-node/common" |
|||
"github.com/hermeznetwork/hermez-node/db/historydb" |
|||
"github.com/hermeznetwork/hermez-node/db/statedb" |
|||
"github.com/hermeznetwork/hermez-node/eth" |
|||
"github.com/hermeznetwork/hermez-node/prover" |
|||
"github.com/hermeznetwork/hermez-node/synchronizer" |
|||
"github.com/hermeznetwork/hermez-node/test" |
|||
"github.com/hermeznetwork/hermez-node/test/til" |
|||
"github.com/iden3/go-merkletree" |
|||
"github.com/stretchr/testify/assert" |
|||
"github.com/stretchr/testify/require" |
|||
) |
|||
|
|||
func TestPipelineShouldL1L2Batch(t *testing.T) { |
|||
ethClientSetup := test.NewClientSetupExample() |
|||
ethClientSetup.ChainID = big.NewInt(int64(chainID)) |
|||
|
|||
var timer timer |
|||
ctx := context.Background() |
|||
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) |
|||
modules := newTestModules(t) |
|||
var stats synchronizer.Stats |
|||
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) |
|||
pipeline, err := coord.newPipeline(ctx) |
|||
require.NoError(t, err) |
|||
pipeline.vars = coord.vars |
|||
|
|||
// Check that the parameters are the ones we expect and use in this test
|
|||
require.Equal(t, 0.5, pipeline.cfg.L1BatchTimeoutPerc) |
|||
require.Equal(t, int64(10), ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout) |
|||
l1BatchTimeoutPerc := pipeline.cfg.L1BatchTimeoutPerc |
|||
l1BatchTimeout := ethClientSetup.RollupVariables.ForgeL1L2BatchTimeout |
|||
|
|||
startBlock := int64(100) |
|||
// Empty batchInfo to pass to shouldL1L2Batch() which sets debug information
|
|||
batchInfo := BatchInfo{} |
|||
|
|||
//
|
|||
// No scheduled L1Batch
|
|||
//
|
|||
|
|||
// Last L1Batch was a long time ago
|
|||
stats.Eth.LastBlock.Num = startBlock |
|||
stats.Sync.LastBlock = stats.Eth.LastBlock |
|||
stats.Sync.LastL1BatchBlock = 0 |
|||
pipeline.stats = stats |
|||
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) |
|||
|
|||
stats.Sync.LastL1BatchBlock = startBlock |
|||
|
|||
// We are are one block before the timeout range * 0.5
|
|||
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 |
|||
stats.Sync.LastBlock = stats.Eth.LastBlock |
|||
pipeline.stats = stats |
|||
assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) |
|||
|
|||
// We are are at timeout range * 0.5
|
|||
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) |
|||
stats.Sync.LastBlock = stats.Eth.LastBlock |
|||
pipeline.stats = stats |
|||
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) |
|||
|
|||
//
|
|||
// Scheduled L1Batch
|
|||
//
|
|||
pipeline.lastScheduledL1BatchBlockNum = startBlock |
|||
stats.Sync.LastL1BatchBlock = startBlock - 10 |
|||
|
|||
// We are are one block before the timeout range * 0.5
|
|||
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) - 1 |
|||
stats.Sync.LastBlock = stats.Eth.LastBlock |
|||
pipeline.stats = stats |
|||
assert.Equal(t, false, pipeline.shouldL1L2Batch(&batchInfo)) |
|||
|
|||
// We are are at timeout range * 0.5
|
|||
stats.Eth.LastBlock.Num = startBlock - 1 + int64(float64(l1BatchTimeout-1)*l1BatchTimeoutPerc) |
|||
stats.Sync.LastBlock = stats.Eth.LastBlock |
|||
pipeline.stats = stats |
|||
assert.Equal(t, true, pipeline.shouldL1L2Batch(&batchInfo)) |
|||
} |
|||
|
|||
const testTokensLen = 3 |
|||
const testUsersLen = 4 |
|||
|
|||
func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchronizer, |
|||
historyDB *historydb.HistoryDB, stateDB *statedb.StateDB) *til.Context { |
|||
// Create a set with `testTokensLen` tokens and for each token
|
|||
// `testUsersLen` accounts.
|
|||
var set []til.Instruction |
|||
// set = append(set, til.Instruction{Typ: "Blockchain"})
|
|||
for tokenID := 1; tokenID < testTokensLen; tokenID++ { |
|||
set = append(set, til.Instruction{ |
|||
Typ: til.TypeAddToken, |
|||
TokenID: common.TokenID(tokenID), |
|||
}) |
|||
} |
|||
depositAmount, ok := new(big.Int).SetString("10225000000000000000000000000000000", 10) |
|||
require.True(t, ok) |
|||
for tokenID := 0; tokenID < testTokensLen; tokenID++ { |
|||
for user := 0; user < testUsersLen; user++ { |
|||
set = append(set, til.Instruction{ |
|||
Typ: common.TxTypeCreateAccountDeposit, |
|||
TokenID: common.TokenID(tokenID), |
|||
DepositAmount: depositAmount, |
|||
From: fmt.Sprintf("User%d", user), |
|||
}) |
|||
} |
|||
} |
|||
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) |
|||
set = append(set, til.Instruction{Typ: til.TypeNewBatchL1}) |
|||
set = append(set, til.Instruction{Typ: til.TypeNewBlock}) |
|||
|
|||
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) |
|||
blocks, err := tc.GenerateBlocksFromInstructions(set) |
|||
require.NoError(t, err) |
|||
require.NotNil(t, blocks) |
|||
|
|||
ethAddTokens(blocks, ethClient) |
|||
err = ethClient.CtlAddBlocks(blocks) |
|||
require.NoError(t, err) |
|||
|
|||
ctx := context.Background() |
|||
for { |
|||
syncBlock, discards, err := sync.Sync2(ctx, nil) |
|||
require.NoError(t, err) |
|||
require.Nil(t, discards) |
|||
if syncBlock == nil { |
|||
break |
|||
} |
|||
} |
|||
dbTokens, err := historyDB.GetAllTokens() |
|||
require.Nil(t, err) |
|||
require.Equal(t, testTokensLen, len(dbTokens)) |
|||
|
|||
dbAccounts, err := historyDB.GetAllAccounts() |
|||
require.Nil(t, err) |
|||
require.Equal(t, testTokensLen*testUsersLen, len(dbAccounts)) |
|||
|
|||
sdbAccounts, err := stateDB.GetAccounts() |
|||
require.Nil(t, err) |
|||
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) |
|||
|
|||
return tc |
|||
} |
|||
|
|||
func TestPipelineForgeBatchWithTxs(t *testing.T) { |
|||
ethClientSetup := test.NewClientSetupExample() |
|||
ethClientSetup.ChainID = big.NewInt(int64(chainID)) |
|||
|
|||
var timer timer |
|||
ctx := context.Background() |
|||
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) |
|||
modules := newTestModules(t) |
|||
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules) |
|||
sync := newTestSynchronizer(t, ethClient, ethClientSetup, modules) |
|||
|
|||
// preload the synchronier (via the test ethClient) some tokens and
|
|||
// users with positive balances
|
|||
tilCtx := preloadSync(t, ethClient, sync, modules.historyDB, modules.stateDB) |
|||
syncStats := sync.Stats() |
|||
batchNum := common.BatchNum(syncStats.Sync.LastBatch) |
|||
syncSCVars := sync.SCVars() |
|||
|
|||
pipeline, err := coord.newPipeline(ctx) |
|||
require.NoError(t, err) |
|||
|
|||
// Insert some l2txs in the Pool
|
|||
setPool := ` |
|||
Type: PoolL2 |
|||
|
|||
PoolTransfer(0) User0-User1: 100 (126) |
|||
PoolTransfer(0) User1-User2: 200 (126) |
|||
PoolTransfer(0) User2-User3: 300 (126) |
|||
` |
|||
l2txs, err := tilCtx.GeneratePoolL2Txs(setPool) |
|||
require.NoError(t, err) |
|||
for _, tx := range l2txs { |
|||
err := modules.l2DB.AddTxTest(&tx) //nolint:gosec
|
|||
require.NoError(t, err) |
|||
} |
|||
|
|||
err = pipeline.reset(batchNum, syncStats, &synchronizer.SCVariables{ |
|||
Rollup: *syncSCVars.Rollup, |
|||
Auction: *syncSCVars.Auction, |
|||
WDelayer: *syncSCVars.WDelayer, |
|||
}) |
|||
require.NoError(t, err) |
|||
// Sanity check
|
|||
sdbAccounts, err := pipeline.txSelector.LocalAccountsDB().GetAccounts() |
|||
require.Nil(t, err) |
|||
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) |
|||
|
|||
// Sanity check
|
|||
sdbAccounts, err = pipeline.batchBuilder.LocalStateDB().GetAccounts() |
|||
require.Nil(t, err) |
|||
require.Equal(t, testTokensLen*testUsersLen, len(sdbAccounts)) |
|||
|
|||
// Sanity check
|
|||
require.Equal(t, modules.stateDB.MT.Root(), |
|||
pipeline.batchBuilder.LocalStateDB().MT.Root()) |
|||
|
|||
batchNum++ |
|||
|
|||
batchInfo, err := pipeline.forgeBatch(batchNum) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, 3, len(batchInfo.L2Txs)) |
|||
|
|||
batchNum++ |
|||
batchInfo, err = pipeline.forgeBatch(batchNum) |
|||
require.NoError(t, err) |
|||
assert.Equal(t, 0, len(batchInfo.L2Txs)) |
|||
} |
|||
|
|||
func TestEthRollupForgeBatch(t *testing.T) { |
|||
if os.Getenv("TEST_ROLLUP_FORGE_BATCH") == "" { |
|||
return |
|||
} |
|||
const web3URL = "http://localhost:8545" |
|||
const password = "test" |
|||
addr := ethCommon.HexToAddress("0xb4124ceb3451635dacedd11767f004d8a28c6ee7") |
|||
sk, err := crypto.HexToECDSA( |
|||
"a8a54b2d8197bc0b19bb8a084031be71835580a01e70a45a13babd16c9bc1563") |
|||
require.NoError(t, err) |
|||
rollupAddr := ethCommon.HexToAddress("0x8EEaea23686c319133a7cC110b840d1591d9AeE0") |
|||
pathKeystore, err := ioutil.TempDir("", "tmpKeystore") |
|||
require.NoError(t, err) |
|||
deleteme = append(deleteme, pathKeystore) |
|||
ctx := context.Background() |
|||
batchInfo := &BatchInfo{} |
|||
proofClient := &prover.MockClient{} |
|||
chainID := uint16(0) |
|||
|
|||
ethClient, err := ethclient.Dial(web3URL) |
|||
require.NoError(t, err) |
|||
ethCfg := eth.EthereumConfig{ |
|||
CallGasLimit: 300000, |
|||
GasPriceDiv: 100, |
|||
} |
|||
scryptN := ethKeystore.LightScryptN |
|||
scryptP := ethKeystore.LightScryptP |
|||
keyStore := ethKeystore.NewKeyStore(pathKeystore, |
|||
scryptN, scryptP) |
|||
account, err := keyStore.ImportECDSA(sk, password) |
|||
require.NoError(t, err) |
|||
require.Equal(t, account.Address, addr) |
|||
err = keyStore.Unlock(account, password) |
|||
require.NoError(t, err) |
|||
|
|||
client, err := eth.NewClient(ethClient, &account, keyStore, ð.ClientConfig{ |
|||
Ethereum: ethCfg, |
|||
Rollup: eth.RollupConfig{ |
|||
Address: rollupAddr, |
|||
}, |
|||
Auction: eth.AuctionConfig{ |
|||
Address: ethCommon.Address{}, |
|||
TokenHEZ: eth.TokenConfig{ |
|||
Address: ethCommon.Address{}, |
|||
Name: "HEZ", |
|||
}, |
|||
}, |
|||
WDelayer: eth.WDelayerConfig{ |
|||
Address: ethCommon.Address{}, |
|||
}, |
|||
}) |
|||
require.NoError(t, err) |
|||
|
|||
zkInputs := common.NewZKInputs(chainID, 100, 24, 512, 32, big.NewInt(1)) |
|||
zkInputs.Metadata.NewStateRootRaw = &merkletree.Hash{1} |
|||
zkInputs.Metadata.NewExitRootRaw = &merkletree.Hash{2} |
|||
batchInfo.ZKInputs = zkInputs |
|||
err = proofClient.CalculateProof(ctx, batchInfo.ZKInputs) |
|||
require.NoError(t, err) |
|||
|
|||
proof, pubInputs, err := proofClient.GetProof(ctx) |
|||
require.NoError(t, err) |
|||
batchInfo.Proof = proof |
|||
batchInfo.PublicInputs = pubInputs |
|||
|
|||
batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo) |
|||
_, err = client.RollupForgeBatch(batchInfo.ForgeBatchArgs) |
|||
require.NoError(t, err) |
|||
batchInfo.Proof = proof |
|||
} |
@ -0,0 +1,207 @@ |
|||
package coordinator |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"strings" |
|||
"time" |
|||
|
|||
"github.com/ethereum/go-ethereum/core/types" |
|||
"github.com/hermeznetwork/hermez-node/common" |
|||
"github.com/hermeznetwork/hermez-node/db/l2db" |
|||
"github.com/hermeznetwork/hermez-node/eth" |
|||
"github.com/hermeznetwork/hermez-node/log" |
|||
"github.com/hermeznetwork/tracerr" |
|||
) |
|||
|
|||
// TxManager handles everything related to ethereum transactions: It makes the
|
|||
// call to forge, waits for transaction confirmation, and keeps checking them
|
|||
// until a number of confirmed blocks have passed.
|
|||
type TxManager struct { |
|||
cfg Config |
|||
ethClient eth.ClientInterface |
|||
l2DB *l2db.L2DB // Used only to mark forged txs as forged in the L2DB
|
|||
coord *Coordinator // Used only to send messages to stop the pipeline
|
|||
batchCh chan *BatchInfo |
|||
lastBlockCh chan int64 |
|||
queue []*BatchInfo |
|||
lastBlock int64 |
|||
// lastConfirmedBatch stores the last BatchNum that who's forge call was confirmed
|
|||
lastConfirmedBatch common.BatchNum |
|||
} |
|||
|
|||
// NewTxManager creates a new TxManager
|
|||
func NewTxManager(cfg *Config, ethClient eth.ClientInterface, l2DB *l2db.L2DB, |
|||
coord *Coordinator) *TxManager { |
|||
return &TxManager{ |
|||
cfg: *cfg, |
|||
ethClient: ethClient, |
|||
l2DB: l2DB, |
|||
coord: coord, |
|||
batchCh: make(chan *BatchInfo, queueLen), |
|||
lastBlockCh: make(chan int64, queueLen), |
|||
lastBlock: -1, |
|||
} |
|||
} |
|||
|
|||
// AddBatch is a thread safe method to pass a new batch TxManager to be sent to
|
|||
// the smart contract via the forge call
|
|||
func (t *TxManager) AddBatch(batchInfo *BatchInfo) { |
|||
t.batchCh <- batchInfo |
|||
} |
|||
|
|||
// SetLastBlock is a thread safe method to pass the lastBlock to the TxManager
|
|||
func (t *TxManager) SetLastBlock(lastBlock int64) { |
|||
t.lastBlockCh <- lastBlock |
|||
} |
|||
|
|||
func (t *TxManager) callRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo) error { |
|||
batchInfo.Debug.Status = StatusSent |
|||
batchInfo.Debug.SendBlockNum = t.lastBlock + 1 |
|||
batchInfo.Debug.SendTimestamp = time.Now() |
|||
batchInfo.Debug.StartToSendDelay = batchInfo.Debug.SendTimestamp.Sub( |
|||
batchInfo.Debug.StartTimestamp).Seconds() |
|||
var ethTx *types.Transaction |
|||
var err error |
|||
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { |
|||
ethTx, err = t.ethClient.RollupForgeBatch(batchInfo.ForgeBatchArgs) |
|||
if err != nil { |
|||
if strings.Contains(err.Error(), common.AuctionErrMsgCannotForge) { |
|||
log.Debugw("TxManager ethClient.RollupForgeBatch", "err", err, |
|||
"block", t.lastBlock+1) |
|||
return tracerr.Wrap(err) |
|||
} |
|||
log.Errorw("TxManager ethClient.RollupForgeBatch", |
|||
"attempt", attempt, "err", err, "block", t.lastBlock+1, |
|||
"batchNum", batchInfo.BatchNum) |
|||
} else { |
|||
break |
|||
} |
|||
select { |
|||
case <-ctx.Done(): |
|||
return tracerr.Wrap(common.ErrDone) |
|||
case <-time.After(t.cfg.EthClientAttemptsDelay): |
|||
} |
|||
} |
|||
if err != nil { |
|||
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.RollupForgeBatch: %w", err)) |
|||
} |
|||
batchInfo.EthTx = ethTx |
|||
log.Infow("TxManager ethClient.RollupForgeBatch", "batch", batchInfo.BatchNum, "tx", ethTx.Hash().Hex()) |
|||
t.cfg.debugBatchStore(batchInfo) |
|||
if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), batchInfo.BatchNum); err != nil { |
|||
return tracerr.Wrap(err) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (t *TxManager) checkEthTransactionReceipt(ctx context.Context, batchInfo *BatchInfo) error { |
|||
txHash := batchInfo.EthTx.Hash() |
|||
var receipt *types.Receipt |
|||
var err error |
|||
for attempt := 0; attempt < t.cfg.EthClientAttempts; attempt++ { |
|||
receipt, err = t.ethClient.EthTransactionReceipt(ctx, txHash) |
|||
if ctx.Err() != nil { |
|||
continue |
|||
} |
|||
if err != nil { |
|||
log.Errorw("TxManager ethClient.EthTransactionReceipt", |
|||
"attempt", attempt, "err", err) |
|||
} else { |
|||
break |
|||
} |
|||
select { |
|||
case <-ctx.Done(): |
|||
return tracerr.Wrap(common.ErrDone) |
|||
case <-time.After(t.cfg.EthClientAttemptsDelay): |
|||
} |
|||
} |
|||
if err != nil { |
|||
return tracerr.Wrap(fmt.Errorf("reached max attempts for ethClient.EthTransactionReceipt: %w", err)) |
|||
} |
|||
batchInfo.Receipt = receipt |
|||
t.cfg.debugBatchStore(batchInfo) |
|||
return nil |
|||
} |
|||
|
|||
func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) { |
|||
receipt := batchInfo.Receipt |
|||
if receipt != nil { |
|||
if receipt.Status == types.ReceiptStatusFailed { |
|||
batchInfo.Debug.Status = StatusFailed |
|||
t.cfg.debugBatchStore(batchInfo) |
|||
log.Errorw("TxManager receipt status is failed", "receipt", receipt) |
|||
return nil, tracerr.Wrap(fmt.Errorf("ethereum transaction receipt statis is failed")) |
|||
} else if receipt.Status == types.ReceiptStatusSuccessful { |
|||
batchInfo.Debug.Status = StatusMined |
|||
batchInfo.Debug.MineBlockNum = receipt.BlockNumber.Int64() |
|||
batchInfo.Debug.StartToMineBlocksDelay = batchInfo.Debug.MineBlockNum - |
|||
batchInfo.Debug.StartBlockNum |
|||
t.cfg.debugBatchStore(batchInfo) |
|||
if batchInfo.BatchNum > t.lastConfirmedBatch { |
|||
t.lastConfirmedBatch = batchInfo.BatchNum |
|||
} |
|||
confirm := t.lastBlock - receipt.BlockNumber.Int64() |
|||
return &confirm, nil |
|||
} |
|||
} |
|||
return nil, nil |
|||
} |
|||
|
|||
// Run the TxManager
|
|||
func (t *TxManager) Run(ctx context.Context) { |
|||
next := 0 |
|||
waitDuration := longWaitDuration |
|||
|
|||
for { |
|||
select { |
|||
case <-ctx.Done(): |
|||
log.Info("TxManager done") |
|||
return |
|||
case lastBlock := <-t.lastBlockCh: |
|||
t.lastBlock = lastBlock |
|||
case batchInfo := <-t.batchCh: |
|||
if err := t.callRollupForgeBatch(ctx, batchInfo); ctx.Err() != nil { |
|||
continue |
|||
} else if err != nil { |
|||
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch call: %v", err)}) |
|||
continue |
|||
} |
|||
log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum) |
|||
t.queue = append(t.queue, batchInfo) |
|||
waitDuration = t.cfg.TxManagerCheckInterval |
|||
case <-time.After(waitDuration): |
|||
if len(t.queue) == 0 { |
|||
continue |
|||
} |
|||
current := next |
|||
next = (current + 1) % len(t.queue) |
|||
batchInfo := t.queue[current] |
|||
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { |
|||
continue |
|||
} else if err != nil { //nolint:staticcheck
|
|||
// We can't get the receipt for the
|
|||
// transaction, so we can't confirm if it was
|
|||
// mined
|
|||
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch receipt: %v", err)}) |
|||
} |
|||
|
|||
confirm, err := t.handleReceipt(batchInfo) |
|||
if err != nil { //nolint:staticcheck
|
|||
// Transaction was rejected
|
|||
t.coord.SendMsg(MsgStopPipeline{Reason: fmt.Sprintf("forgeBatch reject: %v", err)}) |
|||
} |
|||
if confirm != nil && *confirm >= t.cfg.ConfirmBlocks { |
|||
log.Debugw("TxManager tx for RollupForgeBatch confirmed", |
|||
"batch", batchInfo.BatchNum) |
|||
t.queue = append(t.queue[:current], t.queue[current+1:]...) |
|||
if len(t.queue) == 0 { |
|||
waitDuration = longWaitDuration |
|||
next = 0 |
|||
} else { |
|||
next = current % len(t.queue) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |