Browse Source

Integrate test.Client in coordinator_test

feature/sql-semaphore1
Eduard S 4 years ago
parent
commit
c6c50f8f6a
7 changed files with 104 additions and 83 deletions
  1. +0
    -27
      coordinator/batch.go
  2. +0
    -27
      coordinator/batch_test.go
  3. +37
    -21
      coordinator/coordinator.go
  4. +25
    -4
      coordinator/coordinator_test.go
  5. +30
    -3
      coordinator/proofpool.go
  6. +2
    -1
      go.mod
  7. +10
    -0
      go.sum

+ 0
- 27
coordinator/batch.go

@ -52,30 +52,3 @@ func (bi *BatchInfo) SetServerProof(serverProof ServerProofInterface) {
func (bi *BatchInfo) SetProof(proof *Proof) {
bi.proof = proof
}
// // BatchQueue implements a FIFO queue of BatchInfo
// type BatchQueue struct {
// queue []*BatchInfo
// }
//
// // NewBatchQueue returns a new *BatchQueue
// func NewBatchQueue() *BatchQueue {
// return &BatchQueue{
// queue: []*BatchInfo{},
// }
// }
//
// // Push adds the given BatchInfo to the BatchQueue
// func (bq *BatchQueue) Push(b *BatchInfo) {
// bq.queue = append(bq.queue, b)
// }
//
// // Pop pops the first BatchInfo from the BatchQueue
// func (bq *BatchQueue) Pop() *BatchInfo {
// if len(bq.queue) == 0 {
// return nil
// }
// b := bq.queue[0]
// bq.queue = bq.queue[1:]
// return b
// }

+ 0
- 27
coordinator/batch_test.go

@ -1,27 +0,0 @@
package coordinator
// import (
// "testing"
//
// "github.com/hermeznetwork/hermez-node/common"
// "github.com/stretchr/testify/assert"
// )
//
// func TestBatchQueue(t *testing.T) {
// bq := BatchQueue{}
//
// bq.Push(&BatchInfo{
// batchNum: 0,
// })
// bq.Push(&BatchInfo{
// batchNum: 2,
// })
// bq.Push(&BatchInfo{
// batchNum: 1,
// })
//
// assert.Equal(t, common.BatchNum(0), bq.Pop().batchNum)
// assert.Equal(t, common.BatchNum(2), bq.Pop().batchNum)
// assert.Equal(t, common.BatchNum(1), bq.Pop().batchNum)
// assert.Nil(t, bq.Pop())
// }

+ 37
- 21
coordinator/coordinator.go

@ -2,6 +2,7 @@ package coordinator
import (
"fmt"
"sync"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder"
@ -14,6 +15,8 @@ import (
"github.com/iden3/go-merkletree/db/memory"
)
var errTODO = fmt.Errorf("TODO")
// ErrStop is returned when the function is stopped asynchronously via the stop
// channel. It doesn't indicate an error.
var ErrStop = fmt.Errorf("Stopped")
@ -26,6 +29,7 @@ type Config struct {
// Coordinator implements the Coordinator type
type Coordinator struct {
forging bool
rw *sync.RWMutex
isForgeSeq bool // WIP just for testing while implementing
config Config
@ -48,7 +52,7 @@ func NewCoordinator(conf Config,
txsel *txselector.TxSelector,
bb *batchbuilder.BatchBuilder,
serverProofs []ServerProofInterface,
ethClient *eth.Client) *Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here
ethClient eth.ClientInterface) *Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here
serverProofPool := NewServerProofPool(len(serverProofs))
for _, serverProof := range serverProofs {
serverProofPool.Add(serverProof)
@ -61,6 +65,7 @@ func NewCoordinator(conf Config,
batchBuilder: bb,
ethClient: ethClient,
ethTxStore: memory.NewMemoryStorage(),
rw: &sync.RWMutex{},
}
return &c
}
@ -68,27 +73,28 @@ func NewCoordinator(conf Config,
// ForgeLoopFn is the function ran in a loop that checks if it's time to forge
// and forges a batch if so and sends it to outBatchCh. Returns true if it's
// the coordinator turn to forge.
func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) (bool, error) {
func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) (forgetime bool, err error) {
if !c.isForgeSequence() {
if c.forging {
log.Info("stop forging")
log.Info("ForgeLoopFn: forging state end")
c.forging = false
}
log.Debug("not in forge time")
log.Debug("ForgeLoopFn: not in forge time")
return false, nil
}
log.Debug("forge time")
log.Debug("ForgeLoopFn: forge time")
if !c.forging {
log.Info("start forging")
// Start pipeline from a batchNum state taken from synchronizer
log.Info("ForgeLoopFn: forging state begin")
// c.batchNum = c.hdb.GetLastBatchNum() // uncomment when HistoryDB is ready
err := c.txsel.Reset(c.batchNum)
if err != nil {
log.Errorw("TxSelector.Reset", "error", err)
log.Errorw("ForgeLoopFn: TxSelector.Reset", "error", err)
return true, err
}
err = c.batchBuilder.Reset(c.batchNum, true)
if err != nil {
log.Errorw("BatchBuilder.Reset", "error", err)
log.Errorw("ForgeLoopFn: BatchBuilder.Reset", "error", err)
return true, err
}
// c.batchQueue = NewBatchQueue()
@ -100,22 +106,27 @@ func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool)
// if c.synchronizer.Reorg():
_ = c.handleReorg()
// 0. If there's an available server proof: Start pipeline for batchNum = batchNum + 1.
// non-blocking call, returns nil if a server proof is
// not available, or non-nil otherwise.
// 0. Wait for an available server proof
// blocking call
serverProof, err := c.serverProofPool.Get(stopCh)
if err != nil {
return true, err
}
log.Debugw("got serverProof", "server", serverProof)
defer func() {
if !forgetime || err != nil {
c.serverProofPool.Add(serverProof)
}
}()
log.Debugw("start forge")
log.Debugw("ForgeLoopFn: using serverProof", "server", serverProof)
log.Debugw("ForgeLoopFn: forge start")
// forge for batchNum = batchNum + 1.
batchInfo, err := c.forge(serverProof)
if err != nil {
log.Errorw("forge", "error", err)
return true, err
}
log.Debugw("end forge", "batchNum", batchInfo.batchNum)
log.Debugw("ForgeLoopFn: forge end", "batchNum", batchInfo.batchNum)
outBatchCh <- batchInfo
return true, nil
}
@ -126,14 +137,14 @@ func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool)
func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchInfo, stopCh chan bool) error {
select {
case <-stopCh:
log.Info("forgeLoopFn stopped")
log.Info("GetProofCallForgeLoopFn: forgeLoopFn stopped")
return ErrStop
case batchInfo := <-inBatchCh:
log.Debugw("start getProofCallForge", "batchNum", batchInfo.batchNum)
log.Debugw("GetProofCallForgeLoopFn: getProofCallForge start", "batchNum", batchInfo.batchNum)
if err := c.getProofCallForge(batchInfo, stopCh); err != nil {
return err
}
log.Debugw("end getProofCallForge", "batchNum", batchInfo.batchNum)
log.Debugw("GetProofCallForgeLoopFn: getProofCallForge end", "batchNum", batchInfo.batchNum)
outBatchCh <- batchInfo
}
return nil
@ -145,14 +156,14 @@ func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchI
func (c *Coordinator) ForgeCallConfirmLoopFn(inBatchCh chan *BatchInfo, stopCh chan bool) error {
select {
case <-stopCh:
log.Info("forgeConfirmLoopFn stopped")
log.Info("ForgeCallConfirmLoopFn: forgeConfirmLoopFn stopped")
return ErrStop
case batchInfo := <-inBatchCh:
log.Debugw("start forgeCallConfirm", "batchNum", batchInfo.batchNum)
log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm start", "batchNum", batchInfo.batchNum)
if err := c.forgeCallConfirm(batchInfo); err != nil {
return err
}
log.Debugw("end forgeCallConfirm", "batchNum", batchInfo.batchNum)
log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm end", "batchNum", batchInfo.batchNum)
}
return nil
}
@ -226,6 +237,8 @@ func (c *Coordinator) forge(serverProof ServerProofInterface) (*BatchInfo, error
func (c *Coordinator) getProofCallForge(batchInfo *BatchInfo, stopCh chan bool) error {
serverProof := batchInfo.serverProof
proof, err := serverProof.GetProof(stopCh) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
c.serverProofPool.Add(serverProof)
batchInfo.serverProof = nil
if err != nil {
return err
}
@ -266,6 +279,8 @@ func (c *Coordinator) handleReorg() error {
// isForgeSequence returns true if the node is the Forger in the current ethereum block
func (c *Coordinator) isForgeSequence() bool {
c.rw.RLock()
defer c.rw.RUnlock()
return c.isForgeSeq // TODO
}
@ -282,5 +297,6 @@ func (c *Coordinator) shouldL1L2Batch() bool {
}
func (c *Coordinator) prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
return nil // TODO
// TODO
return &eth.RollupForgeBatchArgs{}
}

+ 25
- 4
coordinator/coordinator_test.go

@ -10,8 +10,8 @@ import (
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/test"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -77,7 +77,7 @@ func (cn *CoordNode) Start() {
} else if err != nil {
log.Errorw("CoordNode ForgeLoopFn", "error", err)
} else if !forge {
time.Sleep(500 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
}
}
}
@ -121,13 +121,28 @@ func (cn *CoordNode) Stop() {
cn.stopForgeCallConfirm <- true
}
type timer struct {
time int64
}
func (t *timer) Time() int64 {
currentTime := t.time
t.time++
return currentTime
}
func TestCoordinator(t *testing.T) {
txsel, bb := newTestModules(t)
conf := Config{}
hdb := &historydb.HistoryDB{}
serverProofs := []ServerProofInterface{&ServerProof{}, &ServerProof{}}
ethClient := &eth.Client{}
var timer timer
ethClientSetup := test.NewClientSetupExample()
addr := ethClientSetup.AuctionVariables.BootCoordinator
ethClient := test.NewClient(true, &timer, addr, ethClientSetup)
c := NewCoordinator(conf, hdb, txsel, bb, serverProofs, ethClient)
cn := NewCoordNode(c)
cn.Start()
@ -135,18 +150,24 @@ func TestCoordinator(t *testing.T) {
// simulate forgeSequence time
log.Info("simulate entering in forge time")
c.rw.Lock()
c.isForgeSeq = true
c.rw.Unlock()
time.Sleep(1 * time.Second)
// simulate going out from forgeSequence
log.Info("simulate going out from forge time")
c.rw.Lock()
c.isForgeSeq = false
c.rw.Unlock()
time.Sleep(1 * time.Second)
// simulate entering forgeSequence time again
log.Info("simulate entering in forge time again")
c.rw.Lock()
c.isForgeSeq = true
time.Sleep(1 * time.Second)
c.rw.Unlock()
time.Sleep(2 * time.Second)
// simulate stopping forgerLoop by channel
log.Info("simulate stopping forgerLoop by closing coordinator stopch")

+ 30
- 3
coordinator/proofpool.go

@ -1,10 +1,13 @@
package coordinator
import (
"time"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/log"
)
// ServerProofInterface is the interface to a ServerProof that calculates zk proofs
type ServerProofInterface interface {
CalculateProof(zkInputs *common.ZKInputs) error
GetProof(stopCh chan bool) (*Proof, error)
@ -17,6 +20,7 @@ type ServerProof struct {
Available bool
}
// NewServerProof creates a new ServerProof
func NewServerProof(URL string) *ServerProof {
return &ServerProof{URL: URL}
}
@ -24,12 +28,33 @@ func NewServerProof(URL string) *ServerProof {
// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the
// Proof
func (p *ServerProof) CalculateProof(zkInputs *common.ZKInputs) error {
return nil
return errTODO
}
// GetProof retreives the Proof from the ServerProof
func (p *ServerProof) GetProof(stopCh chan bool) (*Proof, error) {
return nil, nil
return nil, errTODO
}
// ServerProofMock is a mock ServerProof to be used in tests. It doesn't calculate anything
type ServerProofMock struct {
}
// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the
// Proof
func (p *ServerProofMock) CalculateProof(zkInputs *common.ZKInputs) error {
return nil
}
// GetProof retreives the Proof from the ServerProof
func (p *ServerProofMock) GetProof(stopCh chan bool) (*Proof, error) {
// Simulate a delay
select {
case <-time.After(200 * time.Millisecond): //nolint:gomnd
return &Proof{}, nil
case <-stopCh:
return nil, ErrStop
}
}
// ServerProofPool contains the multiple ServerProof
@ -37,17 +62,19 @@ type ServerProofPool struct {
pool chan ServerProofInterface
}
// NewServerProofPool creates a new pool of ServerProofs.
func NewServerProofPool(maxServerProofs int) *ServerProofPool {
return &ServerProofPool{
pool: make(chan ServerProofInterface, maxServerProofs),
}
}
// Add a ServerProof to the pool
func (p *ServerProofPool) Add(serverProof ServerProofInterface) {
p.pool <- serverProof
}
// Get returns the available ServerProof
// Get returns the next available ServerProof
func (p *ServerProofPool) Get(stopCh chan bool) (ServerProofInterface, error) {
select {
case <-stopCh:

+ 2
- 1
go.mod

@ -20,6 +20,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/urfave/cli v1.22.1
github.com/urfave/cli/v2 v2.2.0
go.uber.org/zap v1.13.0
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
gopkg.in/go-playground/validator.v9 v9.29.1
)

+ 10
- 0
go.sum

@ -601,13 +601,23 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

Loading…
Cancel
Save