Advance coordinator implementation

- Common
	- Move ErrTODO and ErrDone to common for usage where needed.
- Coordinator
	- Move prover types to prover package
	- Handle reorgs, stopping the pipeline when necessary
	- Handle ethereum transaction errors by stopping the pipeline
	- In case of ethereum transaction revert, check for known revert causes
	  (more revert causes can be added to handle more cases)
	- Fix skipped transactions in TxManager confirmation logic
	- Cancel and wait for provers to be ready
	- Connect L2DB to:
		- purge l2txs due to timeout
		- mark l2txs at the different states
	- Connect HistoryDB to query L1UserTxs to forge in an L1Batch
- L2DB
	- Skip update functions when the input slices have no values (to avoid a
	  query with no values that results in an SQL error)
- StateDB
	- In LocalStateDB, fix Reset when mt == nil
- Prover (new package)
	- Rename the interface to Prover
	- Rename the mock struct to Mock
	- Extend Prover interface methods to provide everything required by the
	  coordinator
	- Begin implementing required http client code to interact with server
	  proof (not tested)
- Synchronizer:
	- Add LastForgeL1TxsNum to Stats
- Test/Client
	- Update Auction logic to track slots in which there's no forge during
	  the time before the deadline (following the solidity implementation)
This commit is contained in:
Eduard S
2020-12-01 18:05:46 +01:00
parent 16d04de489
commit 482c94d374
14 changed files with 735 additions and 270 deletions

View File

