You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

569 lines
18 KiB

package coordinator
import (
"context"
"fmt"
"io/ioutil"
"math/big"
"os"
"sync"
"testing"
"time"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
dbUtils "github.com/hermeznetwork/hermez-node/db"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/prover"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/test"
"github.com/hermeznetwork/hermez-node/txprocessor"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/hermeznetwork/tracerr"
"github.com/iden3/go-iden3-crypto/babyjub"
"github.com/iden3/go-merkletree/db/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var deleteme = []string{}
func pebbleMakeCheckpoint(source, dest string) error {
// Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) {
err := os.RemoveAll(dest)
if err != nil {
return tracerr.Wrap(err)
}
} else if err != nil && !os.IsNotExist(err) {
return tracerr.Wrap(err)
}
sto, err := pebble.NewPebbleStorage(source, false)
if err != nil {
return tracerr.Wrap(err)
}
defer func() {
errClose := sto.Pebble().Close()
if errClose != nil {
log.Errorw("Pebble.Close", "err", errClose)
}
}()
// execute Checkpoint
err = sto.Pebble().Checkpoint(dest)
if err != nil {
return tracerr.Wrap(err)
}
return nil
}
func TestMain(m *testing.M) {
exitVal := m.Run()
for _, dir := range deleteme {
if err := os.RemoveAll(dir); err != nil {
panic(err)
}
}
os.Exit(exitVal)
}
var syncDBPath string
var txSelDBPath string
var batchBuilderDBPath string
type modules struct {
historyDB *historydb.HistoryDB
l2DB *l2db.L2DB
txSelector *txselector.TxSelector
batchBuilder *batchbuilder.BatchBuilder
stateDB *statedb.StateDB
}
var maxL1Txs uint64 = 256
var maxTxs uint64 = 376
var nLevels uint32 = 32 //nolint:deadcode,unused
var maxFeeTxs uint32 = 64 //nolint:deadcode,varcheck
var chainID uint16 = 0
func newTestModules(t *testing.T) modules {
var err error
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
require.NoError(t, err)
deleteme = append(deleteme, syncDBPath)
syncStateDB, err := statedb.NewStateDB(statedb.Config{Path: syncDBPath, Keep: 128,
Type: statedb.TypeSynchronizer, NLevels: 48})
assert.NoError(t, err)
pass := os.Getenv("POSTGRES_PASS")
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.NoError(t, err)
test.WipeDB(db)
l2DB := l2db.NewL2DB(db, db, 10, 100, 0.0, 24*time.Hour, nil)
historyDB := historydb.NewHistoryDB(db, db, nil)
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")
require.NoError(t, err)
deleteme = append(deleteme, txSelDBPath)
var bjj babyjub.PublicKeyComp
err = bjj.UnmarshalText([]byte("c433f7a696b7aa3a5224efb3993baf0ccd9e92eecee0c29a3f6c8208a9e81d9e"))
require.NoError(t, err)
coordAccount := &txselector.CoordAccount{ // TODO TMP
Addr: ethCommon.HexToAddress("0xc58d29fA6e86E4FAe04DDcEd660d45BCf3Cb2370"),
BJJ: bjj,
AccountCreationAuth: nil,
}
txSelector, err := txselector.NewTxSelector(coordAccount, txSelDBPath, syncStateDB, l2DB)
assert.NoError(t, err)
batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB")
require.NoError(t, err)
deleteme = append(deleteme, batchBuilderDBPath)
batchBuilder, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncStateDB, 0,
uint64(nLevels))
assert.NoError(t, err)
return modules{
historyDB: historyDB,
l2DB: l2DB,
txSelector: txSelector,
batchBuilder: batchBuilder,
stateDB: syncStateDB,
}
}
type timer struct {
time int64
}
func (t *timer) Time() int64 {
currentTime := t.time
t.time++
return currentTime
}
var bidder = ethCommon.HexToAddress("0x6b175474e89094c44da98b954eedeac495271d0f")
var forger = ethCommon.HexToAddress("0xc344E203a046Da13b0B4467EB7B3629D0C99F6E6")
func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *test.Client,
ethClientSetup *test.ClientSetup, modules modules) *Coordinator {
debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch")
require.NoError(t, err)
deleteme = append(deleteme, debugBatchPath)
conf := Config{
ForgerAddress: forgerAddr,
ConfirmBlocks: 5,
L1BatchTimeoutPerc: 0.5,
EthClientAttempts: 5,
SyncRetryInterval: 400 * time.Microsecond,
EthClientAttemptsDelay: 100 * time.Millisecond,
TxManagerCheckInterval: 300 * time.Millisecond,
DebugBatchPath: debugBatchPath,
Purger: PurgerCfg{
PurgeBatchDelay: 10,
PurgeBlockDelay: 10,
InvalidateBatchDelay: 4,
InvalidateBlockDelay: 4,
},
TxProcessorConfig: txprocessor.Config{
NLevels: nLevels,
MaxFeeTx: maxFeeTxs,
MaxTx: uint32(maxTxs),
MaxL1Tx: uint32(maxL1Txs),
ChainID: chainID,
},
VerifierIdx: 0,
}
serverProofs := []prover.Client{
&prover.MockClient{Delay: 300 * time.Millisecond},
&prover.MockClient{Delay: 400 * time.Millisecond},
}
scConsts := &synchronizer.SCConsts{
Rollup: *ethClientSetup.RollupConstants,
Auction: *ethClientSetup.AuctionConstants,
WDelayer: *ethClientSetup.WDelayerConstants,
}
initSCVars := &synchronizer.SCVariables{
Rollup: *ethClientSetup.RollupVariables,
Auction: *ethClientSetup.AuctionVariables,
WDelayer: *ethClientSetup.WDelayerVariables,
}
coord, err := NewCoordinator(conf, modules.historyDB, modules.l2DB, modules.txSelector,
modules.batchBuilder, serverProofs, ethClient, scConsts, initSCVars)
require.NoError(t, err)
return coord
}
func newTestSynchronizer(t *testing.T, ethClient *test.Client, ethClientSetup *test.ClientSetup,
modules modules) *synchronizer.Synchronizer {
sync, err := synchronizer.NewSynchronizer(ethClient, modules.historyDB, modules.stateDB,
synchronizer.Config{
StatsRefreshPeriod: 0 * time.Second,
})
require.NoError(t, err)
return sync
}
// TestCoordinatorFlow is a test where the coordinator is stared (which means
// that goroutines are spawned), and ethereum blocks are mined via the
// test.Client to simulate starting and stopping forging times. This test
// works without a synchronizer, and no l2txs are inserted in the pool, so all
// the batches are forged empty. The purpose of this test is to manually
// observe via the logs that nothing crashes and that the coordinator starts
// and stops forging at the right blocks.
func TestCoordinatorFlow(t *testing.T) {
if os.Getenv("TEST_COORD_FLOW") == "" {
return
}
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
// Bid for slot 2 and 4
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
require.NoError(t, err)
bid, ok := new(big.Int).SetString("11000000000000000000", 10)
if !ok {
panic("bad bid")
}
_, err = ethClient.AuctionBidSimple(3, bid)
require.NoError(t, err)
_, err = ethClient.AuctionBidSimple(5, bid)
require.NoError(t, err)
coord.Start()
ctx := context.Background()
time.Sleep(1 * time.Second)
waitForSlot := func(slot int64) {
for {
blockNum, err := ethClient.EthLastBlock()
require.NoError(t, err)
nextBlockSlot, err := ethClient.AuctionGetSlotNumber(blockNum + 1)
require.NoError(t, err)
if nextBlockSlot == slot {
break
}
ethClient.CtlMineBlock()
time.Sleep(100 * time.Millisecond)
var stats synchronizer.Stats
stats.Eth.LastBlock = *ethClient.CtlLastBlock()
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Eth.LastBatchNum = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch.BatchNum = common.BatchNum(stats.Eth.LastBatchNum)
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
require.NoError(t, err)
var slot common.Slot
slotNum := ethClientSetup.AuctionConstants.SlotNum(blockNum + 1)
slot.StartBlock = ethClientSetup.AuctionConstants.GenesisBlockNum +
(slotNum)*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
slot.EndBlock = ethClientSetup.AuctionConstants.GenesisBlockNum +
(slotNum+1)*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
if canForge {
slot.Forger = forger
}
stats.Sync.Auction.CurrentSlot = slot
// Copy stateDB to synchronizer if there was a new batch
source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch)
dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch)
if stats.Sync.LastBatch.BatchNum != 0 {
if _, err := os.Stat(dest); os.IsNotExist(err) {
log.Infow("Making pebble checkpoint for sync",
"source", source, "dest", dest)
err = pebbleMakeCheckpoint(source, dest)
require.NoError(t, err)
}
}
coord.SendMsg(ctx, MsgSyncBlock{
Stats: stats,
})
}
}
// NOTE: With the current test, the coordinator will enter in forge
// time before the bidded slot because no one else is forging in the
// other slots before the slot deadline.
// simulate forgeSequence time
waitForSlot(2)
log.Info("~~~ simulate entering in forge time")
time.Sleep(1 * time.Second)
// simulate going out from forgeSequence
waitForSlot(3)
log.Info("~~~ simulate going out from forge time")
time.Sleep(1 * time.Second)
// simulate entering forgeSequence time again
waitForSlot(4)
log.Info("~~~ simulate entering in forge time again")
time.Sleep(2 * time.Second)
// simulate stopping forgerLoop by channel
log.Info("~~~ simulate stopping forgerLoop by closing coordinator stopch")
coord.Stop()
time.Sleep(1 * time.Second)
}
func TestCoordinatorStartStop(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
coord.Start()
coord.Stop()
}
func TestCoordCanForge(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
bootForger := ethClientSetup.AuctionVariables.BootCoordinator
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
require.NoError(t, err)
bid, ok := new(big.Int).SetString("12000000000000000000", 10)
if !ok {
panic("bad bid")
}
_, err = ethClient.AuctionBidSimple(3, bid)
require.NoError(t, err)
modules2 := newTestModules(t)
bootCoord := newTestCoordinator(t, bootForger, ethClient, ethClientSetup, modules2)
assert.Equal(t, forger, coord.cfg.ForgerAddress)
assert.Equal(t, bootForger, bootCoord.cfg.ForgerAddress)
ethBootCoord, err := ethClient.AuctionGetBootCoordinator()
require.NoError(t, err)
assert.Equal(t, &bootForger, ethBootCoord)
var stats synchronizer.Stats
slots := [4]common.Slot{}
for i := 0; i < 4; i++ {
slots[i].StartBlock = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(i)*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
slots[i].EndBlock = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(i+1)*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
// Only slot 3 has Coordinator winner, the rest are BootCoordinator
if i == 3 {
slots[i].Forger = forger
} else {
slots[i].Forger = bootForger
}
}
// Slot 0. No bid, so the winner is the boot coordinator
stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot = slots[0]
coord.stats = stats
bootCoord.stats = stats
assert.Equal(t, false, coord.canForge())
assert.Equal(t, true, bootCoord.canForge())
// Slot 0. No bid, and we reach the deadline, so anyone can forge
stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(ethClientSetup.AuctionVariables.SlotDeadline)
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot = slots[0]
coord.stats = stats
bootCoord.stats = stats
assert.Equal(t, true, coord.canForge())
assert.Equal(t, true, bootCoord.canForge())
// Slot 3. coordinator bid, so the winner is the coordinator
stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum +
3*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot = slots[3]
coord.stats = stats
bootCoord.stats = stats
assert.Equal(t, true, coord.canForge())
assert.Equal(t, false, bootCoord.canForge())
}
func TestCoordHandleMsgSyncBlock(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
bootForger := ethClientSetup.AuctionVariables.BootCoordinator
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
_, err := ethClient.AuctionSetCoordinator(forger, "https://foo.bar")
require.NoError(t, err)
bid, ok := new(big.Int).SetString("11000000000000000000", 10)
if !ok {
panic("bad bid")
}
_, err = ethClient.AuctionBidSimple(3, bid)
require.NoError(t, err)
slots := [4]common.Slot{}
for i := 0; i < 4; i++ {
slots[i].StartBlock = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(i)*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
slots[i].EndBlock = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(i+1)*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
// Only slot 3 has Coordinator winner, the rest are BootCoordinator
if i == 3 {
slots[i].Forger = forger
} else {
slots[i].Forger = bootForger
}
}
var msg MsgSyncBlock
coord.stats = msg.Stats
ctx := context.Background()
// Slot 0. No bid, so the winner is the boot coordinator
// pipelineStarted: false -> false
coord.stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum
coord.stats.Sync.LastBlock = coord.stats.Eth.LastBlock
coord.stats.Sync.Auction.CurrentSlot = slots[0]
assert.Equal(t, false, coord.canForge())
msg.Stats = coord.stats
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.Nil(t, coord.pipeline)
// Slot 0. No bid, and we reach the deadline, so anyone can forge
// pipelineStarted: false -> true
coord.stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(ethClientSetup.AuctionVariables.SlotDeadline)
coord.stats.Sync.LastBlock = coord.stats.Eth.LastBlock
coord.stats.Sync.Auction.CurrentSlot = slots[0]
assert.Equal(t, true, coord.canForge())
msg.Stats = coord.stats
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.NotNil(t, coord.pipeline)
// Slot 0. No bid, and we reach the deadline, so anyone can forge
// pipelineStarted: true -> true
coord.stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum +
int64(ethClientSetup.AuctionVariables.SlotDeadline) + 1
coord.stats.Sync.LastBlock = coord.stats.Eth.LastBlock
coord.stats.Sync.Auction.CurrentSlot = slots[0]
assert.Equal(t, true, coord.canForge())
msg.Stats = coord.stats
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.NotNil(t, coord.pipeline)
// Slot 1. No bid, so the winner is the boot coordinator
// pipelineStarted: true -> false
coord.stats.Eth.LastBlock.Num = ethClientSetup.AuctionConstants.GenesisBlockNum +
1*int64(ethClientSetup.AuctionConstants.BlocksPerSlot)
coord.stats.Sync.LastBlock = coord.stats.Eth.LastBlock
coord.stats.Sync.Auction.CurrentSlot = slots[1]
assert.Equal(t, false, coord.canForge())
msg.Stats = coord.stats
require.NoError(t, coord.handleMsgSyncBlock(ctx, &msg))
assert.Nil(t, coord.pipeline)
}
// ethAddTokens adds the tokens from the blocks to the blockchain
func ethAddTokens(blocks []common.BlockData, client *test.Client) {
for _, block := range blocks {
for _, token := range block.Rollup.AddedTokens {
consts := eth.ERC20Consts{
Name: fmt.Sprintf("Token %d", token.TokenID),
Symbol: fmt.Sprintf("TK%d", token.TokenID),
Decimals: 18,
}
// tokenConsts[token.TokenID] = consts
client.CtlAddERC20(token.EthAddr, consts)
}
}
}
func TestCoordinatorStress(t *testing.T) {
if os.Getenv("TEST_COORD_STRESS") == "" {
return
}
log.Info("Begin Test Coord Stress")
ethClientSetup := test.NewClientSetupExample()
ethClientSetup.ChainID = big.NewInt(int64(chainID))
var timer timer
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
modules := newTestModules(t)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup, modules)
syn := newTestSynchronizer(t, ethClient, ethClientSetup, modules)
coord.Start()
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// Synchronizer loop
wg.Add(1)
go func() {
for {
blockData, _, err := syn.Sync(ctx, nil)
if ctx.Err() != nil {
wg.Done()
return
}
require.NoError(t, err)
if blockData != nil {
stats := syn.Stats()
coord.SendMsg(ctx, MsgSyncBlock{
Stats: *stats,
Batches: blockData.Rollup.Batches,
Vars: synchronizer.SCVariablesPtr{
Rollup: blockData.Rollup.Vars,
Auction: blockData.Auction.Vars,
WDelayer: blockData.WDelayer.Vars,
},
})
} else {
time.Sleep(100 * time.Millisecond)
}
}
}()
// Blockchain mining loop
wg.Add(1)
go func() {
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-time.After(1 * time.Second):
ethClient.CtlMineBlock()
}
}
}()
time.Sleep(600 * time.Second)
cancel()
wg.Wait()
coord.Stop()
}
// TODO: Test Reorg
// TODO: Test TxMonitor
// TODO: Test forgeBatch
// TODO: Test waitServerProof
// TODO: Test handleReorg