Browse Source

Sync ForgerCommitment and use it in coord

Previously the coordinator was erroneously using Slot.BatchesLen to determine
when anyone can forge.  The correct behaviour is implmenented with the boolean
flag `ForgerCommitment`, that is set to true only when there's a batch before
the deadline within a slot.

Delete Slot.BatchesLen, and the synchronization code of this value from the
Synchronizer, as this is not needed
feature/sql-semaphore1
Eduard S 3 years ago
parent
commit
8267d007c9
11 changed files with 303 additions and 55 deletions
  1. +8
    -7
      common/bid.go
  2. +1
    -1
      coordinator/batch.go
  3. +4
    -3
      coordinator/coordinator.go
  4. +8
    -1
      coordinator/coordinator_test.go
  5. +9
    -5
      db/historydb/historydb.go
  6. +64
    -0
      db/historydb/historydb_test.go
  7. +20
    -2
      prover/prover.go
  8. +29
    -27
      synchronizer/synchronizer.go
  9. +153
    -7
      synchronizer/synchronizer_test.go
  10. +6
    -1
      test/ethclient.go
  11. +1
    -1
      test/proofserver/proofserver.go

+ 8
- 7
common/bid.go

@ -26,13 +26,14 @@ type BidCoordinator struct {
// Slot contains relevant information of a slot
type Slot struct {
SlotNum int64
DefaultSlotBid *big.Int
StartBlock int64
EndBlock int64
BatchesLen int
BidValue *big.Int
BootCoord bool
SlotNum int64
DefaultSlotBid *big.Int
StartBlock int64
EndBlock int64
ForgerCommitment bool
// BatchesLen int
BidValue *big.Int
BootCoord bool
// Bidder, Forer and URL correspond to the winner of the slot (which is
// not always the highest bidder). These are the values of the
// coordinator that is able to forge exclusively before the deadline.

+ 1
- 1
coordinator/batch.go

@ -47,7 +47,7 @@ type BatchInfo struct {
// DebugStore is a debug function to store the BatchInfo as a json text file in
// storePath
func (b *BatchInfo) DebugStore(storePath string) error {
batchJSON, err := json.Marshal(b)
batchJSON, err := json.MarshalIndent(b, "", " ")
if err != nil {
return tracerr.Wrap(err)
}

+ 4
- 3
coordinator/coordinator.go

@ -190,9 +190,10 @@ func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
anyoneForge := false
if stats.Sync.Auction.CurrentSlot.BatchesLen == 0 &&
c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) > int64(c.vars.Auction.SlotDeadline) {
log.Debug("Coordinator: anyone can forge in the current slot (slotDeadline passed)")
if !stats.Sync.Auction.CurrentSlot.ForgerCommitment &&
c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) >= int64(c.vars.Auction.SlotDeadline) {
log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)",
"block", stats.Eth.LastBlock.Num)
anyoneForge = true
}
if stats.Sync.Auction.CurrentSlot.Forger == c.cfg.ForgerAddress || anyoneForge {

+ 8
- 1
coordinator/coordinator_test.go

@ -213,6 +213,7 @@ func TestCoordinatorFlow(t *testing.T) {
return
}
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
@ -294,6 +295,7 @@ func TestCoordinatorFlow(t *testing.T) {
func TestCoordinatorStartStop(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
@ -304,6 +306,7 @@ func TestCoordinatorStartStop(t *testing.T) {
func TestCoordCanForge(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
bootForger := ethClientSetup.AuctionVariables.BootCoordinator
var timer timer
@ -356,6 +359,7 @@ func TestCoordCanForge(t *testing.T) {
func TestCoordHandleMsgSyncBlock(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
bootForger := ethClientSetup.AuctionVariables.BootCoordinator
var timer timer
@ -417,6 +421,7 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) {
func TestPipelineShouldL1L2Batch(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
@ -562,6 +567,7 @@ func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchr
func TestPipeline1(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
@ -646,6 +652,7 @@ func TestCoordinatorStress(t *testing.T) {
}
log.Info("Begin Test Coord Stress")
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
@ -691,7 +698,7 @@ func TestCoordinatorStress(t *testing.T) {
case <-ctx.Done():
wg.Done()
return
case <-time.After(100 * time.Millisecond):
case <-time.After(1 * time.Second):
ethClient.CtlMineBlock()
}
}

+ 9
- 5
db/historydb/historydb.go

@ -293,11 +293,15 @@ func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]common.Batch, erro
return db.SlicePtrsToSlice(batches).([]common.Batch), tracerr.Wrap(err)
}
// GetBatchesLen retrieve number of batches from the DB, given a slotNum
func (hdb *HistoryDB) GetBatchesLen(slotNum int64) (int, error) {
row := hdb.db.QueryRow("SELECT COUNT(*) FROM batch WHERE slot_num = $1;", slotNum)
var batchesLen int
return batchesLen, tracerr.Wrap(row.Scan(&batchesLen))
// GetFirstBatchBlockNumBySlot returns the ethereum block number of the first
// batch within a slot
func (hdb *HistoryDB) GetFirstBatchBlockNumBySlot(slotNum int64) (int64, error) {
row := hdb.db.QueryRow(
`SELECT eth_block_num FROM batch
WHERE slot_num = $1 ORDER BY batch_num ASC LIMIT 1;`, slotNum,
)
var blockNum int64
return blockNum, tracerr.Wrap(row.Scan(&blockNum))
}
// GetLastBatchNum returns the BatchNum of the latest forged batch

+ 64
- 0
db/historydb/historydb_test.go

@ -1051,6 +1051,70 @@ func TestGetLastTxsPosition(t *testing.T) {
assert.Equal(t, sql.ErrNoRows.Error(), err.Error())
}
func TestGetFirstBatchBlockNumBySlot(t *testing.T) {
test.WipeDB(historyDB.DB())
set := `
Type: Blockchain
// Slot = 0
> block // 2
> block // 3
> block // 4
> block // 5
// Slot = 1
> block // 6
> block // 7
> batch
> block // 8
> block // 9
// Slot = 2
> batch
> block // 10
> block // 11
> block // 12
> block // 13
`
tc := til.NewContext(uint16(0), common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocks(set)
assert.NoError(t, err)
tilCfgExtra := til.ConfigExtra{
CoordUser: "A",
}
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
require.NoError(t, err)
for i := range blocks {
for j := range blocks[i].Rollup.Batches {
blocks[i].Rollup.Batches[j].Batch.SlotNum = int64(i) / 4
}
}
// Add all blocks
for i := range blocks {
err = historyDB.AddBlockSCData(&blocks[i])
require.NoError(t, err)
}
_, err = historyDB.GetFirstBatchBlockNumBySlot(0)
require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err))
bn1, err := historyDB.GetFirstBatchBlockNumBySlot(1)
require.NoError(t, err)
assert.Equal(t, int64(8), bn1)
bn2, err := historyDB.GetFirstBatchBlockNumBySlot(2)
require.NoError(t, err)
assert.Equal(t, int64(10), bn2)
}
// setTestBlocks WARNING: this will delete the blocks and recreate them
func setTestBlocks(from, to int64) []common.Block {
test.WipeDB(historyDB.DB())

+ 20
- 2
prover/prover.go

@ -288,7 +288,8 @@ func (p *ProofServerClient) WaitReady(ctx context.Context) error {
// MockClient is a mock ServerProof to be used in tests. It doesn't calculate anything
type MockClient struct {
Delay time.Duration
counter int64
Delay time.Duration
}
// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the
@ -302,7 +303,24 @@ func (p *MockClient) GetProof(ctx context.Context) (*Proof, []*big.Int, error) {
// Simulate a delay
select {
case <-time.After(p.Delay): //nolint:gomnd
return &Proof{}, []*big.Int{big.NewInt(1234)}, nil //nolint:gomnd
i := p.counter * 100 //nolint:gomnd
p.counter++
return &Proof{
PiA: [3]*big.Int{
big.NewInt(i), big.NewInt(i + 1), big.NewInt(1), //nolint:gomnd
},
PiB: [3][2]*big.Int{
{big.NewInt(i + 2), big.NewInt(i + 3)}, //nolint:gomnd
{big.NewInt(i + 4), big.NewInt(i + 5)}, //nolint:gomnd
{big.NewInt(1), big.NewInt(0)}, //nolint:gomnd
},
PiC: [3]*big.Int{
big.NewInt(i + 6), big.NewInt(i + 7), big.NewInt(1), //nolint:gomnd
},
Protocol: "groth",
},
[]*big.Int{big.NewInt(i + 42)}, //nolint:gomnd
nil
case <-ctx.Done():
return nil, nil, tracerr.Wrap(common.ErrDone)
}

+ 29
- 27
synchronizer/synchronizer.go

@ -177,17 +177,12 @@ type SCConsts struct {
// Config is the Synchronizer configuration
type Config struct {
// StartBlockNum StartBlockNum
// InitialVariables SCVariables
StatsRefreshPeriod time.Duration
}
// Synchronizer implements the Synchronizer type
type Synchronizer struct {
ethClient eth.ClientInterface
// auctionConstants common.AuctionConstants
// rollupConstants common.RollupConstants
// wDelayerConstants common.WDelayerConstants
ethClient eth.ClientInterface
consts SCConsts
historyDB *historydb.HistoryDB
stateDB *statedb.StateDB
@ -196,8 +191,6 @@ type Synchronizer struct {
startBlockNum int64
vars SCVariables
stats *StatsHolder
// firstSavedBlock *common.Block
// mux sync.Mutex
}
// NewSynchronizer creates a new Synchronizer
@ -286,24 +279,28 @@ func (s *Synchronizer) SCVars() SCVariablesPtr {
}
}
func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
// firstBatchBlockNum is the blockNum of first batch in that block, if any
func (s *Synchronizer) updateCurrentSlotIfSync(reset bool, firstBatchBlockNum *int64) error {
slot := common.Slot{
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
BatchesLen: int(s.stats.Sync.Auction.CurrentSlot.BatchesLen),
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment,
}
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock.Num + 1
slotNum := s.consts.Auction.SlotNum(blockNum)
if batchesLen == -1 {
dbBatchesLen, err := s.historyDB.GetBatchesLen(slotNum)
if err != nil {
return tracerr.Wrap(fmt.Errorf("historyDB.GetBatchesLen: %w", err))
if reset {
dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum)
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err))
} else if tracerr.Unwrap(err) == sql.ErrNoRows {
firstBatchBlockNum = nil
} else {
firstBatchBlockNum = &dbFirstBatchBlockNum
}
slot.BatchesLen = dbBatchesLen
slot.ForgerCommitment = false
} else if slotNum > slot.SlotNum {
slot.BatchesLen = batchesLen
} else {
slot.BatchesLen += batchesLen
// We are in a new slotNum, start from default values
slot.ForgerCommitment = false
}
slot.SlotNum = slotNum
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
@ -333,6 +330,11 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
slot.URL = s.vars.Auction.BootCoordinatorURL
}
}
if firstBatchBlockNum != nil &&
s.consts.Auction.RelativeBlock(*firstBatchBlockNum) <
int64(s.vars.Auction.SlotDeadline) {
slot.ForgerCommitment = true
}
// TODO: Remove this SANITY CHECK once this code is tested enough
// BEGIN SANITY CHECK
@ -499,11 +501,6 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
WDelayer: *wDelayerData,
}
// log.Debugw("Sync()", "block", blockData)
// err = s.historyDB.AddBlock(blockData.Block)
// if err != nil {
// return err
// }
err = s.historyDB.AddBlockSCData(&blockData)
if err != nil {
return nil, nil, tracerr.Wrap(err)
@ -522,9 +519,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
}
}
s.stats.UpdateSync(ethBlock,
&rollupData.Batches[batchesLen-1].Batch.BatchNum, lastL1BatchBlock, lastForgeL1TxsNum)
&rollupData.Batches[batchesLen-1].Batch.BatchNum,
lastL1BatchBlock, lastForgeL1TxsNum)
}
var firstBatchBlockNum *int64
if len(rollupData.Batches) > 0 {
firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum
}
if err := s.updateCurrentSlotIfSync(len(rollupData.Batches)); err != nil {
if err := s.updateCurrentSlotIfSync(false, firstBatchBlockNum); err != nil {
return nil, nil, tracerr.Wrap(err)
}
@ -674,7 +676,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum)
if err := s.updateCurrentSlotIfSync(-1); err != nil {
if err := s.updateCurrentSlotIfSync(true, nil); err != nil {
return tracerr.Wrap(err)
}
return nil

+ 153
- 7
synchronizer/synchronizer_test.go

@ -271,18 +271,25 @@ func ethAddTokens(blocks []common.BlockData, client *test.Client) {
}
}
func TestSync(t *testing.T) {
//
// Setup
//
var chainID uint16 = 0
var deleteme = []string{}
func TestMain(m *testing.M) {
exitVal := m.Run()
for _, dir := range deleteme {
if err := os.RemoveAll(dir); err != nil {
panic(err)
}
}
os.Exit(exitVal)
}
ctx := context.Background()
func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) {
// Int State DB
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.Nil(t, os.RemoveAll(dir))
deleteme = append(deleteme, dir)
chainID := uint16(0)
stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID)
require.NoError(t, err)
@ -294,9 +301,20 @@ func TestSync(t *testing.T) {
// Clear DB
test.WipeDB(historyDB.DB())
return stateDB, historyDB
}
func TestSyncGeneral(t *testing.T) {
//
// Setup
//
stateDB, historyDB := newTestModules(t)
// Init eth client
var timer timer
clientSetup := test.NewClientSetupExample()
clientSetup.ChainID = big.NewInt(int64(chainID))
bootCoordAddr := clientSetup.AuctionVariables.BootCoordinator
client := test.NewClient(true, &timer, &ethCommon.Address{}, clientSetup)
@ -306,6 +324,8 @@ func TestSync(t *testing.T) {
})
require.NoError(t, err)
ctx := context.Background()
//
// First Sync from an initial state
//
@ -659,3 +679,129 @@ func TestSync(t *testing.T) {
assert.Equal(t, 2, len(dbAccounts))
assertEqualAccountsHistoryDBStateDB(t, dbAccounts, sdbAccounts)
}
func TestSyncForgerCommitment(t *testing.T) {
stateDB, historyDB := newTestModules(t)
// Init eth client
var timer timer
clientSetup := test.NewClientSetupExample()
clientSetup.ChainID = big.NewInt(int64(chainID))
clientSetup.AuctionConstants.GenesisBlockNum = 2
clientSetup.AuctionConstants.BlocksPerSlot = 4
clientSetup.AuctionVariables.SlotDeadline = 2
bootCoordAddr := clientSetup.AuctionVariables.BootCoordinator
client := test.NewClient(true, &timer, &ethCommon.Address{}, clientSetup)
// Create Synchronizer
s, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
})
require.NoError(t, err)
ctx := context.Background()
set := `
Type: Blockchain
// Slot = 0
> block // 2
> block // 3
> block // 4
> block // 5
// Slot = 1
> block // 6
> batch
> block // 7
> block // 8
> block // 9
// Slot = 2
> block // 10
> block // 11
> batch
> block // 12
> block // 13
`
// For each block, true when the slot that belongs to the following
// block has forgerCommitment
commitment := map[int64]bool{
2: false,
3: false,
4: false,
5: false,
6: false,
7: true,
8: true,
9: false,
10: false,
11: false,
12: false,
13: false,
}
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocks(set)
assert.NoError(t, err)
tilCfgExtra := til.ConfigExtra{
BootCoordAddr: bootCoordAddr,
CoordUser: "A",
}
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
require.NoError(t, err)
// for i := range blocks {
// for j := range blocks[i].Rollup.Batches {
// blocks[i].Rollup.Batches[j].Batch.SlotNum = int64(i) / 4
// }
// }
// be in sync
for {
syncBlock, discards, err := s.Sync2(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
break
}
}
stats := s.Stats()
require.Equal(t, int64(1), stats.Sync.LastBlock.Num)
// Store ForgerComitmnent observed at every block by the live synchronizer
syncCommitment := map[int64]bool{}
// Store ForgerComitmnent observed at every block by a syncrhonizer that is restarted
syncRestartedCommitment := map[int64]bool{}
for _, block := range blocks {
// Add block data to the smart contracts
err = client.CtlAddBlocks([]common.BlockData{block})
require.NoError(t, err)
syncBlock, discards, err := s.Sync2(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
break
}
stats := s.Stats()
require.True(t, stats.Synced())
syncCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment
s2, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
})
require.NoError(t, err)
stats = s2.Stats()
require.True(t, stats.Synced())
syncRestartedCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment
}
assert.Equal(t, commitment, syncCommitment)
assert.Equal(t, commitment, syncRestartedCommitment)
}

