You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

292 lines
9.2 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package l2db
  2. import (
  3. "errors"
  4. "math/big"
  5. "time"
  6. ethCommon "github.com/ethereum/go-ethereum/common"
  7. "github.com/hermeznetwork/hermez-node/common"
  8. "github.com/hermeznetwork/hermez-node/log"
  9. "github.com/iden3/go-iden3-crypto/babyjub"
  10. "github.com/jmoiron/sqlx"
  11. //nolint:errcheck // driver for postgres DB
  12. _ "github.com/lib/pq"
  13. "github.com/russross/meddler"
  14. )
  15. // TODO(Edu): Check DB consistency while there's concurrent use from Coordinator/TxSelector & API
  16. // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
  17. // due to them being forged or invalid after a safety period
  18. type L2DB struct {
  19. db *sqlx.DB
  20. safetyPeriod common.BatchNum
  21. ttl time.Duration
  22. maxTxs uint32
  23. }
  24. // NewL2DB creates a L2DB.
  25. // To create it, it's needed db connection, safety period expressed in batches,
  26. // maxTxs that the DB should have and TTL (time to live) for pending txs.
  27. func NewL2DB(db *sqlx.DB, safetyPeriod common.BatchNum, maxTxs uint32, TTL time.Duration) *L2DB {
  28. return &L2DB{
  29. db: db,
  30. safetyPeriod: safetyPeriod,
  31. ttl: TTL,
  32. maxTxs: maxTxs,
  33. }
  34. }
  35. // DB returns a pointer to the L2DB.db. This method should be used only for
  36. // internal testing purposes.
  37. func (l2db *L2DB) DB() *sqlx.DB {
  38. return l2db.db
  39. }
  40. // AddAccountCreationAuth inserts an account creation authorization into the DB
  41. func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
  42. return meddler.Insert(l2db.db, "account_creation_auth", auth)
  43. }
  44. // GetAccountCreationAuth returns an account creation authorization into the DB
  45. func (l2db *L2DB) GetAccountCreationAuth(addr *ethCommon.Address) (*common.AccountCreationAuth, error) {
  46. if addr == nil {
  47. return nil, errors.New("addr cannot be nil")
  48. }
  49. auth := new(common.AccountCreationAuth)
  50. return auth, meddler.QueryRow(
  51. l2db.db, auth,
  52. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  53. addr,
  54. )
  55. }
  56. // AddTxTest inserts a tx into the L2DB. This is useful for test purposes,
  57. // but in production txs will only be inserted through the API (method TBD)
  58. func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
  59. type withouUSD struct {
  60. TxID common.TxID `meddler:"tx_id"`
  61. FromIdx common.Idx `meddler:"from_idx"`
  62. ToIdx *common.Idx `meddler:"to_idx"`
  63. ToEthAddr *ethCommon.Address `meddler:"to_eth_addr"`
  64. ToBJJ *babyjub.PublicKey `meddler:"to_bjj"`
  65. TokenID common.TokenID `meddler:"token_id"`
  66. Amount *big.Int `meddler:"amount,bigint"`
  67. AmountFloat float64 `meddler:"amount_f"`
  68. Fee common.FeeSelector `meddler:"fee"`
  69. Nonce common.Nonce `meddler:"nonce"`
  70. State common.PoolL2TxState `meddler:"state"`
  71. Signature *babyjub.Signature `meddler:"signature"`
  72. Timestamp time.Time `meddler:"timestamp,utctime"`
  73. BatchNum *common.BatchNum `meddler:"batch_num"`
  74. RqFromIdx *common.Idx `meddler:"rq_from_idx"`
  75. RqToIdx *common.Idx `meddler:"rq_to_idx"`
  76. RqToEthAddr *ethCommon.Address `meddler:"rq_to_eth_addr"`
  77. RqToBJJ *babyjub.PublicKey `meddler:"rq_to_bjj"`
  78. RqTokenID *common.TokenID `meddler:"rq_token_id"`
  79. RqAmount *big.Int `meddler:"rq_amount,bigintnull"`
  80. RqFee *common.FeeSelector `meddler:"rq_fee"`
  81. RqNonce *uint64 `meddler:"rq_nonce"`
  82. Type common.TxType `meddler:"tx_type"`
  83. }
  84. return meddler.Insert(l2db.db, "tx_pool", &withouUSD{
  85. TxID: tx.TxID,
  86. FromIdx: tx.FromIdx,
  87. ToIdx: tx.ToIdx,
  88. ToEthAddr: tx.ToEthAddr,
  89. ToBJJ: tx.ToBJJ,
  90. TokenID: tx.TokenID,
  91. Amount: tx.Amount,
  92. AmountFloat: tx.AmountFloat,
  93. Fee: tx.Fee,
  94. Nonce: tx.Nonce,
  95. State: tx.State,
  96. Signature: tx.Signature,
  97. Timestamp: tx.Timestamp,
  98. BatchNum: tx.BatchNum,
  99. RqFromIdx: tx.RqFromIdx,
  100. RqToIdx: tx.RqToIdx,
  101. RqToEthAddr: tx.RqToEthAddr,
  102. RqToBJJ: tx.RqToBJJ,
  103. RqTokenID: tx.RqTokenID,
  104. RqAmount: tx.RqAmount,
  105. RqFee: tx.RqFee,
  106. RqNonce: tx.RqNonce,
  107. Type: tx.Type,
  108. })
  109. }
  110. // selectPoolTx select part of queries to get common.PoolL2Tx
  111. const selectPoolTx = `SELECT tx_pool.*, token.usd * tx_pool.amount_f AS value_usd,
  112. fee_percentage(tx_pool.fee::NUMERIC) * token.usd * tx_pool.amount_f AS fee_usd, token.usd_update
  113. FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
  114. // GetTx return the specified Tx
  115. func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
  116. tx := new(common.PoolL2Tx)
  117. return tx, meddler.QueryRow(
  118. l2db.db, tx,
  119. selectPoolTx+"WHERE tx_id = $1;",
  120. txID,
  121. )
  122. }
  123. // GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee
  124. func (l2db *L2DB) GetPendingTxs() ([]*common.PoolL2Tx, error) {
  125. var txs []*common.PoolL2Tx
  126. err := meddler.QueryAll(
  127. l2db.db, &txs,
  128. selectPoolTx+"WHERE state = $1 AND token.usd IS NOT NULL",
  129. common.PoolL2TxStatePending,
  130. )
  131. return txs, err
  132. }
  133. // StartForging updates the state of the transactions that will begin the forging process.
  134. // The state of the txs referenced by txIDs will be changed from Pending -> Forging
  135. func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  136. query, args, err := sqlx.In(
  137. `UPDATE tx_pool
  138. SET state = ?, batch_num = ?
  139. WHERE state = ? AND tx_id IN (?);`,
  140. common.PoolL2TxStateForging,
  141. batchNum,
  142. common.PoolL2TxStatePending,
  143. txIDs,
  144. )
  145. if err != nil {
  146. return err
  147. }
  148. query = l2db.db.Rebind(query)
  149. _, err = l2db.db.Exec(query, args...)
  150. return err
  151. }
  152. // DoneForging updates the state of the transactions that have been forged
  153. // so the state of the txs referenced by txIDs will be changed from Forging -> Forged
  154. func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  155. query, args, err := sqlx.In(
  156. `UPDATE tx_pool
  157. SET state = ?, batch_num = ?
  158. WHERE state = ? AND tx_id IN (?);`,
  159. common.PoolL2TxStateForged,
  160. batchNum,
  161. common.PoolL2TxStateForging,
  162. txIDs,
  163. )
  164. if err != nil {
  165. return err
  166. }
  167. query = l2db.db.Rebind(query)
  168. _, err = l2db.db.Exec(query, args...)
  169. return err
  170. }
  171. // InvalidateTxs updates the state of the transactions that are invalid.
  172. // The state of the txs referenced by txIDs will be changed from * -> Invalid
  173. func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
  174. query, args, err := sqlx.In(
  175. `UPDATE tx_pool
  176. SET state = ?, batch_num = ?
  177. WHERE tx_id IN (?);`,
  178. common.PoolL2TxStateInvalid,
  179. batchNum,
  180. txIDs,
  181. )
  182. if err != nil {
  183. return err
  184. }
  185. query = l2db.db.Rebind(query)
  186. _, err = l2db.db.Exec(query, args...)
  187. return err
  188. }
  189. // CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces.
  190. // The state of the affected txs will be changed from Pending -> Invalid
  191. func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) (err error) {
  192. txn, err := l2db.db.Begin()
  193. if err != nil {
  194. return err
  195. }
  196. defer func() {
  197. // Rollback the transaction if there was an error.
  198. if err != nil {
  199. errRollback := txn.Rollback()
  200. if errRollback != nil {
  201. log.Errorw("Rollback", "err", errRollback)
  202. }
  203. }
  204. }()
  205. for i := 0; i < len(updatedAccounts); i++ {
  206. _, err = txn.Exec(
  207. `UPDATE tx_pool
  208. SET state = $1, batch_num = $2
  209. WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`,
  210. common.PoolL2TxStateInvalid,
  211. batchNum,
  212. common.PoolL2TxStatePending,
  213. updatedAccounts[i].Idx,
  214. updatedAccounts[i].Nonce,
  215. )
  216. if err != nil {
  217. return err
  218. }
  219. }
  220. return txn.Commit()
  221. }
  222. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
  223. // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
  224. func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
  225. _, err := l2db.db.Exec(
  226. `UPDATE tx_pool SET batch_num = NULL, state = $1
  227. WHERE (state = $2 OR state = $3) AND batch_num > $4`,
  228. common.PoolL2TxStatePending,
  229. common.PoolL2TxStateForged,
  230. common.PoolL2TxStateInvalid,
  231. lastValidBatch,
  232. )
  233. return err
  234. }
  235. // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
  236. // it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
  237. func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
  238. txn, err := l2db.db.Begin()
  239. if err != nil {
  240. return err
  241. }
  242. defer func() {
  243. // Rollback the transaction if there was an error.
  244. if err != nil {
  245. errRollback := txn.Rollback()
  246. if errRollback != nil {
  247. log.Errorw("Rollback", "err", errRollback)
  248. }
  249. }
  250. }()
  251. // Delete pending txs that have been in the pool after the TTL if maxTxs is reached
  252. now := time.Now().UTC().Unix()
  253. _, err = txn.Exec(
  254. `DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
  255. l2db.maxTxs,
  256. time.Unix(now-int64(l2db.ttl.Seconds()), 0),
  257. )
  258. if err != nil {
  259. return err
  260. }
  261. // Delete txs that have been marked as forged / invalid after the safety period
  262. _, err = txn.Exec(
  263. `DELETE FROM tx_pool
  264. WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
  265. currentBatchNum-l2db.safetyPeriod,
  266. common.PoolL2TxStateForged,
  267. common.PoolL2TxStateInvalid,
  268. )
  269. if err != nil {
  270. return err
  271. }
  272. return txn.Commit()
  273. }