You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

347 lines
11 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package l2db
  2. import (
  3. "fmt"
  4. "math/big"
  5. "time"
  6. ethCommon "github.com/ethereum/go-ethereum/common"
  7. "github.com/hermeznetwork/hermez-node/common"
  8. "github.com/hermeznetwork/hermez-node/db"
  9. "github.com/hermeznetwork/tracerr"
  10. "github.com/jmoiron/sqlx"
  11. //nolint:errcheck // driver for postgres DB
  12. _ "github.com/lib/pq"
  13. "github.com/russross/meddler"
  14. )
  15. // TODO(Edu): Check DB consistency while there's concurrent use from Coordinator/TxSelector & API
  16. // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
  17. // due to them being forged or invalid after a safety period
  18. type L2DB struct {
  19. db *sqlx.DB
  20. safetyPeriod common.BatchNum
  21. ttl time.Duration
  22. maxTxs uint32 // limit of txs that are accepted in the pool
  23. }
  24. // NewL2DB creates a L2DB.
  25. // To create it, it's needed db connection, safety period expressed in batches,
  26. // maxTxs that the DB should have and TTL (time to live) for pending txs.
  27. func NewL2DB(db *sqlx.DB, safetyPeriod common.BatchNum, maxTxs uint32, TTL time.Duration) *L2DB {
  28. return &L2DB{
  29. db: db,
  30. safetyPeriod: safetyPeriod,
  31. ttl: TTL,
  32. maxTxs: maxTxs,
  33. }
  34. }
  35. // DB returns a pointer to the L2DB.db. This method should be used only for
  36. // internal testing purposes.
  37. func (l2db *L2DB) DB() *sqlx.DB {
  38. return l2db.db
  39. }
  40. // AddAccountCreationAuth inserts an account creation authorization into the DB
  41. func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
  42. // return meddler.Insert(l2db.db, "account_creation_auth", auth)
  43. _, err := l2db.db.Exec(
  44. `INSERT INTO account_creation_auth (eth_addr, bjj, signature)
  45. VALUES ($1, $2, $3);`,
  46. auth.EthAddr, auth.BJJ, auth.Signature,
  47. )
  48. return tracerr.Wrap(err)
  49. }
  50. // GetAccountCreationAuth returns an account creation authorization from the DB
  51. func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
  52. auth := new(common.AccountCreationAuth)
  53. return auth, tracerr.Wrap(meddler.QueryRow(
  54. l2db.db, auth,
  55. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  56. addr,
  57. ))
  58. }
  59. // GetAccountCreationAuthAPI returns an account creation authorization from the DB
  60. func (l2db *L2DB) GetAccountCreationAuthAPI(addr ethCommon.Address) (*AccountCreationAuthAPI, error) {
  61. auth := new(AccountCreationAuthAPI)
  62. return auth, tracerr.Wrap(meddler.QueryRow(
  63. l2db.db, auth,
  64. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  65. addr,
  66. ))
  67. }
  68. // AddTx inserts a tx to the pool
  69. func (l2db *L2DB) AddTx(tx *PoolL2TxWrite) error {
  70. row := l2db.db.QueryRow(
  71. "SELECT COUNT(*) FROM tx_pool WHERE state = $1;",
  72. common.PoolL2TxStatePending,
  73. )
  74. var totalTxs uint32
  75. if err := row.Scan(&totalTxs); err != nil {
  76. return tracerr.Wrap(err)
  77. }
  78. if totalTxs >= l2db.maxTxs {
  79. return tracerr.New(
  80. "The pool is at full capacity. More transactions are not accepted currently",
  81. )
  82. }
  83. return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", tx))
  84. }
  85. // AddTxTest inserts a tx into the L2DB. This is useful for test purposes,
  86. // but in production txs will only be inserted through the API
  87. func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
  88. // transform tx from *common.PoolL2Tx to PoolL2TxWrite
  89. insertTx := &PoolL2TxWrite{
  90. TxID: tx.TxID,
  91. FromIdx: tx.FromIdx,
  92. TokenID: tx.TokenID,
  93. Amount: tx.Amount,
  94. Fee: tx.Fee,
  95. Nonce: tx.Nonce,
  96. State: common.PoolL2TxStatePending,
  97. Signature: tx.Signature,
  98. RqAmount: tx.RqAmount,
  99. Type: tx.Type,
  100. }
  101. if tx.ToIdx != 0 {
  102. insertTx.ToIdx = &tx.ToIdx
  103. }
  104. nilAddr := ethCommon.BigToAddress(big.NewInt(0))
  105. if tx.ToEthAddr != nilAddr {
  106. insertTx.ToEthAddr = &tx.ToEthAddr
  107. }
  108. if tx.RqFromIdx != 0 {
  109. insertTx.RqFromIdx = &tx.RqFromIdx
  110. }
  111. if tx.RqToIdx != 0 { // if true, all Rq... fields must be different to nil
  112. insertTx.RqToIdx = &tx.RqToIdx
  113. insertTx.RqTokenID = &tx.RqTokenID
  114. insertTx.RqFee = &tx.RqFee
  115. insertTx.RqNonce = &tx.RqNonce
  116. }
  117. if tx.RqToEthAddr != nilAddr {
  118. insertTx.RqToEthAddr = &tx.RqToEthAddr
  119. }
  120. if tx.ToBJJ != common.EmptyBJJComp {
  121. insertTx.ToBJJ = &tx.ToBJJ
  122. }
  123. if tx.RqToBJJ != common.EmptyBJJComp {
  124. insertTx.RqToBJJ = &tx.RqToBJJ
  125. }
  126. f := new(big.Float).SetInt(tx.Amount)
  127. amountF, _ := f.Float64()
  128. insertTx.AmountFloat = amountF
  129. // insert tx
  130. return tracerr.Wrap(meddler.Insert(l2db.db, "tx_pool", insertTx))
  131. }
  132. // selectPoolTxAPI select part of queries to get PoolL2TxRead
  133. const selectPoolTxAPI = `SELECT tx_pool.tx_id, hez_idx(tx_pool.from_idx, token.symbol) AS from_idx, tx_pool.effective_from_eth_addr,
  134. tx_pool.effective_from_bjj, hez_idx(tx_pool.to_idx, token.symbol) AS to_idx, tx_pool.effective_to_eth_addr,
  135. tx_pool.effective_to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
  136. tx_pool.state, tx_pool.signature, tx_pool.timestamp, tx_pool.batch_num, hez_idx(tx_pool.rq_from_idx, token.symbol) AS rq_from_idx,
  137. hez_idx(tx_pool.rq_to_idx, token.symbol) AS rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount,
  138. tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type,
  139. token.item_id AS token_item_id, token.eth_block_num, token.eth_addr, token.name, token.symbol, token.decimals, token.usd, token.usd_update
  140. FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
  141. // selectPoolTxCommon select part of queries to get common.PoolL2Tx
  142. const selectPoolTxCommon = `SELECT tx_pool.tx_id, from_idx, to_idx, tx_pool.to_eth_addr,
  143. tx_pool.to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
  144. tx_pool.state, tx_pool.signature, tx_pool.timestamp, rq_from_idx,
  145. rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount,
  146. tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type,
  147. fee_percentage(tx_pool.fee::NUMERIC) * token.usd * tx_pool.amount_f AS fee_usd, token.usd_update
  148. FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
  149. // GetTx return the specified Tx in common.PoolL2Tx format
  150. func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
  151. tx := new(common.PoolL2Tx)
  152. return tx, tracerr.Wrap(meddler.QueryRow(
  153. l2db.db, tx,
  154. selectPoolTxCommon+"WHERE tx_id = $1;",
  155. txID,
  156. ))
  157. }
  158. // GetTxAPI return the specified Tx in PoolTxAPI format
  159. func (l2db *L2DB) GetTxAPI(txID common.TxID) (*PoolTxAPI, error) {
  160. tx := new(PoolTxAPI)
  161. return tx, tracerr.Wrap(meddler.QueryRow(
  162. l2db.db, tx,
  163. selectPoolTxAPI+"WHERE tx_id = $1;",
  164. txID,
  165. ))
  166. }
  167. // GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee
  168. func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) {
  169. var txs []*common.PoolL2Tx
  170. err := meddler.QueryAll(
  171. l2db.db, &txs,
  172. selectPoolTxCommon+"WHERE state = $1",
  173. common.PoolL2TxStatePending,
  174. )
  175. return db.SlicePtrsToSlice(txs).([]common.PoolL2Tx), tracerr.Wrap(err)
  176. }
  177. // StartForging updates the state of the transactions that will begin the forging process.
  178. // The state of the txs referenced by txIDs will be changed from Pending -> Forging
  179. func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  180. if len(txIDs) == 0 {
  181. return nil
  182. }
  183. query, args, err := sqlx.In(
  184. `UPDATE tx_pool
  185. SET state = ?, batch_num = ?
  186. WHERE state = ? AND tx_id IN (?);`,
  187. common.PoolL2TxStateForging,
  188. batchNum,
  189. common.PoolL2TxStatePending,
  190. txIDs,
  191. )
  192. if err != nil {
  193. return tracerr.Wrap(err)
  194. }
  195. query = l2db.db.Rebind(query)
  196. _, err = l2db.db.Exec(query, args...)
  197. return tracerr.Wrap(err)
  198. }
  199. // DoneForging updates the state of the transactions that have been forged
  200. // so the state of the txs referenced by txIDs will be changed from Forging -> Forged
  201. func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  202. if len(txIDs) == 0 {
  203. return nil
  204. }
  205. query, args, err := sqlx.In(
  206. `UPDATE tx_pool
  207. SET state = ?, batch_num = ?
  208. WHERE state = ? AND tx_id IN (?);`,
  209. common.PoolL2TxStateForged,
  210. batchNum,
  211. common.PoolL2TxStateForging,
  212. txIDs,
  213. )
  214. if err != nil {
  215. return tracerr.Wrap(err)
  216. }
  217. query = l2db.db.Rebind(query)
  218. _, err = l2db.db.Exec(query, args...)
  219. return tracerr.Wrap(err)
  220. }
  221. // InvalidateTxs updates the state of the transactions that are invalid.
  222. // The state of the txs referenced by txIDs will be changed from * -> Invalid
  223. func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
  224. if len(txIDs) == 0 {
  225. return nil
  226. }
  227. query, args, err := sqlx.In(
  228. `UPDATE tx_pool
  229. SET state = ?, batch_num = ?
  230. WHERE tx_id IN (?);`,
  231. common.PoolL2TxStateInvalid,
  232. batchNum,
  233. txIDs,
  234. )
  235. if err != nil {
  236. return tracerr.Wrap(err)
  237. }
  238. query = l2db.db.Rebind(query)
  239. _, err = l2db.db.Exec(query, args...)
  240. return tracerr.Wrap(err)
  241. }
  242. // GetPendingUniqueFromIdxs returns from all the pending transactions, the set
  243. // of unique FromIdx
  244. func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.Idx, error) {
  245. var idxs []common.Idx
  246. rows, err := l2db.db.Query(`SELECT DISTINCT from_idx FROM tx_pool
  247. WHERE state = $1;`, common.PoolL2TxStatePending)
  248. if err != nil {
  249. return nil, tracerr.Wrap(err)
  250. }
  251. defer db.RowsClose(rows)
  252. var idx common.Idx
  253. for rows.Next() {
  254. err = rows.Scan(&idx)
  255. if err != nil {
  256. return nil, tracerr.Wrap(err)
  257. }
  258. idxs = append(idxs, idx)
  259. }
  260. return idxs, nil
  261. }
  262. var invalidateOldNoncesQuery = fmt.Sprintf(`
  263. UPDATE tx_pool SET
  264. state = '%s',
  265. batch_num = %%d
  266. FROM (VALUES
  267. (NULL::::BIGINT, NULL::::BIGINT),
  268. (:idx, :nonce)
  269. ) as updated_acc (idx, nonce)
  270. WHERE tx_pool.state = '%s' AND
  271. tx_pool.from_idx = updated_acc.idx AND
  272. tx_pool.nonce < updated_acc.nonce;
  273. `, common.PoolL2TxStateInvalid, common.PoolL2TxStatePending)
  274. // InvalidateOldNonces invalidate txs with nonces that are smaller or equal than their
  275. // respective accounts nonces. The state of the affected txs will be changed
  276. // from Pending to Invalid
  277. func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxNonce, batchNum common.BatchNum) (err error) {
  278. if len(updatedAccounts) == 0 {
  279. return nil
  280. }
  281. // Fill the batch_num in the query with Sprintf because we are using a
  282. // named query which works with slices, and doens't handle an extra
  283. // individual argument.
  284. query := fmt.Sprintf(invalidateOldNoncesQuery, batchNum)
  285. if _, err := sqlx.NamedExec(l2db.db, query, updatedAccounts); err != nil {
  286. return tracerr.Wrap(err)
  287. }
  288. return nil
  289. }
  290. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
  291. // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
  292. func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
  293. _, err := l2db.db.Exec(
  294. `UPDATE tx_pool SET batch_num = NULL, state = $1
  295. WHERE (state = $2 OR state = $3) AND batch_num > $4`,
  296. common.PoolL2TxStatePending,
  297. common.PoolL2TxStateForged,
  298. common.PoolL2TxStateInvalid,
  299. lastValidBatch,
  300. )
  301. return tracerr.Wrap(err)
  302. }
  303. // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
  304. // it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded
  305. func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
  306. now := time.Now().UTC().Unix()
  307. _, err = l2db.db.Exec(
  308. `DELETE FROM tx_pool WHERE (
  309. batch_num < $1 AND (state = $2 OR state = $3)
  310. ) OR (
  311. (SELECT count(*) FROM tx_pool WHERE state = $4) > $5
  312. AND timestamp < $6 AND state = $4
  313. );`,
  314. currentBatchNum-l2db.safetyPeriod,
  315. common.PoolL2TxStateForged,
  316. common.PoolL2TxStateInvalid,
  317. common.PoolL2TxStatePending,
  318. l2db.maxTxs,
  319. time.Unix(now-int64(l2db.ttl.Seconds()), 0),
  320. )
  321. return tracerr.Wrap(err)
  322. }