Compare commits

...

3 Commits

Author SHA1 Message Date
Eduard S
672d08c671 Calculate ForgeBatch gasLimit with parametrized formula
Add the following config parameters at
`Coordinator.EthClient.ForgeBatchGasCost`:
- `Fixed`
- `L1UserTx`
- `L1CoordTx`
- `L2Tx`
Which are the costs associated to a ForgeBatch transaction, split
into different parts to be used in the formula to compute the gasLimit.
2021-02-25 15:30:50 +01:00
Eduard S
9b7b333acf Reorg l2db before starting pipeline 2021-02-25 15:30:38 +01:00
Eduard S
2a5992d218 Fix missing timer reset in TxManager
Also, replace usage of time.Duration by time.NewTimer, because the later allows
replacing timers by stopping them before so that we never leak resources.
2021-02-25 15:30:26 +01:00
7 changed files with 81 additions and 37 deletions

View File

@@ -104,6 +104,12 @@ GasPriceIncPerc = 10
Path = "/tmp/iden3-test/hermez/ethkeystore" Path = "/tmp/iden3-test/hermez/ethkeystore"
Password = "yourpasswordhere" Password = "yourpasswordhere"
[Coordinator.EthClient.ForgeBatchGasCost]
Fixed = 500000
L1UserTx = 8000
L1CoordTx = 9000
L2Tx = 1
[Coordinator.API] [Coordinator.API]
Coordinator = true Coordinator = true

View File

@@ -35,6 +35,15 @@ type ServerProof struct {
URL string `validate:"required"` URL string `validate:"required"`
} }
// ForgeBatchGasCost is the costs associated to a ForgeBatch transaction, split
// into different parts to be used in a formula.
type ForgeBatchGasCost struct {
Fixed uint64 `validate:"required"`
L1UserTx uint64 `validate:"required"`
L1CoordTx uint64 `validate:"required"`
L2Tx uint64 `validate:"required"`
}
// Coordinator is the coordinator specific configuration. // Coordinator is the coordinator specific configuration.
type Coordinator struct { type Coordinator struct {
// ForgerAddress is the address under which this coordinator is forging // ForgerAddress is the address under which this coordinator is forging
@@ -180,6 +189,9 @@ type Coordinator struct {
// Password used to decrypt the keys in the keystore // Password used to decrypt the keys in the keystore
Password string `validate:"required"` Password string `validate:"required"`
} `validate:"required"` } `validate:"required"`
// ForgeBatchGasCost contains the cost of each action in the
// ForgeBatch transaction.
ForgeBatchGasCost ForgeBatchGasCost `validate:"required"`
} `validate:"required"` } `validate:"required"`
API struct { API struct {
// Coordinator enables the coordinator API endpoints // Coordinator enables the coordinator API endpoints

View File

@@ -11,6 +11,7 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common" ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder" "github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/config"
"github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/eth"
@@ -115,7 +116,10 @@ type Config struct {
Purger PurgerCfg Purger PurgerCfg
// VerifierIdx is the index of the verifier contract registered in the // VerifierIdx is the index of the verifier contract registered in the
// smart contract // smart contract
VerifierIdx uint8 VerifierIdx uint8
// ForgeBatchGasCost contains the cost of each action in the
// ForgeBatch transaction.
ForgeBatchGasCost config.ForgeBatchGasCost
TxProcessorConfig txprocessor.Config TxProcessorConfig txprocessor.Config
} }
@@ -383,11 +387,23 @@ func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats)
fromBatch.ForgerAddr = c.cfg.ForgerAddress fromBatch.ForgerAddr = c.cfg.ForgerAddress
fromBatch.StateRoot = big.NewInt(0) fromBatch.StateRoot = big.NewInt(0)
} }
// Before starting the pipeline make sure we reset any
// l2tx from the pool that was forged in a batch that
// didn't end up being mined. We are already doing
// this in handleStopPipeline, but we do it again as a
// failsafe in case the last synced batchnum is
// different than in the previous call to l2DB.Reorg,
// or in case the node was restarted when there was a
// started batch that included l2txs but was not mined.
if err := c.l2DB.Reorg(fromBatch.BatchNum); err != nil {
return tracerr.Wrap(err)
}
var err error var err error
if c.pipeline, err = c.newPipeline(ctx); err != nil { if c.pipeline, err = c.newPipeline(ctx); err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
c.pipelineFromBatch = fromBatch c.pipelineFromBatch = fromBatch
// Start the pipeline
if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil { if err := c.pipeline.Start(fromBatch.BatchNum, stats, &c.vars); err != nil {
c.pipeline = nil c.pipeline = nil
return tracerr.Wrap(err) return tracerr.Wrap(err)
@@ -508,7 +524,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1) c.wg.Add(1)
go func() { go func() {
waitCh := time.After(longWaitDuration) timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
@@ -520,24 +536,27 @@ func (c *Coordinator) Start() {
continue continue
} else if err != nil { } else if err != nil {
log.Errorw("Coordinator.handleMsg", "err", err) log.Errorw("Coordinator.handleMsg", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval) if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue continue
} }
waitCh = time.After(longWaitDuration) case <-timer.C:
case <-waitCh: timer.Reset(longWaitDuration)
if !c.stats.Synced() { if !c.stats.Synced() {
waitCh = time.After(longWaitDuration)
continue continue
} }
if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil { if err := c.syncStats(c.ctx, &c.stats); c.ctx.Err() != nil {
waitCh = time.After(longWaitDuration)
continue continue
} else if err != nil { } else if err != nil {
log.Errorw("Coordinator.syncStats", "err", err) log.Errorw("Coordinator.syncStats", "err", err)
waitCh = time.After(c.cfg.SyncRetryInterval) if !timer.Stop() {
<-timer.C
}
timer.Reset(c.cfg.SyncRetryInterval)
continue continue
} }
waitCh = time.After(longWaitDuration)
} }
} }
}() }()

