You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

435 lines
13 KiB

  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/prover"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/hermeznetwork/tracerr"
  18. )
  19. type statsVars struct {
  20. Stats synchronizer.Stats
  21. Vars synchronizer.SCVariablesPtr
  22. }
  23. // Pipeline manages the forging of batches with parallel server proofs
  24. type Pipeline struct {
  25. cfg Config
  26. consts synchronizer.SCConsts
  27. // state
  28. batchNum common.BatchNum
  29. lastScheduledL1BatchBlockNum int64
  30. lastForgeL1TxsNum int64
  31. started bool
  32. proversPool *ProversPool
  33. provers []prover.Client
  34. txManager *TxManager
  35. historyDB *historydb.HistoryDB
  36. l2DB *l2db.L2DB
  37. txSelector *txselector.TxSelector
  38. batchBuilder *batchbuilder.BatchBuilder
  39. purger *Purger
  40. stats synchronizer.Stats
  41. vars synchronizer.SCVariables
  42. statsVarsCh chan statsVars
  43. ctx context.Context
  44. wg sync.WaitGroup
  45. cancel context.CancelFunc
  46. }
  47. // NewPipeline creates a new Pipeline
  48. func NewPipeline(ctx context.Context,
  49. cfg Config,
  50. historyDB *historydb.HistoryDB,
  51. l2DB *l2db.L2DB,
  52. txSelector *txselector.TxSelector,
  53. batchBuilder *batchbuilder.BatchBuilder,
  54. purger *Purger,
  55. txManager *TxManager,
  56. provers []prover.Client,
  57. scConsts *synchronizer.SCConsts,
  58. ) (*Pipeline, error) {
  59. proversPool := NewProversPool(len(provers))
  60. proversPoolSize := 0
  61. for _, prover := range provers {
  62. if err := prover.WaitReady(ctx); err != nil {
  63. log.Errorw("prover.WaitReady", "err", err)
  64. } else {
  65. proversPool.Add(ctx, prover)
  66. proversPoolSize++
  67. }
  68. }
  69. if proversPoolSize == 0 {
  70. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  71. }
  72. return &Pipeline{
  73. cfg: cfg,
  74. historyDB: historyDB,
  75. l2DB: l2DB,
  76. txSelector: txSelector,
  77. batchBuilder: batchBuilder,
  78. provers: provers,
  79. proversPool: proversPool,
  80. purger: purger,
  81. txManager: txManager,
  82. consts: *scConsts,
  83. statsVarsCh: make(chan statsVars, queueLen),
  84. }, nil
  85. }
  86. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  87. func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  88. select {
  89. case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  90. case <-ctx.Done():
  91. }
  92. }
  93. // reset pipeline state
  94. func (p *Pipeline) reset(batchNum common.BatchNum,
  95. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  96. p.batchNum = batchNum
  97. p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
  98. p.stats = *stats
  99. p.vars = *vars
  100. p.lastScheduledL1BatchBlockNum = 0
  101. err := p.txSelector.Reset(p.batchNum)
  102. if err != nil {
  103. return tracerr.Wrap(err)
  104. }
  105. err = p.batchBuilder.Reset(p.batchNum, true)
  106. if err != nil {
  107. return tracerr.Wrap(err)
  108. }
  109. return nil
  110. }
  111. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  112. if vars.Rollup != nil {
  113. p.vars.Rollup = *vars.Rollup
  114. }
  115. if vars.Auction != nil {
  116. p.vars.Auction = *vars.Auction
  117. }
  118. if vars.WDelayer != nil {
  119. p.vars.WDelayer = *vars.WDelayer
  120. }
  121. }
  122. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  123. // and then waits for an available proof server and sends the zkInputs to it so
  124. // that the proof computation begins.
  125. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  126. batchInfo, err := p.forgeBatch(batchNum)
  127. if ctx.Err() != nil {
  128. return nil, ctx.Err()
  129. } else if err != nil {
  130. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  131. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  132. "lastForgeL1TxsNum", p.lastForgeL1TxsNum,
  133. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  134. } else {
  135. log.Errorw("forgeBatch", "err", err)
  136. }
  137. return nil, err
  138. }
  139. // 6. Wait for an available server proof (blocking call)
  140. serverProof, err := p.proversPool.Get(ctx)
  141. if ctx.Err() != nil {
  142. return nil, ctx.Err()
  143. } else if err != nil {
  144. log.Errorw("proversPool.Get", "err", err)
  145. return nil, err
  146. }
  147. batchInfo.ServerProof = serverProof
  148. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  149. return nil, ctx.Err()
  150. } else if err != nil {
  151. log.Errorw("sendServerProof", "err", err)
  152. batchInfo.ServerProof = nil
  153. p.proversPool.Add(ctx, serverProof)
  154. return nil, err
  155. }
  156. return batchInfo, nil
  157. }
  158. // Start the forging pipeline
  159. func (p *Pipeline) Start(batchNum common.BatchNum,
  160. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  161. if p.started {
  162. log.Fatal("Pipeline already started")
  163. }
  164. p.started = true
  165. if err := p.reset(batchNum, stats, vars); err != nil {
  166. return tracerr.Wrap(err)
  167. }
  168. p.ctx, p.cancel = context.WithCancel(context.Background())
  169. queueSize := 1
  170. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  171. p.wg.Add(1)
  172. go func() {
  173. waitDuration := zeroDuration
  174. for {
  175. select {
  176. case <-p.ctx.Done():
  177. log.Info("Pipeline forgeBatch loop done")
  178. p.wg.Done()
  179. return
  180. case statsVars := <-p.statsVarsCh:
  181. p.stats = statsVars.Stats
  182. p.syncSCVars(statsVars.Vars)
  183. case <-time.After(waitDuration):
  184. batchNum = p.batchNum + 1
  185. if batchInfo, err := p.handleForgeBatch(p.ctx, batchNum); err != nil {
  186. waitDuration = p.cfg.SyncRetryInterval
  187. continue
  188. } else {
  189. p.batchNum = batchNum
  190. select {
  191. case batchChSentServerProof <- batchInfo:
  192. case <-p.ctx.Done():
  193. }
  194. }
  195. }
  196. }
  197. }()
  198. p.wg.Add(1)
  199. go func() {
  200. for {
  201. select {
  202. case <-p.ctx.Done():
  203. log.Info("Pipeline waitServerProofSendEth loop done")
  204. p.wg.Done()
  205. return
  206. case batchInfo := <-batchChSentServerProof:
  207. err := p.waitServerProof(p.ctx, batchInfo)
  208. // We are done with this serverProof, add it back to the pool
  209. p.proversPool.Add(p.ctx, batchInfo.ServerProof)
  210. batchInfo.ServerProof = nil
  211. if p.ctx.Err() != nil {
  212. continue
  213. } else if err != nil {
  214. log.Errorw("waitServerProof", "err", err)
  215. continue
  216. }
  217. p.txManager.AddBatch(p.ctx, batchInfo)
  218. }
  219. }
  220. }()
  221. return nil
  222. }
  223. // Stop the forging pipeline
  224. func (p *Pipeline) Stop(ctx context.Context) {
  225. if !p.started {
  226. log.Fatal("Pipeline already stopped")
  227. }
  228. p.started = false
  229. log.Info("Stopping Pipeline...")
  230. p.cancel()
  231. p.wg.Wait()
  232. for _, prover := range p.provers {
  233. if err := prover.Cancel(ctx); ctx.Err() != nil {
  234. continue
  235. } else if err != nil {
  236. log.Errorw("prover.Cancel", "err", err)
  237. }
  238. }
  239. }
  240. // sendServerProof sends the circuit inputs to the proof server
  241. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  242. p.cfg.debugBatchStore(batchInfo)
  243. // 7. Call the selected idle server proof with BatchBuilder output,
  244. // save server proof info for batchNum
  245. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  246. return tracerr.Wrap(err)
  247. }
  248. return nil
  249. }
  250. // forgeBatch forges the batchNum batch.
  251. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  252. // remove transactions from the pool that have been there for too long
  253. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  254. p.stats.Sync.LastBlock.Num, int64(batchNum))
  255. if err != nil {
  256. return nil, tracerr.Wrap(err)
  257. }
  258. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  259. if err != nil {
  260. return nil, tracerr.Wrap(err)
  261. }
  262. batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
  263. batchInfo.Debug.StartTimestamp = time.Now()
  264. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  265. selectionCfg := &txselector.SelectionConfig{
  266. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  267. TxProcessorConfig: p.cfg.TxProcessorConfig,
  268. }
  269. var poolL2Txs []common.PoolL2Tx
  270. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  271. var auths [][]byte
  272. var coordIdxs []common.Idx
  273. // 1. Decide if we forge L2Tx or L1+L2Tx
  274. if p.shouldL1L2Batch(batchInfo) {
  275. batchInfo.L1Batch = true
  276. defer func() {
  277. // If there's no error, update the parameters related
  278. // to the last L1Batch forged
  279. if err == nil {
  280. p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  281. p.lastForgeL1TxsNum++
  282. }
  283. }()
  284. if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  285. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  286. }
  287. // 2a: L1+L2 txs
  288. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
  289. if err != nil {
  290. return nil, tracerr.Wrap(err)
  291. }
  292. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, err =
  293. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  294. if err != nil {
  295. return nil, tracerr.Wrap(err)
  296. }
  297. } else {
  298. // 2b: only L2 txs
  299. coordIdxs, auths, l1CoordTxs, poolL2Txs, err =
  300. p.txSelector.GetL2TxSelection(selectionCfg)
  301. if err != nil {
  302. return nil, tracerr.Wrap(err)
  303. }
  304. l1UserTxsExtra = nil
  305. }
  306. // 3. Save metadata from TxSelector output for BatchNum
  307. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  308. batchInfo.L1CoordTxs = l1CoordTxs
  309. batchInfo.L1CoordinatorTxsAuths = auths
  310. batchInfo.CoordIdxs = coordIdxs
  311. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  312. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  313. return nil, tracerr.Wrap(err)
  314. }
  315. // Invalidate transactions that become invalid beause of
  316. // the poolL2Txs selected. Will mark as invalid the txs that have a
  317. // (fromIdx, nonce) which already appears in the selected txs (includes
  318. // all the nonces smaller than the current one)
  319. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  320. if err != nil {
  321. return nil, tracerr.Wrap(err)
  322. }
  323. // 4. Call BatchBuilder with TxSelector output
  324. configBatch := &batchbuilder.ConfigBatch{
  325. TxProcessorConfig: p.cfg.TxProcessorConfig,
  326. }
  327. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  328. l1CoordTxs, poolL2Txs)
  329. if err != nil {
  330. return nil, tracerr.Wrap(err)
  331. }
  332. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  333. if err != nil {
  334. return nil, tracerr.Wrap(err)
  335. }
  336. batchInfo.L2Txs = l2Txs
  337. // 5. Save metadata from BatchBuilder output for BatchNum
  338. batchInfo.ZKInputs = zkInputs
  339. batchInfo.Debug.Status = StatusForged
  340. p.cfg.debugBatchStore(batchInfo)
  341. log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum)
  342. return batchInfo, nil
  343. }
  344. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  345. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  346. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  347. if err != nil {
  348. return tracerr.Wrap(err)
  349. }
  350. batchInfo.Proof = proof
  351. batchInfo.PublicInputs = pubInputs
  352. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  353. batchInfo.Debug.Status = StatusProof
  354. p.cfg.debugBatchStore(batchInfo)
  355. log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum)
  356. return nil
  357. }
  358. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  359. // Take the lastL1BatchBlockNum as the biggest between the last
  360. // scheduled one, and the synchronized one.
  361. lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
  362. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  363. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  364. }
  365. // Set Debug information
  366. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
  367. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  368. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  369. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  370. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  371. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  372. // range before the l1batch timeout.
  373. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  374. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  375. }
  376. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  377. proof := batchInfo.Proof
  378. zki := batchInfo.ZKInputs
  379. return &eth.RollupForgeBatchArgs{
  380. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  381. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  382. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  383. L1UserTxs: batchInfo.L1UserTxsExtra,
  384. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  385. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  386. L2TxsData: batchInfo.L2Txs,
  387. FeeIdxCoordinator: batchInfo.CoordIdxs,
  388. // Circuit selector
  389. VerifierIdx: batchInfo.VerifierIdx,
  390. L1Batch: batchInfo.L1Batch,
  391. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  392. // Implementation of the verifier need a swap on the proofB vector
  393. ProofB: [2][2]*big.Int{
  394. {proof.PiB[0][1], proof.PiB[0][0]},
  395. {proof.PiB[1][1], proof.PiB[1][0]},
  396. },
  397. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  398. }
  399. }