You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

441 lines
13 KiB

  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/prover"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/hermeznetwork/tracerr"
  18. )
  19. type statsVars struct {
  20. Stats synchronizer.Stats
  21. Vars synchronizer.SCVariablesPtr
  22. }
  23. // Pipeline manages the forging of batches with parallel server proofs
  24. type Pipeline struct {
  25. cfg Config
  26. consts synchronizer.SCConsts
  27. // state
  28. batchNum common.BatchNum
  29. lastScheduledL1BatchBlockNum int64
  30. lastForgeL1TxsNum int64
  31. started bool
  32. proversPool *ProversPool
  33. provers []prover.Client
  34. txManager *TxManager
  35. historyDB *historydb.HistoryDB
  36. l2DB *l2db.L2DB
  37. txSelector *txselector.TxSelector
  38. batchBuilder *batchbuilder.BatchBuilder
  39. purger *Purger
  40. stats synchronizer.Stats
  41. vars synchronizer.SCVariables
  42. statsVarsCh chan statsVars
  43. ctx context.Context
  44. wg sync.WaitGroup
  45. cancel context.CancelFunc
  46. }
  47. // NewPipeline creates a new Pipeline
  48. func NewPipeline(ctx context.Context,
  49. cfg Config,
  50. historyDB *historydb.HistoryDB,
  51. l2DB *l2db.L2DB,
  52. txSelector *txselector.TxSelector,
  53. batchBuilder *batchbuilder.BatchBuilder,
  54. purger *Purger,
  55. txManager *TxManager,
  56. provers []prover.Client,
  57. scConsts *synchronizer.SCConsts,
  58. ) (*Pipeline, error) {
  59. proversPool := NewProversPool(len(provers))
  60. proversPoolSize := 0
  61. for _, prover := range provers {
  62. if err := prover.WaitReady(ctx); err != nil {
  63. log.Errorw("prover.WaitReady", "err", err)
  64. } else {
  65. proversPool.Add(ctx, prover)
  66. proversPoolSize++
  67. }
  68. }
  69. if proversPoolSize == 0 {
  70. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  71. }
  72. return &Pipeline{
  73. cfg: cfg,
  74. historyDB: historyDB,
  75. l2DB: l2DB,
  76. txSelector: txSelector,
  77. batchBuilder: batchBuilder,
  78. provers: provers,
  79. proversPool: proversPool,
  80. purger: purger,
  81. txManager: txManager,
  82. consts: *scConsts,
  83. statsVarsCh: make(chan statsVars, queueLen),
  84. }, nil
  85. }
  86. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  87. func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  88. select {
  89. case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  90. case <-ctx.Done():
  91. }
  92. }
  93. // reset pipeline state
  94. func (p *Pipeline) reset(batchNum common.BatchNum,
  95. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  96. p.batchNum = batchNum
  97. p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
  98. p.stats = *stats
  99. p.vars = *vars
  100. p.lastScheduledL1BatchBlockNum = 0
  101. err := p.txSelector.Reset(p.batchNum)
  102. if err != nil {
  103. return tracerr.Wrap(err)
  104. }
  105. err = p.batchBuilder.Reset(p.batchNum, true)
  106. if err != nil {
  107. return tracerr.Wrap(err)
  108. }
  109. return nil
  110. }
  111. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  112. if vars.Rollup != nil {
  113. p.vars.Rollup = *vars.Rollup
  114. }
  115. if vars.Auction != nil {
  116. p.vars.Auction = *vars.Auction
  117. }
  118. if vars.WDelayer != nil {
  119. p.vars.WDelayer = *vars.WDelayer
  120. }
  121. }
  122. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  123. // and then waits for an available proof server and sends the zkInputs to it so
  124. // that the proof computation begins.
  125. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  126. batchInfo, err := p.forgeBatch(batchNum)
  127. if ctx.Err() != nil {
  128. return nil, ctx.Err()
  129. } else if err != nil {
  130. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  131. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  132. "lastForgeL1TxsNum", p.lastForgeL1TxsNum,
  133. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  134. } else {
  135. log.Errorw("forgeBatch", "err", err)
  136. }
  137. return nil, err
  138. }
  139. // 6. Wait for an available server proof (blocking call)
  140. serverProof, err := p.proversPool.Get(ctx)
  141. if ctx.Err() != nil {
  142. return nil, ctx.Err()
  143. } else if err != nil {
  144. log.Errorw("proversPool.Get", "err", err)
  145. return nil, err
  146. }
  147. batchInfo.ServerProof = serverProof
  148. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  149. return nil, ctx.Err()
  150. } else if err != nil {
  151. log.Errorw("sendServerProof", "err", err)
  152. batchInfo.ServerProof = nil
  153. p.proversPool.Add(ctx, serverProof)
  154. return nil, err
  155. }
  156. return batchInfo, nil
  157. }
  158. // Start the forging pipeline
  159. func (p *Pipeline) Start(batchNum common.BatchNum,
  160. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  161. if p.started {
  162. log.Fatal("Pipeline already started")
  163. }
  164. p.started = true
  165. if err := p.reset(batchNum, stats, vars); err != nil {
  166. return tracerr.Wrap(err)
  167. }
  168. p.ctx, p.cancel = context.WithCancel(context.Background())
  169. queueSize := 1
  170. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  171. p.wg.Add(1)
  172. go func() {
  173. waitDuration := zeroDuration
  174. for {
  175. select {
  176. case <-p.ctx.Done():
  177. log.Info("Pipeline forgeBatch loop done")
  178. p.wg.Done()
  179. return
  180. case statsVars := <-p.statsVarsCh:
  181. p.stats = statsVars.Stats
  182. p.syncSCVars(statsVars.Vars)
  183. case <-time.After(waitDuration):
  184. batchNum = p.batchNum + 1
  185. batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
  186. if p.ctx.Err() != nil {
  187. continue
  188. } else if err != nil {
  189. waitDuration = p.cfg.SyncRetryInterval
  190. continue
  191. }
  192. p.batchNum = batchNum
  193. select {
  194. case batchChSentServerProof <- batchInfo:
  195. case <-p.ctx.Done():
  196. }
  197. }
  198. }
  199. }()
  200. p.wg.Add(1)
  201. go func() {
  202. for {
  203. select {
  204. case <-p.ctx.Done():
  205. log.Info("Pipeline waitServerProofSendEth loop done")
  206. p.wg.Done()
  207. return
  208. case batchInfo := <-batchChSentServerProof:
  209. err := p.waitServerProof(p.ctx, batchInfo)
  210. // We are done with this serverProof, add it back to the pool
  211. p.proversPool.Add(p.ctx, batchInfo.ServerProof)
  212. batchInfo.ServerProof = nil
  213. if p.ctx.Err() != nil {
  214. continue
  215. } else if err != nil {
  216. log.Errorw("waitServerProof", "err", err)
  217. continue
  218. }
  219. p.txManager.AddBatch(p.ctx, batchInfo)
  220. }
  221. }
  222. }()
  223. return nil
  224. }
  225. // Stop the forging pipeline
  226. func (p *Pipeline) Stop(ctx context.Context) {
  227. if !p.started {
  228. log.Fatal("Pipeline already stopped")
  229. }
  230. p.started = false
  231. log.Info("Stopping Pipeline...")
  232. p.cancel()
  233. p.wg.Wait()
  234. for _, prover := range p.provers {
  235. if err := prover.Cancel(ctx); ctx.Err() != nil {
  236. continue
  237. } else if err != nil {
  238. log.Errorw("prover.Cancel", "err", err)
  239. }
  240. }
  241. }
  242. // sendServerProof sends the circuit inputs to the proof server
  243. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  244. p.cfg.debugBatchStore(batchInfo)
  245. // 7. Call the selected idle server proof with BatchBuilder output,
  246. // save server proof info for batchNum
  247. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  248. return tracerr.Wrap(err)
  249. }
  250. return nil
  251. }
  252. // forgeBatch forges the batchNum batch.
  253. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  254. // remove transactions from the pool that have been there for too long
  255. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  256. p.stats.Sync.LastBlock.Num, int64(batchNum))
  257. if err != nil {
  258. return nil, tracerr.Wrap(err)
  259. }
  260. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  261. if err != nil {
  262. return nil, tracerr.Wrap(err)
  263. }
  264. batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
  265. batchInfo.Debug.StartTimestamp = time.Now()
  266. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  267. selectionCfg := &txselector.SelectionConfig{
  268. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  269. TxProcessorConfig: p.cfg.TxProcessorConfig,
  270. }
  271. var poolL2Txs []common.PoolL2Tx
  272. var discardedL2Txs []common.PoolL2Tx
  273. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  274. var auths [][]byte
  275. var coordIdxs []common.Idx
  276. // 1. Decide if we forge L2Tx or L1+L2Tx
  277. if p.shouldL1L2Batch(batchInfo) {
  278. batchInfo.L1Batch = true
  279. defer func() {
  280. // If there's no error, update the parameters related
  281. // to the last L1Batch forged
  282. if err == nil {
  283. p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  284. p.lastForgeL1TxsNum++
  285. }
  286. }()
  287. if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  288. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  289. }
  290. // 2a: L1+L2 txs
  291. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
  292. if err != nil {
  293. return nil, tracerr.Wrap(err)
  294. }
  295. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  296. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  297. if err != nil {
  298. return nil, tracerr.Wrap(err)
  299. }
  300. } else {
  301. // 2b: only L2 txs
  302. coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  303. p.txSelector.GetL2TxSelection(selectionCfg)
  304. if err != nil {
  305. return nil, tracerr.Wrap(err)
  306. }
  307. l1UserTxsExtra = nil
  308. }
  309. // 3. Save metadata from TxSelector output for BatchNum
  310. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  311. batchInfo.L1CoordTxs = l1CoordTxs
  312. batchInfo.L1CoordinatorTxsAuths = auths
  313. batchInfo.CoordIdxs = coordIdxs
  314. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  315. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  316. return nil, tracerr.Wrap(err)
  317. }
  318. if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil {
  319. return nil, tracerr.Wrap(err)
  320. }
  321. // Invalidate transactions that become invalid beause of
  322. // the poolL2Txs selected. Will mark as invalid the txs that have a
  323. // (fromIdx, nonce) which already appears in the selected txs (includes
  324. // all the nonces smaller than the current one)
  325. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  326. if err != nil {
  327. return nil, tracerr.Wrap(err)
  328. }
  329. // 4. Call BatchBuilder with TxSelector output
  330. configBatch := &batchbuilder.ConfigBatch{
  331. TxProcessorConfig: p.cfg.TxProcessorConfig,
  332. }
  333. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  334. l1CoordTxs, poolL2Txs)
  335. if err != nil {
  336. return nil, tracerr.Wrap(err)
  337. }
  338. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  339. if err != nil {
  340. return nil, tracerr.Wrap(err)
  341. }
  342. batchInfo.L2Txs = l2Txs
  343. // 5. Save metadata from BatchBuilder output for BatchNum
  344. batchInfo.ZKInputs = zkInputs
  345. batchInfo.Debug.Status = StatusForged
  346. p.cfg.debugBatchStore(batchInfo)
  347. log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum)
  348. return batchInfo, nil
  349. }
  350. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  351. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  352. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  353. if err != nil {
  354. return tracerr.Wrap(err)
  355. }
  356. batchInfo.Proof = proof
  357. batchInfo.PublicInputs = pubInputs
  358. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  359. batchInfo.Debug.Status = StatusProof
  360. p.cfg.debugBatchStore(batchInfo)
  361. log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum)
  362. return nil
  363. }
  364. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  365. // Take the lastL1BatchBlockNum as the biggest between the last
  366. // scheduled one, and the synchronized one.
  367. lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
  368. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  369. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  370. }
  371. // Set Debug information
  372. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
  373. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  374. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  375. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  376. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  377. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  378. // range before the l1batch timeout.
  379. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  380. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  381. }
  382. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  383. proof := batchInfo.Proof
  384. zki := batchInfo.ZKInputs
  385. return &eth.RollupForgeBatchArgs{
  386. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  387. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  388. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  389. L1UserTxs: batchInfo.L1UserTxsExtra,
  390. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  391. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  392. L2TxsData: batchInfo.L2Txs,
  393. FeeIdxCoordinator: batchInfo.CoordIdxs,
  394. // Circuit selector
  395. VerifierIdx: batchInfo.VerifierIdx,
  396. L1Batch: batchInfo.L1Batch,
  397. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  398. // Implementation of the verifier need a swap on the proofB vector
  399. ProofB: [2][2]*big.Int{
  400. {proof.PiB[0][1], proof.PiB[0][0]},
  401. {proof.PiB[1][1], proof.PiB[1][0]},
  402. },
  403. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  404. }
  405. }