From 8267d007c9732d6dbfbeddc3ac3cec25e800a7be Mon Sep 17 00:00:00 2001 From: Eduard S Date: Wed, 23 Dec 2020 14:37:30 +0100 Subject: [PATCH] Sync ForgerCommitment and use it in coord Previously the coordinator was erroneously using Slot.BatchesLen to determine when anyone can forge. The correct behaviour is implmenented with the boolean flag `ForgerCommitment`, that is set to true only when there's a batch before the deadline within a slot. Delete Slot.BatchesLen, and the synchronization code of this value from the Synchronizer, as this is not needed --- common/bid.go | 15 +-- coordinator/batch.go | 2 +- coordinator/coordinator.go | 7 +- coordinator/coordinator_test.go | 9 +- db/historydb/historydb.go | 14 ++- db/historydb/historydb_test.go | 64 ++++++++++++ prover/prover.go | 22 +++- synchronizer/synchronizer.go | 56 ++++++----- synchronizer/synchronizer_test.go | 160 ++++++++++++++++++++++++++++-- test/ethclient.go | 7 +- test/proofserver/proofserver.go | 2 +- 11 files changed, 303 insertions(+), 55 deletions(-) diff --git a/common/bid.go b/common/bid.go index b006c28..e756d91 100644 --- a/common/bid.go +++ b/common/bid.go @@ -26,13 +26,14 @@ type BidCoordinator struct { // Slot contains relevant information of a slot type Slot struct { - SlotNum int64 - DefaultSlotBid *big.Int - StartBlock int64 - EndBlock int64 - BatchesLen int - BidValue *big.Int - BootCoord bool + SlotNum int64 + DefaultSlotBid *big.Int + StartBlock int64 + EndBlock int64 + ForgerCommitment bool + // BatchesLen int + BidValue *big.Int + BootCoord bool // Bidder, Forer and URL correspond to the winner of the slot (which is // not always the highest bidder). These are the values of the // coordinator that is able to forge exclusively before the deadline. diff --git a/coordinator/batch.go b/coordinator/batch.go index 2bba88a..a611e16 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -47,7 +47,7 @@ type BatchInfo struct { // DebugStore is a debug function to store the BatchInfo as a json text file in // storePath func (b *BatchInfo) DebugStore(storePath string) error { - batchJSON, err := json.Marshal(b) + batchJSON, err := json.MarshalIndent(b, "", " ") if err != nil { return tracerr.Wrap(err) } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 897ab72..4729b32 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -190,9 +190,10 @@ func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) { func (c *Coordinator) canForge(stats *synchronizer.Stats) bool { anyoneForge := false - if stats.Sync.Auction.CurrentSlot.BatchesLen == 0 && - c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) > int64(c.vars.Auction.SlotDeadline) { - log.Debug("Coordinator: anyone can forge in the current slot (slotDeadline passed)") + if !stats.Sync.Auction.CurrentSlot.ForgerCommitment && + c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) >= int64(c.vars.Auction.SlotDeadline) { + log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)", + "block", stats.Eth.LastBlock.Num) anyoneForge = true } if stats.Sync.Auction.CurrentSlot.Forger == c.cfg.ForgerAddress || anyoneForge { diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 42d1739..c961686 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -213,6 +213,7 @@ func TestCoordinatorFlow(t *testing.T) { return } ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) var timer timer ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) modules := newTestModules(t) @@ -294,6 +295,7 @@ func TestCoordinatorFlow(t *testing.T) { func TestCoordinatorStartStop(t *testing.T) { ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) var timer timer ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) modules := newTestModules(t) @@ -304,6 +306,7 @@ func TestCoordinatorStartStop(t *testing.T) { func TestCoordCanForge(t *testing.T) { ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) bootForger := ethClientSetup.AuctionVariables.BootCoordinator var timer timer @@ -356,6 +359,7 @@ func TestCoordCanForge(t *testing.T) { func TestCoordHandleMsgSyncBlock(t *testing.T) { ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) bootForger := ethClientSetup.AuctionVariables.BootCoordinator var timer timer @@ -417,6 +421,7 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) { func TestPipelineShouldL1L2Batch(t *testing.T) { ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) var timer timer ctx := context.Background() @@ -562,6 +567,7 @@ func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchr func TestPipeline1(t *testing.T) { ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) var timer timer ctx := context.Background() @@ -646,6 +652,7 @@ func TestCoordinatorStress(t *testing.T) { } log.Info("Begin Test Coord Stress") ethClientSetup := test.NewClientSetupExample() + ethClientSetup.ChainID = big.NewInt(int64(chainID)) var timer timer ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup) modules := newTestModules(t) @@ -691,7 +698,7 @@ func TestCoordinatorStress(t *testing.T) { case <-ctx.Done(): wg.Done() return - case <-time.After(100 * time.Millisecond): + case <-time.After(1 * time.Second): ethClient.CtlMineBlock() } } diff --git a/db/historydb/historydb.go b/db/historydb/historydb.go index 4fa8e42..799d04a 100644 --- a/db/historydb/historydb.go +++ b/db/historydb/historydb.go @@ -293,11 +293,15 @@ func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]common.Batch, erro return db.SlicePtrsToSlice(batches).([]common.Batch), tracerr.Wrap(err) } -// GetBatchesLen retrieve number of batches from the DB, given a slotNum -func (hdb *HistoryDB) GetBatchesLen(slotNum int64) (int, error) { - row := hdb.db.QueryRow("SELECT COUNT(*) FROM batch WHERE slot_num = $1;", slotNum) - var batchesLen int - return batchesLen, tracerr.Wrap(row.Scan(&batchesLen)) +// GetFirstBatchBlockNumBySlot returns the ethereum block number of the first +// batch within a slot +func (hdb *HistoryDB) GetFirstBatchBlockNumBySlot(slotNum int64) (int64, error) { + row := hdb.db.QueryRow( + `SELECT eth_block_num FROM batch + WHERE slot_num = $1 ORDER BY batch_num ASC LIMIT 1;`, slotNum, + ) + var blockNum int64 + return blockNum, tracerr.Wrap(row.Scan(&blockNum)) } // GetLastBatchNum returns the BatchNum of the latest forged batch diff --git a/db/historydb/historydb_test.go b/db/historydb/historydb_test.go index a480bb7..f5dbc4d 100644 --- a/db/historydb/historydb_test.go +++ b/db/historydb/historydb_test.go @@ -1051,6 +1051,70 @@ func TestGetLastTxsPosition(t *testing.T) { assert.Equal(t, sql.ErrNoRows.Error(), err.Error()) } +func TestGetFirstBatchBlockNumBySlot(t *testing.T) { + test.WipeDB(historyDB.DB()) + + set := ` + Type: Blockchain + + // Slot = 0 + + > block // 2 + > block // 3 + > block // 4 + > block // 5 + + // Slot = 1 + + > block // 6 + > block // 7 + > batch + > block // 8 + > block // 9 + + // Slot = 2 + + > batch + > block // 10 + > block // 11 + > block // 12 + > block // 13 + + ` + tc := til.NewContext(uint16(0), common.RollupConstMaxL1UserTx) + blocks, err := tc.GenerateBlocks(set) + assert.NoError(t, err) + + tilCfgExtra := til.ConfigExtra{ + CoordUser: "A", + } + err = tc.FillBlocksExtra(blocks, &tilCfgExtra) + require.NoError(t, err) + + for i := range blocks { + for j := range blocks[i].Rollup.Batches { + blocks[i].Rollup.Batches[j].Batch.SlotNum = int64(i) / 4 + } + } + + // Add all blocks + for i := range blocks { + err = historyDB.AddBlockSCData(&blocks[i]) + require.NoError(t, err) + } + + _, err = historyDB.GetFirstBatchBlockNumBySlot(0) + require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err)) + + bn1, err := historyDB.GetFirstBatchBlockNumBySlot(1) + require.NoError(t, err) + assert.Equal(t, int64(8), bn1) + + bn2, err := historyDB.GetFirstBatchBlockNumBySlot(2) + require.NoError(t, err) + assert.Equal(t, int64(10), bn2) +} + // setTestBlocks WARNING: this will delete the blocks and recreate them func setTestBlocks(from, to int64) []common.Block { test.WipeDB(historyDB.DB()) diff --git a/prover/prover.go b/prover/prover.go index effa429..3d8c6ba 100644 --- a/prover/prover.go +++ b/prover/prover.go @@ -288,7 +288,8 @@ func (p *ProofServerClient) WaitReady(ctx context.Context) error { // MockClient is a mock ServerProof to be used in tests. It doesn't calculate anything type MockClient struct { - Delay time.Duration + counter int64 + Delay time.Duration } // CalculateProof sends the *common.ZKInputs to the ServerProof to compute the @@ -302,7 +303,24 @@ func (p *MockClient) GetProof(ctx context.Context) (*Proof, []*big.Int, error) { // Simulate a delay select { case <-time.After(p.Delay): //nolint:gomnd - return &Proof{}, []*big.Int{big.NewInt(1234)}, nil //nolint:gomnd + i := p.counter * 100 //nolint:gomnd + p.counter++ + return &Proof{ + PiA: [3]*big.Int{ + big.NewInt(i), big.NewInt(i + 1), big.NewInt(1), //nolint:gomnd + }, + PiB: [3][2]*big.Int{ + {big.NewInt(i + 2), big.NewInt(i + 3)}, //nolint:gomnd + {big.NewInt(i + 4), big.NewInt(i + 5)}, //nolint:gomnd + {big.NewInt(1), big.NewInt(0)}, //nolint:gomnd + }, + PiC: [3]*big.Int{ + big.NewInt(i + 6), big.NewInt(i + 7), big.NewInt(1), //nolint:gomnd + }, + Protocol: "groth", + }, + []*big.Int{big.NewInt(i + 42)}, //nolint:gomnd + nil case <-ctx.Done(): return nil, nil, tracerr.Wrap(common.ErrDone) } diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index ebce7b1..b2e4f4c 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -177,17 +177,12 @@ type SCConsts struct { // Config is the Synchronizer configuration type Config struct { - // StartBlockNum StartBlockNum - // InitialVariables SCVariables StatsRefreshPeriod time.Duration } // Synchronizer implements the Synchronizer type type Synchronizer struct { - ethClient eth.ClientInterface - // auctionConstants common.AuctionConstants - // rollupConstants common.RollupConstants - // wDelayerConstants common.WDelayerConstants + ethClient eth.ClientInterface consts SCConsts historyDB *historydb.HistoryDB stateDB *statedb.StateDB @@ -196,8 +191,6 @@ type Synchronizer struct { startBlockNum int64 vars SCVariables stats *StatsHolder - // firstSavedBlock *common.Block - // mux sync.Mutex } // NewSynchronizer creates a new Synchronizer @@ -286,24 +279,28 @@ func (s *Synchronizer) SCVars() SCVariablesPtr { } } -func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error { +// firstBatchBlockNum is the blockNum of first batch in that block, if any +func (s *Synchronizer) updateCurrentSlotIfSync(reset bool, firstBatchBlockNum *int64) error { slot := common.Slot{ - SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum, - BatchesLen: int(s.stats.Sync.Auction.CurrentSlot.BatchesLen), + SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum, + ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment, } // We want the next block because the current one is already mined blockNum := s.stats.Sync.LastBlock.Num + 1 slotNum := s.consts.Auction.SlotNum(blockNum) - if batchesLen == -1 { - dbBatchesLen, err := s.historyDB.GetBatchesLen(slotNum) - if err != nil { - return tracerr.Wrap(fmt.Errorf("historyDB.GetBatchesLen: %w", err)) + if reset { + dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum) + if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows { + return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err)) + } else if tracerr.Unwrap(err) == sql.ErrNoRows { + firstBatchBlockNum = nil + } else { + firstBatchBlockNum = &dbFirstBatchBlockNum } - slot.BatchesLen = dbBatchesLen + slot.ForgerCommitment = false } else if slotNum > slot.SlotNum { - slot.BatchesLen = batchesLen - } else { - slot.BatchesLen += batchesLen + // We are in a new slotNum, start from default values + slot.ForgerCommitment = false } slot.SlotNum = slotNum slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum) @@ -333,6 +330,11 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error { slot.URL = s.vars.Auction.BootCoordinatorURL } } + if firstBatchBlockNum != nil && + s.consts.Auction.RelativeBlock(*firstBatchBlockNum) < + int64(s.vars.Auction.SlotDeadline) { + slot.ForgerCommitment = true + } // TODO: Remove this SANITY CHECK once this code is tested enough // BEGIN SANITY CHECK @@ -499,11 +501,6 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) WDelayer: *wDelayerData, } - // log.Debugw("Sync()", "block", blockData) - // err = s.historyDB.AddBlock(blockData.Block) - // if err != nil { - // return err - // } err = s.historyDB.AddBlockSCData(&blockData) if err != nil { return nil, nil, tracerr.Wrap(err) @@ -522,9 +519,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block) } } s.stats.UpdateSync(ethBlock, - &rollupData.Batches[batchesLen-1].Batch.BatchNum, lastL1BatchBlock, lastForgeL1TxsNum) + &rollupData.Batches[batchesLen-1].Batch.BatchNum, + lastL1BatchBlock, lastForgeL1TxsNum) + } + var firstBatchBlockNum *int64 + if len(rollupData.Batches) > 0 { + firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum } - if err := s.updateCurrentSlotIfSync(len(rollupData.Batches)); err != nil { + if err := s.updateCurrentSlotIfSync(false, firstBatchBlockNum); err != nil { return nil, nil, tracerr.Wrap(err) } @@ -674,7 +676,7 @@ func (s *Synchronizer) resetState(block *common.Block) error { s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum) - if err := s.updateCurrentSlotIfSync(-1); err != nil { + if err := s.updateCurrentSlotIfSync(true, nil); err != nil { return tracerr.Wrap(err) } return nil diff --git a/synchronizer/synchronizer_test.go b/synchronizer/synchronizer_test.go index e01c537..a0210e8 100644 --- a/synchronizer/synchronizer_test.go +++ b/synchronizer/synchronizer_test.go @@ -271,18 +271,25 @@ func ethAddTokens(blocks []common.BlockData, client *test.Client) { } } -func TestSync(t *testing.T) { - // - // Setup - // +var chainID uint16 = 0 +var deleteme = []string{} + +func TestMain(m *testing.M) { + exitVal := m.Run() + for _, dir := range deleteme { + if err := os.RemoveAll(dir); err != nil { + panic(err) + } + } + os.Exit(exitVal) +} - ctx := context.Background() +func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) { // Int State DB dir, err := ioutil.TempDir("", "tmpdb") require.NoError(t, err) - defer assert.Nil(t, os.RemoveAll(dir)) + deleteme = append(deleteme, dir) - chainID := uint16(0) stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID) require.NoError(t, err) @@ -294,9 +301,20 @@ func TestSync(t *testing.T) { // Clear DB test.WipeDB(historyDB.DB()) + return stateDB, historyDB +} + +func TestSyncGeneral(t *testing.T) { + // + // Setup + // + + stateDB, historyDB := newTestModules(t) + // Init eth client var timer timer clientSetup := test.NewClientSetupExample() + clientSetup.ChainID = big.NewInt(int64(chainID)) bootCoordAddr := clientSetup.AuctionVariables.BootCoordinator client := test.NewClient(true, &timer, ðCommon.Address{}, clientSetup) @@ -306,6 +324,8 @@ func TestSync(t *testing.T) { }) require.NoError(t, err) + ctx := context.Background() + // // First Sync from an initial state // @@ -659,3 +679,129 @@ func TestSync(t *testing.T) { assert.Equal(t, 2, len(dbAccounts)) assertEqualAccountsHistoryDBStateDB(t, dbAccounts, sdbAccounts) } + +func TestSyncForgerCommitment(t *testing.T) { + stateDB, historyDB := newTestModules(t) + + // Init eth client + var timer timer + clientSetup := test.NewClientSetupExample() + clientSetup.ChainID = big.NewInt(int64(chainID)) + clientSetup.AuctionConstants.GenesisBlockNum = 2 + clientSetup.AuctionConstants.BlocksPerSlot = 4 + clientSetup.AuctionVariables.SlotDeadline = 2 + bootCoordAddr := clientSetup.AuctionVariables.BootCoordinator + client := test.NewClient(true, &timer, ðCommon.Address{}, clientSetup) + + // Create Synchronizer + s, err := NewSynchronizer(client, historyDB, stateDB, Config{ + StatsRefreshPeriod: 0 * time.Second, + }) + require.NoError(t, err) + + ctx := context.Background() + + set := ` + Type: Blockchain + + // Slot = 0 + + > block // 2 + > block // 3 + > block // 4 + > block // 5 + + // Slot = 1 + + > block // 6 + > batch + > block // 7 + > block // 8 + > block // 9 + + // Slot = 2 + + > block // 10 + > block // 11 + > batch + > block // 12 + > block // 13 + + ` + // For each block, true when the slot that belongs to the following + // block has forgerCommitment + commitment := map[int64]bool{ + 2: false, + 3: false, + 4: false, + 5: false, + + 6: false, + 7: true, + 8: true, + 9: false, + + 10: false, + 11: false, + 12: false, + 13: false, + } + tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx) + blocks, err := tc.GenerateBlocks(set) + assert.NoError(t, err) + + tilCfgExtra := til.ConfigExtra{ + BootCoordAddr: bootCoordAddr, + CoordUser: "A", + } + err = tc.FillBlocksExtra(blocks, &tilCfgExtra) + require.NoError(t, err) + + // for i := range blocks { + // for j := range blocks[i].Rollup.Batches { + // blocks[i].Rollup.Batches[j].Batch.SlotNum = int64(i) / 4 + // } + // } + + // be in sync + for { + syncBlock, discards, err := s.Sync2(ctx, nil) + require.NoError(t, err) + require.Nil(t, discards) + if syncBlock == nil { + break + } + } + stats := s.Stats() + require.Equal(t, int64(1), stats.Sync.LastBlock.Num) + + // Store ForgerComitmnent observed at every block by the live synchronizer + syncCommitment := map[int64]bool{} + // Store ForgerComitmnent observed at every block by a syncrhonizer that is restarted + syncRestartedCommitment := map[int64]bool{} + for _, block := range blocks { + // Add block data to the smart contracts + err = client.CtlAddBlocks([]common.BlockData{block}) + require.NoError(t, err) + + syncBlock, discards, err := s.Sync2(ctx, nil) + require.NoError(t, err) + require.Nil(t, discards) + if syncBlock == nil { + break + } + stats := s.Stats() + require.True(t, stats.Synced()) + syncCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment + + s2, err := NewSynchronizer(client, historyDB, stateDB, Config{ + StatsRefreshPeriod: 0 * time.Second, + }) + require.NoError(t, err) + stats = s2.Stats() + require.True(t, stats.Synced()) + syncRestartedCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment + } + assert.Equal(t, commitment, syncCommitment) + assert.Equal(t, commitment, syncRestartedCommitment) +} diff --git a/test/ethclient.go b/test/ethclient.go index 655f8cd..b767fec 100644 --- a/test/ethclient.go +++ b/test/ethclient.go @@ -399,6 +399,8 @@ type Client struct { forgeBatchArgsPending map[ethCommon.Hash]*batch forgeBatchArgs map[ethCommon.Hash]*batch + + startBlock int64 } // NewClient returns a new test Client that implements the eth.IClient @@ -480,7 +482,10 @@ func NewClient(l bool, timer Timer, addr *ethCommon.Address, setup *ClientSetup) maxBlockNum: blockNum, } - for i := int64(1); i < setup.AuctionConstants.GenesisBlockNum+1; i++ { + if c.startBlock == 0 { + c.startBlock = 2 + } + for i := int64(1); i < c.startBlock; i++ { c.CtlMineBlock() } diff --git a/test/proofserver/proofserver.go b/test/proofserver/proofserver.go index 0e81439..791e110 100644 --- a/test/proofserver/proofserver.go +++ b/test/proofserver/proofserver.go @@ -180,7 +180,7 @@ func (s *Mock) runProver(ctx context.Context) { "pi_a": ["%v", "%v", "1"], "pi_b": [["%v", "%v"],["%v", "%v"],["1", "0"]], "pi_c": ["%v", "%v", "1"], - "protocol": "groth16" + "protocol": "groth" }`, i, i+1, i+2, i+3, i+4, i+5, i+6, i+7) //nolint:gomnd s.pubData = fmt.Sprintf(`[ "%v"