You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

436 lines
13 KiB

3 years ago
  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/prover"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/hermeznetwork/tracerr"
  18. )
  19. type statsVars struct {
  20. Stats synchronizer.Stats
  21. Vars synchronizer.SCVariablesPtr
  22. }
  23. // Pipeline manages the forging of batches with parallel server proofs
  24. type Pipeline struct {
  25. num int
  26. cfg Config
  27. consts synchronizer.SCConsts
  28. // state
  29. batchNum common.BatchNum
  30. lastScheduledL1BatchBlockNum int64
  31. lastForgeL1TxsNum int64
  32. started bool
  33. proversPool *ProversPool
  34. provers []prover.Client
  35. txManager *TxManager
  36. historyDB *historydb.HistoryDB
  37. l2DB *l2db.L2DB
  38. txSelector *txselector.TxSelector
  39. batchBuilder *batchbuilder.BatchBuilder
  40. purger *Purger
  41. stats synchronizer.Stats
  42. vars synchronizer.SCVariables
  43. statsVarsCh chan statsVars
  44. ctx context.Context
  45. wg sync.WaitGroup
  46. cancel context.CancelFunc
  47. }
  48. // NewPipeline creates a new Pipeline
  49. func NewPipeline(ctx context.Context,
  50. cfg Config,
  51. num int, // Pipeline sequential number
  52. historyDB *historydb.HistoryDB,
  53. l2DB *l2db.L2DB,
  54. txSelector *txselector.TxSelector,
  55. batchBuilder *batchbuilder.BatchBuilder,
  56. purger *Purger,
  57. txManager *TxManager,
  58. provers []prover.Client,
  59. scConsts *synchronizer.SCConsts,
  60. ) (*Pipeline, error) {
  61. proversPool := NewProversPool(len(provers))
  62. proversPoolSize := 0
  63. for _, prover := range provers {
  64. if err := prover.WaitReady(ctx); err != nil {
  65. log.Errorw("prover.WaitReady", "err", err)
  66. } else {
  67. proversPool.Add(ctx, prover)
  68. proversPoolSize++
  69. }
  70. }
  71. if proversPoolSize == 0 {
  72. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  73. }
  74. return &Pipeline{
  75. num: num,
  76. cfg: cfg,
  77. historyDB: historyDB,
  78. l2DB: l2DB,
  79. txSelector: txSelector,
  80. batchBuilder: batchBuilder,
  81. provers: provers,
  82. proversPool: proversPool,
  83. purger: purger,
  84. txManager: txManager,
  85. consts: *scConsts,
  86. statsVarsCh: make(chan statsVars, queueLen),
  87. }, nil
  88. }
  89. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  90. func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  91. select {
  92. case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  93. case <-ctx.Done():
  94. }
  95. }
  96. // reset pipeline state
  97. func (p *Pipeline) reset(batchNum common.BatchNum,
  98. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  99. p.batchNum = batchNum
  100. p.lastForgeL1TxsNum = stats.Sync.LastForgeL1TxsNum
  101. p.stats = *stats
  102. p.vars = *vars
  103. p.lastScheduledL1BatchBlockNum = 0
  104. err := p.txSelector.Reset(p.batchNum)
  105. if err != nil {
  106. return tracerr.Wrap(err)
  107. }
  108. err = p.batchBuilder.Reset(p.batchNum, true)
  109. if err != nil {
  110. return tracerr.Wrap(err)
  111. }
  112. return nil
  113. }
  114. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  115. updateSCVars(&p.vars, vars)
  116. }
  117. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  118. // and then waits for an available proof server and sends the zkInputs to it so
  119. // that the proof computation begins.
  120. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  121. batchInfo, err := p.forgeBatch(batchNum)
  122. if ctx.Err() != nil {
  123. return nil, ctx.Err()
  124. } else if err != nil {
  125. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  126. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  127. "lastForgeL1TxsNum", p.lastForgeL1TxsNum,
  128. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  129. } else {
  130. log.Errorw("forgeBatch", "err", err)
  131. }
  132. return nil, err
  133. }
  134. // 6. Wait for an available server proof (blocking call)
  135. serverProof, err := p.proversPool.Get(ctx)
  136. if ctx.Err() != nil {
  137. return nil, ctx.Err()
  138. } else if err != nil {
  139. log.Errorw("proversPool.Get", "err", err)
  140. return nil, err
  141. }
  142. batchInfo.ServerProof = serverProof
  143. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  144. return nil, ctx.Err()
  145. } else if err != nil {
  146. log.Errorw("sendServerProof", "err", err)
  147. batchInfo.ServerProof = nil
  148. p.proversPool.Add(ctx, serverProof)
  149. return nil, err
  150. }
  151. return batchInfo, nil
  152. }
  153. // Start the forging pipeline
  154. func (p *Pipeline) Start(batchNum common.BatchNum,
  155. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  156. if p.started {
  157. log.Fatal("Pipeline already started")
  158. }
  159. p.started = true
  160. if err := p.reset(batchNum, stats, vars); err != nil {
  161. return tracerr.Wrap(err)
  162. }
  163. p.ctx, p.cancel = context.WithCancel(context.Background())
  164. queueSize := 1
  165. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  166. p.wg.Add(1)
  167. go func() {
  168. waitDuration := zeroDuration
  169. for {
  170. select {
  171. case <-p.ctx.Done():
  172. log.Info("Pipeline forgeBatch loop done")
  173. p.wg.Done()
  174. return
  175. case statsVars := <-p.statsVarsCh:
  176. p.stats = statsVars.Stats
  177. p.syncSCVars(statsVars.Vars)
  178. case <-time.After(waitDuration):
  179. batchNum = p.batchNum + 1
  180. batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
  181. if p.ctx.Err() != nil {
  182. continue
  183. } else if err != nil {
  184. waitDuration = p.cfg.SyncRetryInterval
  185. continue
  186. }
  187. p.batchNum = batchNum
  188. select {
  189. case batchChSentServerProof <- batchInfo:
  190. case <-p.ctx.Done():
  191. }
  192. }
  193. }
  194. }()
  195. p.wg.Add(1)
  196. go func() {
  197. for {
  198. select {
  199. case <-p.ctx.Done():
  200. log.Info("Pipeline waitServerProofSendEth loop done")
  201. p.wg.Done()
  202. return
  203. case batchInfo := <-batchChSentServerProof:
  204. err := p.waitServerProof(p.ctx, batchInfo)
  205. // We are done with this serverProof, add it back to the pool
  206. p.proversPool.Add(p.ctx, batchInfo.ServerProof)
  207. batchInfo.ServerProof = nil
  208. if p.ctx.Err() != nil {
  209. continue
  210. } else if err != nil {
  211. log.Errorw("waitServerProof", "err", err)
  212. continue
  213. }
  214. p.txManager.AddBatch(p.ctx, batchInfo)
  215. }
  216. }
  217. }()
  218. return nil
  219. }
  220. // Stop the forging pipeline
  221. func (p *Pipeline) Stop(ctx context.Context) {
  222. if !p.started {
  223. log.Fatal("Pipeline already stopped")
  224. }
  225. p.started = false
  226. log.Info("Stopping Pipeline...")
  227. p.cancel()
  228. p.wg.Wait()
  229. for _, prover := range p.provers {
  230. if err := prover.Cancel(ctx); ctx.Err() != nil {
  231. continue
  232. } else if err != nil {
  233. log.Errorw("prover.Cancel", "err", err)
  234. }
  235. }
  236. }
  237. // sendServerProof sends the circuit inputs to the proof server
  238. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  239. p.cfg.debugBatchStore(batchInfo)
  240. // 7. Call the selected idle server proof with BatchBuilder output,
  241. // save server proof info for batchNum
  242. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  243. return tracerr.Wrap(err)
  244. }
  245. return nil
  246. }
  247. // forgeBatch forges the batchNum batch.
  248. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  249. // remove transactions from the pool that have been there for too long
  250. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  251. p.stats.Sync.LastBlock.Num, int64(batchNum))
  252. if err != nil {
  253. return nil, tracerr.Wrap(err)
  254. }
  255. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  256. if err != nil {
  257. return nil, tracerr.Wrap(err)
  258. }
  259. // Structure to accumulate data and metadata of the batch
  260. batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum}
  261. batchInfo.Debug.StartTimestamp = time.Now()
  262. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  263. selectionCfg := &txselector.SelectionConfig{
  264. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  265. TxProcessorConfig: p.cfg.TxProcessorConfig,
  266. }
  267. var poolL2Txs []common.PoolL2Tx
  268. var discardedL2Txs []common.PoolL2Tx
  269. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  270. var auths [][]byte
  271. var coordIdxs []common.Idx
  272. // 1. Decide if we forge L2Tx or L1+L2Tx
  273. if p.shouldL1L2Batch(batchInfo) {
  274. batchInfo.L1Batch = true
  275. defer func() {
  276. // If there's no error, update the parameters related
  277. // to the last L1Batch forged
  278. if err == nil {
  279. p.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  280. p.lastForgeL1TxsNum++
  281. }
  282. }()
  283. if p.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  284. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  285. }
  286. // 2a: L1+L2 txs
  287. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.lastForgeL1TxsNum + 1)
  288. if err != nil {
  289. return nil, tracerr.Wrap(err)
  290. }
  291. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  292. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  293. if err != nil {
  294. return nil, tracerr.Wrap(err)
  295. }
  296. } else {
  297. // 2b: only L2 txs
  298. coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  299. p.txSelector.GetL2TxSelection(selectionCfg)
  300. if err != nil {
  301. return nil, tracerr.Wrap(err)
  302. }
  303. l1UserTxsExtra = nil
  304. }
  305. // 3. Save metadata from TxSelector output for BatchNum
  306. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  307. batchInfo.L1CoordTxs = l1CoordTxs
  308. batchInfo.L1CoordinatorTxsAuths = auths
  309. batchInfo.CoordIdxs = coordIdxs
  310. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  311. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  312. return nil, tracerr.Wrap(err)
  313. }
  314. if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil {
  315. return nil, tracerr.Wrap(err)
  316. }
  317. // Invalidate transactions that become invalid beause of
  318. // the poolL2Txs selected. Will mark as invalid the txs that have a
  319. // (fromIdx, nonce) which already appears in the selected txs (includes
  320. // all the nonces smaller than the current one)
  321. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  322. if err != nil {
  323. return nil, tracerr.Wrap(err)
  324. }
  325. // 4. Call BatchBuilder with TxSelector output
  326. configBatch := &batchbuilder.ConfigBatch{
  327. TxProcessorConfig: p.cfg.TxProcessorConfig,
  328. }
  329. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  330. l1CoordTxs, poolL2Txs)
  331. if err != nil {
  332. return nil, tracerr.Wrap(err)
  333. }
  334. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  335. if err != nil {
  336. return nil, tracerr.Wrap(err)
  337. }
  338. batchInfo.L2Txs = l2Txs
  339. // 5. Save metadata from BatchBuilder output for BatchNum
  340. batchInfo.ZKInputs = zkInputs
  341. batchInfo.Debug.Status = StatusForged
  342. p.cfg.debugBatchStore(batchInfo)
  343. log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum)
  344. return batchInfo, nil
  345. }
  346. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  347. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  348. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  349. if err != nil {
  350. return tracerr.Wrap(err)
  351. }
  352. batchInfo.Proof = proof
  353. batchInfo.PublicInputs = pubInputs
  354. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  355. batchInfo.Debug.Status = StatusProof
  356. p.cfg.debugBatchStore(batchInfo)
  357. log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum)
  358. return nil
  359. }
  360. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  361. // Take the lastL1BatchBlockNum as the biggest between the last
  362. // scheduled one, and the synchronized one.
  363. lastL1BatchBlockNum := p.lastScheduledL1BatchBlockNum
  364. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  365. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  366. }
  367. // Set Debug information
  368. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.lastScheduledL1BatchBlockNum
  369. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  370. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  371. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  372. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  373. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  374. // range before the l1batch timeout.
  375. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  376. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  377. }
  378. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  379. proof := batchInfo.Proof
  380. zki := batchInfo.ZKInputs
  381. return &eth.RollupForgeBatchArgs{
  382. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  383. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  384. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  385. L1UserTxs: batchInfo.L1UserTxsExtra,
  386. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  387. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  388. L2TxsData: batchInfo.L2Txs,
  389. FeeIdxCoordinator: batchInfo.CoordIdxs,
  390. // Circuit selector
  391. VerifierIdx: batchInfo.VerifierIdx,
  392. L1Batch: batchInfo.L1Batch,
  393. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  394. // Implementation of the verifier need a swap on the proofB vector
  395. ProofB: [2][2]*big.Int{
  396. {proof.PiB[0][1], proof.PiB[0][0]},
  397. {proof.PiB[1][1], proof.PiB[1][0]},
  398. },
  399. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  400. }
  401. }