Update test ethclient and coordinator

This commit is contained in:
Eduard S
2020-09-17 10:19:02 +02:00
parent d71871c8b7
commit 0fc4930652
11 changed files with 347 additions and 74 deletions

View File

@@ -1,6 +1,7 @@
package coordinator
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/hermeznetwork/hermez-node/common"
)
@@ -18,6 +19,7 @@ type BatchInfo struct {
L1OperatorTxs []*common.L1Tx
L2Txs []*common.PoolL2Tx
// FeesInfo
ethTx *types.Transaction
}
// NewBatchInfo creates a new BatchInfo with the given batchNum &
@@ -52,3 +54,8 @@ func (bi *BatchInfo) SetServerProof(serverProof ServerProofInterface) {
func (bi *BatchInfo) SetProof(proof *Proof) {
bi.proof = proof
}
// SetEthTx sets the ethTx to the BatchInfo data structure
func (bi *BatchInfo) SetEthTx(ethTx *types.Transaction) {
bi.ethTx = ethTx
}

View File

@@ -1,18 +1,18 @@
package coordinator
import (
"context"
"fmt"
"sync"
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/txselector"
kvdb "github.com/iden3/go-merkletree/db"
"github.com/iden3/go-merkletree/db/memory"
)
var errTODO = fmt.Errorf("TODO")
@@ -28,9 +28,9 @@ type Config struct {
// Coordinator implements the Coordinator type
type Coordinator struct {
forging bool
rw *sync.RWMutex
isForgeSeq bool // WIP just for testing while implementing
forging bool
// rw *sync.RWMutex
// isForgeSeq bool // WIP just for testing while implementing
config Config
@@ -42,8 +42,9 @@ type Coordinator struct {
txsel *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
ethClient eth.ClientInterface
ethTxStore kvdb.Storage
ethClient eth.ClientInterface
ethTxs []*types.Transaction
// ethTxStore kvdb.Storage
}
// NewCoordinator creates a new Coordinator
@@ -64,17 +65,30 @@ func NewCoordinator(conf Config,
txsel: txsel,
batchBuilder: bb,
ethClient: ethClient,
ethTxStore: memory.NewMemoryStorage(),
rw: &sync.RWMutex{},
ethTxs: make([]*types.Transaction, 0),
// ethTxStore: memory.NewMemoryStorage(),
// rw: &sync.RWMutex{},
}
return &c
}
// TODO(Edu): Change the current design of the coordinator structur:
// - Move Start and Stop functions (from node/node.go) here
// - Add concept of StartPipeline, StopPipeline, that spawns and stops the goroutines
// - Add a Manager that calls StartPipeline and StopPipeline, checks when it's time to forge, schedules new batches, etc.
// - Add a TxMonitor that monitors successful ForgeBatch ethereum transactions and waits for N blocks of confirmation, and reports back errors to the Manager.
// ForgeLoopFn is the function ran in a loop that checks if it's time to forge
// and forges a batch if so and sends it to outBatchCh. Returns true if it's
// the coordinator turn to forge.
func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool) (forgetime bool, err error) {
if !c.isForgeSequence() {
// TODO: Move the logic to check if it's forge time or not outside the pipeline
isForgeSequence, err := c.isForgeSequence()
if err != nil {
return false, err
}
if !isForgeSequence {
if c.forging {
log.Info("ForgeLoopFn: forging state end")
c.forging = false
@@ -106,6 +120,12 @@ func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool)
// if c.synchronizer.Reorg():
_ = c.handleReorg()
defer func() {
if err == ErrStop {
log.Info("ForgeLoopFn: forgeLoopFn stopped")
}
}()
// 0. Wait for an available server proof
// blocking call
serverProof, err := c.serverProofPool.Get(stopCh)
@@ -134,10 +154,14 @@ func (c *Coordinator) ForgeLoopFn(outBatchCh chan *BatchInfo, stopCh chan bool)
// GetProofCallForgeLoopFn is the function ran in a loop that gets a forged
// batch via inBatchCh, waits for the proof server to finish, calls the ForgeBatch
// function in the Rollup Smart Contract, and sends the batch to outBatchCh.
func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchInfo, stopCh chan bool) error {
func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchInfo, stopCh chan bool) (err error) {
defer func() {
if err == ErrStop {
log.Info("GetProofCallForgeLoopFn: forgeLoopFn stopped")
}
}()
select {
case <-stopCh:
log.Info("GetProofCallForgeLoopFn: forgeLoopFn stopped")
return ErrStop
case batchInfo := <-inBatchCh:
log.Debugw("GetProofCallForgeLoopFn: getProofCallForge start", "batchNum", batchInfo.batchNum)
@@ -153,14 +177,18 @@ func (c *Coordinator) GetProofCallForgeLoopFn(inBatchCh, outBatchCh chan *BatchI
// ForgeCallConfirmLoopFn is the function ran in a loop that gets a batch that
// has been sent to the Rollup Smart Contract via inBatchCh and waits for the
// ethereum transaction confirmation.
func (c *Coordinator) ForgeCallConfirmLoopFn(inBatchCh chan *BatchInfo, stopCh chan bool) error {
func (c *Coordinator) ForgeCallConfirmLoopFn(inBatchCh chan *BatchInfo, stopCh chan bool) (err error) {
defer func() {
if err == ErrStop {
log.Info("ForgeCallConfirmLoopFn: forgeConfirmLoopFn stopped")
}
}()
select {
case <-stopCh:
log.Info("ForgeCallConfirmLoopFn: forgeConfirmLoopFn stopped")
return ErrStop
case batchInfo := <-inBatchCh:
log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm start", "batchNum", batchInfo.batchNum)
if err := c.forgeCallConfirm(batchInfo); err != nil {
if err := c.forgeCallConfirm(batchInfo, stopCh); err != nil {
return err
}
log.Debugw("ForgeCallConfirmLoopFn: forgeCallConfirm end", "batchNum", batchInfo.batchNum)
@@ -244,13 +272,15 @@ func (c *Coordinator) getProofCallForge(batchInfo *BatchInfo, stopCh chan bool)
}
batchInfo.SetProof(proof)
forgeBatchArgs := c.prepareForgeBatchArgs(batchInfo)
_, err = c.ethClient.RollupForgeBatch(forgeBatchArgs)
ethTx, err := c.ethClient.RollupForgeBatch(forgeBatchArgs)
if err != nil {
return err
}
// TODO: Move this to the next step (forgeCallConfirm)
log.Debugf("ethClient ForgeCall sent, batchNum: %d", c.batchNum)
batchInfo.SetEthTx(ethTx)
// TODO once tx data type is defined, store ethTx (returned by ForgeCall)
// TODO(FUTURE) once tx data type is defined, store ethTx (returned by ForgeCall)
// TBD if use ethTxStore as a disk k-v database, or use a Queue
// tx, err := c.ethTxStore.NewTx()
// if err != nil {
@@ -264,13 +294,46 @@ func (c *Coordinator) getProofCallForge(batchInfo *BatchInfo, stopCh chan bool)
return nil
}
func (c *Coordinator) forgeCallConfirm(batchInfo *BatchInfo) error {
func (c *Coordinator) forgeCallConfirm(batchInfo *BatchInfo, stopCh chan bool) error {
// TODO strategy of this sequence TBD
// confirm eth txs and mark them as accepted sequence
// IDEA: Keep an array in Coordinator with the list of sent ethTx.
// Here, loop over them and only delete them once the number of
// confirmed blocks is over a configured value. If the tx is rejected,
// return error.
// ethTx := ethTxStore.GetFirstPending()
// waitForAccepted(ethTx) // blocking call, returns once the ethTx is mined
// ethTxStore.MarkAccepted(ethTx)
return nil
txID := batchInfo.ethTx.Hash()
// TODO: Follow EthereumClient.waitReceipt logic
count := 0
// TODO: Define this waitTime in the config
waitTime := 100 * time.Millisecond //nolint:gomnd
select {
case <-time.After(waitTime):
receipt, err := c.ethClient.EthTransactionReceipt(context.TODO(), txID)
if err != nil {
return err
}
if receipt != nil {
if receipt.Status == types.ReceiptStatusFailed {
return fmt.Errorf("receipt status is failed")
} else if receipt.Status == types.ReceiptStatusSuccessful {
return nil
}
}
// TODO: Call go-ethereum:
// if err == nil && receipt == nil :
// `func (ec *Client) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {`
count++
if time.Duration(count)*waitTime > 60*time.Second {
log.Warnw("Waiting for ethTx receipt for more than 60 seconds", "tx", batchInfo.ethTx)
// TODO: Decide if we resend the Tx with higher gas price
}
case <-stopCh:
return ErrStop
}
return fmt.Errorf("timeout")
}
func (c *Coordinator) handleReorg() error {
@@ -278,10 +341,17 @@ func (c *Coordinator) handleReorg() error {
}
// isForgeSequence returns true if the node is the Forger in the current ethereum block
func (c *Coordinator) isForgeSequence() bool {
c.rw.RLock()
defer c.rw.RUnlock()
return c.isForgeSeq // TODO
func (c *Coordinator) isForgeSequence() (bool, error) {
// TODO: Consider checking if we can forge by quering the Synchronizer instead of using ethClient
blockNum, err := c.ethClient.EthCurrentBlock()
if err != nil {
return false, err
}
addr, err := c.ethClient.EthAddress()
if err != nil {
return false, err
}
return c.ethClient.AuctionCanForge(*addr, blockNum+1)
}
func (c *Coordinator) purgeRemoveByTimeout() error {

View File

@@ -2,10 +2,12 @@ package coordinator
import (
"io/ioutil"
"math/big"
"os"
"testing"
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder"
dbUtils "github.com/hermeznetwork/hermez-node/db"
"github.com/hermeznetwork/hermez-node/db/historydb"
@@ -65,8 +67,9 @@ func (cn *CoordNode) Start() {
cn.stopForge = make(chan bool)
cn.stopGetProofCallForge = make(chan bool)
cn.stopForgeCallConfirm = make(chan bool)
batchCh0 := make(chan *BatchInfo)
batchCh1 := make(chan *BatchInfo)
queueSize := 8
batchCh0 := make(chan *BatchInfo, queueSize)
batchCh1 := make(chan *BatchInfo, queueSize)
go func() {
for {
@@ -78,6 +81,7 @@ func (cn *CoordNode) Start() {
return
} else if err != nil {
log.Errorw("CoordNode ForgeLoopFn", "error", err)
time.Sleep(200 * time.Millisecond) // Avoid overflowing log with errors
} else if !forge {
time.Sleep(200 * time.Millisecond)
}
@@ -133,17 +137,38 @@ func (t *timer) Time() int64 {
return currentTime
}
func waitForSlot(t *testing.T, c *test.Client, slot int64) {
for {
blockNum, err := c.EthCurrentBlock()
require.Nil(t, err)
nextBlockSlot, err := c.AuctionGetSlotNumber(blockNum + 1)
require.Nil(t, err)
if nextBlockSlot == slot {
break
}
c.CtlMineBlock()
}
}
func TestCoordinator(t *testing.T) {
txsel, bb := newTestModules(t)
conf := Config{}
hdb := &historydb.HistoryDB{}
serverProofs := []ServerProofInterface{&ServerProof{}, &ServerProof{}}
serverProofs := []ServerProofInterface{&ServerProofMock{}, &ServerProofMock{}}
var timer timer
ethClientSetup := test.NewClientSetupExample()
addr := ethClientSetup.AuctionVariables.BootCoordinator
ethClient := test.NewClient(true, &timer, addr, ethClientSetup)
addr := ethCommon.HexToAddress("0xc344E203a046Da13b0B4467EB7B3629D0C99F6E6")
ethClient := test.NewClient(true, &timer, &addr, ethClientSetup)
// Bid for slot 2 and 4
_, err := ethClient.AuctionRegisterCoordinator(addr, "https://foo.bar")
require.Nil(t, err)
_, err = ethClient.AuctionBid(2, big.NewInt(9999), addr)
require.Nil(t, err)
_, err = ethClient.AuctionBid(4, big.NewInt(9999), addr)
require.Nil(t, err)
c := NewCoordinator(conf, hdb, txsel, bb, serverProofs, ethClient)
cn := NewCoordNode(c)
@@ -151,24 +176,18 @@ func TestCoordinator(t *testing.T) {
time.Sleep(1 * time.Second)
// simulate forgeSequence time
waitForSlot(t, ethClient, 2)
log.Info("simulate entering in forge time")
c.rw.Lock()
c.isForgeSeq = true
c.rw.Unlock()
time.Sleep(1 * time.Second)
// simulate going out from forgeSequence
waitForSlot(t, ethClient, 3)
log.Info("simulate going out from forge time")
c.rw.Lock()
c.isForgeSeq = false
c.rw.Unlock()
time.Sleep(1 * time.Second)
// simulate entering forgeSequence time again
waitForSlot(t, ethClient, 4)
log.Info("simulate entering in forge time again")
c.rw.Lock()
c.isForgeSeq = true
c.rw.Unlock()
time.Sleep(2 * time.Second)
// simulate stopping forgerLoop by channel

View File

@@ -28,11 +28,13 @@ func NewServerProof(URL string) *ServerProof {
// CalculateProof sends the *common.ZKInputs to the ServerProof to compute the
// Proof
func (p *ServerProof) CalculateProof(zkInputs *common.ZKInputs) error {
log.Error("TODO")
return errTODO
}
// GetProof retreives the Proof from the ServerProof
func (p *ServerProof) GetProof(stopCh chan bool) (*Proof, error) {
log.Error("TODO")
return nil, errTODO
}