From c6c50f8f6a694ccf9ad232a5f0928cb3d4f50ee1 Mon Sep 17 00:00:00 2001 From: Eduard S Date: Tue, 15 Sep 2020 12:47:00 +0200 Subject: [PATCH] Integrate test.Client in coordinator_test --- coordinator/batch.go | 27 --------------- coordinator/batch_test.go | 27 --------------- coordinator/coordinator.go | 58 +++++++++++++++++++++------------ coordinator/coordinator_test.go | 29 ++++++++++++++--- coordinator/proofpool.go | 33 +++++++++++++++++-- go.mod | 3 +- go.sum | 10 ++++++ 7 files changed, 104 insertions(+), 83 deletions(-) delete mode 100644 coordinator/batch_test.go diff --git a/coordinator/batch.go b/coordinator/batch.go index 8f8ecf7..1bbf30c 100644 --- a/coordinator/batch.go +++ b/coordinator/batch.go @@ -52,30 +52,3 @@ func (bi *BatchInfo) SetServerProof(serverProof ServerProofInterface) { func (bi *BatchInfo) SetProof(proof *Proof) { bi.proof = proof } - -// // BatchQueue implements a FIFO queue of BatchInfo -// type BatchQueue struct { -// queue []*BatchInfo -// } -// -// // NewBatchQueue returns a new *BatchQueue -// func NewBatchQueue() *BatchQueue { -// return &BatchQueue{ -// queue: []*BatchInfo{}, -// } -// } -// -// // Push adds the given BatchInfo to the BatchQueue -// func (bq *BatchQueue) Push(b *BatchInfo) { -// bq.queue = append(bq.queue, b) -// } -// -// // Pop pops the first BatchInfo from the BatchQueue -// func (bq *BatchQueue) Pop() *BatchInfo { -// if len(bq.queue) == 0 { -// return nil -// } -// b := bq.queue[0] -// bq.queue = bq.queue[1:] -// return b -// } diff --git a/coordinator/batch_test.go b/coordinator/batch_test.go deleted file mode 100644 index 0198103..0000000 --- a/coordinator/batch_test.go +++ /dev/null @@ -1,27 +0,0 @@ -package coordinator - -// import ( -// "testing" -// -// "github.com/hermeznetwork/hermez-node/common" -// "github.com/stretchr/testify/assert" -// ) -// -// func TestBatchQueue(t *testing.T) { -// bq := BatchQueue{} -// -// bq.Push(&BatchInfo{ -// batchNum: 0, -// }) -// bq.Push(&BatchInfo{ -// batchNum: 2, -// }) -// bq.Push(&BatchInfo{ -// batchNum: 1, -// }) -// -// assert.Equal(t, common.BatchNum(0), bq.Pop().batchNum) -// assert.Equal(t, common.BatchNum(2), bq.Pop().batchNum) -// assert.Equal(t, common.BatchNum(1), bq.Pop().batchNum) -// assert.Nil(t, bq.Pop()) -// } diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 2c41ad8..2edfe46 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -2,6 +2,7 @@ package coordinator import ( "fmt" + "sync" ethCommon "github.com/ethereum/go-ethereum/common" "github.com/hermeznetwork/hermez-node/batchbuilder" @@ -14,6 +15,8 @@ import ( "github.com/iden3/go-merkletree/db/memory" ) +var errTODO = fmt.Errorf("TODO") + // ErrStop is returned when the function is stopped asynchronously via the stop // channel. It doesn't indicate an error. var ErrStop = fmt.Errorf("Stopped") @@ -26,6 +29,7 @@ type Config struct { // Coordinator implements the Coordinator type type Coordinator struct { forging bool + rw *sync.RWMutex isForgeSeq bool // WIP just for testing while implementing config Config @@ -48,7 +52,7 @@ func NewCoordinator(conf Config, txsel *txselector.TxSelector, bb *batchbuilder.BatchBuilder, serverProofs []ServerProofInterface, - ethClient *eth.Client) *Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here + ethClient eth.ClientInterface) *Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here serverProofPool := NewServerProofPool(len(serverProofs)) for _, serverProof := range serverProofs { serverProofPool.Add(serverProof) @@ -61,6 +65,7 @@ func NewCoordinator(conf Config, batchBuilder: bb, ethClient: ethClient, ethTxStore: memory.NewMemoryStorage(), + rw: &sync.RWMutex{}, } return &c } @@ -68,27 +73,28 @@ func NewCoordinator(conf Config, // ForgeLoopFn is the function ran in a loop that checks if it's time to forge // and forges a batch if so and sends it to outBatchCh. Returns true if it's // the coordinator turn to forge. -func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) (bool, error) { +func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) (forgetime bool, err error) { if !c.isForgeSequence() { if c.forging { - log.Info("stop forging") + log.Info("ForgeLoopFn: forging state end") c.forging = false } - log.Debug("not in forge time") + log.Debug("ForgeLoopFn: not in forge time") return false, nil } - log.Debug("forge time") + log.Debug("ForgeLoopFn: forge time") if !c.forging { - log.Info("start forging") + // Start pipeline from a batchNum state taken from synchronizer + log.Info("ForgeLoopFn: forging state begin") // c.batchNum = c.hdb.GetLastBatchNum() // uncomment when HistoryDB is ready err := c.txsel.Reset(c.batchNum) if err != nil { - log.Errorw("TxSelector.Reset", "error", err) + log.Errorw("ForgeLoopFn: TxSelector.Reset", "error", err) return true, err } err = c.batchBuilder.Reset(c.batchNum, true) if err != nil { - log.Errorw("BatchBuilder.Reset", "error", err) + log.Errorw("ForgeLoopFn: BatchBuilder.Reset", "error", err) return true, err } // c.batchQueue = NewBatchQueue() @@ -100,22 +106,27 @@ func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) // if c.synchronizer.Reorg(): _ = c.handleReorg() - // 0. If there's an available server proof: Start pipeline for batchNum = batchNum + 1. - // non-blocking call, returns nil if a server proof is - // not available, or non-nil otherwise. + // 0. Wait for an available server proof + // blocking call serverProof, err := c.serverProofPool.Get(stopCh) if err != nil { return true, err } - log.Debugw("got serverProof", "server", serverProof) + defer func() { + if !forgetime || err != nil { + c.serverProofPool.Add(serverProof) + } + }() - log.Debugw("start forge") + log.Debugw("ForgeLoopFn: using serverProof", "server", serverProof) + log.Debugw("ForgeLoopFn: forge start") + // forge for batchNum = batchNum + 1. batchInfo, err := c.forge(serverProof) if err != nil { log.Errorw("forge", "error", err) return true, err } - log.Debugw("end forge", "batchNum", batchInfo.batchNum) + log.Debugw("ForgeLoopFn: forge end", "batchNum", batchInfo.batchNum) outBatchCh <- batchInfo return true, nil } @@ -126,14 +137,14 @@ func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchInfo, stopCh chan bool) error { select { case <-stopCh: - log.Info("forgeLoopFn stopped") + log.Info("GetProofCallForgeLoopFn: forgeLoopFn stopped") return ErrStop case batchInfo := <-inBatchCh: - log.Debugw("start getProofCallForge", "batchNum", batchInfo.batchNum) + log.Debugw("GetProofCallForgeLoopFn: getProofCallForge start", "batchNum", batchInfo.batchNum) if err := c.getProofCallForge(batchInfo, stopCh); err != nil { return err } - log.Debugw("end getProofCallForge", "batchNum", batchInfo.batchNum) + log.Debugw("GetProofCallForgeLoopFn: getProofCallForge end", "batchNum", batchInfo.batchNum) outBatchCh <- batchInfo } return nil @@ -145,14 +156,14 @@ func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchI func (c *Coordinator) ForgeCallConfirmLoopFn(inBatchCh chan *BatchInfo, stopCh chan bool) error { select { case <-stopCh: - log.Info("forgeConfirmLoopFn stopped") + log.Info("ForgeCallConfirmLoopFn: forgeConfirmLoopFn stopped") return ErrStop case batchInfo := <-inBatchCh: - log.Debugw("start forgeCallConfirm", "batchNum", batchInfo.batchNum) + log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm start", "batchNum", batchInfo.batchNum) if err := c.forgeCallConfirm(batchInfo); err != nil { return err } - log.Debugw("end forgeCallConfirm", "batchNum", batchInfo.batchNum) + log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm end", "batchNum", batchInfo.batchNum) } return nil } @@ -226,6 +237,8 @@ func (c *Coordinator) forge(serverProof ServerProofInterface) (*BatchInfo, error func (c *Coordinator) getProofCallForge(batchInfo *BatchInfo, stopCh chan bool) error { serverProof := batchInfo.serverProof proof, err := serverProof.GetProof(stopCh) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof + c.serverProofPool.Add(serverProof) + batchInfo.serverProof = nil if err != nil { return err } @@ -266,6 +279,8 @@ func (c *Coordinator) handleReorg() error { // isForgeSequence returns true if the node is the Forger in the current ethereum block func (c *Coordinator) isForgeSequence() bool { + c.rw.RLock() + defer c.rw.RUnlock() return c.isForgeSeq // TODO } @@ -282,5 +297,6 @@ func (c *Coordinator) shouldL1L2Batch() bool { } func (c *Coordinator) prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs { - return nil // TODO + // TODO + return ð.RollupForgeBatchArgs{} } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 1f678cf..7f07f97 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -10,8 +10,8 @@ import ( "github.com/hermeznetwork/hermez-node/db/historydb" "github.com/hermeznetwork/hermez-node/db/l2db" "github.com/hermeznetwork/hermez-node/db/statedb" - "github.com/hermeznetwork/hermez-node/eth" "github.com/hermeznetwork/hermez-node/log" + "github.com/hermeznetwork/hermez-node/test" "github.com/hermeznetwork/hermez-node/txselector" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -77,7 +77,7 @@ func (cn *CoordNode) Start() { } else if err != nil { log.Errorw("CoordNode ForgeLoopFn", "error", err) } else if !forge { - time.Sleep(500 * time.Millisecond) + time.Sleep(200 * time.Millisecond) } } } @@ -121,13 +121,28 @@ func (cn *CoordNode) Stop() { cn.stopForgeCallConfirm <- true } +type timer struct { + time int64 +} + +func (t *timer) Time() int64 { + currentTime := t.time + t.time++ + return currentTime +} + func TestCoordinator(t *testing.T) { txsel, bb := newTestModules(t) conf := Config{} hdb := &historydb.HistoryDB{} serverProofs := []ServerProofInterface{&ServerProof{}, &ServerProof{}} - ethClient := ð.Client{} + + var timer timer + ethClientSetup := test.NewClientSetupExample() + addr := ethClientSetup.AuctionVariables.BootCoordinator + ethClient := test.NewClient(true, &timer, addr, ethClientSetup) + c := NewCoordinator(conf, hdb, txsel, bb, serverProofs, ethClient) cn := NewCoordNode(c) cn.Start() @@ -135,18 +150,24 @@ func TestCoordinator(t *testing.T) { // simulate forgeSequence time log.Info("simulate entering in forge time") + c.rw.Lock() c.isForgeSeq = true + c.rw.Unlock() time.Sleep(1 * time.Second) // simulate going out from forgeSequence log.Info("simulate going out from forge time") + c.rw.Lock() c.isForgeSeq = false + c.rw.Unlock() time.Sleep(1 * time.Second) // simulate entering forgeSequence time again log.Info("simulate entering in forge time again") + c.rw.Lock() c.isForgeSeq = true - time.Sleep(1 * time.Second) + c.rw.Unlock() + time.Sleep(2 * time.Second) // simulate stopping forgerLoop by channel log.Info("simulate stopping forgerLoop by closing coordinator stopch") diff --git a/coordinator/proofpool.go b/coordinator/proofpool.go index 64f39f6..41f7745 100644 --- a/coordinator/proofpool.go +++ b/coordinator/proofpool.go @@ -1,10 +1,13 @@ package coordinator import ( + "time" + "github.com/hermeznetwork/hermez-node/common" "github.com/hermeznetwork/hermez-node/log" ) +// ServerProofInterface is the interface to a ServerProof that calculates zk proofs type ServerProofInterface interface { CalculateProof(zkInputs *common.ZKInputs) error GetProof(stopCh chan bool) (*Proof, error) @@ -17,6 +20,7 @@ type ServerProof struct { Available bool } +// NewServerProof creates a new ServerProof func NewServerProof(URL string) *ServerProof { return &ServerProof{URL: URL} } @@ -24,12 +28,33 @@ func NewServerProof(URL string) *ServerProof { // CalculateProof sends the *common.ZKInputs to the ServerProof to compute the // Proof func (p *ServerProof) CalculateProof(zkInputs *common.ZKInputs) error { - return nil + return errTODO } // GetProof retreives the Proof from the ServerProof func (p *ServerProof) GetProof(stopCh chan bool) (*Proof, error) { - return nil, nil + return nil, errTODO +} + +// ServerProofMock is a mock ServerProof to be used in tests. It doesn't calculate anything +type ServerProofMock struct { +} + +// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the +// Proof +func (p *ServerProofMock) CalculateProof(zkInputs *common.ZKInputs) error { + return nil +} + +// GetProof retreives the Proof from the ServerProof +func (p *ServerProofMock) GetProof(stopCh chan bool) (*Proof, error) { + // Simulate a delay + select { + case <-time.After(200 * time.Millisecond): //nolint:gomnd + return &Proof{}, nil + case <-stopCh: + return nil, ErrStop + } } // ServerProofPool contains the multiple ServerProof @@ -37,17 +62,19 @@ type ServerProofPool struct { pool chan ServerProofInterface } +// NewServerProofPool creates a new pool of ServerProofs. func NewServerProofPool(maxServerProofs int) *ServerProofPool { return &ServerProofPool{ pool: make(chan ServerProofInterface, maxServerProofs), } } +// Add a ServerProof to the pool func (p *ServerProofPool) Add(serverProof ServerProofInterface) { p.pool <- serverProof } -// Get returns the available ServerProof +// Get returns the next available ServerProof func (p *ServerProofPool) Get(stopCh chan bool) (ServerProofInterface, error) { select { case <-stopCh: diff --git a/go.mod b/go.mod index 6c8766d..8e34fbc 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/urfave/cli v1.22.1 github.com/urfave/cli/v2 v2.2.0 - go.uber.org/zap v1.13.0 + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.16.0 gopkg.in/go-playground/validator.v9 v9.29.1 ) diff --git a/go.sum b/go.sum index 3a2391e..2a27b50 100644 --- a/go.sum +++ b/go.sum @@ -601,13 +601,23 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU= go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=