Browse Source

Merge pull request #414 from hermeznetwork/feature/syncforgercommitment

Sync ForgerCommitment and use it in coord
feature/sql-semaphore1
arnau 4 years ago
committed by GitHub
parent
commit
e787651ba3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 303 additions and 55 deletions
  1. +8
    -7
      common/bid.go
  2. +1
    -1
      coordinator/batch.go
  3. +4
    -3
      coordinator/coordinator.go
  4. +8
    -1
      coordinator/coordinator_test.go
  5. +9
    -5
      db/historydb/historydb.go
  6. +64
    -0
      db/historydb/historydb_test.go
  7. +20
    -2
      prover/prover.go
  8. +29
    -27
      synchronizer/synchronizer.go
  9. +153
    -7
      synchronizer/synchronizer_test.go
  10. +6
    -1
      test/ethclient.go
  11. +1
    -1
      test/proofserver/proofserver.go

+ 8
- 7
common/bid.go

@ -26,13 +26,14 @@ type BidCoordinator struct {
// Slot contains relevant information of a slot
type Slot struct {
SlotNum int64
DefaultSlotBid *big.Int
StartBlock int64
EndBlock int64
BatchesLen int
BidValue *big.Int
BootCoord bool
SlotNum int64
DefaultSlotBid *big.Int
StartBlock int64
EndBlock int64
ForgerCommitment bool
// BatchesLen int
BidValue *big.Int
BootCoord bool
// Bidder, Forer and URL correspond to the winner of the slot (which is
// not always the highest bidder). These are the values of the
// coordinator that is able to forge exclusively before the deadline.

+ 1
- 1
coordinator/batch.go

@ -47,7 +47,7 @@ type BatchInfo struct {
// DebugStore is a debug function to store the BatchInfo as a json text file in
// storePath
func (b *BatchInfo) DebugStore(storePath string) error {
batchJSON, err := json.Marshal(b)
batchJSON, err := json.MarshalIndent(b, "", " ")
if err != nil {
return tracerr.Wrap(err)
}

+ 4
- 3
coordinator/coordinator.go

@ -190,9 +190,10 @@ func (c *Coordinator) syncSCVars(vars synchronizer.SCVariablesPtr) {
func (c *Coordinator) canForge(stats *synchronizer.Stats) bool {
anyoneForge := false
if stats.Sync.Auction.CurrentSlot.BatchesLen == 0 &&
c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) > int64(c.vars.Auction.SlotDeadline) {
log.Debug("Coordinator: anyone can forge in the current slot (slotDeadline passed)")
if !stats.Sync.Auction.CurrentSlot.ForgerCommitment &&
c.consts.Auction.RelativeBlock(stats.Eth.LastBlock.Num+1) >= int64(c.vars.Auction.SlotDeadline) {
log.Debugw("Coordinator: anyone can forge in the current slot (slotDeadline passed)",
"block", stats.Eth.LastBlock.Num)
anyoneForge = true
}
if stats.Sync.Auction.CurrentSlot.Forger == c.cfg.ForgerAddress || anyoneForge {

+ 8
- 1
coordinator/coordinator_test.go

@ -213,6 +213,7 @@ func TestCoordinatorFlow(t *testing.T) {
return
}
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
@ -294,6 +295,7 @@ func TestCoordinatorFlow(t *testing.T) {
func TestCoordinatorStartStop(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
@ -304,6 +306,7 @@ func TestCoordinatorStartStop(t *testing.T) {
func TestCoordCanForge(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
bootForger := ethClientSetup.AuctionVariables.BootCoordinator
var timer timer
@ -356,6 +359,7 @@ func TestCoordCanForge(t *testing.T) {
func TestCoordHandleMsgSyncBlock(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
bootForger := ethClientSetup.AuctionVariables.BootCoordinator
var timer timer
@ -417,6 +421,7 @@ func TestCoordHandleMsgSyncBlock(t *testing.T) {
func TestPipelineShouldL1L2Batch(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
@ -562,6 +567,7 @@ func preloadSync(t *testing.T, ethClient *test.Client, sync *synchronizer.Synchr
func TestPipeline1(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ctx := context.Background()
@ -646,6 +652,7 @@ func TestCoordinatorStress(t *testing.T) {
}
log.Info("Begin Test Coord Stress")
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
@ -691,7 +698,7 @@ func TestCoordinatorStress(t *testing.T) {
case <-ctx.Done():
wg.Done()
return
case <-time.After(100 * time.Millisecond):
case <-time.After(1 * time.Second):
ethClient.CtlMineBlock()
}
}

+ 9
- 5
db/historydb/historydb.go

@ -293,11 +293,15 @@ func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]common.Batch, erro
return db.SlicePtrsToSlice(batches).([]common.Batch), tracerr.Wrap(err)
}
// GetBatchesLen retrieve number of batches from the DB, given a slotNum
func (hdb *HistoryDB) GetBatchesLen(slotNum int64) (int, error) {
row := hdb.db.QueryRow("SELECT COUNT(*) FROM batch WHERE slot_num = $1;", slotNum)
var batchesLen int
return batchesLen, tracerr.Wrap(row.Scan(&batchesLen))
// GetFirstBatchBlockNumBySlot returns the ethereum block number of the first
// batch within a slot
func (hdb *HistoryDB) GetFirstBatchBlockNumBySlot(slotNum int64) (int64, error) {
row := hdb.db.QueryRow(
`SELECT eth_block_num FROM batch
WHERE slot_num = $1 ORDER BY batch_num ASC LIMIT 1;`, slotNum,
)
var blockNum int64
return blockNum, tracerr.Wrap(row.Scan(&blockNum))
}
// GetLastBatchNum returns the BatchNum of the latest forged batch

+ 64
- 0
db/historydb/historydb_test.go

@ -1051,6 +1051,70 @@ func TestGetLastTxsPosition(t *testing.T) {
assert.Equal(t, sql.ErrNoRows.Error(), err.Error())
}
func TestGetFirstBatchBlockNumBySlot(t *testing.T) {
test.WipeDB(historyDB.DB())
set := `
Type: Blockchain
// Slot = 0
> block // 2
> block // 3
> block // 4
> block // 5
// Slot = 1
> block // 6
> block // 7
> batch
> block // 8
> block // 9
// Slot = 2
> batch
> block // 10
> block // 11
> block // 12
> block // 13
`
tc := til.NewContext(uint16(0), common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocks(set)
assert.NoError(t, err)
tilCfgExtra := til.ConfigExtra{
CoordUser: "A",
}
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
require.NoError(t, err)
for i := range blocks {
for j := range blocks[i].Rollup.Batches {
blocks[i].Rollup.Batches[j].Batch.SlotNum = int64(i) / 4
}
}
// Add all blocks
for i := range blocks {
err = historyDB.AddBlockSCData(&blocks[i])
require.NoError(t, err)
}
_, err = historyDB.GetFirstBatchBlockNumBySlot(0)
require.Equal(t, sql.ErrNoRows, tracerr.Unwrap(err))
bn1, err := historyDB.GetFirstBatchBlockNumBySlot(1)
require.NoError(t, err)
assert.Equal(t, int64(8), bn1)
bn2, err := historyDB.GetFirstBatchBlockNumBySlot(2)
require.NoError(t, err)
assert.Equal(t, int64(10), bn2)
}
// setTestBlocks WARNING: this will delete the blocks and recreate them
func setTestBlocks(from, to int64) []common.Block {
test.WipeDB(historyDB.DB())

+ 20
- 2
prover/prover.go

@ -288,7 +288,8 @@ func (p *ProofServerClient) WaitReady(ctx context.Context) error {
// MockClient is a mock ServerProof to be used in tests. It doesn't calculate anything
type MockClient struct {
Delay time.Duration
counter int64
Delay time.Duration
}
// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the
@ -302,7 +303,24 @@ func (p *MockClient) GetProof(ctx context.Context) (*Proof, []*big.Int, error) {
// Simulate a delay
select {
case <-time.After(p.Delay): //nolint:gomnd
return &Proof{}, []*big.Int{big.NewInt(1234)}, nil //nolint:gomnd
i := p.counter * 100 //nolint:gomnd
p.counter++
return &Proof{
PiA: [3]*big.Int{
big.NewInt(i), big.NewInt(i + 1), big.NewInt(1), //nolint:gomnd
},
PiB: [3][2]*big.Int{
{big.NewInt(i + 2), big.NewInt(i + 3)}, //nolint:gomnd
{big.NewInt(i + 4), big.NewInt(i + 5)}, //nolint:gomnd
{big.NewInt(1), big.NewInt(0)}, //nolint:gomnd
},
PiC: [3]*big.Int{
big.NewInt(i + 6), big.NewInt(i + 7), big.NewInt(1), //nolint:gomnd
},
Protocol: "groth",
},
[]*big.Int{big.NewInt(i + 42)}, //nolint:gomnd
nil
case <-ctx.Done():
return nil, nil, tracerr.Wrap(common.ErrDone)
}

+ 29
- 27
synchronizer/synchronizer.go

@ -177,17 +177,12 @@ type SCConsts struct {
// Config is the Synchronizer configuration
type Config struct {
// StartBlockNum StartBlockNum
// InitialVariables SCVariables
StatsRefreshPeriod time.Duration
}
// Synchronizer implements the Synchronizer type
type Synchronizer struct {
ethClient eth.ClientInterface
// auctionConstants common.AuctionConstants
// rollupConstants common.RollupConstants
// wDelayerConstants common.WDelayerConstants
ethClient eth.ClientInterface
consts SCConsts
historyDB *historydb.HistoryDB
stateDB *statedb.StateDB
@ -196,8 +191,6 @@ type Synchronizer struct {
startBlockNum int64
vars SCVariables
stats *StatsHolder
// firstSavedBlock *common.Block
// mux sync.Mutex
}
// NewSynchronizer creates a new Synchronizer
@ -286,24 +279,28 @@ func (s *Synchronizer) SCVars() SCVariablesPtr {
}
}
func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
// firstBatchBlockNum is the blockNum of first batch in that block, if any
func (s *Synchronizer) updateCurrentSlotIfSync(reset bool, firstBatchBlockNum *int64) error {
slot := common.Slot{
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
BatchesLen: int(s.stats.Sync.Auction.CurrentSlot.BatchesLen),
SlotNum: s.stats.Sync.Auction.CurrentSlot.SlotNum,
ForgerCommitment: s.stats.Sync.Auction.CurrentSlot.ForgerCommitment,
}
// We want the next block because the current one is already mined
blockNum := s.stats.Sync.LastBlock.Num + 1
slotNum := s.consts.Auction.SlotNum(blockNum)
if batchesLen == -1 {
dbBatchesLen, err := s.historyDB.GetBatchesLen(slotNum)
if err != nil {
return tracerr.Wrap(fmt.Errorf("historyDB.GetBatchesLen: %w", err))
if reset {
dbFirstBatchBlockNum, err := s.historyDB.GetFirstBatchBlockNumBySlot(slotNum)
if err != nil && tracerr.Unwrap(err) != sql.ErrNoRows {
return tracerr.Wrap(fmt.Errorf("historyDB.GetFirstBatchBySlot: %w", err))
} else if tracerr.Unwrap(err) == sql.ErrNoRows {
firstBatchBlockNum = nil
} else {
firstBatchBlockNum = &dbFirstBatchBlockNum
}
slot.BatchesLen = dbBatchesLen
slot.ForgerCommitment = false
} else if slotNum > slot.SlotNum {
slot.BatchesLen = batchesLen
} else {
slot.BatchesLen += batchesLen
// We are in a new slotNum, start from default values
slot.ForgerCommitment = false
}
slot.SlotNum = slotNum
slot.StartBlock, slot.EndBlock = s.consts.Auction.SlotBlocks(slot.SlotNum)
@ -333,6 +330,11 @@ func (s *Synchronizer) updateCurrentSlotIfSync(batchesLen int) error {
slot.URL = s.vars.Auction.BootCoordinatorURL
}
}
if firstBatchBlockNum != nil &&
s.consts.Auction.RelativeBlock(*firstBatchBlockNum) <
int64(s.vars.Auction.SlotDeadline) {
slot.ForgerCommitment = true
}
// TODO: Remove this SANITY CHECK once this code is tested enough
// BEGIN SANITY CHECK
@ -499,11 +501,6 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
WDelayer: *wDelayerData,
}
// log.Debugw("Sync()", "block", blockData)
// err = s.historyDB.AddBlock(blockData.Block)
// if err != nil {
// return err
// }
err = s.historyDB.AddBlockSCData(&blockData)
if err != nil {
return nil, nil, tracerr.Wrap(err)
@ -522,9 +519,14 @@ func (s *Synchronizer) Sync2(ctx context.Context, lastSavedBlock *common.Block)
}
}
s.stats.UpdateSync(ethBlock,
&rollupData.Batches[batchesLen-1].Batch.BatchNum, lastL1BatchBlock, lastForgeL1TxsNum)
&rollupData.Batches[batchesLen-1].Batch.BatchNum,
lastL1BatchBlock, lastForgeL1TxsNum)
}
var firstBatchBlockNum *int64
if len(rollupData.Batches) > 0 {
firstBatchBlockNum = &rollupData.Batches[0].Batch.EthBlockNum
}
if err := s.updateCurrentSlotIfSync(len(rollupData.Batches)); err != nil {
if err := s.updateCurrentSlotIfSync(false, firstBatchBlockNum); err != nil {
return nil, nil, tracerr.Wrap(err)
}
@ -674,7 +676,7 @@ func (s *Synchronizer) resetState(block *common.Block) error {
s.stats.UpdateSync(block, &batchNum, &lastL1BatchBlockNum, lastForgeL1TxsNum)
if err := s.updateCurrentSlotIfSync(-1); err != nil {
if err := s.updateCurrentSlotIfSync(true, nil); err != nil {
return tracerr.Wrap(err)
}
return nil

+ 153
- 7
synchronizer/synchronizer_test.go

@ -271,18 +271,25 @@ func ethAddTokens(blocks []common.BlockData, client *test.Client) {
}
}
func TestSync(t *testing.T) {
//
// Setup
//
var chainID uint16 = 0
var deleteme = []string{}
func TestMain(m *testing.M) {
exitVal := m.Run()
for _, dir := range deleteme {
if err := os.RemoveAll(dir); err != nil {
panic(err)
}
}
os.Exit(exitVal)
}
ctx := context.Background()
func newTestModules(t *testing.T) (*statedb.StateDB, *historydb.HistoryDB) {
// Int State DB
dir, err := ioutil.TempDir("", "tmpdb")
require.NoError(t, err)
defer assert.Nil(t, os.RemoveAll(dir))
deleteme = append(deleteme, dir)
chainID := uint16(0)
stateDB, err := statedb.NewStateDB(dir, statedb.TypeSynchronizer, 32, chainID)
require.NoError(t, err)
@ -294,9 +301,20 @@ func TestSync(t *testing.T) {
// Clear DB
test.WipeDB(historyDB.DB())
return stateDB, historyDB
}
func TestSyncGeneral(t *testing.T) {
//
// Setup
//
stateDB, historyDB := newTestModules(t)
// Init eth client
var timer timer
clientSetup := test.NewClientSetupExample()
clientSetup.ChainID = big.NewInt(int64(chainID))
bootCoordAddr := clientSetup.AuctionVariables.BootCoordinator
client := test.NewClient(true, &timer, &ethCommon.Address{}, clientSetup)
@ -306,6 +324,8 @@ func TestSync(t *testing.T) {
})
require.NoError(t, err)
ctx := context.Background()
//
// First Sync from an initial state
//
@ -659,3 +679,129 @@ func TestSync(t *testing.T) {
assert.Equal(t, 2, len(dbAccounts))
assertEqualAccountsHistoryDBStateDB(t, dbAccounts, sdbAccounts)
}
func TestSyncForgerCommitment(t *testing.T) {
stateDB, historyDB := newTestModules(t)
// Init eth client
var timer timer
clientSetup := test.NewClientSetupExample()
clientSetup.ChainID = big.NewInt(int64(chainID))
clientSetup.AuctionConstants.GenesisBlockNum = 2
clientSetup.AuctionConstants.BlocksPerSlot = 4
clientSetup.AuctionVariables.SlotDeadline = 2
bootCoordAddr := clientSetup.AuctionVariables.BootCoordinator
client := test.NewClient(true, &timer, &ethCommon.Address{}, clientSetup)
// Create Synchronizer
s, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
})
require.NoError(t, err)
ctx := context.Background()
set := `
Type: Blockchain
// Slot = 0
> block // 2
> block // 3
> block // 4
> block // 5
// Slot = 1
> block // 6
> batch
> block // 7
> block // 8
> block // 9
// Slot = 2
> block // 10
> block // 11
> batch
> block // 12
> block // 13
`
// For each block, true when the slot that belongs to the following
// block has forgerCommitment
commitment := map[int64]bool{
2: false,
3: false,
4: false,
5: false,
6: false,
7: true,
8: true,
9: false,
10: false,
11: false,
12: false,
13: false,
}
tc := til.NewContext(chainID, common.RollupConstMaxL1UserTx)
blocks, err := tc.GenerateBlocks(set)
assert.NoError(t, err)
tilCfgExtra := til.ConfigExtra{
BootCoordAddr: bootCoordAddr,
CoordUser: "A",
}
err = tc.FillBlocksExtra(blocks, &tilCfgExtra)
require.NoError(t, err)
// for i := range blocks {
// for j := range blocks[i].Rollup.Batches {
// blocks[i].Rollup.Batches[j].Batch.SlotNum = int64(i) / 4
// }
// }
// be in sync
for {
syncBlock, discards, err := s.Sync2(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
break
}
}
stats := s.Stats()
require.Equal(t, int64(1), stats.Sync.LastBlock.Num)
// Store ForgerComitmnent observed at every block by the live synchronizer
syncCommitment := map[int64]bool{}
// Store ForgerComitmnent observed at every block by a syncrhonizer that is restarted
syncRestartedCommitment := map[int64]bool{}
for _, block := range blocks {
// Add block data to the smart contracts
err = client.CtlAddBlocks([]common.BlockData{block})
require.NoError(t, err)
syncBlock, discards, err := s.Sync2(ctx, nil)
require.NoError(t, err)
require.Nil(t, discards)
if syncBlock == nil {
break
}
stats := s.Stats()
require.True(t, stats.Synced())
syncCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment
s2, err := NewSynchronizer(client, historyDB, stateDB, Config{
StatsRefreshPeriod: 0 * time.Second,
})
require.NoError(t, err)
stats = s2.Stats()
require.True(t, stats.Synced())
syncRestartedCommitment[syncBlock.Block.Num] = stats.Sync.Auction.CurrentSlot.ForgerCommitment
}
assert.Equal(t, commitment, syncCommitment)
assert.Equal(t, commitment, syncRestartedCommitment)
}

+ 6
- 1
test/ethclient.go

@ -399,6 +399,8 @@ type Client struct {
forgeBatchArgsPending map[ethCommon.Hash]*batch
forgeBatchArgs map[ethCommon.Hash]*batch
startBlock int64
}
// NewClient returns a new test Client that implements the eth.IClient
@ -480,7 +482,10 @@ func NewClient(l bool, timer Timer, addr *ethCommon.Address, setup *ClientSetup)
maxBlockNum: blockNum,
}
for i := int64(1); i < setup.AuctionConstants.GenesisBlockNum+1; i++ {
if c.startBlock == 0 {
c.startBlock = 2
}
for i := int64(1); i < c.startBlock; i++ {
c.CtlMineBlock()
}

+ 1
- 1
test/proofserver/proofserver.go

@ -180,7 +180,7 @@ func (s *Mock) runProver(ctx context.Context) {
"pi_a": ["%v", "%v", "1"],
"pi_b": [["%v", "%v"],["%v", "%v"],["1", "0"]],
"pi_c": ["%v", "%v", "1"],
"protocol": "groth16"
"protocol": "groth"
}`, i, i+1, i+2, i+3, i+4, i+5, i+6, i+7) //nolint:gomnd
s.pubData = fmt.Sprintf(`[
"%v"

Loading…
Cancel
Save