View File

@@ -271,7 +271,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
p.wg.Add(1) p.wg.Add(1)
go func() { go func() {
waitCh := time.After(zeroDuration) timer := time.NewTimer(zeroDuration)
for { for {
select { select {
case <-p.ctx.Done(): case <-p.ctx.Done():
@@ -281,23 +281,21 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case statsVars := <-p.statsVarsCh: case statsVars := <-p.statsVarsCh:
p.stats = statsVars.Stats p.stats = statsVars.Stats
p.syncSCVars(statsVars.Vars) p.syncSCVars(statsVars.Vars)
case <-waitCh: case <-timer.C:
timer.Reset(p.cfg.ForgeRetryInterval)
// Once errAtBatchNum != 0, we stop forging // Once errAtBatchNum != 0, we stop forging
// batches because there's been an error and we // batches because there's been an error and we
// wait for the pipeline to be stopped. // wait for the pipeline to be stopped.
if p.getErrAtBatchNum() != 0 { if p.getErrAtBatchNum() != 0 {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} }
batchNum = p.state.batchNum + 1 batchNum = p.state.batchNum + 1
batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
if p.ctx.Err() != nil { if p.ctx.Err() != nil {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced || } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced ||
tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay || tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay { tracerr.Unwrap(err) == errForgeBeforeDelay {
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} else if err != nil { } else if err != nil {
p.setErrAtBatchNum(batchNum) p.setErrAtBatchNum(batchNum)
@@ -306,7 +304,6 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
"Pipeline.handleForgBatch: %v", err), "Pipeline.handleForgBatch: %v", err),
FailedBatchNum: batchNum, FailedBatchNum: batchNum,
}) })
waitCh = time.After(p.cfg.ForgeRetryInterval)
continue continue
} }
p.lastForgeTime = time.Now() p.lastForgeTime = time.Now()
@@ -316,7 +313,10 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
case batchChSentServerProof <- batchInfo: case batchChSentServerProof <- batchInfo:
case <-p.ctx.Done(): case <-p.ctx.Done():
} }
waitCh = time.After(zeroDuration) if !timer.Stop() {
<-timer.C
}
timer.Reset(zeroDuration)
} }
} }
}() }()

View File

