Browse Source

Merge pull request #645 from hermeznetwork/feature/newpolicy

Add config parameter  ForgeOncePerSlotIfTxs
feature/update-smart-contracts
arnau 3 years ago
committed by GitHub
parent
commit
334eecc99e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 149 additions and 94 deletions
  1. +10
    -4
      config/config.go
  2. +1
    -1
      coordinator/batch.go
  3. +8
    -4
      coordinator/coordinator.go
  4. +126
    -82
      coordinator/pipeline.go
  5. +2
    -2
      coordinator/pipeline_test.go
  6. +1
    -1
      coordinator/txmanager.go
  7. +1
    -0
      node/node.go

+ 10
- 4
config/config.go

@ -112,11 +112,17 @@ type Coordinator struct {
// MustForgeAtSlotDeadline enables the coordinator to forge slots if
// the empty slots reach the slot deadline.
MustForgeAtSlotDeadline bool
// IgnoreSlotCommitment IgnoreSlotCommitment disables forcing the
// coordinator to forge a slot immediately when the slot is not
// committed. If set to false, the coordinator will immediately forge
// a batch at the beginning of a slot if it's the slot winner.
// IgnoreSlotCommitment disables forcing the coordinator to forge a
// slot immediately when the slot is not committed. If set to false,
// the coordinator will immediately forge a batch at the beginning of a
// slot if it's the slot winner.
IgnoreSlotCommitment bool
// ForgeOncePerSlotIfTxs will make the coordinator forge at most one
// batch per slot, only if there are included txs in that batch, or
// pending l1UserTxs in the smart contract. Setting this parameter
// overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline`
// and `IgnoreSlotCommitment`.
ForgeOncePerSlotIfTxs bool
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval Duration `validate:"required"`

+ 1
- 1
coordinator/batch.go

@ -85,7 +85,7 @@ type BatchInfo struct {
PublicInputs []*big.Int
L1Batch bool
VerifierIdx uint8
L1UserTxsExtra []common.L1Tx
L1UserTxs []common.L1Tx
L1CoordTxs []common.L1Tx
L1CoordinatorTxsAuths [][]byte
L2Txs []common.L2Tx

+ 8
- 4
coordinator/coordinator.go

@ -24,10 +24,8 @@ import (
)
var (
errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet")
errForgeNoTxsBeforeDelay = fmt.Errorf(
"no txs to forge and we haven't reached the forge no txs delay")
errForgeBeforeDelay = fmt.Errorf("we haven't reached the forge delay")
errLastL1BatchNotSynced = fmt.Errorf("last L1Batch not synced yet")
errSkipBatchByPolicy = fmt.Errorf("skip batch by policy")
)
const (
@ -92,6 +90,12 @@ type Config struct {
// the coordinator will immediately forge a batch at the beginning of
// a slot if it's the slot winner.
IgnoreSlotCommitment bool
// ForgeOncePerSlotIfTxs will make the coordinator forge at most one
// batch per slot, only if there are included txs in that batch, or
// pending l1UserTxs in the smart contract. Setting this parameter
// overrides `ForgeDelay`, `ForgeNoTxsDelay`, `MustForgeAtSlotDeadline`
// and `IgnoreSlotCommitment`.
ForgeOncePerSlotIfTxs bool
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time.Duration

+ 126
- 82
coordinator/pipeline.go

@ -224,8 +224,9 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context,
// 2. Forge the batch internally (make a selection of txs and prepare
// all the smart contract arguments)
var skipReason *string
p.mutexL2DBUpdateDelete.Lock()
batchInfo, err = p.forgeBatch(batchNum)
batchInfo, skipReason, err = p.forgeBatch(batchNum)
p.mutexL2DBUpdateDelete.Unlock()
if ctx.Err() != nil {
return nil, ctx.Err()
@ -234,13 +235,13 @@ func (p *Pipeline) handleForgeBatch(ctx context.Context,
log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
"lastForgeL1TxsNum", p.state.lastForgeL1TxsNum,
"syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
} else if tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay {
// no log
} else {
log.Errorw("forgeBatch", "err", err)
}
return nil, tracerr.Wrap(err)
} else if skipReason != nil {
log.Debugw("skipping batch", "batch", batchNum, "reason", *skipReason)
return nil, tracerr.Wrap(errSkipBatchByPolicy)
}
// 3. Send the ZKInputs to the proof server
@ -295,8 +296,7 @@ func (p *Pipeline) Start(batchNum common.BatchNum,
if p.ctx.Err() != nil {
continue
} else if tracerr.Unwrap(err) == errLastL1BatchNotSynced ||
tracerr.Unwrap(err) == errForgeNoTxsBeforeDelay ||
tracerr.Unwrap(err) == errForgeBeforeDelay {
tracerr.Unwrap(err) == errSkipBatchByPolicy {
continue
} else if err != nil {
p.setErrAtBatchNum(batchNum)
@ -389,25 +389,109 @@ func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) er
return nil
}
// check if we reach the ForgeDelay or not before batch forging
func (p *Pipeline) preForgeBatchCheck(slotCommitted bool, now time.Time) error {
if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay {
return errForgeBeforeDelay
// slotCommitted returns true if the current slot has already been committed
func (p *Pipeline) slotCommitted() bool {
// Synchronizer has synchronized a batch in the current slot (setting
// CurrentSlot.ForgerCommitment) or the pipeline has already
// internally-forged a batch in the current slot
return p.stats.Sync.Auction.CurrentSlot.ForgerCommitment ||
p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged
}
// forgePolicySkipPreSelection is called before doing a tx selection in a batch to
// determine by policy if we should forge the batch or not. Returns true and
// the reason when the forging of the batch must be skipped.
func (p *Pipeline) forgePolicySkipPreSelection(now time.Time) (bool, string) {
// Check if the slot is not yet fulfilled
slotCommitted := p.slotCommitted()
if p.cfg.ForgeOncePerSlotIfTxs {
if slotCommitted {
return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed"
}
return false, ""
}
return nil
// Determine if we must commit the slot
if !p.cfg.IgnoreSlotCommitment && !slotCommitted {
return false, ""
}
// If we haven't reached the ForgeDelay, skip forging the batch
if now.Sub(p.lastForgeTime) < p.cfg.ForgeDelay {
return true, "we haven't reached the forge delay"
}
return false, ""
}
// forgePolicySkipPostSelection is called after doing a tx selection in a batch to
// determine by policy if we should forge the batch or not. Returns true and
// the reason when the forging of the batch must be skipped.
func (p *Pipeline) forgePolicySkipPostSelection(now time.Time, l1UserTxsExtra, l1CoordTxs []common.L1Tx,
poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) (bool, string, error) {
// Check if the slot is not yet fulfilled
slotCommitted := p.slotCommitted()
pendingTxs := true
if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 {
if batchInfo.L1Batch {
// Query the number of unforged L1UserTxs
// (either in a open queue or in a frozen
// not-yet-forged queue).
count, err := p.historyDB.GetUnforgedL1UserTxsCount()
if err != nil {
return false, "", err
}
// If there are future L1UserTxs, we forge a
// batch to advance the queues to be able to
// forge the L1UserTxs in the future.
// Otherwise, skip.
if count == 0 {
pendingTxs = false
}
} else {
pendingTxs = false
}
}
if p.cfg.ForgeOncePerSlotIfTxs {
if slotCommitted {
return true, "cfg.ForgeOncePerSlotIfTxs = true and slot already committed",
nil
}
if pendingTxs {
return false, "", nil
}
return true, "cfg.ForgeOncePerSlotIfTxs = true and no pending txs",
nil
}
// Determine if we must commit the slot
if !p.cfg.IgnoreSlotCommitment && !slotCommitted {
return false, "", nil
}
// check if there is no txs to forge, no l1UserTxs in the open queue to
// freeze and we haven't reached the ForgeNoTxsDelay
if now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay {
if !pendingTxs {
return true, "no txs to forge and we haven't reached the forge no txs delay",
nil
}
}
return false, "", nil
}
// forgeBatch forges the batchNum batch.
func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo,
skipReason *string, err error) {
// remove transactions from the pool that have been there for too long
_, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
_, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
// Structure to accumulate data and metadata of the batch
now := time.Now()
@ -417,53 +501,48 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
var poolL2Txs []common.PoolL2Tx
var discardedL2Txs []common.PoolL2Tx
var l1UserTxsExtra, l1CoordTxs []common.L1Tx
var l1UserTxs, l1CoordTxs []common.L1Tx
var auths [][]byte
var coordIdxs []common.Idx
// Check if the slot is not yet fulfilled
slotCommitted := p.cfg.IgnoreSlotCommitment
if p.stats.Sync.Auction.CurrentSlot.ForgerCommitment ||
p.stats.Sync.Auction.CurrentSlot.SlotNum == p.state.lastSlotForged {
slotCommitted = true
}
// If we haven't reached the ForgeDelay, skip forging the batch
if err = p.preForgeBatchCheck(slotCommitted, now); err != nil {
return nil, tracerr.Wrap(err)
if skip, reason := p.forgePolicySkipPreSelection(now); skip {
return nil, &reason, nil
}
// 1. Decide if we forge L2Tx or L1+L2Tx
if p.shouldL1L2Batch(batchInfo) {
batchInfo.L1Batch = true
if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
return nil, tracerr.Wrap(errLastL1BatchNotSynced)
return nil, nil, tracerr.Wrap(errLastL1BatchNotSynced)
}
// 2a: L1+L2 txs
l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1)
_l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, l1UserTxs)
coordIdxs, auths, l1UserTxs, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, _l1UserTxs)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
} else {
// 2b: only L2 txs
coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
p.txSelector.GetL2TxSelection(p.cfg.TxProcessorConfig)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
l1UserTxsExtra = nil
l1UserTxs = nil
}
// If there are no txs to forge, no l1UserTxs in the open queue to
// freeze, and we haven't reached the ForgeNoTxsDelay, skip forging the
// batch.
if err = p.postForgeBatchCheck(slotCommitted, now, l1UserTxsExtra, l1CoordTxs, poolL2Txs, batchInfo); err != nil {
return nil, tracerr.Wrap(err)
if skip, reason, err := p.forgePolicySkipPostSelection(now,
l1UserTxs, l1CoordTxs, poolL2Txs, batchInfo); err != nil {
return nil, nil, tracerr.Wrap(err)
} else if skip {
if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil {
return nil, nil, tracerr.Wrap(err)
}
return nil, &reason, tracerr.Wrap(err)
}
if batchInfo.L1Batch {
@ -472,7 +551,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
}
// 3. Save metadata from TxSelector output for BatchNum
batchInfo.L1UserTxsExtra = l1UserTxsExtra
batchInfo.L1UserTxs = l1UserTxs
batchInfo.L1CoordTxs = l1CoordTxs
batchInfo.L1CoordinatorTxsAuths = auths
batchInfo.CoordIdxs = coordIdxs
@ -480,10 +559,10 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs),
batchInfo.BatchNum); err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
// Invalidate transactions that become invalid because of
@ -492,21 +571,21 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
// all the nonces smaller than the current one)
err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
// 4. Call BatchBuilder with TxSelector output
configBatch := &batchbuilder.ConfigBatch{
TxProcessorConfig: p.cfg.TxProcessorConfig,
}
zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxs,
l1CoordTxs, poolL2Txs)
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
if err != nil {
return nil, tracerr.Wrap(err)
return nil, nil, tracerr.Wrap(err)
}
batchInfo.L2Txs = l2Txs
@ -518,42 +597,7 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, e
p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum
return batchInfo, nil
}
// check if there is no txs to forge, no l1UserTxs in the open queue to freeze and we haven't reached the ForgeNoTxsDelay
func (p *Pipeline) postForgeBatchCheck(slotCommitted bool, now time.Time, l1UserTxsExtra, l1CoordTxs []common.L1Tx,
poolL2Txs []common.PoolL2Tx, batchInfo *BatchInfo) error {
if slotCommitted && now.Sub(p.lastForgeTime) < p.cfg.ForgeNoTxsDelay {
noTxs := false
if len(l1UserTxsExtra) == 0 && len(l1CoordTxs) == 0 && len(poolL2Txs) == 0 {
if batchInfo.L1Batch {
// Query the number of unforged L1UserTxs
// (either in a open queue or in a frozen
// not-yet-forged queue).
count, err := p.historyDB.GetUnforgedL1UserTxsCount()
if err != nil {
return err
}
// If there are future L1UserTxs, we forge a
// batch to advance the queues to be able to
// forge the L1UserTxs in the future.
// Otherwise, skip.
if count == 0 {
noTxs = true
}
} else {
noTxs = true
}
}
if noTxs {
if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil {
return err
}
return errForgeNoTxsBeforeDelay
}
}
return nil
return batchInfo, nil, nil
}
// waitServerProof gets the generated zkProof & sends it to the SmartContract
@ -598,7 +642,7 @@ func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
L1UserTxs: batchInfo.L1UserTxsExtra,
L1UserTxs: batchInfo.L1UserTxs,
L1CoordinatorTxs: batchInfo.L1CoordTxs,
L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
L2TxsData: batchInfo.L2Txs,

+ 2
- 2
coordinator/pipeline_test.go

@ -224,12 +224,12 @@ PoolTransfer(0) User2-User3: 300 (126)
batchNum++
batchInfo, err := pipeline.forgeBatch(batchNum)
batchInfo, _, err := pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 3, len(batchInfo.L2Txs))
batchNum++
batchInfo, err = pipeline.forgeBatch(batchNum)
batchInfo, _, err = pipeline.forgeBatch(batchNum)
require.NoError(t, err)
assert.Equal(t, 0, len(batchInfo.L2Txs))
}

