Browse Source

Merge pull request #349 from hermeznetwork/feature/integration23

Make coordinator more responsive
feature/sql-semaphore1
arnau 4 years ago
committed by GitHub
parent
commit
872fc6a37e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 273 additions and 228 deletions
  1. +3
    -3
      api/swagger.yml
  2. +9
    -9
      cli/node/cfg.buidler.toml
  3. +40
    -0
      cli/node/coordcfg.buidler.toml
  4. +5
    -5
      common/ethwdelayer.go
  5. +5
    -2
      config/config.go
  6. +78
    -55
      coordinator/coordinator.go
  7. +1
    -1
      db/migrations/0001.sql
  8. +4
    -1
      eth/auction.go
  9. +79
    -60
      node/node.go
  10. +39
    -77
      synchronizer/synchronizer.go
  11. +5
    -10
      synchronizer/synchronizer_test.go
  12. +5
    -5
      test/ethclient.go

+ 3
- 3
api/swagger.yml

@ -2837,9 +2837,9 @@ components:
- $ref: '#/components/schemas/EthBlockNum'
- description: The time that everyone needs to wait until a withdrawal of the funds is allowed, in seconds.
- example: 539573849
emergencyModeStartingTime:
emergencyModeStartingBlock:
type: integer
description: Second (since unix epoch) in which the emergency mode has been activated.
description: Block number in which the emergency mode has been activated.
example: 10
emergencyMode:
type: boolean
@ -2851,7 +2851,7 @@ components:
- hermezGovernanceAddress
- emergencyCouncilAddress
- withdrawalDelay
- emergencyModeStartingTime
- emergencyModeStartingBlock
- emergencyMode
StateMetrics:
type: object

+ 9
- 9
cli/node/cfg.buidler.toml

@ -25,9 +25,9 @@ SyncLoopInterval = "1s"
StatsRefreshPeriod = "1s"
[Synchronizer.StartBlockNum]
Rollup = 6
Auction = 3
WDelayer = 7
Rollup = 19
Auction = 17
WDelayer = 15
[SmartContracts]
Rollup = "0x8EEaea23686c319133a7cC110b840d1591d9AeE0"
@ -56,12 +56,12 @@ TokenHEZName = "Hermez Network Token"
SlotDeadline = 20
[Synchronizer.InitialVariables.WDelayer]
# HermezRollupAddress =
HermezGovernanceAddress = "0x0000000000000000000000000000000000000001"
EmergencyCouncilAddress = "0x0000000000000000000000000000000000000001"
WithdrawalDelay = 60
EmergencyModeStartingTime = 0
EmergencyMode = false
# HermezRollupAddress =
HermezGovernanceAddress = "0x0000000000000000000000000000000000000001"
EmergencyCouncilAddress = "0x0000000000000000000000000000000000000001"
WithdrawalDelay = 60
EmergencyModeStartingTime = 0
EmergencyMode = false
[Synchronizer.InitialVariables.Rollup]
FeeAddToken = "10"

+ 40
- 0
cli/node/coordcfg.buidler.toml

@ -0,0 +1,40 @@
ForgerAddress = "0x6BB84Cc84D4A34467aD12a2039A312f7029e2071"
ConfirmBlocks = 10
L1BatchTimeoutPerc = 0.6
ProofServerPollInterval = "1s"
SyncRetryInterval = "1s"
[L2DB]
SafetyPeriod = 10
MaxTxs = 512
TTL = "24h"
PurgeBatchDelay = 10
InvalidateBatchDelay = 20
PurgeBlockDelay = 10
InvalidateBlockDelay = 20
[TxSelector]
Path = "/tmp/iden3-test/hermez/txselector"
[BatchBuilder]
Path = "/tmp/iden3-test/hermez/batchbuilder"
[[ServerProofs]]
URL = "http://localhost:3000"
[EthClient]
CallGasLimit = 300000
DeployGasLimit = 1000000
GasPriceDiv = 100
ReceiptTimeout = "60s"
ReceiptLoopInterval = "500ms"
CheckLoopInterval = "500ms"
Attempts = 8
AttemptsDelay = "200ms"
[API]
Coordinator = true
[Debug]
BatchPath = "/tmp/iden3-test/hermez/batchesdebug"

+ 5
- 5
common/ethwdelayer.go

