You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

428 lines
13 KiB

  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/prover"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/hermeznetwork/tracerr"
  18. )
  19. type statsVars struct {
  20. Stats synchronizer.Stats
  21. Vars synchronizer.SCVariablesPtr
  22. }
  23. // Pipeline manages the forging of batches with parallel server proofs
  24. type Pipeline struct {
  25. cfg Config
  26. consts synchronizer.SCConsts
  27. // state
  28. batchNum common.BatchNum
  29. lastScheduledL1BatchBlockNum int64
  30. lastForgeL1TxsNum int64
  31. started bool
  32. proversPool *ProversPool
  33. provers []prover.Client
  34. txManager *TxManager
  35. historyDB *historydb.HistoryDB
  36. l2DB *l2db.L2DB
  37. txSelector *txselector.TxSelector
  38. batchBuilder *batchbuilder.BatchBuilder
  39. purger *Purger
  40. stats synchronizer.Stats
  41. vars synchronizer.SCVariables
  42. statsVarsCh chan statsVars
  43. ctx context.Context
  44. wg sync.WaitGroup
  45. cancel context.CancelFunc
  46. }
  47. // NewPipeline creates a new Pipeline
  48. func NewPipeline(ctx context.Context,
  49. cfg Config,
  50. historyDB *historydb.HistoryDB,
  51. l2DB *l2db.L2DB,
  52. txSelector *txselector.TxSelector,
  53. batchBuilder *batchbuilder.BatchBuilder,
  54. purger *Purger,
  55. txManager *TxManager,
  56. provers []prover.Client,
  57. scConsts *synchronizer.SCConsts,
  58. ) (*Pipeline, error) {
  59. proversPool := NewProversPool(len(provers))
  60. proversPoolSize := 0
  61. for _, prover := range provers {
  62. if err := prover.WaitReady(ctx); err != nil {
  63. log.Errorw("prover.WaitReady", "err", err)
  64. } else {
  65. proversPool.Add(prover)
  66. proversPoolSize++
  67. }
  68. }
  69. if proversPoolSize == 0 {
  70. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  71. }
  72. return &Pipeline{
  73. cfg: cfg,
  74. historyDB: historyDB,
  75. l2DB: l2DB,
  76. txSelector: txSelector,
  77. batchBuilder: batchBuilder,
  78. provers: provers,
  79. proversPool: proversPool,
  80. purger: purger,
  81. txManager: txManager,
  82. consts: *scConsts,
  83. statsVarsCh: make(chan statsVars, queueLen),
  84. }, nil
  85. }
  86. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  87. func (p *Pipeline) SetSyncStatsVars(stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  88. p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}
  89. }
  90. // reset pipeline state
  91. func (p *Pipeline) reset(batchNum common.BatchNum,
  92. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  93. p.batchNum = batchNum
  94. p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
  95. p.stats = *stats
  96. p.vars = *vars
  97. p.lastScheduledL1BatchBlockNum = 0
  98. err := p.txSelector.Reset(p.batchNum)
  99. if err != nil {
  100. return tracerr.Wrap(err)
  101. }
  102. err = p.batchBuilder.Reset(p.batchNum, true)
  103. if err != nil {
  104. return tracerr.Wrap(err)
  105. }
  106. return nil
  107. }
  108. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  109. if vars.Rollup != nil {
  110. p.vars.Rollup = *vars.Rollup
  111. }
  112. if vars.Auction != nil {
  113. p.vars.Auction = *vars.Auction
  114. }
  115. if vars.WDelayer != nil {
  116. p.vars.WDelayer = *vars.WDelayer
  117. }
  118. }
  119. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  120. // and then waits for an available proof server and sends the zkInputs to it so
  121. // that the proof computation begins.
  122. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  123. batchInfo, err := p.forgeBatch(batchNum)
  124. if ctx.Err() != nil {
  125. return nil, ctx.Err()
  126. } else if err != nil {
  127. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  128. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  129. "lastForgeL1TxsNum", p.lastForgeL1TxsNum,
  130. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  131. } else {
  132. log.Errorw("forgeBatch", "err", err)
  133. }
  134. return nil, err
  135. }
  136. // 6. Wait for an available server proof (blocking call)
  137. serverProof, err := p.proversPool.Get(ctx)
  138. if ctx.Err() != nil {
  139. return nil, ctx.Err()
  140. } else if err != nil {
  141. log.Errorw("proversPool.Get", "err", err)
  142. return nil, err
  143. }
  144. batchInfo.ServerProof = serverProof
  145. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  146. return nil, ctx.Err()
  147. } else if err != nil {
  148. log.Errorw("sendServerProof", "err", err)
  149. batchInfo.ServerProof = nil
  150. p.proversPool.Add(serverProof)
  151. return nil, err
  152. }
  153. return batchInfo, nil
  154. }
  155. // Start the forging pipeline
  156. func (p *Pipeline) Start(batchNum common.BatchNum,
  157. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  158. if p.started {
  159. log.Fatal("Pipeline already started")
  160. }
  161. p.started = true
  162. if err := p.reset(batchNum, stats, vars); err != nil {
  163. return tracerr.Wrap(err)
  164. }
  165. p.ctx, p.cancel = context.WithCancel(context.Background())
  166. queueSize := 1
  167. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  168. p.wg.Add(1)
  169. go func() {
  170. waitDuration := zeroDuration
  171. for {
  172. select {
  173. case <-p.ctx.Done():
  174. log.Info("Pipeline forgeBatch loop done")
  175. p.wg.Done()
  176. return
  177. case statsVars := <-p.statsVarsCh:
  178. p.stats = statsVars.Stats
  179. p.syncSCVars(statsVars.Vars)
  180. case <-time.After(waitDuration):
  181. batchNum = p.batchNum + 1
  182. if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil {
  183. waitDuration = p.cfg.SyncRetryInterval
  184. continue
  185. } else {
  186. p.batchNum = batchNum
  187. batchChSentServerProof <- batchInfo
  188. }
  189. }
  190. }
  191. }()
  192. p.wg.Add(1)
  193. go func() {
  194. for {
  195. select {
  196. case <-p.ctx.Done():
  197. log.Info("Pipeline waitServerProofSendEth loop done")
  198. p.wg.Done()
  199. return
  200. case batchInfo := <-batchChSentServerProof:
  201. err := p.waitServerProof(p.ctx, batchInfo)
  202. // We are done with this serverProof, add it back to the pool
  203. p.proversPool.Add(batchInfo.ServerProof)
  204. batchInfo.ServerProof = nil
  205. if p.ctx.Err() != nil {
  206. continue
  207. }
  208. if err != nil {
  209. log.Errorw("waitServerProof", "err", err)
  210. continue
  211. }
  212. p.txManager.AddBatch(batchInfo)
  213. }
  214. }
  215. }()
  216. return nil
  217. }
  218. // Stop the forging pipeline
  219. func (p *Pipeline) Stop(ctx context.Context) {
  220. if !p.started {
  221. log.Fatal("Pipeline already stopped")
  222. }
  223. p.started = false
  224. log.Info("Stopping Pipeline...")
  225. p.cancel()
  226. p.wg.Wait()
  227. for _, prover := range p.provers {
  228. if err := prover.Cancel(ctx); ctx.Err() != nil {
  229. continue
  230. } else if err != nil {
  231. log.Errorw("prover.Cancel", "err", err)
  232. }
  233. }
  234. }
  235. // sendServerProof sends the circuit inputs to the proof server
  236. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  237. p.cfg.debugBatchStore(batchInfo)
  238. // 7. Call the selected idle server proof with BatchBuilder output,
  239. // save server proof info for batchNum
  240. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  241. return tracerr.Wrap(err)
  242. }
  243. return nil
  244. }
  245. // forgeBatch forges the batchNum batch.
  246. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  247. // remove transactions from the pool that have been there for too long
  248. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  249. p.stats.Sync.LastBlock.Num, int64(batchNum))
  250. if err != nil {
  251. return nil, tracerr.Wrap(err)
  252. }
  253. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  254. if err != nil {
  255. return nil, tracerr.Wrap(err)
  256. }
  257. batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
  258. batchInfo.Debug.StartTimestamp = time.Now()
  259. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  260. selectionCfg := &txselector.SelectionConfig{
  261. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  262. TxProcessorConfig: p.cfg.TxProcessorConfig,
  263. }
  264. var poolL2Txs []common.PoolL2Tx
  265. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  266. var auths [][]byte
  267. var coordIdxs []common.Idx
  268. // 1. Decide if we forge L2Tx or L1+L2Tx
  269. if p.shouldL1L2Batch(batchInfo) {
  270. batchInfo.L1Batch = true
  271. defer func() {
  272. // If there's no error, update the parameters related
  273. // to the last L1Batch forged
  274. if err == nil {
  275. p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  276. p.lastForgeL1TxsNum++
  277. }
  278. }()
  279. if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  280. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  281. }
  282. // 2a: L1+L2 txs
  283. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
  284. if err != nil {
  285. return nil, tracerr.Wrap(err)
  286. }
  287. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err =
  288. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  289. if err != nil {
  290. return nil, tracerr.Wrap(err)
  291. }
  292. } else {
  293. // 2b: only L2 txs
  294. coordIdxs, auths, l1CoordTxs, poolL2Txs, err =
  295. p.txSelector.GetL2TxSelection(selectionCfg)
  296. if err != nil {
  297. return nil, tracerr.Wrap(err)
  298. }
  299. l1UserTxsExtra = nil
  300. }
  301. // 3. Save metadata from TxSelector output for BatchNum
  302. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  303. batchInfo.L1CoordTxs = l1CoordTxs
  304. batchInfo.L1CoordinatorTxsAuths = auths
  305. batchInfo.CoordIdxs = coordIdxs
  306. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  307. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  308. return nil, tracerr.Wrap(err)
  309. }
  310. // Invalidate transactions that become invalid beause of
  311. // the poolL2Txs selected. Will mark as invalid the txs that have a
  312. // (fromIdx, nonce) which already appears in the selected txs (includes
  313. // all the nonces smaller than the current one)
  314. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  315. if err != nil {
  316. return nil, tracerr.Wrap(err)
  317. }
  318. // 4. Call BatchBuilder with TxSelector output
  319. configBatch := &batchbuilder.ConfigBatch{
  320. ForgerAddress: p.cfg.ForgerAddress,
  321. TxProcessorConfig: p.cfg.TxProcessorConfig,
  322. }
  323. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  324. l1CoordTxs, poolL2Txs, nil)
  325. if err != nil {
  326. return nil, tracerr.Wrap(err)
  327. }
  328. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  329. if err != nil {
  330. return nil, tracerr.Wrap(err)
  331. }
  332. batchInfo.L2Txs = l2Txs
  333. // 5. Save metadata from BatchBuilder output for BatchNum
  334. batchInfo.ZKInputs = zkInputs
  335. batchInfo.Debug.Status = StatusForged
  336. p.cfg.debugBatchStore(batchInfo)
  337. return batchInfo, nil
  338. }
  339. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  340. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  341. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  342. if err != nil {
  343. return tracerr.Wrap(err)
  344. }
  345. batchInfo.Proof = proof
  346. batchInfo.PublicInputs = pubInputs
  347. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  348. batchInfo.Debug.Status = StatusProof
  349. p.cfg.debugBatchStore(batchInfo)
  350. return nil
  351. }
  352. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  353. // Take the lastL1BatchBlockNum as the biggest between the last
  354. // scheduled one, and the synchronized one.
  355. lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
  356. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  357. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  358. }
  359. // Set Debug information
  360. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
  361. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  362. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  363. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  364. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  365. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  366. // range before the l1batch timeout.
  367. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  368. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  369. }
  370. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  371. proof := batchInfo.Proof
  372. zki := batchInfo.ZKInputs
  373. return &eth.RollupForgeBatchArgs{
  374. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  375. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  376. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  377. L1UserTxs: batchInfo.L1UserTxsExtra,
  378. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  379. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  380. L2TxsData: batchInfo.L2Txs,
  381. FeeIdxCoordinator: batchInfo.CoordIdxs,
  382. // Circuit selector
  383. VerifierIdx: batchInfo.VerifierIdx,
  384. L1Batch: batchInfo.L1Batch,
  385. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  386. ProofB: [2][2]*big.Int{
  387. {proof.PiB[0][0], proof.PiB[0][1]},
  388. {proof.PiB[1][0], proof.PiB[1][1]},
  389. },
  390. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  391. }
  392. }