You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
13 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. package coordinator
  2. import (
  3. "context"
  4. "fmt"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/hermeznetwork/hermez-node/batchbuilder"
  9. "github.com/hermeznetwork/hermez-node/common"
  10. "github.com/hermeznetwork/hermez-node/db/historydb"
  11. "github.com/hermeznetwork/hermez-node/db/l2db"
  12. "github.com/hermeznetwork/hermez-node/eth"
  13. "github.com/hermeznetwork/hermez-node/log"
  14. "github.com/hermeznetwork/hermez-node/prover"
  15. "github.com/hermeznetwork/hermez-node/synchronizer"
  16. "github.com/hermeznetwork/hermez-node/txselector"
  17. "github.com/hermeznetwork/tracerr"
  18. )
  19. type statsVars struct {
  20. Stats synchronizer.Stats
  21. Vars synchronizer.SCVariablesPtr
  22. }
  23. type state struct {
  24. batchNum common.BatchNum
  25. lastScheduledL1BatchBlockNum int64
  26. lastForgeL1TxsNum int64
  27. }
  28. // Pipeline manages the forging of batches with parallel server proofs
  29. type Pipeline struct {
  30. num int
  31. cfg Config
  32. consts synchronizer.SCConsts
  33. // state
  34. state state
  35. // batchNum common.BatchNum
  36. // lastScheduledL1BatchBlockNum int64
  37. // lastForgeL1TxsNum int64
  38. started bool
  39. proversPool *ProversPool
  40. provers []prover.Client
  41. txManager *TxManager
  42. historyDB *historydb.HistoryDB
  43. l2DB *l2db.L2DB
  44. txSelector *txselector.TxSelector
  45. batchBuilder *batchbuilder.BatchBuilder
  46. purger *Purger
  47. stats synchronizer.Stats
  48. vars synchronizer.SCVariables
  49. statsVarsCh chan statsVars
  50. ctx context.Context
  51. wg sync.WaitGroup
  52. cancel context.CancelFunc
  53. }
  54. // NewPipeline creates a new Pipeline
  55. func NewPipeline(ctx context.Context,
  56. cfg Config,
  57. num int, // Pipeline sequential number
  58. historyDB *historydb.HistoryDB,
  59. l2DB *l2db.L2DB,
  60. txSelector *txselector.TxSelector,
  61. batchBuilder *batchbuilder.BatchBuilder,
  62. purger *Purger,
  63. txManager *TxManager,
  64. provers []prover.Client,
  65. scConsts *synchronizer.SCConsts,
  66. ) (*Pipeline, error) {
  67. proversPool := NewProversPool(len(provers))
  68. proversPoolSize := 0
  69. for _, prover := range provers {
  70. if err := prover.WaitReady(ctx); err != nil {
  71. log.Errorw("prover.WaitReady", "err", err)
  72. } else {
  73. proversPool.Add(ctx, prover)
  74. proversPoolSize++
  75. }
  76. }
  77. if proversPoolSize == 0 {
  78. return nil, tracerr.Wrap(fmt.Errorf("no provers in the pool"))
  79. }
  80. return &Pipeline{
  81. num: num,
  82. cfg: cfg,
  83. historyDB: historyDB,
  84. l2DB: l2DB,
  85. txSelector: txSelector,
  86. batchBuilder: batchBuilder,
  87. provers: provers,
  88. proversPool: proversPool,
  89. purger: purger,
  90. txManager: txManager,
  91. consts: *scConsts,
  92. statsVarsCh: make(chan statsVars, queueLen),
  93. }, nil
  94. }
  95. // SetSyncStatsVars is a thread safe method to sets the synchronizer Stats
  96. func (p *Pipeline) SetSyncStatsVars(ctx context.Context, stats *synchronizer.Stats, vars *synchronizer.SCVariablesPtr) {
  97. select {
  98. case p.statsVarsCh <- statsVars{Stats: *stats, Vars: *vars}:
  99. case <-ctx.Done():
  100. }
  101. }
  102. // reset pipeline state
  103. func (p *Pipeline) reset(batchNum common.BatchNum,
  104. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  105. p.state = state{
  106. batchNum: batchNum,
  107. lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum,
  108. lastScheduledL1BatchBlockNum: 0,
  109. }
  110. p.stats = *stats
  111. p.vars = *vars
  112. err := p.txSelector.Reset(p.state.batchNum)
  113. if err != nil {
  114. return tracerr.Wrap(err)
  115. }
  116. err = p.batchBuilder.Reset(p.state.batchNum, true)
  117. if err != nil {
  118. return tracerr.Wrap(err)
  119. }
  120. return nil
  121. }
  122. func (p *Pipeline) syncSCVars(vars synchronizer.SCVariablesPtr) {
  123. updateSCVars(&p.vars, vars)
  124. }
  125. // handleForgeBatch calls p.forgeBatch to forge the batch and get the zkInputs,
  126. // and then waits for an available proof server and sends the zkInputs to it so
  127. // that the proof computation begins.
  128. func (p *Pipeline) handleForgeBatch(ctx context.Context, batchNum common.BatchNum) (*BatchInfo, error) {
  129. batchInfo, err := p.forgeBatch(batchNum)
  130. if ctx.Err() != nil {
  131. return nil, ctx.Err()
  132. } else if err != nil {
  133. if tracerr.Unwrap(err) == errLastL1BatchNotSynced {
  134. log.Warnw("forgeBatch: scheduled L1Batch too early", "err", err,
  135. "lastForgeL1TxsNum", p.state.lastForgeL1TxsNum,
  136. "syncLastForgeL1TxsNum", p.stats.Sync.LastForgeL1TxsNum)
  137. } else {
  138. log.Errorw("forgeBatch", "err", err)
  139. }
  140. return nil, err
  141. }
  142. // 6. Wait for an available server proof (blocking call)
  143. serverProof, err := p.proversPool.Get(ctx)
  144. if ctx.Err() != nil {
  145. return nil, ctx.Err()
  146. } else if err != nil {
  147. log.Errorw("proversPool.Get", "err", err)
  148. return nil, err
  149. }
  150. batchInfo.ServerProof = serverProof
  151. if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil {
  152. return nil, ctx.Err()
  153. } else if err != nil {
  154. log.Errorw("sendServerProof", "err", err)
  155. batchInfo.ServerProof = nil
  156. p.proversPool.Add(ctx, serverProof)
  157. return nil, err
  158. }
  159. return batchInfo, nil
  160. }
  161. // Start the forging pipeline
  162. func (p *Pipeline) Start(batchNum common.BatchNum,
  163. stats *synchronizer.Stats, vars *synchronizer.SCVariables) error {
  164. if p.started {
  165. log.Fatal("Pipeline already started")
  166. }
  167. p.started = true
  168. if err := p.reset(batchNum, stats, vars); err != nil {
  169. return tracerr.Wrap(err)
  170. }
  171. p.ctx, p.cancel = context.WithCancel(context.Background())
  172. queueSize := 1
  173. batchChSentServerProof := make(chan *BatchInfo, queueSize)
  174. p.wg.Add(1)
  175. go func() {
  176. waitDuration := zeroDuration
  177. for {
  178. select {
  179. case <-p.ctx.Done():
  180. log.Info("Pipeline forgeBatch loop done")
  181. p.wg.Done()
  182. return
  183. case statsVars := <-p.statsVarsCh:
  184. p.stats = statsVars.Stats
  185. p.syncSCVars(statsVars.Vars)
  186. case <-time.After(waitDuration):
  187. batchNum = p.state.batchNum + 1
  188. batchInfo, err := p.handleForgeBatch(p.ctx, batchNum)
  189. if p.ctx.Err() != nil {
  190. continue
  191. } else if err != nil {
  192. waitDuration = p.cfg.SyncRetryInterval
  193. continue
  194. }
  195. p.state.batchNum = batchNum
  196. select {
  197. case batchChSentServerProof <- batchInfo:
  198. case <-p.ctx.Done():
  199. }
  200. }
  201. }
  202. }()
  203. p.wg.Add(1)
  204. go func() {
  205. for {
  206. select {
  207. case <-p.ctx.Done():
  208. log.Info("Pipeline waitServerProofSendEth loop done")
  209. p.wg.Done()
  210. return
  211. case batchInfo := <-batchChSentServerProof:
  212. err := p.waitServerProof(p.ctx, batchInfo)
  213. // We are done with this serverProof, add it back to the pool
  214. p.proversPool.Add(p.ctx, batchInfo.ServerProof)
  215. batchInfo.ServerProof = nil
  216. if p.ctx.Err() != nil {
  217. continue
  218. } else if err != nil {
  219. log.Errorw("waitServerProof", "err", err)
  220. continue
  221. }
  222. p.txManager.AddBatch(p.ctx, batchInfo)
  223. }
  224. }
  225. }()
  226. return nil
  227. }
  228. // Stop the forging pipeline
  229. func (p *Pipeline) Stop(ctx context.Context) {
  230. if !p.started {
  231. log.Fatal("Pipeline already stopped")
  232. }
  233. p.started = false
  234. log.Info("Stopping Pipeline...")
  235. p.cancel()
  236. p.wg.Wait()
  237. for _, prover := range p.provers {
  238. if err := prover.Cancel(ctx); ctx.Err() != nil {
  239. continue
  240. } else if err != nil {
  241. log.Errorw("prover.Cancel", "err", err)
  242. }
  243. }
  244. }
  245. // sendServerProof sends the circuit inputs to the proof server
  246. func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  247. p.cfg.debugBatchStore(batchInfo)
  248. // 7. Call the selected idle server proof with BatchBuilder output,
  249. // save server proof info for batchNum
  250. if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil {
  251. return tracerr.Wrap(err)
  252. }
  253. return nil
  254. }
  255. // forgeBatch forges the batchNum batch.
  256. func (p *Pipeline) forgeBatch(batchNum common.BatchNum) (batchInfo *BatchInfo, err error) {
  257. // remove transactions from the pool that have been there for too long
  258. _, err = p.purger.InvalidateMaybe(p.l2DB, p.txSelector.LocalAccountsDB(),
  259. p.stats.Sync.LastBlock.Num, int64(batchNum))
  260. if err != nil {
  261. return nil, tracerr.Wrap(err)
  262. }
  263. _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum))
  264. if err != nil {
  265. return nil, tracerr.Wrap(err)
  266. }
  267. // Structure to accumulate data and metadata of the batch
  268. batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum}
  269. batchInfo.Debug.StartTimestamp = time.Now()
  270. batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1
  271. selectionCfg := &txselector.SelectionConfig{
  272. MaxL1UserTxs: common.RollupConstMaxL1UserTx,
  273. TxProcessorConfig: p.cfg.TxProcessorConfig,
  274. }
  275. var poolL2Txs []common.PoolL2Tx
  276. var discardedL2Txs []common.PoolL2Tx
  277. var l1UserTxsExtra, l1CoordTxs []common.L1Tx
  278. var auths [][]byte
  279. var coordIdxs []common.Idx
  280. // 1. Decide if we forge L2Tx or L1+L2Tx
  281. if p.shouldL1L2Batch(batchInfo) {
  282. batchInfo.L1Batch = true
  283. defer func() {
  284. // If there's no error, update the parameters related
  285. // to the last L1Batch forged
  286. if err == nil {
  287. p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1
  288. p.state.lastForgeL1TxsNum++
  289. }
  290. }()
  291. if p.state.lastForgeL1TxsNum != p.stats.Sync.LastForgeL1TxsNum {
  292. return nil, tracerr.Wrap(errLastL1BatchNotSynced)
  293. }
  294. // 2a: L1+L2 txs
  295. l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1)
  296. if err != nil {
  297. return nil, tracerr.Wrap(err)
  298. }
  299. coordIdxs, auths, l1UserTxsExtra, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  300. p.txSelector.GetL1L2TxSelection(selectionCfg, l1UserTxs)
  301. if err != nil {
  302. return nil, tracerr.Wrap(err)
  303. }
  304. } else {
  305. // 2b: only L2 txs
  306. coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err =
  307. p.txSelector.GetL2TxSelection(selectionCfg)
  308. if err != nil {
  309. return nil, tracerr.Wrap(err)
  310. }
  311. l1UserTxsExtra = nil
  312. }
  313. // 3. Save metadata from TxSelector output for BatchNum
  314. batchInfo.L1UserTxsExtra = l1UserTxsExtra
  315. batchInfo.L1CoordTxs = l1CoordTxs
  316. batchInfo.L1CoordinatorTxsAuths = auths
  317. batchInfo.CoordIdxs = coordIdxs
  318. batchInfo.VerifierIdx = p.cfg.VerifierIdx
  319. if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum); err != nil {
  320. return nil, tracerr.Wrap(err)
  321. }
  322. if err := p.l2DB.UpdateTxsInfo(discardedL2Txs); err != nil {
  323. return nil, tracerr.Wrap(err)
  324. }
  325. // Invalidate transactions that become invalid beause of
  326. // the poolL2Txs selected. Will mark as invalid the txs that have a
  327. // (fromIdx, nonce) which already appears in the selected txs (includes
  328. // all the nonces smaller than the current one)
  329. err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum)
  330. if err != nil {
  331. return nil, tracerr.Wrap(err)
  332. }
  333. // 4. Call BatchBuilder with TxSelector output
  334. configBatch := &batchbuilder.ConfigBatch{
  335. TxProcessorConfig: p.cfg.TxProcessorConfig,
  336. }
  337. zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxsExtra,
  338. l1CoordTxs, poolL2Txs)
  339. if err != nil {
  340. return nil, tracerr.Wrap(err)
  341. }
  342. l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way
  343. if err != nil {
  344. return nil, tracerr.Wrap(err)
  345. }
  346. batchInfo.L2Txs = l2Txs
  347. // 5. Save metadata from BatchBuilder output for BatchNum
  348. batchInfo.ZKInputs = zkInputs
  349. batchInfo.Debug.Status = StatusForged
  350. p.cfg.debugBatchStore(batchInfo)
  351. log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum)
  352. return batchInfo, nil
  353. }
  354. // waitServerProof gets the generated zkProof & sends it to the SmartContract
  355. func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error {
  356. proof, pubInputs, err := batchInfo.ServerProof.GetProof(ctx) // blocking call, until not resolved don't continue. Returns when the proof server has calculated the proof
  357. if err != nil {
  358. return tracerr.Wrap(err)
  359. }
  360. batchInfo.Proof = proof
  361. batchInfo.PublicInputs = pubInputs
  362. batchInfo.ForgeBatchArgs = prepareForgeBatchArgs(batchInfo)
  363. batchInfo.Debug.Status = StatusProof
  364. p.cfg.debugBatchStore(batchInfo)
  365. log.Infow("Pipeline: batch proof calculated", "batch", batchInfo.BatchNum)
  366. return nil
  367. }
  368. func (p *Pipeline) shouldL1L2Batch(batchInfo *BatchInfo) bool {
  369. // Take the lastL1BatchBlockNum as the biggest between the last
  370. // scheduled one, and the synchronized one.
  371. lastL1BatchBlockNum := p.state.lastScheduledL1BatchBlockNum
  372. if p.stats.Sync.LastL1BatchBlock > lastL1BatchBlockNum {
  373. lastL1BatchBlockNum = p.stats.Sync.LastL1BatchBlock
  374. }
  375. // Set Debug information
  376. batchInfo.Debug.LastScheduledL1BatchBlockNum = p.state.lastScheduledL1BatchBlockNum
  377. batchInfo.Debug.LastL1BatchBlock = p.stats.Sync.LastL1BatchBlock
  378. batchInfo.Debug.LastL1BatchBlockDelta = p.stats.Eth.LastBlock.Num + 1 - lastL1BatchBlockNum
  379. batchInfo.Debug.L1BatchBlockScheduleDeadline =
  380. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1) * p.cfg.L1BatchTimeoutPerc)
  381. // Return true if we have passed the l1BatchTimeoutPerc portion of the
  382. // range before the l1batch timeout.
  383. return p.stats.Eth.LastBlock.Num+1-lastL1BatchBlockNum >=
  384. int64(float64(p.vars.Rollup.ForgeL1L2BatchTimeout-1)*p.cfg.L1BatchTimeoutPerc)
  385. }
  386. func prepareForgeBatchArgs(batchInfo *BatchInfo) *eth.RollupForgeBatchArgs {
  387. proof := batchInfo.Proof
  388. zki := batchInfo.ZKInputs
  389. return &eth.RollupForgeBatchArgs{
  390. NewLastIdx: int64(zki.Metadata.NewLastIdxRaw),
  391. NewStRoot: zki.Metadata.NewStateRootRaw.BigInt(),
  392. NewExitRoot: zki.Metadata.NewExitRootRaw.BigInt(),
  393. L1UserTxs: batchInfo.L1UserTxsExtra,
  394. L1CoordinatorTxs: batchInfo.L1CoordTxs,
  395. L1CoordinatorTxsAuths: batchInfo.L1CoordinatorTxsAuths,
  396. L2TxsData: batchInfo.L2Txs,
  397. FeeIdxCoordinator: batchInfo.CoordIdxs,
  398. // Circuit selector
  399. VerifierIdx: batchInfo.VerifierIdx,
  400. L1Batch: batchInfo.L1Batch,
  401. ProofA: [2]*big.Int{proof.PiA[0], proof.PiA[1]},
  402. // Implementation of the verifier need a swap on the proofB vector
  403. ProofB: [2][2]*big.Int{
  404. {proof.PiB[0][1], proof.PiB[0][0]},
  405. {proof.PiB[1][1], proof.PiB[1][0]},
  406. },
  407. ProofC: [2]*big.Int{proof.PiC[0], proof.PiC[1]},
  408. }
  409. }