+ 1
- 1
coordinator/txmanager.go

@ -147,7 +147,7 @@ func (t *TxManager) NewAuth(ctx context.Context, batchInfo *BatchInfo) (*bind.Tr
auth.Value = big.NewInt(0) // in wei
gasLimit := t.cfg.ForgeBatchGasCost.Fixed +
uint64(len(batchInfo.L1UserTxsExtra))*t.cfg.ForgeBatchGasCost.L1UserTx +
uint64(len(batchInfo.L1UserTxs))*t.cfg.ForgeBatchGasCost.L1UserTx +
uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx +
uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx
auth.GasLimit = gasLimit

+ 1
- 0
node/node.go

@ -367,6 +367,7 @@ func NewNode(mode Mode, cfg *config.Node) (*Node, error) {
ForgeDelay: cfg.Coordinator.ForgeDelay.Duration,
MustForgeAtSlotDeadline: cfg.Coordinator.MustForgeAtSlotDeadline,
IgnoreSlotCommitment: cfg.Coordinator.IgnoreSlotCommitment,
ForgeOncePerSlotIfTxs: cfg.Coordinator.ForgeOncePerSlotIfTxs,
ForgeNoTxsDelay: cfg.Coordinator.ForgeNoTxsDelay.Duration,
SyncRetryInterval: cfg.Coordinator.SyncRetryInterval.Duration,
PurgeByExtDelInterval: cfg.Coordinator.PurgeByExtDelInterval.Duration,

Loading…
Cancel
Save