@ -6,7 +6,9 @@ import (
ethCommon "github.com/ethereum/go-ethereum/common"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/batchbuilder"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/common"
"github.com/hermeznetwork/hermez-node/db/historydb"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/eth"
"github.com/hermeznetwork/hermez-node/log"
"github.com/hermeznetwork/hermez-node/txselector"
"github.com/hermeznetwork/hermez-node/txselector"
kvdb "github.com/iden3/go-merkletree/db"
kvdb "github.com/iden3/go-merkletree/db"
"github.com/iden3/go-merkletree/db/memory"
"github.com/iden3/go-merkletree/db/memory"
@ -15,17 +17,26 @@ import (
// CoordinatorConfig contains the Coordinator configuration
// CoordinatorConfig contains the Coordinator configuration
type CoordinatorConfig struct {
type CoordinatorConfig struct {
ForgerAddress ethCommon . Address
ForgerAddress ethCommon . Address
LoopInterval time . Duration
}
}
// Coordinator implements the Coordinator type
// Coordinator implements the Coordinator type
type Coordinator struct {
type Coordinator struct {
// m sync.Mutex
stopch chan bool
stopforgerch chan bool
forging bool
isForgeSeq bool // WIP just for testing while implementing
config CoordinatorConfig
config CoordinatorConfig
batchNum uint64
batchNum common . BatchNum
batchQueue * BatchQueue
batchQueue * BatchQueue
serverProofPool ServerProofPool
serverProofPool ServerProofPool
// synchronizer *synchronizer.Synchronizer
// synchronizer *synchronizer.Synchronizer
hdb * historydb . HistoryDB
txsel * txselector . TxSelector
txsel * txselector . TxSelector
batchBuilder * batchbuilder . BatchBuilder
batchBuilder * batchbuilder . BatchBuilder
@ -34,48 +45,121 @@ type Coordinator struct {
}
}
// NewCoordinator creates a new Coordinator
// NewCoordinator creates a new Coordinator
func NewCoordinator ( ) * Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here
var c * Coordinator
// c.ethClient = eth.NewClient() // TBD
c . ethTxStore = memory . NewMemoryStorage ( )
return c
func NewCoordinator ( conf CoordinatorConfig ,
hdb * historydb . HistoryDB ,
txsel * txselector . TxSelector ,
bb * batchbuilder . BatchBuilder ,
ethClient * eth . Client ) * Coordinator { // once synchronizer is ready, synchronizer.Synchronizer will be passed as parameter here
c := Coordinator {
config : conf ,
hdb : hdb ,
txsel : txsel ,
batchBuilder : bb ,
ethClient : ethClient ,
ethTxStore : memory . NewMemoryStorage ( ) ,
}
return & c
}
func ( c * Coordinator ) Stop ( ) {
log . Info ( "Stopping Coordinator" )
c . stopch <- true
}
}
// Start starts the Coordinator service
// Start starts the Coordinator service
func ( c * Coordinator ) Start ( ) {
func ( c * Coordinator ) Start ( ) {
// TODO TBD note: the sequences & loops & errors & logging & goroutines
// & channels approach still needs to be defined, the current code is a
// wip draft
// TBD: goroutines strategy
// if in Forge Sequence:
if c . isForgeSequence ( ) {
// c.batchNum = c.synchronizer.LastBatchNum()
_ = c . txsel . Reset ( c . batchNum )
_ = c . batchBuilder . Reset ( c . batchNum , true )
c . batchQueue = NewBatchQueue ( )
go func ( ) {
for {
_ = c . forgeSequence ( )
time . Sleep ( 1 * time . Second )
c . stopch = make ( chan bool ) // initialize channel
go func ( ) {
log . Info ( "Starting Coordinator" )
for {
select {
case <- c . stopch :
close ( c . stopforgerch )
log . Info ( "Coordinator stopped" )
return
case <- time . After ( c . config . LoopInterval ) :
if ! c . isForgeSequence ( ) {
if c . forging {
log . Info ( "forging stopped" )
c . forging = false
close ( c . stopforgerch )
}
log . Debug ( "not in forge time" )
continue
}
if ! c . forging {
log . Info ( "Start Forging" )
// c.batchNum = c.hdb.GetLastBatchNum() // uncomment when HistoryDB is ready
err := c . txsel . Reset ( c . batchNum )
if err != nil {
log . Error ( "forging err: " , err )
}
err = c . batchBuilder . Reset ( c . batchNum , true )
if err != nil {
log . Error ( "forging err: " , err )
}
c . batchQueue = NewBatchQueue ( )
c . forgerLoop ( )
c . forging = true
}
}
}
} ( )
}
// forgerLoop trigers goroutines for:
// - forgeSequence
// - proveSequence
// - forgeConfirmationSequence
func ( c * Coordinator ) forgerLoop ( ) {
c . stopforgerch = make ( chan bool ) // initialize channel
go func ( ) {
log . Info ( "forgeSequence started" )
for {
select {
case <- c . stopforgerch :
log . Info ( "forgeSequence stopped" )
return
case <- time . After ( c . config . LoopInterval ) :
if err := c . forgeSequence ( ) ; err != nil {
log . Error ( "forgeSequence err: " , err )
}
}
}
} ( )
go func ( ) {
for {
_ = c . proveSequence ( )
time . Sleep ( 1 * time . Second )
}
} ( )
go func ( ) {
log . Info ( "proveSequence started" )
for {
select {
case <- c . stopforgerch :
log . Info ( "proveSequence stopped" )
return
case <- time . After ( c . config . LoopInterval ) :
if err := c . proveSequence ( ) ; err != nil && err != common . ErrBatchQueueEmpty {
log . Error ( "proveSequence err: " , err )
}
}
}
} ( )
go func ( ) {
for {
_ = c . forgeConfirmationSequence ( )
time . Sleep ( 1 * time . Second )
}
} ( )
go func ( ) {
log . Info ( "forgeConfirmationSequence started" )
for {
select {
case <- c . stopforgerch :
log . Info ( "forgeConfirmationSequence stopped" )
return
case <- time . After ( c . config . LoopInterval ) :
if err := c . forgeConfirmationSequence ( ) ; err != nil {
log . Error ( "forgeConfirmationSequence err: " , err )
}
}
}
} ( )
}
}
} ( )
}
}
// forgeSequence
func ( c * Coordinator ) forgeSequence ( ) error {
func ( c * Coordinator ) forgeSequence ( ) error {
// TODO once synchronizer has this method ready:
// TODO once synchronizer has this method ready:
// If there's been a reorg, handle it
// If there's been a reorg, handle it
@ -104,8 +188,8 @@ func (c *Coordinator) forgeSequence() error {
// 1. Decide if we forge L2Tx or L1+L2Tx
// 1. Decide if we forge L2Tx or L1+L2Tx
if c . shouldL1L2Batch ( ) {
if c . shouldL1L2Batch ( ) {
// 2a: L1+L2 txs
// 2a: L1+L2 txs
// l1UserTxs, toForgeL1TxsNumber := c.synchronizer.GetNextL1UserTxs() // TODO once synchronizer is ready, uncomment
var l1UserTxs [ ] * common . L1Tx = nil // tmp, depends on synchronizer
// l1UserTxs, toForgeL1TxsNumber := c.hdb.GetNextL1UserTxs() // TODO once HistoryDB is ready, uncomment
var l1UserTxs [ ] * common . L1Tx = nil // tmp, depends on HistoryDB
l1UserTxsExtra , l1OperatorTxs , poolL2Txs , err = c . txsel . GetL1L2TxSelection ( c . batchNum , l1UserTxs ) // TODO once feesInfo is added to method return, add the var
l1UserTxsExtra , l1OperatorTxs , poolL2Txs , err = c . txsel . GetL1L2TxSelection ( c . batchNum , l1UserTxs ) // TODO once feesInfo is added to method return, add the var
if err != nil {
if err != nil {
return err
return err
@ -144,6 +228,7 @@ func (c *Coordinator) forgeSequence() error {
// 5. Save metadata from BatchBuilder output for BatchNum
// 5. Save metadata from BatchBuilder output for BatchNum
batchInfo . SetZKInputs ( zkInputs )
batchInfo . SetZKInputs ( zkInputs )
log . Debugf ( "Batch builded, batchNum: %d " , c . batchNum )
// 6. Call an idle server proof with BatchBuilder output, save server proof info for batchNum
// 6. Call an idle server proof with BatchBuilder output, save server proof info for batchNum
err = batchInfo . serverProof . CalculateProof ( zkInputs )
err = batchInfo . serverProof . CalculateProof ( zkInputs )
@ -160,6 +245,7 @@ func (c *Coordinator) proveSequence() error {
batchInfo := c . batchQueue . Pop ( )
batchInfo := c . batchQueue . Pop ( )
if batchInfo == nil {
if batchInfo == nil {
// no batches in queue, return
// no batches in queue, return
log . Debug ( "not batch to prove yet" )
return common . ErrBatchQueueEmpty
return common . ErrBatchQueueEmpty
}
}
serverProofInfo := batchInfo . serverProof
serverProofInfo := batchInfo . serverProof
@ -173,6 +259,8 @@ func (c *Coordinator) proveSequence() error {
if err != nil {
if err != nil {
return err
return err
}
}
log . Debugf ( "ethClient ForgeCall sent, batchNum: %d" , c . batchNum )
// TODO once tx data type is defined, store ethTx (returned by ForgeCall)
// TODO once tx data type is defined, store ethTx (returned by ForgeCall)
// TBD if use ethTxStore as a disk k-v database, or use a Queue
// TBD if use ethTxStore as a disk k-v database, or use a Queue
// tx, err := c.ethTxStore.NewTx()
// tx, err := c.ethTxStore.NewTx()
@ -202,8 +290,7 @@ func (c *Coordinator) handleReorg() error {
// isForgeSequence returns true if the node is the Forger in the current ethereum block
// isForgeSequence returns true if the node is the Forger in the current ethereum block
func ( c * Coordinator ) isForgeSequence ( ) bool {
func ( c * Coordinator ) isForgeSequence ( ) bool {
return false
return c . isForgeSeq
}
}
func ( c * Coordinator ) purgeRemoveByTimeout ( ) error {
func ( c * Coordinator ) purgeRemoveByTimeout ( ) error {