+ 6
- 1
test/ethclient.go

@ -399,6 +399,8 @@ type Client struct {
forgeBatchArgsPending map[ethCommon.Hash]*batch
forgeBatchArgs map[ethCommon.Hash]*batch
startBlock int64
}
// NewClient returns a new test Client that implements the eth.IClient
@ -480,7 +482,10 @@ func NewClient(l bool, timer Timer, addr *ethCommon.Address, setup *ClientSetup)
maxBlockNum: blockNum,
}
for i := int64(1); i < setup.AuctionConstants.GenesisBlockNum+1; i++ {
if c.startBlock == 0 {
c.startBlock = 2
}
for i := int64(1); i < c.startBlock; i++ {
c.CtlMineBlock()
}

+ 1
- 1
test/proofserver/proofserver.go

@ -180,7 +180,7 @@ func (s *Mock) runProver(ctx context.Context) {
"pi_a": ["%v", "%v", "1"],
"pi_b": [["%v", "%v"],["%v", "%v"],["1", "0"]],
"pi_c": ["%v", "%v", "1"],
"protocol": "groth16"
"protocol": "groth"
}`, i, i+1, i+2, i+3, i+4, i+5, i+6, i+7) //nolint:gomnd
s.pubData = fmt.Sprintf(`[
"%v"

Loading…
Cancel
Save