@@ -1,6 +1,8 @@
package coordinator
import (
"context"
"fmt"
"io/ioutil"
"math/big"
"os"
@@ -10,18 +12,53 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder"
dbUtils "github.com/hermeznetwork/hermez-node/db"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/db/l2db"
"github.com/hermeznetwork/hermez-node/db/statedb"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/prover"
"github.com/hermeznetwork/hermez-node/synchronizer"
"github.com/hermeznetwork/hermez-node/test"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/hermeznetwork/tracerr"
"github.com/iden3/go-merkletree/db/pebble"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var deleteme = []string{}
func pebbleMakeCheckpoint(source, dest string) error {
// Remove dest folder (if it exists) before doing the checkpoint
if _, err := os.Stat(dest); !os.IsNotExist(err) {
err := os.RemoveAll(dest)
if err != nil {
return tracerr.Wrap(err)
}
} else if err != nil && !os.IsNotExist(err) {
return tracerr.Wrap(err)
}
sto, err := pebble.NewPebbleStorage(source, false)
if err != nil {
return tracerr.Wrap(err)
}
defer func() {
errClose := sto.Pebble().Close()
if errClose != nil {
log.Errorw("Pebble.Close", "err", errClose)
}
}()
// execute Checkpoint
err = sto.Pebble().Checkpoint(dest)
if err != nil {
return tracerr.Wrap(err)
}
return nil
}
func TestMain(m *testing.M) {
exitVal := m.Run()
for _, dir := range deleteme {
@@ -32,10 +69,16 @@ func TestMain(m *testing.M) {
os.Exit(exitVal)
}
func newTestModules(t *testing.T) (*txselector.TxSelector, *batchbuilder.BatchBuilder) { // FUTURE once Synchronizer is ready, should return it also
var syncDBPath string
var txSelDBPath string
var batchBuilderDBPath string
func newTestModules(t *testing.T) (*historydb.HistoryDB, *l2db.L2DB,
*txselector.TxSelector, *batchbuilder.BatchBuilder) { // FUTURE once Synchronizer is ready, should return it also
nLevels := 32
syncDBPath, err := ioutil.TempDir("", "tmpSyncDB")
var err error
syncDBPath, err = ioutil.TempDir("", "tmpSyncDB")
require.Nil(t, err)
deleteme = append(deleteme, syncDBPath)
syncSdb, err := statedb.NewStateDB(syncDBPath, statedb.TypeSynchronizer, nLevels)
@@ -45,22 +88,23 @@ func newTestModules(t *testing.T) (*txselector.TxSelector, *batchbuilder.BatchBu
db, err := dbUtils.InitSQLDB(5432, "localhost", "hermez", pass, "hermez")
require.Nil(t, err)
l2DB := l2db.NewL2DB(db, 10, 100, 24*time.Hour)
historyDB := historydb.NewHistoryDB(db)
txselDir, err := ioutil.TempDir("", "tmpTxSelDB")
txSelDBPath, err = ioutil.TempDir("", "tmpTxSelDB")
require.Nil(t, err)
deleteme = append(deleteme, txselDir)
txsel, err := txselector.NewTxSelector(txselDir, syncSdb, l2DB, 10, 10, 10)
deleteme = append(deleteme, txSelDBPath)
txsel, err := txselector.NewTxSelector(txSelDBPath, syncSdb, l2DB, 10, 10, 10)
assert.Nil(t, err)
bbDir, err := ioutil.TempDir("", "tmpBatchBuilderDB")
batchBuilderDBPath, err = ioutil.TempDir("", "tmpBatchBuilderDB")
require.Nil(t, err)
deleteme = append(deleteme, bbDir)
bb, err := batchbuilder.NewBatchBuilder(bbDir, syncSdb, nil, 0, uint64(nLevels))
deleteme = append(deleteme, batchBuilderDBPath)
bb, err := batchbuilder.NewBatchBuilder(batchBuilderDBPath, syncSdb, nil, 0, uint64(nLevels))
assert.Nil(t, err)
// l1Txs, coordinatorL1Txs, poolL2Txs := test.GenerateTestTxsFromSet(t, test.SetTest0)
return txsel, bb
return historyDB, l2DB, txsel, bb
}
type timer struct {
@@ -77,7 +121,7 @@ var bidder = ethCommon.HexToAddress("0x6b175474e89094c44da98b954eedeac495271d0f"
var forger = ethCommon.HexToAddress("0xc344E203a046Da13b0B4467EB7B3629D0C99F6E6")
func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *test.Client, ethClientSetup *test.ClientSetup) *Coordinator {
txsel, bb := newTestModules(t)
historyDB, l2DB, txsel, bb := newTestModules(t)
debugBatchPath, err := ioutil.TempDir("", "tmpDebugBatch")
require.Nil(t, err)
@@ -89,10 +133,10 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
L1BatchTimeoutPerc: 0.5,
EthClientAttempts: 5,
EthClientAttemptsDelay: 100 * time.Millisecond,
TxManagerCheckInterval: 500 * time.Millisecond,
TxManagerCheckInterval: 300 * time.Millisecond,
DebugBatchPath: debugBatchPath,
}
serverProofs := []ServerProofInterface{&ServerProofMock{}, &ServerProofMock{}}
serverProofs := []prover.Client{&prover.MockClient{}, &prover.MockClient{}}
scConsts := &synchronizer.SCConsts{
Rollup: *ethClientSetup.RollupConstants,
@@ -104,7 +148,8 @@ func newTestCoordinator(t *testing.T, forgerAddr ethCommon.Address, ethClient *t
Auction: *ethClientSetup.AuctionVariables,
WDelayer: *ethClientSetup.WDelayerVariables,
}
coord, err := NewCoordinator(conf, nil, txsel, bb, serverProofs, ethClient, scConsts, initSCVars)
coord, err := NewCoordinator(conf, historyDB, l2DB, txsel, bb, serverProofs,
ethClient, scConsts, initSCVars)
require.Nil(t, err)
return coord
}
@@ -142,13 +187,26 @@ func TestCoordinatorFlow(t *testing.T) {
time.Sleep(100 * time.Millisecond)
var stats synchronizer.Stats
stats.Eth.LastBlock = *ethClient.CtlLastBlock()
stats.Sync.LastBlock = *ethClient.CtlLastBlock()
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Eth.LastBatch = ethClient.CtlLastForgedBatch()
stats.Sync.LastBatch = stats.Eth.LastBatch
canForge, err := ethClient.AuctionCanForge(forger, blockNum+1)
require.Nil(t, err)
if canForge {
// fmt.Println("DBG canForge")
stats.Sync.Auction.CurrentSlot.Forger = forger
}
// Copy stateDB to synchronizer if there was a new batch
source := fmt.Sprintf("%v/BatchNum%v", batchBuilderDBPath, stats.Sync.LastBatch)
dest := fmt.Sprintf("%v/BatchNum%v", syncDBPath, stats.Sync.LastBatch)
if stats.Sync.LastBatch != 0 {
if _, err := os.Stat(dest); os.IsNotExist(err) {
log.Infow("Making pebble checkpoint for sync",
"source", source, "dest", dest)
err = pebbleMakeCheckpoint(source, dest)
require.NoError(t, err)
}
}
coord.SendMsg(MsgSyncStats{
Stats: stats,
})
@@ -247,6 +305,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
require.Nil(t, err)
var stats synchronizer.Stats
ctx := context.Background()
// Slot 0. No bid, so the winner is the boot coordinator
// pipelineStarted: false -> false
@@ -254,7 +313,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, false, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.Nil(t, coord.pipeline)
// Slot 0. No bid, and we reach the deadline, so anyone can forge
@@ -264,7 +323,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, true, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.NotNil(t, coord.pipeline)
// Slot 0. No bid, and we reach the deadline, so anyone can forge
@@ -274,7 +333,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, true, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.NotNil(t, coord.pipeline)
// Slot 0. No bid, so the winner is the boot coordinator
@@ -284,7 +343,7 @@ func TestCoordHandleMsgSyncStats(t *testing.T) {
stats.Sync.LastBlock = stats.Eth.LastBlock
stats.Sync.Auction.CurrentSlot.Forger = bootForger
assert.Equal(t, false, coord.canForge(&stats))
require.Nil(t, coord.handleMsgSyncStats(&stats))
require.Nil(t, coord.handleMsgSyncStats(ctx, &stats))
assert.Nil(t, coord.pipeline)
}
@@ -292,9 +351,11 @@ func TestPipelineShouldL1L2Batch(t *testing.T) {
ethClientSetup := test.NewClientSetupExample()
var timer timer
ctx := context.Background()
ethClient := test.NewClient(true, &timer, &bidder, ethClientSetup)
coord := newTestCoordinator(t, forger, ethClient, ethClientSetup)
pipeline := coord.newPipeline()
pipeline, err := coord.newPipeline(ctx)
require.NoError(t, err)
pipeline.vars = coord.vars
// Check that the parameters are the ones we expect and use in this test
@@ -354,3 +415,6 @@ func TestPipelineShouldL1L2Batch(t *testing.T) {
// TODO: Test Reorg
// TODO: Test Pipeline
// TODO: Test TxMonitor
// TODO: Test forgeSendServerProof
// TODO: Test waitServerProof
// TODO: Test handleReorg