@ -30,11 +30,11 @@ type WDelayerEscapeHatchWithdrawal struct {
type WDelayerVariables struct {
EthBlockNum int64 `json:"ethereumBlockNum" meddler:"eth_block_num"`
// HermezRollupAddress ethCommon.Address `json:"hermezRollupAddress" meddler:"rollup_address"`
HermezGovernanceAddress ethCommon.Address `json:"hermezGovernanceAddress" meddler:"gov_address" validate:"required"`
EmergencyCouncilAddress ethCommon.Address `json:"emergencyCouncilAddress" meddler:"emg_address" validate:"required"`
WithdrawalDelay uint64 `json:"withdrawalDelay" meddler:"withdrawal_delay" validate:"required"`
EmergencyModeStartingTime uint64 `json:"emergencyModeStartingTime" meddler:"emergency_start_time"`
EmergencyMode bool `json:"emergencyMode" meddler:"emergency_mode"`
HermezGovernanceAddress ethCommon.Address `json:"hermezGovernanceAddress" meddler:"gov_address" validate:"required"`
EmergencyCouncilAddress ethCommon.Address `json:"emergencyCouncilAddress" meddler:"emg_address" validate:"required"`
WithdrawalDelay uint64 `json:"withdrawalDelay" meddler:"withdrawal_delay" validate:"required"`
EmergencyModeStartingBlock int64 `json:"emergencyModeStartingBlock" meddler:"emergency_start_block"`
EmergencyMode bool `json:"emergencyMode" meddler:"emergency_mode"`
}
// Copy returns a deep copy of the Variables

+ 5
- 2
config/config.go

@ -15,7 +15,7 @@ import (
// Duration is a wrapper type that parses time duration from text.
type Duration struct {
time.Duration
time.Duration `validate:"required"`
}
// UnmarshalText unmarshalls time duration from text.
@ -46,7 +46,10 @@ type Coordinator struct {
// ProofServerPollInterval is the waiting interval between polling the
// ProofServer while waiting for a particular status
ProofServerPollInterval Duration `validate:"required"`
L2DB struct {
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval Duration `validate:"required"`
L2DB struct {
SafetyPeriod common.BatchNum `validate:"required"`
MaxTxs uint32 `validate:"required"`
TTL Duration `validate:"required"`

+ 78
- 55
coordinator/coordinator.go

@ -36,6 +36,9 @@ type Config struct {
// EthClientAttempts is the number of attempts to do an eth client RPC
// call before giving up
EthClientAttempts int
// SyncRetryInterval is the waiting interval between calls to the main
// handler of a synced block after an error
SyncRetryInterval time.Duration
// EthClientAttemptsDelay is delay between attempts do do an eth client
// RPC call
EthClientAttemptsDelay time.Duration
@ -64,6 +67,7 @@ type Coordinator struct {
provers []prover.Client
consts synchronizer.SCConsts
vars synchronizer.SCVariables
stats *synchronizer.Stats
started bool
cfg Config
@ -150,14 +154,9 @@ func (c *Coordinator) newPipeline(ctx context.Context) (*Pipeline, error) {
type MsgSyncBlock struct {
Stats synchronizer.Stats
Batches []common.BatchData
}
// MsgSyncSCVars indicates an update to Smart Contract Vars
// TODO: Move this to MsgSyncBlock and remove MsgSyncSCVars
type MsgSyncSCVars struct {
Rollup *common.RollupVariables
Auction *common.AuctionVariables
WDelayer *common.WDelayerVariables
// Vars contains each Smart Contract variables if they are updated, or
// nil if they haven't changed.
Vars synchronizer.SCVariablesPtr
}
// MsgSyncReorg indicates a reorg
@ -175,15 +174,15 @@ func (c *Coordinator) SendMsg(msg interface{}) {
c.msgCh <- msg
}
func (c *Coordinator) handleMsgSyncSCVars(msg *MsgSyncSCVars) {
if msg.Rollup != nil {
c.vars.Rollup = *msg.Rollup
func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
if vars.Rollup != nil {
c.vars.Rollup = *vars.Rollup
}
if msg.Auction != nil {
c.vars.Auction = *msg.Auction
if vars.Auction != nil {
c.vars.Auction = *vars.Auction
}
if msg.WDelayer != nil {
c.vars.WDelayer = *msg.WDelayer
if vars.WDelayer != nil {
c.vars.WDelayer = *vars.WDelayer
}
}
@ -200,12 +199,7 @@ func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
return false
}
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
stats := &msg.Stats
// batches := msg.Batches
if !stats.Synced() {
return nil
}
func (c *Coordinator) syncStats(ctx context.Context, stats *synchronizer.Stats) error {
c.txManager.SetLastBlock(stats.Eth.LastBlock.Num)
canForge := c.canForge(stats)
@ -260,6 +254,16 @@ func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock)
return nil
}
func (c *Coordinator) handleMsgSyncBlock(ctx context.Context, msg *MsgSyncBlock) error {
c.stats = &msg.Stats
// batches := msg.Batches
if !c.stats.Synced() {
return nil
}
c.syncSCVars(msg.Vars)
return c.syncStats(ctx, c.stats)
}
func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) error {
if c.pipeline != nil {
c.pipeline.Stop(c.ctx)
@ -271,6 +275,33 @@ func (c *Coordinator) handleStopPipeline(ctx context.Context, reason string) err
return nil
}
func (c *Coordinator) handleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case MsgSyncBlock:
if err := c.handleMsgSyncBlock(ctx, &msg); common.IsErrDone(err) {
return nil
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleMsgSyncBlock error: %w", err))
}
case MsgSyncReorg:
if err := c.handleReorg(ctx, &msg.Stats); common.IsErrDone(err) {
return nil
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleReorg error: %w", err))
}
case MsgStopPipeline:
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
if err := c.handleStopPipeline(ctx, msg.Reason); common.IsErrDone(err) {
return nil
} else if err != nil {
return tracerr.Wrap(fmt.Errorf("Coordinator.handleStopPipeline: %w", err))
}
default:
log.Fatalw("Coordinator Unexpected Coordinator msg of type %T: %+v", msg, msg)
}
return nil
}
// Start the coordinator
func (c *Coordinator) Start() {
if c.started {
@ -285,6 +316,7 @@ func (c *Coordinator) Start() {
c.wg.Add(1)
go func() {
waitDuration := time.Duration(longWaitDuration)
for {
select {
case <-c.ctx.Done():
@ -292,33 +324,23 @@ func (c *Coordinator) Start() {
c.wg.Done()
return
case msg := <-c.msgCh:
switch msg := msg.(type) {
case MsgSyncBlock:
if err := c.handleMsgSyncBlock(c.ctx, &msg); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleMsgSyncBlock error", "err", err)
continue
}
case MsgSyncReorg:
if err := c.handleReorg(c.ctx, &msg.Stats); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleReorg error", "err", err)
continue
}
case MsgStopPipeline:
log.Infow("Coordinator received MsgStopPipeline", "reason", msg.Reason)
if err := c.handleStopPipeline(c.ctx, msg.Reason); common.IsErrDone(err) {
continue
} else if err != nil {
log.Errorw("Coordinator.handleStopPipeline", "err", err)
}
case MsgSyncSCVars:
c.handleMsgSyncSCVars(&msg)
default:
log.Fatalw("Coordinator Unexpected Coordinator msg of type %T: %+v", msg, msg)
if err := c.handleMsg(c.ctx, msg); err != nil {
log.Errorw("Coordinator.handleMsg", "err", err)
waitDuration = time.Duration(c.cfg.SyncRetryInterval)
continue
}
waitDuration = time.Duration(longWaitDuration)
case <-time.After(waitDuration):
if c.stats == nil {
waitDuration = time.Duration(longWaitDuration)
continue
}
if err := c.syncStats(c.ctx, c.stats); err != nil {
log.Errorw("Coordinator.syncStats", "err", err)
waitDuration = time.Duration(c.cfg.SyncRetryInterval)
continue
}
waitDuration = time.Duration(longWaitDuration)
}
}
}()
@ -344,19 +366,20 @@ func (c *Coordinator) Stop() {
}
func (c *Coordinator) handleReorg(ctx context.Context, stats *synchronizer.Stats) error {
if common.BatchNum(stats.Sync.LastBatch) < c.pipelineBatchNum {
c.stats = stats
if common.BatchNum(c.stats.Sync.LastBatch) < c.pipelineBatchNum {
// There's been a reorg and the batch from which the pipeline
// was started was in a block that was discarded. The batch
// may not be in the main chain, so we stop the pipeline as a
// precaution (it will be started again once the node is in
// sync).
log.Infow("Coordinator.handleReorg StopPipeline sync.LastBatch < c.pipelineBatchNum",
"sync.LastBatch", stats.Sync.LastBatch,
"sync.LastBatch", c.stats.Sync.LastBatch,
"c.pipelineBatchNum", c.pipelineBatchNum)
if err := c.handleStopPipeline(ctx, "reorg"); err != nil {
return tracerr.Wrap(err)
}
if err := c.l2DB.Reorg(common.BatchNum(stats.Sync.LastBatch)); err != nil {
if err := c.l2DB.Reorg(common.BatchNum(c.stats.Sync.LastBatch)); err != nil {
return tracerr.Wrap(err)
}
}
@ -481,12 +504,12 @@ func (t *TxManager) handleReceipt(batchInfo *BatchInfo) (*int64, error) {
return nil, nil
}
const longWaitTime = 999 * time.Hour
const longWaitDuration = 999 * time.Hour
// Run the TxManager
func (t *TxManager) Run(ctx context.Context) {
next := 0
waitTime := time.Duration(longWaitTime)
waitDuration := time.Duration(longWaitDuration)
for {
select {
case <-ctx.Done():
@ -503,8 +526,8 @@ func (t *TxManager) Run(ctx context.Context) {
}
log.Debugf("ethClient ForgeCall sent, batchNum: %d", batchInfo.BatchNum)
t.queue = append(t.queue, batchInfo)
waitTime = t.cfg.TxManagerCheckInterval
case <-time.After(waitTime):
waitDuration = t.cfg.TxManagerCheckInterval
case <-time.After(waitDuration):
if len(t.queue) == 0 {
continue
}
@ -531,7 +554,7 @@ func (t *TxManager) Run(ctx context.Context) {
"batch", batchInfo.BatchNum)
t.queue = append(t.queue[:current], t.queue[current+1:]...)
if len(t.queue) == 0 {
waitTime = longWaitTime
waitDuration = longWaitDuration
next = 0
} else {
next = current % len(t.queue)

+ 1
- 1
db/migrations/0001.sql

@ -583,7 +583,7 @@ CREATE TABLE wdelayer_vars (
gov_address BYTEA NOT NULL,
emg_address BYTEA NOT NULL,
withdrawal_delay BIGINT NOT NULL,
emergency_start_time BIGINT NOT NULL,
emergency_start_block BIGINT NOT NULL,
emergency_mode BOOLEAN NOT NULL
);

+ 4
- 1
eth/auction.go

@ -651,7 +651,10 @@ func (c *AuctionClient) AuctionConstants() (auctionConstants *common.AuctionCons
return tracerr.Wrap(err)
}
auctionConstants.TokenHEZ, err = c.auction.TokenHEZ(c.opts)
return tracerr.Wrap(err)
if err != nil {
return tracerr.Wrap(err)
}
return nil
}); err != nil {
return nil, tracerr.Wrap(err)
}

+ 79
- 60
node/node.go

@ -127,12 +127,7 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
if err != nil {
return nil, tracerr.Wrap(err)
}
varsRollup, varsAuction, varsWDelayer := sync.SCVars()
initSCVars := synchronizer.SCVariables{
Rollup: *varsRollup,
Auction: *varsAuction,
WDelayer: *varsWDelayer,
}
initSCVars := sync.SCVars()
scConsts := synchronizer.SCConsts{
Rollup: *sync.RollupConstants(),
@ -174,6 +169,7 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
ForgerAddress: coordCfg.ForgerAddress,
ConfirmBlocks: coordCfg.ConfirmBlocks,
L1BatchTimeoutPerc: coordCfg.L1BatchTimeoutPerc,
SyncRetryInterval: coordCfg.SyncRetryInterval.Duration,
EthClientAttempts: coordCfg.EthClient.Attempts,
EthClientAttemptsDelay: coordCfg.EthClient.AttemptsDelay.Duration,
TxManagerCheckInterval: coordCfg.EthClient.CheckLoopInterval.Duration,
@ -192,7 +188,11 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
serverProofs,
client,
&scConsts,
&initSCVars,
&synchronizer.SCVariables{
Rollup: *initSCVars.Rollup,
Auction: *initSCVars.Auction,
WDelayer: *initSCVars.WDelayer,
},
)
if err != nil {
return nil, tracerr.Wrap(err)
@ -230,9 +230,9 @@ func NewNode(mode Mode, cfg *config.Node, coordCfg *config.Coordinator) (*Node,
if err != nil {
return nil, tracerr.Wrap(err)
}
nodeAPI.api.SetRollupVariables(initSCVars.Rollup)
nodeAPI.api.SetAuctionVariables(initSCVars.Auction)
nodeAPI.api.SetWDelayerVariables(initSCVars.WDelayer)
nodeAPI.api.SetRollupVariables(*initSCVars.Rollup)
nodeAPI.api.SetAuctionVariables(*initSCVars.Auction)
nodeAPI.api.SetWDelayerVariables(*initSCVars.WDelayer)
}
var debugAPI *debugapi.DebugAPI
if cfg.Debug.APIAddress != "" {
@ -326,6 +326,59 @@ func (a *NodeAPI) Run(ctx context.Context) error {
return nil
}
func (n *Node) handleNewBlock(stats *synchronizer.Stats, vars synchronizer.SCVariablesPtr,
batches []common.BatchData) {
if n.mode == ModeCoordinator {
n.coord.SendMsg(coordinator.MsgSyncBlock{
Stats: *stats,
Batches: batches,
Vars: synchronizer.SCVariablesPtr{
Rollup: vars.Rollup,
Auction: vars.Auction,
WDelayer: vars.WDelayer,
},
})
}
if n.nodeAPI != nil {
if vars.Rollup != nil {
n.nodeAPI.api.SetRollupVariables(*vars.Rollup)
}
if vars.Auction != nil {
n.nodeAPI.api.SetAuctionVariables(*vars.Auction)
}
if vars.WDelayer != nil {
n.nodeAPI.api.SetWDelayerVariables(*vars.WDelayer)
}
if stats.Synced() {
if err := n.nodeAPI.api.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatch),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("API.UpdateNetworkInfo", "err", err)
}
}
}
}
func (n *Node) handleReorg(stats *synchronizer.Stats) {
if n.mode == ModeCoordinator {
n.coord.SendMsg(coordinator.MsgSyncReorg{
Stats: *stats,
})
}
if n.nodeAPI != nil {
vars := n.sync.SCVars()
n.nodeAPI.api.SetRollupVariables(*vars.Rollup)
n.nodeAPI.api.SetAuctionVariables(*vars.Auction)
n.nodeAPI.api.SetWDelayerVariables(*vars.WDelayer)
n.nodeAPI.api.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
}
// TODO(Edu): Consider keeping the `lastBlock` inside synchronizer so that we
// don't have to pass it around.
func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration) {
@ -338,59 +391,15 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
} else if discarded != nil {
// case: reorg
log.Infow("Synchronizer.Sync reorg", "discarded", *discarded)
if n.mode == ModeCoordinator {
n.coord.SendMsg(coordinator.MsgSyncReorg{
Stats: *stats,
})
}
if n.nodeAPI != nil {
rollup, auction, wDelayer := n.sync.SCVars()
n.nodeAPI.api.SetRollupVariables(*rollup)
n.nodeAPI.api.SetAuctionVariables(*auction)
n.nodeAPI.api.SetWDelayerVariables(*wDelayer)
n.nodeAPI.api.UpdateNetworkInfoBlock(
stats.Eth.LastBlock, stats.Sync.LastBlock,
)
}
n.handleReorg(stats)
return nil, time.Duration(0)
} else if blockData != nil {
// case: new block
if n.mode == ModeCoordinator {
if stats.Synced() && (blockData.Rollup.Vars != nil ||
blockData.Auction.Vars != nil ||
blockData.WDelayer.Vars != nil) {
n.coord.SendMsg(coordinator.MsgSyncSCVars{
Rollup: blockData.Rollup.Vars,
Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars,
})
}
n.coord.SendMsg(coordinator.MsgSyncBlock{
Stats: *stats,
Batches: blockData.Rollup.Batches,
})
}
if n.nodeAPI != nil {
if blockData.Rollup.Vars != nil {
n.nodeAPI.api.SetRollupVariables(*blockData.Rollup.Vars)
}
if blockData.Auction.Vars != nil {
n.nodeAPI.api.SetAuctionVariables(*blockData.Auction.Vars)
}
if blockData.WDelayer.Vars != nil {
n.nodeAPI.api.SetWDelayerVariables(*blockData.WDelayer.Vars)
}
if stats.Synced() {
if err := n.nodeAPI.api.UpdateNetworkInfo(
stats.Eth.LastBlock, stats.Sync.LastBlock,
common.BatchNum(stats.Eth.LastBatch),
stats.Sync.Auction.CurrentSlot.SlotNum,
); err != nil {
log.Errorw("API.UpdateNetworkInfo", "err", err)
}
}
}
n.handleNewBlock(stats, synchronizer.SCVariablesPtr{
Rollup: blockData.Rollup.Vars,
Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars,
}, blockData.Rollup.Batches)
return &blockData.Block, time.Duration(0)
} else {
// case: no block
@ -401,6 +410,16 @@ func (n *Node) syncLoopFn(lastBlock *common.Block) (*common.Block, time.Duration
// StartSynchronizer starts the synchronizer
func (n *Node) StartSynchronizer() {
log.Info("Starting Synchronizer...")
// Trigger a manual call to handleNewBlock with the loaded state of the
// synchronizer in order to quickly activate the API and Coordinator
// and avoid waiting for the next block. Without this, the API and
// Coordinator will not react until the following block (starting from
// the last synced one) is synchronized
stats := n.sync.Stats()
vars := n.sync.SCVars()
n.handleNewBlock(stats, vars, []common.BatchData{})
n.wg.Add(1)
go func() {
var lastBlock *common.Block

+ 39
- 77
synchronizer/synchronizer.go

@ -67,7 +67,7 @@ type Stats struct {
// Synced returns true if the Synchronizer is up to date with the last ethereum block
func (s *Stats) Synced() bool {
return s.Eth.LastBlock == s.Sync.LastBlock
return s.Eth.LastBlock.Num == s.Sync.LastBlock.Num
}
// TODO(Edu): Consider removing all the mutexes from StatsHolder, make
@ -185,6 +185,14 @@ type SCVariables struct {
WDelayer common.WDelayerVariables `validate:"required"`
}
// SCVariablesPtr joins all the smart contract variables as pointers in a single
// struct
type SCVariablesPtr struct {
Rollup *common.RollupVariables `validate:"required"`
Auction *common.AuctionVariables `validate:"required"`
WDelayer *common.WDelayerVariables `validate:"required"`
}
// SCConsts joins all the smart contract constants in a single struct
type SCConsts struct {
Rollup common.RollupConstants
@ -221,27 +229,27 @@ func NewSynchronizer(ethClient eth.ClientInterface, historyDB *historydb.History
stateDB *statedb.StateDB, cfg Config) (*Synchronizer, error) {
auctionConstants, err := ethClient.AuctionConstants()
if err != nil {
log.Errorw("NewSynchronizer ethClient.AuctionConstants()", "err", err)
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("NewSynchronizer ethClient.AuctionConstants(): %w",
err))
}
rollupConstants, err := ethClient.RollupConstants()
if err != nil {
log.Errorw("NewSynchronizer ethClient.RollupConstants()", "err", err)
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("NewSynchronizer ethClient.RollupConstants(): %w",
err))
}
wDelayerConstants, err := ethClient.WDelayerConstants()
if err != nil {
log.Errorw("NewSynchronizer ethClient.WDelayerConstants()", "err", err)
return nil, tracerr.Wrap(err)
return nil, tracerr.Wrap(fmt.Errorf("NewSynchronizer ethClient.WDelayerConstants(): %w",
err))
}
// Set startBlockNum to the minimum between Auction, Rollup and
// WDelayer StartBlockNum
startBlockNum := cfg.StartBlockNum.Auction
if startBlockNum < cfg.StartBlockNum.Rollup {
if cfg.StartBlockNum.Rollup < startBlockNum {
startBlockNum = cfg.StartBlockNum.Rollup
}
if startBlockNum < cfg.StartBlockNum.WDelayer {
if cfg.StartBlockNum.WDelayer < startBlockNum {
startBlockNum = cfg.StartBlockNum.WDelayer
}
stats := NewStatsHolder(startBlockNum, cfg.StatsRefreshPeriod)
@ -283,8 +291,12 @@ func (s *Synchronizer) WDelayerConstants() *common.WDelayerConstants {
}
// SCVars returns a copy of the Smart Contract Variables
func (s *Synchronizer) SCVars() (*common.RollupVariables, *common.AuctionVariables, *common.WDelayerVariables) {
return s.vars.Rollup.Copy(), s.vars.Auction.Copy(), s.vars.WDelayer.Copy()
func (s *Synchronizer) SCVars() SCVariablesPtr {
return SCVariablesPtr{
Rollup: s.vars.Rollup.Copy(),
Auction: s.vars.Auction.Copy(),
WDelayer: s.vars.WDelayer.Copy(),
}
}
func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
@ -297,17 +309,13 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
slotNum := s.consts.Auction.SlotNum(blockNum)
if batchesLen == -1 {
dbBatchesLen, err := s.historyDB.GetBatchesLen(slotNum)
// fmt.Printf("DBG -1 from: %v, to: %v, len: %v\n", from, to, dbBatchesLen)
if err != nil {
log.Errorw("historyDB.GetBatchesLen", "err", err)
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("historyDB.GetBatchesLen: %w", err))
}
slot.BatchesLen = dbBatchesLen
} else if slotNum > slot.SlotNum {
// fmt.Printf("DBG batchesLen Reset len: %v (%v %v)\n", batchesLen, slotNum, slot.SlotNum)
slot.BatchesLen = batchesLen
} else {
// fmt.Printf("DBG batchesLen add len: %v: %v\n", batchesLen, slot.BatchesLen+batchesLen)
slot.BatchesLen += batchesLen
}
slot.SlotNum = slotNum
@ -321,7 +329,7 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
if tracerr.Unwrap(err) == sql.ErrNoRows {
slot.BootCoord = true
slot.Forger = s.vars.Auction.BootCoordinator
slot.URL = "???"
slot.URL = s.vars.Auction.BootCoordinatorURL
} else if err == nil {
slot.BidValue = bidCoord.BidValue
slot.DefaultSlotBid = bidCoord.DefaultSlotSetBid[slot.SlotNum%6]
@ -335,7 +343,7 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
} else {
slot.BootCoord = true
slot.Forger = s.vars.Auction.BootCoordinator
slot.URL = "???"
slot.URL = s.vars.Auction.BootCoordinatorURL
}
}
@ -417,6 +425,11 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
}
if lastSavedBlock != nil {
nextBlockNum = lastSavedBlock.Num + 1
if lastSavedBlock.Num < s.startBlockNum {
return nil, nil, tracerr.Wrap(
fmt.Errorf("lastSavedBlock (%v) < startBlockNum (%v)",
lastSavedBlock.Num, s.startBlockNum))
}
}
ethBlock, err := s.ethClient.EthBlockByNumber(ctx, nextBlockNum)
@ -554,14 +567,12 @@ func (s *Synchronizer) reorg(uncleBlock *common.Block) (int64, error) {
for blockNum >= s.startBlockNum {
ethBlock, err := s.ethClient.EthBlockByNumber(context.Background(), blockNum)
if err != nil {
log.Errorw("ethClient.EthBlockByNumber", "err", err)
return 0, tracerr.Wrap(err)
return 0, tracerr.Wrap(fmt.Errorf("ethClient.EthBlockByNumber: %w", err))
}
block, err = s.historyDB.GetBlock(blockNum)
if err != nil {
log.Errorw("historyDB.GetBlock", "err", err)
return 0, tracerr.Wrap(err)
return 0, tracerr.Wrap(fmt.Errorf("historyDB.GetBlock: %w", err))
}
if block.Hash == ethBlock.Hash {
log.Debugf("Found valid block: %v", blockNum)
@ -595,8 +606,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
wDelayer = &s.cfg.InitialVariables.WDelayer
log.Info("Setting initial SCVars in HistoryDB")
if err = s.historyDB.SetInitialSCVars(rollup, auction, wDelayer); err != nil {
log.Errorw("historyDB.SetInitialSCVars", "err", err)
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("historyDB.SetInitialSCVars: %w", err))
}
}
s.vars.Rollup = *rollup
@ -605,8 +615,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
batchNum, err := s.historyDB.GetLastBatchNum()
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
log.Errorw("historyDB.GetLastBatchNum", "err", err)
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastBatchNum: %w", err))
}
if tracerr.Unwrap(err) == sql.ErrNoRows {
batchNum = 0
@ -614,8 +623,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
lastL1BatchBlockNum, err := s.historyDB.GetLastL1BatchBlockNum()
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
log.Errorw("historyDB.GetLastL1BatchBlockNum", "err", err)
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastL1BatchBlockNum: %w", err))
}
if tracerr.Unwrap(err) == sql.ErrNoRows {
lastL1BatchBlockNum = 0
@ -623,8 +631,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
lastForgeL1TxsNum, err := s.historyDB.GetLastL1TxsNum()
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
log.Errorw("historyDB.GetLastL1BatchBlockNum", "err", err)
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("historyDB.GetLastL1BatchBlockNum: %w", err))
}
if tracerr.Unwrap(err) == sql.ErrNoRows || lastForgeL1TxsNum == nil {
n := int64(-1)
@ -633,8 +640,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
err = s.stateDB.Reset(batchNum)
if err != nil {
log.Errorw("stateDB.Reset", "err", err)
return tracerr.Wrap(err)
return tracerr.Wrap(fmt.Errorf("stateDB.Reset: %w", err))
}
s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum)
@ -645,51 +651,6 @@ func (s *Synchronizer) resetState(block *common.Block) error {
return nil
}
// TODO: Figure out who will use the Status output, and only return what's strictly need
/*
// Status returns current status values from the Synchronizer
func (s *Synchronizer) Status() (*common.SyncStatus, error) {
// Avoid possible inconsistencies
s.mux.Lock()
defer s.mux.Unlock()
var status *common.SyncStatus
// TODO: Join all queries to the DB into a single transaction so that
// we can remove the mutex locking here:
// - HistoryDB.GetLastBlock
// - HistoryDB.GetLastBatchNum
// - HistoryDB.GetCurrentForgerAddr
// - HistoryDB.GetNextForgerAddr
// Get latest block in History DB
lastSavedBlock, err := s.historyDB.GetLastBlock()
if err != nil {
return nil, err
}
status.CurrentBlock = lastSavedBlock.EthBlockNum
// Get latest batch in History DB
lastSavedBatch, err := s.historyDB.GetLastBatchNum()
if err != nil && err != sql.ErrNoRows {
return nil, err
}
status.CurrentBatch = lastSavedBatch
// Get latest blockNum in blockchain
latestBlockNum, err := s.ethClient.EthLastBlock()
if err != nil {
return nil, err
}
// TODO: Get CurrentForgerAddr & NextForgerAddr from the Auction SC / Or from the HistoryDB
// Check if Synchronizer is synchronized
status.Synchronized = status.CurrentBlock == latestBlockNum
return status, nil
}
*/
// rollupSync retreives all the Rollup Smart Contract Data that happened at
// ethBlock.blockNum with ethBlock.Hash.
func (s *Synchronizer) rollupSync(ethBlock *common.Block) (*common.RollupData, error) {
@ -1130,6 +1091,7 @@ func (s *Synchronizer) wdelayerSync(ethBlock *common.Block) (*common.WDelayerDat
for range wDelayerEvents.EmergencyModeEnabled {
s.vars.WDelayer.EmergencyMode = true
s.vars.WDelayer.EmergencyModeStartingBlock = blockNum
varsUpdate = true
}
for _, evt := range wDelayerEvents.NewWithdrawalDelay {

+ 5
- 10
synchronizer/synchronizer_test.go

@ -354,11 +354,6 @@ func TestSync(t *testing.T) {
//
// First Sync from an initial state
//
var vars struct {
Rollup *common.RollupVariables
Auction *common.AuctionVariables
WDelayer *common.WDelayerVariables
}
stats := s.Stats()
assert.Equal(t, false, stats.Synced())
@ -375,7 +370,7 @@ func TestSync(t *testing.T) {
assert.Equal(t, int64(1), stats.Eth.FirstBlockNum)
assert.Equal(t, int64(1), stats.Eth.LastBlock.Num)
assert.Equal(t, int64(1), stats.Sync.LastBlock.Num)
vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars()
vars := s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
@ -524,7 +519,7 @@ func TestSync(t *testing.T) {
assert.Equal(t, int64(1), stats.Eth.FirstBlockNum)
assert.Equal(t, int64(4), stats.Eth.LastBlock.Num)
assert.Equal(t, int64(4), stats.Sync.LastBlock.Num)
vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars()
vars = s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
@ -575,7 +570,7 @@ func TestSync(t *testing.T) {
assert.Equal(t, int64(1), stats.Eth.FirstBlockNum)
assert.Equal(t, int64(5), stats.Eth.LastBlock.Num)
assert.Equal(t, int64(5), stats.Sync.LastBlock.Num)
vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars()
vars = s.SCVars()
assert.NotEqual(t, clientSetup.RollupVariables, vars.Rollup)
assert.NotEqual(t, clientSetup.AuctionVariables, vars.Auction)
assert.NotEqual(t, clientSetup.WDelayerVariables, vars.WDelayer)
@ -649,7 +644,7 @@ func TestSync(t *testing.T) {
stats = s.Stats()
assert.Equal(t, false, stats.Synced())
assert.Equal(t, int64(6), stats.Eth.LastBlock.Num)
vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars()
vars = s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)
@ -688,7 +683,7 @@ func TestSync(t *testing.T) {
assert.Equal(t, false, stats.Synced())
}
vars.Rollup, vars.Auction, vars.WDelayer = s.SCVars()
vars = s.SCVars()
assert.Equal(t, clientSetup.RollupVariables, vars.Rollup)
assert.Equal(t, clientSetup.AuctionVariables, vars.Auction)
assert.Equal(t, clientSetup.WDelayerVariables, vars.WDelayer)

+ 5
- 5
test/ethclient.go

@ -333,11 +333,11 @@ func NewClientSetupExample() *ClientSetup {
HermezRollup: auctionConstants.HermezRollup,
}
wDelayerVariables := &common.WDelayerVariables{
HermezGovernanceAddress: ethCommon.HexToAddress("0xcfD0d163AE6432a72682323E2C3A5a69e6B37D12"),
EmergencyCouncilAddress: ethCommon.HexToAddress("0x2730700932a4FDB97B9268A3Ca29f97Ea5fd7EA0"),
WithdrawalDelay: 60,
EmergencyModeStartingTime: 0,
EmergencyMode: false,
HermezGovernanceAddress: ethCommon.HexToAddress("0xcfD0d163AE6432a72682323E2C3A5a69e6B37D12"),
EmergencyCouncilAddress: ethCommon.HexToAddress("0x2730700932a4FDB97B9268A3Ca29f97Ea5fd7EA0"),
WithdrawalDelay: 60,
EmergencyModeStartingBlock: 0,
EmergencyMode: false,
}
return &ClientSetup{
RollupConstants: rollupConstants,

Loading…
Cancel
Save