You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

433 lines
13 KiB

3 years ago
  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/prover"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/hermeznetwork/tracerr"
  18. )
  19. type statsVars struct {
  20. Stats synchronizer.Stats
  21. Vars synchronizer.SCVariablesPtr
  22. }
  23. // Pipeline manages the forging of batches with parallel server proofs
  24. type Pipeline struct {
  25. cfg Config
  26. consts synchronizer.SCConsts
  27. // state
  28. batchNum common.BatchNum
  29. lastScheduledL1BatchBlockNum int64
  30. lastForgeL1TxsNum int64
  31. started bool
  32. proversPool *ProversPool
  33. provers []prover.Client
  34. txManager *TxManager
  35. historyDB *historydb.HistoryDB
  36. l2DB *l2db.L2DB
  37. txSelector *txselector.TxSelector
  38. batchBuilder *batchbuilder.BatchBuilder
  39. purger *Purger
  40. stats synchronizer.Stats
  41. vars synchronizer.SCVariables
  42. statsVarsCh chan statsVars
  43. ctx context.Context
  44. wg sync.WaitGroup
  45. cancel context.CancelFunc
  46. }
  47. // NewPipeline creates a new Pipeline
  48. func NewPipeline(ctx context.Context,
  49. cfg Config,
  50. historyDB *historydb.HistoryDB,
  51. l2DB *l2db.L2DB,
  52. txSelector *txselector.TxSelector,
  53. batchBuilder *batchbuilder.BatchBuilder,
  54. purger *Purger,
  55. txManager *TxManager,
  56. provers []prover.Client,
  57. scConsts *synchronizer.SCConsts,
  58. ) (*Pipeline, error) {
  59. proversPool := NewProversPool(len(provers))
  60. proversPoolSize := 0
  61. for _, prover := range provers {
  62. if err := prover.WaitReady(ctx); err != nil {
  63. log.Errorw("prover.WaitReady", "err", err)
  64. } else {
  65. proversPool.Add(ctx, prover)
  66. proversPoolSize++
  67. }
  68. }
  69. if proversPoolSize == 0 {
  70. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  71. }
  72. return &Pipeline{
  73. cfg: cfg,
  74. historyDB: historyDB,
  75. l2DB: l2DB,
  76. txSelector: txSelector,
  77. batchBuilder: batchBuilder,
  78. provers: provers,
  79. proversPool: proversPool,
  80. purger: purger,
  81. txManager: txManager,
  82. consts: *scConsts,
  83. statsVarsCh: make(chan statsVars, queueLen),
  84. }, nil
  85. }
  86. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  87. func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  88. select {
  89. case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  90. case <-ctx.Done():
  91. }
  92. }
  93. // reset pipeline state
  94. func (p *Pipeline) reset(batchNum common.BatchNum,
  95. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  96. p.batchNum = batchNum
  97. p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
  98. p.stats = *stats
  99. p.vars = *vars
  100. p.lastScheduledL1BatchBlockNum = 0
  101. err := p.txSelector.Reset(p.batchNum)
  102. if err != nil {
  103. return tracerr.Wrap(err)
  104. }
  105. err = p.batchBuilder.Reset(p.batchNum, true)
  106. if err != nil {
  107. return tracerr.Wrap(err)
  108. }
  109. return nil
  110. }
  111. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  112. updateSCVars(&p.vars, vars)
  113. }
  114. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  115. // and then waits for an available proof server and sends the zkInputs to it so
  116. // that the proof computation begins.
  117. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  118. batchInfo, err := p.forgeBatch(batchNum)
  119. if ctx.Err() != nil {
  120. return nil, ctx.Err()
  121. } else if err != nil {
  122. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  123. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  124. "lastForgeL1TxsNum", p.lastForgeL1TxsNum,
  125. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  126. } else {
  127. log.Errorw("forgeBatch", "err", err)
  128. }
  129. return nil, err
  130. }
  131. // 6. Wait for an available server proof (blocking call)
  132. serverProof, err := p.proversPool.Get(ctx)
  133. if ctx.Err() != nil {
  134. return nil, ctx.Err()
  135. } else if err != nil {
  136. log.Errorw("proversPool.Get", "err", err)
  137. return nil, err
  138. }
  139. batchInfo.ServerProof = serverProof
  140. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  141. return nil, ctx.Err()
  142. } else if err != nil {
  143. log.Errorw("sendServerProof", "err", err)
  144. batchInfo.ServerProof = nil
  145. p.proversPool.Add(ctx, serverProof)
  146. return nil, err
  147. }
  148. return batchInfo, nil
  149. }
  150. // Start the forging pipeline
  151. func (p *Pipeline) Start(batchNum common.BatchNum,
  152. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  153. if p.started {
  154. log.Fatal("Pipeline already started")
  155. }
  156. p.started = true
  157. if err := p.reset(batchNum, stats, vars); err != nil {
  158. return tracerr.Wrap(err)
  159. }
  160. p.ctx, p.cancel = context.WithCancel(context.Background())
  161. queueSize := 1
  162. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  163. p.wg.Add(1)
  164. go func() {
  165. waitDuration := zeroDuration
  166. for {
  167. select {
  168. case <-p.ctx.Done():
  169. log.Info("Pipeline forgeBatch loop done")
  170. p.wg.Done()
  171. return
  172. case statsVars := <-p.statsVarsCh:
  173. p.stats = statsVars.Stats
  174. p.syncSCVars(statsVars.Vars)
  175. case <-time.After(waitDuration):
  176. batchNum = p.batchNum + 1
  177. batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
  178. if p.ctx.Err() != nil {
  179. continue
  180. } else if err != nil {
  181. waitDuration = p.cfg.SyncRetryInterval
  182. continue
  183. }
  184. p.batchNum = batchNum
  185. select {
  186. case batchChSentServerProof <- batchInfo:
  187. case <-p.ctx.Done():
  188. }
  189. }
  190. }
  191. }()
  192. p.wg.Add(1)
  193. go func() {
  194. for {
  195. select {
  196. case <-p.ctx.Done():
  197. log.Info("Pipeline waitServerProofSendEth loop done")
  198. p.wg.Done()
  199. return
  200. case batchInfo := <-batchChSentServerProof:
  201. err := p.waitServerProof(p.ctx, batchInfo)
  202. // We are done with this serverProof, add it back to the pool
  203. p.proversPool.Add(p.ctx, batchInfo.ServerProof)
  204. batchInfo.ServerProof = nil
  205. if p.ctx.Err() != nil {
  206. continue
  207. } else if err != nil {
  208. log.Errorw("waitServerProof", "err", err)
  209. continue
  210. }
  211. p.txManager.AddBatch(p.ctx, batchInfo)
  212. }
  213. }
  214. }()
  215. return nil
  216. }
  217. // Stop the forging pipeline
  218. func (p *Pipeline) Stop(ctx context.Context) {
  219. if !p.started {
  220. log.Fatal("Pipeline already stopped")
  221. }
  222. p.started = false
  223. log.Info("Stopping Pipeline...")
  224. p.cancel()
  225. p.wg.Wait()
  226. for _, prover := range p.provers {
  227. if err := prover.Cancel(ctx); ctx.Err() != nil {
  228. continue
  229. } else if err != nil {
  230. log.Errorw("prover.Cancel", "err", err)
  231. }
  232. }
  233. }
  234. // sendServerProof sends the circuit inputs to the proof server
  235. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  236. p.cfg.debugBatchStore(batchInfo)
  237. // 7. Call the selected idle server proof with BatchBuilder output,
  238. // save server proof info for batchNum
  239. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  240. return tracerr.Wrap(err)
  241. }
  242. return nil
  243. }
  244. // forgeBatch forges the batchNum batch.
  245. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  246. // remove transactions from the pool that have been there for too long
  247. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  248. p.stats.Sync.LastBlock.Num, int64(batchNum))
  249. if err != nil {
  250. return nil, tracerr.Wrap(err)
  251. }
  252. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  253. if err != nil {
  254. return nil, tracerr.Wrap(err)
  255. }
  256. batchInfo = &BatchInfo{BatchNum: batchNum} // to accumulate metadata of the batch
  257. batchInfo.Debug.StartTimestamp = time.Now()
  258. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  259. selectionCfg := &txselector.SelectionConfig{
  260. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  261. TxProcessorConfig: p.cfg.TxProcessorConfig,
  262. }
  263. var poolL2Txs []common.PoolL2Tx
  264. var discardedL2Txs []common.PoolL2Tx
  265. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  266. var auths [][]byte
  267. var coordIdxs []common.Idx
  268. // 1. Decide if we forge L2Tx or L1+L2Tx
  269. if p.shouldL1L2Batch(batchInfo) {
  270. batchInfo.L1Batch = true
  271. defer func() {
  272. // If there's no error, update the parameters related
  273. // to the last L1Batch forged
  274. if err == nil {
  275. p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  276. p.lastForgeL1TxsNum++
  277. }
  278. }()
  279. if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  280. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  281. }
  282. // 2a: L1+L2 txs
  283. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
  284. if err != nil {
  285. return nil, tracerr.Wrap(err)
  286. }
  287. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  288. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  289. if err != nil {
  290. return nil, tracerr.Wrap(err)
  291. }
  292. } else {
  293. // 2b: only L2 txs
  294. coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  295. p.txSelector.GetL2TxSelection(selectionCfg)
  296. if err != nil {
  297. return nil, tracerr.Wrap(err)
  298. }
  299. l1UserTxsExtra = nil
  300. }
  301. // 3. Save metadata from TxSelector output for BatchNum
  302. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  303. batchInfo.L1CoordTxs = l1CoordTxs
  304. batchInfo.L1CoordinatorTxsAuths = auths
  305. batchInfo.CoordIdxs = coordIdxs
  306. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  307. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  308. return nil, tracerr.Wrap(err)
  309. }
  310. if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil {
  311. return nil, tracerr.Wrap(err)
  312. }
  313. // Invalidate transactions that become invalid beause of
  314. // the poolL2Txs selected. Will mark as invalid the txs that have a
  315. // (fromIdx, nonce) which already appears in the selected txs (includes
  316. // all the nonces smaller than the current one)
  317. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  318. if err != nil {
  319. return nil, tracerr.Wrap(err)
  320. }
  321. // 4. Call BatchBuilder with TxSelector output
  322. configBatch := &batchbuilder.ConfigBatch{
  323. TxProcessorConfig: p.cfg.TxProcessorConfig,
  324. }
  325. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  326. l1CoordTxs, poolL2Txs)
  327. if err != nil {
  328. return nil, tracerr.Wrap(err)
  329. }
  330. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  331. if err != nil {
  332. return nil, tracerr.Wrap(err)
  333. }
  334. batchInfo.L2Txs = l2Txs
  335. // 5. Save metadata from BatchBuilder output for BatchNum
  336. batchInfo.ZKInputs = zkInputs
  337. batchInfo.Debug.Status = StatusForged
  338. p.cfg.debugBatchStore(batchInfo)
  339. log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum)
  340. return batchInfo, nil
  341. }
  342. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  343. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  344. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  345. if err != nil {
  346. return tracerr.Wrap(err)
  347. }
  348. batchInfo.Proof = proof
  349. batchInfo.PublicInputs = pubInputs
  350. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  351. batchInfo.Debug.Status = StatusProof
  352. p.cfg.debugBatchStore(batchInfo)
  353. log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum)
  354. return nil
  355. }
  356. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  357. // Take the lastL1BatchBlockNum as the biggest between the last
  358. // scheduled one, and the synchronized one.
  359. lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
  360. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  361. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  362. }
  363. // Set Debug information
  364. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
  365. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  366. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  367. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  368. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  369. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  370. // range before the l1batch timeout.
  371. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  372. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  373. }
  374. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  375. proof := batchInfo.Proof
  376. zki := batchInfo.ZKInputs
  377. return &eth.RollupForgeBatchArgs{
  378. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  379. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  380. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  381. L1UserTxs: batchInfo.L1UserTxsExtra,
  382. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  383. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  384. L2TxsData: batchInfo.L2Txs,
  385. FeeIdxCoordinator: batchInfo.CoordIdxs,
  386. // Circuit selector
  387. VerifierIdx: batchInfo.VerifierIdx,
  388. L1Batch: batchInfo.L1Batch,
  389. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  390. // Implementation of the verifier need a swap on the proofB vector
  391. ProofB: [2][2]*big.Int{
  392. {proof.PiB[0][1], proof.PiB[0][0]},
  393. {proof.PiB[1][1], proof.PiB[1][0]},
  394. },
  395. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  396. }
  397. }