You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

532 lines
17 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package coordinator
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "math/big"
  7. "sync"
  8. "time"
  9. "github.com/hermeznetwork/hermez-node/batchbuilder"
  10. "github.com/hermeznetwork/hermez-node/common"
  11. "github.com/hermeznetwork/hermez-node/db/historydb"
  12. "github.com/hermeznetwork/hermez-node/db/l2db"
  13. "github.com/hermeznetwork/hermez-node/eth"
  14. "github.com/hermeznetwork/hermez-node/log"
  15. "github.com/hermeznetwork/hermez-node/prover"
  16. "github.com/hermeznetwork/hermez-node/synchronizer"
  17. "github.com/hermeznetwork/hermez-node/txselector"
  18. "github.com/hermeznetwork/tracerr"
  19. )
  20. type statsVars struct {
  21. Stats synchronizer.Stats
  22. Vars synchronizer.SCVariablesPtr
  23. }
  24. type state struct {
  25. batchNum common.BatchNum
  26. lastScheduledL1BatchBlockNum int64
  27. lastForgeL1TxsNum int64
  28. }
  29. // Pipeline manages the forging of batches with parallel server proofs
  30. type Pipeline struct {
  31. num int
  32. cfg Config
  33. consts synchronizer.SCConsts
  34. // state
  35. state state
  36. // batchNum common.BatchNum
  37. // lastScheduledL1BatchBlockNum int64
  38. // lastForgeL1TxsNum int64
  39. started bool
  40. rw sync.RWMutex
  41. errAtBatchNum common.BatchNum
  42. proversPool *ProversPool
  43. provers []prover.Client
  44. coord *Coordinator
  45. txManager *TxManager
  46. historyDB *historydb.HistoryDB
  47. l2DB *l2db.L2DB
  48. txSelector *txselector.TxSelector
  49. batchBuilder *batchbuilder.BatchBuilder
  50. purger *Purger
  51. stats synchronizer.Stats
  52. vars synchronizer.SCVariables
  53. statsVarsCh chan statsVars
  54. ctx context.Context
  55. wg sync.WaitGroup
  56. cancel context.CancelFunc
  57. }
  58. func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) {
  59. p.rw.Lock()
  60. defer p.rw.Unlock()
  61. p.errAtBatchNum = batchNum
  62. }
  63. func (p *Pipeline) getErrAtBatchNum() common.BatchNum {
  64. p.rw.RLock()
  65. defer p.rw.RUnlock()
  66. return p.errAtBatchNum
  67. }
  68. // NewPipeline creates a new Pipeline
  69. func NewPipeline(ctx context.Context,
  70. cfg Config,
  71. num int, // Pipeline sequential number
  72. historyDB *historydb.HistoryDB,
  73. l2DB *l2db.L2DB,
  74. txSelector *txselector.TxSelector,
  75. batchBuilder *batchbuilder.BatchBuilder,
  76. purger *Purger,
  77. coord *Coordinator,
  78. txManager *TxManager,
  79. provers []prover.Client,
  80. scConsts *synchronizer.SCConsts,
  81. ) (*Pipeline, error) {
  82. proversPool := NewProversPool(len(provers))
  83. proversPoolSize := 0
  84. for _, prover := range provers {
  85. if err := prover.WaitReady(ctx); err != nil {
  86. log.Errorw("prover.WaitReady", "err", err)
  87. } else {
  88. proversPool.Add(ctx, prover)
  89. proversPoolSize++
  90. }
  91. }
  92. if proversPoolSize == 0 {
  93. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  94. }
  95. return &Pipeline{
  96. num: num,
  97. cfg: cfg,
  98. historyDB: historyDB,
  99. l2DB: l2DB,
  100. txSelector: txSelector,
  101. batchBuilder: batchBuilder,
  102. provers: provers,
  103. proversPool: proversPool,
  104. purger: purger,
  105. coord: coord,
  106. txManager: txManager,
  107. consts: *scConsts,
  108. statsVarsCh: make(chan statsVars, queueLen),
  109. }, nil
  110. }
  111. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  112. func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  113. select {
  114. case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  115. case <-ctx.Done():
  116. }
  117. }
  118. // reset pipeline state
  119. func (p *Pipeline) reset(batchNum common.BatchNum,
  120. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  121. p.state = state{
  122. batchNum: batchNum,
  123. lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum,
  124. lastScheduledL1BatchBlockNum: 0,
  125. }
  126. p.stats = *stats
  127. p.vars = *vars
  128. // Reset the StateDB in TxSelector and BatchBuilder from the
  129. // synchronizer only if the checkpoint we reset from either:
  130. // a. Doesn't exist in the TxSelector/BatchBuilder
  131. // b. The batch has already been synced by the synchronizer and has a
  132. // different MTRoot than the BatchBuilder
  133. // Otherwise, reset from the local checkpoint.
  134. // First attempt to reset from local checkpoint if such checkpoint exists
  135. existsTxSelector, err := p.txSelector.LocalAccountsDB().CheckpointExists(p.state.batchNum)
  136. if err != nil {
  137. return tracerr.Wrap(err)
  138. }
  139. fromSynchronizerTxSelector := !existsTxSelector
  140. if err := p.txSelector.Reset(p.state.batchNum, fromSynchronizerTxSelector); err != nil {
  141. return tracerr.Wrap(err)
  142. }
  143. existsBatchBuilder, err := p.batchBuilder.LocalStateDB().CheckpointExists(p.state.batchNum)
  144. if err != nil {
  145. return tracerr.Wrap(err)
  146. }
  147. fromSynchronizerBatchBuilder := !existsBatchBuilder
  148. if err := p.batchBuilder.Reset(p.state.batchNum, fromSynchronizerBatchBuilder); err != nil {
  149. return tracerr.Wrap(err)
  150. }
  151. // After reset, check that if the batch exists in the historyDB, the
  152. // stateRoot matches with the local one, if not, force a reset from
  153. // synchronizer
  154. batch, err := p.historyDB.GetBatch(p.state.batchNum)
  155. if tracerr.Unwrap(err) == sql.ErrNoRows {
  156. // nothing to do
  157. } else if err != nil {
  158. return tracerr.Wrap(err)
  159. } else {
  160. localStateRoot := p.batchBuilder.LocalStateDB().MT.Root().BigInt()
  161. if batch.StateRoot.Cmp(localStateRoot) != 0 {
  162. log.Debugw("localStateRoot (%v) != historyDB stateRoot (%v). "+
  163. "Forcing reset from Synchronizer", localStateRoot, batch.StateRoot)
  164. // StateRoot from synchronizer doesn't match StateRoot
  165. // from batchBuilder, force a reset from synchronizer
  166. if err := p.txSelector.Reset(p.state.batchNum, true); err != nil {
  167. return tracerr.Wrap(err)
  168. }
  169. if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil {
  170. return tracerr.Wrap(err)
  171. }
  172. }
  173. }
  174. return nil
  175. }
  176. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  177. updateSCVars(&p.vars, vars)
  178. }
  179. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  180. // and then waits for an available proof server and sends the zkInputs to it so
  181. // that the proof computation begins.
  182. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  183. batchInfo, err := p.forgeBatch(batchNum)
  184. if ctx.Err() != nil {
  185. return nil, ctx.Err()
  186. } else if err != nil {
  187. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  188. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  189. "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum,
  190. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  191. } else {
  192. log.Errorw("forgeBatch", "err", err)
  193. }
  194. return nil, err
  195. }
  196. // 6. Wait for an available server proof (blocking call)
  197. serverProof, err := p.proversPool.Get(ctx)
  198. if ctx.Err() != nil {
  199. return nil, ctx.Err()
  200. } else if err != nil {
  201. log.Errorw("proversPool.Get", "err", err)
  202. return nil, err
  203. }
  204. batchInfo.ServerProof = serverProof
  205. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  206. return nil, ctx.Err()
  207. } else if err != nil {
  208. log.Errorw("sendServerProof", "err", err)
  209. batchInfo.ServerProof = nil
  210. p.proversPool.Add(ctx, serverProof)
  211. return nil, err
  212. }
  213. return batchInfo, nil
  214. }
  215. // Start the forging pipeline
  216. func (p *Pipeline) Start(batchNum common.BatchNum,
  217. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  218. if p.started {
  219. log.Fatal("Pipeline already started")
  220. }
  221. p.started = true
  222. if err := p.reset(batchNum, stats, vars); err != nil {
  223. return tracerr.Wrap(err)
  224. }
  225. p.ctx, p.cancel = context.WithCancel(context.Background())
  226. queueSize := 1
  227. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  228. p.wg.Add(1)
  229. go func() {
  230. waitDuration := zeroDuration
  231. for {
  232. select {
  233. case <-p.ctx.Done():
  234. log.Info("Pipeline forgeBatch loop done")
  235. p.wg.Done()
  236. return
  237. case statsVars := <-p.statsVarsCh:
  238. p.stats = statsVars.Stats
  239. p.syncSCVars(statsVars.Vars)
  240. case <-time.After(waitDuration):
  241. // Once errAtBatchNum != 0, we stop forging
  242. // batches because there's been an error and we
  243. // wait for the pipeline to be stopped.
  244. if p.getErrAtBatchNum() != 0 {
  245. waitDuration = p.cfg.ForgeRetryInterval
  246. continue
  247. }
  248. batchNum = p.state.batchNum + 1
  249. batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
  250. if p.ctx.Err() != nil {
  251. continue
  252. } else if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  253. waitDuration = p.cfg.ForgeRetryInterval
  254. continue
  255. } else if err != nil {
  256. p.setErrAtBatchNum(batchNum)
  257. waitDuration = p.cfg.ForgeRetryInterval
  258. p.coord.SendMsg(p.ctx, MsgStopPipeline{
  259. Reason: fmt.Sprintf(
  260. "Pipeline.handleForgBatch: %v", err),
  261. FailedBatchNum: batchNum,
  262. })
  263. continue
  264. }
  265. p.state.batchNum = batchNum
  266. select {
  267. case batchChSentServerProof <- batchInfo:
  268. case <-p.ctx.Done():
  269. }
  270. }
  271. }
  272. }()
  273. p.wg.Add(1)
  274. go func() {
  275. for {
  276. select {
  277. case <-p.ctx.Done():
  278. log.Info("Pipeline waitServerProofSendEth loop done")
  279. p.wg.Done()
  280. return
  281. case batchInfo := <-batchChSentServerProof:
  282. // Once errAtBatchNum != 0, we stop forging
  283. // batches because there's been an error and we
  284. // wait for the pipeline to be stopped.
  285. if p.getErrAtBatchNum() != 0 {
  286. continue
  287. }
  288. err := p.waitServerProof(p.ctx, batchInfo)
  289. if p.ctx.Err() != nil {
  290. continue
  291. } else if err != nil {
  292. log.Errorw("waitServerProof", "err", err)
  293. p.setErrAtBatchNum(batchInfo.BatchNum)
  294. p.coord.SendMsg(p.ctx, MsgStopPipeline{
  295. Reason: fmt.Sprintf(
  296. "Pipeline.waitServerProof: %v", err),
  297. FailedBatchNum: batchInfo.BatchNum,
  298. })
  299. continue
  300. }
  301. // We are done with this serverProof, add it back to the pool
  302. p.proversPool.Add(p.ctx, batchInfo.ServerProof)
  303. // batchInfo.ServerProof = nil
  304. p.txManager.AddBatch(p.ctx, batchInfo)
  305. }
  306. }
  307. }()
  308. return nil
  309. }
  310. // Stop the forging pipeline
  311. func (p *Pipeline) Stop(ctx context.Context) {
  312. if !p.started {
  313. log.Fatal("Pipeline already stopped")
  314. }
  315. p.started = false
  316. log.Info("Stopping Pipeline...")
  317. p.cancel()
  318. p.wg.Wait()
  319. for _, prover := range p.provers {
  320. if err := prover.Cancel(ctx); ctx.Err() != nil {
  321. continue
  322. } else if err != nil {
  323. log.Errorw("prover.Cancel", "err", err)
  324. }
  325. }
  326. }
  327. // sendServerProof sends the circuit inputs to the proof server
  328. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  329. p.cfg.debugBatchStore(batchInfo)
  330. // 7. Call the selected idle server proof with BatchBuilder output,
  331. // save server proof info for batchNum
  332. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  333. return tracerr.Wrap(err)
  334. }
  335. return nil
  336. }
  337. // forgeBatch forges the batchNum batch.
  338. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  339. // remove transactions from the pool that have been there for too long
  340. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  341. p.stats.Sync.LastBlock.Num, int64(batchNum))
  342. if err != nil {
  343. return nil, tracerr.Wrap(err)
  344. }
  345. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  346. if err != nil {
  347. return nil, tracerr.Wrap(err)
  348. }
  349. // Structure to accumulate data and metadata of the batch
  350. batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum}
  351. batchInfo.Debug.StartTimestamp = time.Now()
  352. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  353. selectionCfg := &txselector.SelectionConfig{
  354. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  355. TxProcessorConfig: p.cfg.TxProcessorConfig,
  356. }
  357. var poolL2Txs []common.PoolL2Tx
  358. var discardedL2Txs []common.PoolL2Tx
  359. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  360. var auths [][]byte
  361. var coordIdxs []common.Idx
  362. // TODO: If there are no txs and we are behind the timeout, skip
  363. // forging a batch and return a particular error that can be handleded
  364. // in the loop where handleForgeBatch is called to retry after an
  365. // interval
  366. // 1. Decide if we forge L2Tx or L1+L2Tx
  367. if p.shouldL1L2Batch(batchInfo) {
  368. batchInfo.L1Batch = true
  369. if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  370. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  371. }
  372. // 2a: L1+L2 txs
  373. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1)
  374. if err != nil {
  375. return nil, tracerr.Wrap(err)
  376. }
  377. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  378. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  379. if err != nil {
  380. return nil, tracerr.Wrap(err)
  381. }
  382. p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  383. p.state.lastForgeL1TxsNum++
  384. } else {
  385. // 2b: only L2 txs
  386. coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  387. p.txSelector.GetL2TxSelection(selectionCfg)
  388. if err != nil {
  389. return nil, tracerr.Wrap(err)
  390. }
  391. l1UserTxsExtra = nil
  392. }
  393. // 3. Save metadata from TxSelector output for BatchNum
  394. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  395. batchInfo.L1CoordTxs = l1CoordTxs
  396. batchInfo.L1CoordinatorTxsAuths = auths
  397. batchInfo.CoordIdxs = coordIdxs
  398. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  399. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  400. return nil, tracerr.Wrap(err)
  401. }
  402. if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil {
  403. return nil, tracerr.Wrap(err)
  404. }
  405. // Invalidate transactions that become invalid beause of
  406. // the poolL2Txs selected. Will mark as invalid the txs that have a
  407. // (fromIdx, nonce) which already appears in the selected txs (includes
  408. // all the nonces smaller than the current one)
  409. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  410. if err != nil {
  411. return nil, tracerr.Wrap(err)
  412. }
  413. // 4. Call BatchBuilder with TxSelector output
  414. configBatch := &batchbuilder.ConfigBatch{
  415. TxProcessorConfig: p.cfg.TxProcessorConfig,
  416. }
  417. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  418. l1CoordTxs, poolL2Txs)
  419. if err != nil {
  420. return nil, tracerr.Wrap(err)
  421. }
  422. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  423. if err != nil {
  424. return nil, tracerr.Wrap(err)
  425. }
  426. batchInfo.L2Txs = l2Txs
  427. // 5. Save metadata from BatchBuilder output for BatchNum
  428. batchInfo.ZKInputs = zkInputs
  429. batchInfo.Debug.Status = StatusForged
  430. p.cfg.debugBatchStore(batchInfo)
  431. log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum)
  432. return batchInfo, nil
  433. }
  434. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  435. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  436. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  437. if err != nil {
  438. return tracerr.Wrap(err)
  439. }
  440. batchInfo.Proof = proof
  441. batchInfo.PublicInputs = pubInputs
  442. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  443. batchInfo.Debug.Status = StatusProof
  444. p.cfg.debugBatchStore(batchInfo)
  445. log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum)
  446. return nil
  447. }
  448. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  449. // Take the lastL1BatchBlockNum as the biggest between the last
  450. // scheduled one, and the synchronized one.
  451. lastL1BatchBlockNum := p.state.lastScheduledL1BatchBlockNum
  452. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  453. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  454. }
  455. // Set Debug information
  456. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.state.lastScheduledL1BatchBlockNum
  457. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  458. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  459. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  460. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  461. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  462. // range before the l1batch timeout.
  463. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  464. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  465. }
  466. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  467. proof := batchInfo.Proof
  468. zki := batchInfo.ZKInputs
  469. return &eth.RollupForgeBatchArgs{
  470. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  471. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  472. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  473. L1UserTxs: batchInfo.L1UserTxsExtra,
  474. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  475. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  476. L2TxsData: batchInfo.L2Txs,
  477. FeeIdxCoordinator: batchInfo.CoordIdxs,
  478. // Circuit selector
  479. VerifierIdx: batchInfo.VerifierIdx,
  480. L1Batch: batchInfo.L1Batch,
  481. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  482. // Implementation of the verifier need a swap on the proofB vector
  483. ProofB: [2][2]*big.Int{
  484. {proof.PiB[0][1], proof.PiB[0][0]},
  485. {proof.PiB[1][1], proof.PiB[1][0]},
  486. },
  487. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  488. }
  489. }