@@ -123,7 +123,7 @@ func (t *TxManager) syncSCVars(vars synchronizer.SCVariablesPtr) {
} }
// NewAuth generates a new auth object for an ethereum transaction // NewAuth generates a new auth object for an ethereum transaction
func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) { func (t *TxManager) NewAuth(ctx context.Context, batchInfo *BatchInfo) (*bind.TransactOpts, error) {
gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx) gasPrice, err := t.ethClient.EthSuggestGasPrice(ctx)
if err != nil { if err != nil {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
@@ -143,15 +143,12 @@ func (t *TxManager) NewAuth(ctx context.Context) (*bind.TransactOpts, error) {
return nil, tracerr.Wrap(err) return nil, tracerr.Wrap(err)
} }
auth.Value = big.NewInt(0) // in wei auth.Value = big.NewInt(0) // in wei
// TODO: Calculate GasLimit based on the contents of the ForgeBatchArgs
// This requires a function that estimates the gas usage of the gasLimit := t.cfg.ForgeBatchGasCost.Fixed +
// forgeBatch call based on the contents of the ForgeBatch args: uint64(len(batchInfo.L1UserTxsExtra))*t.cfg.ForgeBatchGasCost.L1UserTx +
// - length of l2txs uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx +
// - length of l1Usertxs uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx
// - length of l1CoordTxs with authorization signature auth.GasLimit = gasLimit
// - length of l1CoordTxs without authoriation signature
// - etc.
auth.GasLimit = 1000000
auth.GasPrice = gasPrice auth.GasPrice = gasPrice
auth.Nonce = nil auth.Nonce = nil
@@ -191,7 +188,7 @@ func addPerc(v *big.Int, p int64) *big.Int {
func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error { func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchInfo, resend bool) error {
var ethTx *types.Transaction var ethTx *types.Transaction
var err error var err error
auth, err := t.NewAuth(ctx) auth, err := t.NewAuth(ctx, batchInfo)
if err != nil { if err != nil {
return tracerr.Wrap(err) return tracerr.Wrap(err)
} }
@@ -419,8 +416,6 @@ func (q *Queue) Push(batchInfo *BatchInfo) {
// Run the TxManager // Run the TxManager
func (t *TxManager) Run(ctx context.Context) { func (t *TxManager) Run(ctx context.Context) {
waitCh := time.After(longWaitDuration)
var statsVars statsVars var statsVars statsVars
select { select {
case statsVars = <-t.statsVarsCh: case statsVars = <-t.statsVarsCh:
@@ -431,6 +426,7 @@ func (t *TxManager) Run(ctx context.Context) {
log.Infow("TxManager: received initial statsVars", log.Infow("TxManager: received initial statsVars",
"block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum) "block", t.stats.Eth.LastBlock.Num, "batch", t.stats.Eth.LastBatchNum)
timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -474,13 +470,17 @@ func (t *TxManager) Run(ctx context.Context) {
continue continue
} }
t.queue.Push(batchInfo) t.queue.Push(batchInfo)
waitCh = time.After(t.cfg.TxManagerCheckInterval) if !timer.Stop() {
case <-waitCh: <-timer.C
}
timer.Reset(t.cfg.TxManagerCheckInterval)
case <-timer.C:
queuePosition, batchInfo := t.queue.Next() queuePosition, batchInfo := t.queue.Next()
if batchInfo == nil { if batchInfo == nil {
waitCh = time.After(longWaitDuration) timer.Reset(longWaitDuration)
continue continue
} }
timer.Reset(t.cfg.TxManagerCheckInterval)
if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil { if err := t.checkEthTransactionReceipt(ctx, batchInfo); ctx.Err() != nil {
continue continue
} else if err != nil { //nolint:staticcheck } else if err != nil { //nolint:staticcheck

View File

@@ -336,6 +336,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
PurgeBlockDelay: cfg.Coordinator.L2DB.PurgeBlockDelay, PurgeBlockDelay: cfg.Coordinator.L2DB.PurgeBlockDelay,
InvalidateBlockDelay: cfg.Coordinator.L2DB.InvalidateBlockDelay, InvalidateBlockDelay: cfg.Coordinator.L2DB.InvalidateBlockDelay,
}, },
ForgeBatchGasCost: cfg.Coordinator.EthClient.ForgeBatchGasCost,
VerifierIdx: uint8(verifierIdx), VerifierIdx: uint8(verifierIdx),
TxProcessorConfig: txProcessorCfg, TxProcessorConfig: txProcessorCfg,
}, },

View File

@@ -146,7 +146,7 @@ const longWaitDuration = 999 * time.Hour
// const provingDuration = 2 * time.Second // const provingDuration = 2 * time.Second
func (s *Mock) runProver(ctx context.Context) { func (s *Mock) runProver(ctx context.Context) {
waitCh := time.After(longWaitDuration) timer := time.NewTimer(longWaitDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@@ -154,21 +154,27 @@ func (s *Mock) runProver(ctx context.Context) {
case msg := <-s.msgCh: case msg := <-s.msgCh:
switch msg.value { switch msg.value {
case "cancel": case "cancel":
waitCh = time.After(longWaitDuration) if !timer.Stop() {
<-timer.C
}
timer.Reset(longWaitDuration)
s.Lock() s.Lock()
if !s.status.IsReady() { if !s.status.IsReady() {
s.status = prover.StatusCodeAborted s.status = prover.StatusCodeAborted
} }
s.Unlock() s.Unlock()
case "prove": case "prove":
waitCh = time.After(s.provingDuration) if !timer.Stop() {
<-timer.C
}
timer.Reset(s.provingDuration)
s.Lock() s.Lock()
s.status = prover.StatusCodeBusy s.status = prover.StatusCodeBusy
s.Unlock() s.Unlock()
} }
msg.ackCh <- true msg.ackCh <- true
case <-waitCh: case <-timer.C:
waitCh = time.After(longWaitDuration) timer.Reset(longWaitDuration)
s.Lock() s.Lock()
if s.status != prover.StatusCodeBusy { if s.status != prover.StatusCodeBusy {
s.Unlock() s.Unlock()