You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

397 lines
13 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. /*
  2. Package l2db is responsible for storing and retrieving the data received by the coordinator through the api.
  3. Note that this data will be different for each coordinator in the network, as this represents the L2 information.
  4. The data managed by this package is fundamentally PoolL2Tx and AccountCreationAuth. All this data come from
  5. the API sent by clients and is used by the txselector to decide which transactions are selected to forge a batch.
  6. Some of the database tooling used in this package such as meddler and migration tools is explained in the db package.
  7. This package is spitted in different files following these ideas:
  8. - l2db.go: constructor and functions used by packages other than the api.
  9. - apiqueries.go: functions used by the API, the queries implemented in this functions use a semaphore
  10. to restrict the maximum concurrent connections to the database.
  11. - views.go: structs used to retrieve/store data from/to the database. When possible, the common structs are used, however
  12. most of the time there is no 1:1 relation between the struct fields and the tables of the schema, especially when joining tables.
  13. In some cases, some of the structs defined in this file also include custom Marshallers to easily match the expected api formats.
  14. */
  15. package l2db
  16. import (
  17. "fmt"
  18. "math/big"
  19. "time"
  20. ethCommon "github.com/ethereum/go-ethereum/common"
  21. "github.com/hermeznetwork/hermez-node/common"
  22. "github.com/hermeznetwork/hermez-node/db"
  23. "github.com/hermeznetwork/tracerr"
  24. "github.com/jmoiron/sqlx"
  25. //nolint:errcheck // driver for postgres DB
  26. _ "github.com/lib/pq"
  27. "github.com/russross/meddler"
  28. )
  29. // TODO(Edu): Check DB consistency while there's concurrent use from Coordinator/TxSelector & API
  30. // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
  31. // due to them being forged or invalid after a safety period
  32. type L2DB struct {
  33. dbRead *sqlx.DB
  34. dbWrite *sqlx.DB
  35. safetyPeriod common.BatchNum
  36. ttl time.Duration
  37. maxTxs uint32 // limit of txs that are accepted in the pool
  38. minFeeUSD float64
  39. maxFeeUSD float64
  40. apiConnCon *db.APIConnectionController
  41. }
  42. // NewL2DB creates a L2DB.
  43. // To create it, it's needed db connection, safety period expressed in batches,
  44. // maxTxs that the DB should have and TTL (time to live) for pending txs.
  45. func NewL2DB(
  46. dbRead, dbWrite *sqlx.DB,
  47. safetyPeriod common.BatchNum,
  48. maxTxs uint32,
  49. minFeeUSD float64,
  50. maxFeeUSD float64,
  51. TTL time.Duration,
  52. apiConnCon *db.APIConnectionController,
  53. ) *L2DB {
  54. return &L2DB{
  55. dbRead: dbRead,
  56. dbWrite: dbWrite,
  57. safetyPeriod: safetyPeriod,
  58. ttl: TTL,
  59. maxTxs: maxTxs,
  60. minFeeUSD: minFeeUSD,
  61. maxFeeUSD: maxFeeUSD,
  62. apiConnCon: apiConnCon,
  63. }
  64. }
  65. // DB returns a pointer to the L2DB.db. This method should be used only for
  66. // internal testing purposes.
  67. func (l2db *L2DB) DB() *sqlx.DB {
  68. return l2db.dbWrite
  69. }
  70. // MinFeeUSD returns the minimum fee in USD that is required to accept txs into
  71. // the pool
  72. func (l2db *L2DB) MinFeeUSD() float64 {
  73. return l2db.minFeeUSD
  74. }
  75. // AddAccountCreationAuth inserts an account creation authorization into the DB
  76. func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
  77. _, err := l2db.dbWrite.Exec(
  78. `INSERT INTO account_creation_auth (eth_addr, bjj, signature)
  79. VALUES ($1, $2, $3);`,
  80. auth.EthAddr, auth.BJJ, auth.Signature,
  81. )
  82. return tracerr.Wrap(err)
  83. }
  84. // AddManyAccountCreationAuth inserts a batch of accounts creation authorization
  85. // if not exist into the DB
  86. func (l2db *L2DB) AddManyAccountCreationAuth(auths []common.AccountCreationAuth) error {
  87. _, err := sqlx.NamedExec(l2db.dbWrite,
  88. `INSERT INTO account_creation_auth (eth_addr, bjj, signature)
  89. VALUES (:ethaddr, :bjj, :signature)
  90. ON CONFLICT (eth_addr) DO NOTHING`, auths)
  91. return tracerr.Wrap(err)
  92. }
  93. // GetAccountCreationAuth returns an account creation authorization from the DB
  94. func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
  95. auth := new(common.AccountCreationAuth)
  96. return auth, tracerr.Wrap(meddler.QueryRow(
  97. l2db.dbRead, auth,
  98. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  99. addr,
  100. ))
  101. }
  102. // UpdateTxsInfo updates the parameter Info of the pool transactions
  103. func (l2db *L2DB) UpdateTxsInfo(txs []common.PoolL2Tx) error {
  104. if len(txs) == 0 {
  105. return nil
  106. }
  107. type txUpdate struct {
  108. ID common.TxID `db:"id"`
  109. Info string `db:"info"`
  110. }
  111. txUpdates := make([]txUpdate, len(txs))
  112. for i := range txs {
  113. txUpdates[i] = txUpdate{ID: txs[i].TxID, Info: txs[i].Info}
  114. }
  115. const query string = `
  116. UPDATE tx_pool SET
  117. info = tx_update.info
  118. FROM (VALUES
  119. (NULL::::BYTEA, NULL::::VARCHAR),
  120. (:id, :info)
  121. ) as tx_update (id, info)
  122. WHERE tx_pool.tx_id = tx_update.id;
  123. `
  124. if len(txUpdates) > 0 {
  125. if _, err := sqlx.NamedExec(l2db.dbWrite, query, txUpdates); err != nil {
  126. return tracerr.Wrap(err)
  127. }
  128. }
  129. return nil
  130. }
  131. // NewPoolL2TxWriteFromPoolL2Tx creates a new PoolL2TxWrite from a PoolL2Tx
  132. func NewPoolL2TxWriteFromPoolL2Tx(tx *common.PoolL2Tx) *PoolL2TxWrite {
  133. // transform tx from *common.PoolL2Tx to PoolL2TxWrite
  134. insertTx := &PoolL2TxWrite{
  135. TxID: tx.TxID,
  136. FromIdx: tx.FromIdx,
  137. TokenID: tx.TokenID,
  138. Amount: tx.Amount,
  139. Fee: tx.Fee,
  140. Nonce: tx.Nonce,
  141. State: common.PoolL2TxStatePending,
  142. Signature: tx.Signature,
  143. RqAmount: tx.RqAmount,
  144. Type: tx.Type,
  145. }
  146. if tx.ToIdx != 0 {
  147. insertTx.ToIdx = &tx.ToIdx
  148. }
  149. nilAddr := ethCommon.BigToAddress(big.NewInt(0))
  150. if tx.ToEthAddr != nilAddr {
  151. insertTx.ToEthAddr = &tx.ToEthAddr
  152. }
  153. if tx.RqFromIdx != 0 {
  154. insertTx.RqFromIdx = &tx.RqFromIdx
  155. }
  156. if tx.RqToIdx != 0 { // if true, all Rq... fields must be different to nil
  157. insertTx.RqToIdx = &tx.RqToIdx
  158. insertTx.RqTokenID = &tx.RqTokenID
  159. insertTx.RqFee = &tx.RqFee
  160. insertTx.RqNonce = &tx.RqNonce
  161. }
  162. if tx.RqToEthAddr != nilAddr {
  163. insertTx.RqToEthAddr = &tx.RqToEthAddr
  164. }
  165. if tx.ToBJJ != common.EmptyBJJComp {
  166. insertTx.ToBJJ = &tx.ToBJJ
  167. }
  168. if tx.RqToBJJ != common.EmptyBJJComp {
  169. insertTx.RqToBJJ = &tx.RqToBJJ
  170. }
  171. f := new(big.Float).SetInt(tx.Amount)
  172. amountF, _ := f.Float64()
  173. insertTx.AmountFloat = amountF
  174. return insertTx
  175. }
  176. // AddTxTest inserts a tx into the L2DB. This is useful for test purposes,
  177. // but in production txs will only be inserted through the API
  178. func (l2db *L2DB) AddTxTest(tx *common.PoolL2Tx) error {
  179. insertTx := NewPoolL2TxWriteFromPoolL2Tx(tx)
  180. // insert tx
  181. return tracerr.Wrap(meddler.Insert(l2db.dbWrite, "tx_pool", insertTx))
  182. }
  183. // selectPoolTxCommon select part of queries to get common.PoolL2Tx
  184. const selectPoolTxCommon = `SELECT tx_pool.tx_id, from_idx, to_idx, tx_pool.to_eth_addr,
  185. tx_pool.to_bjj, tx_pool.token_id, tx_pool.amount, tx_pool.fee, tx_pool.nonce,
  186. tx_pool.state, tx_pool.info, tx_pool.signature, tx_pool.timestamp, rq_from_idx,
  187. rq_to_idx, tx_pool.rq_to_eth_addr, tx_pool.rq_to_bjj, tx_pool.rq_token_id, tx_pool.rq_amount,
  188. tx_pool.rq_fee, tx_pool.rq_nonce, tx_pool.tx_type,
  189. (fee_percentage(tx_pool.fee::NUMERIC) * token.usd * tx_pool.amount_f) /
  190. (10.0 ^ token.decimals::NUMERIC) AS fee_usd, token.usd_update
  191. FROM tx_pool INNER JOIN token ON tx_pool.token_id = token.token_id `
  192. // GetTx return the specified Tx in common.PoolL2Tx format
  193. func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
  194. tx := new(common.PoolL2Tx)
  195. return tx, tracerr.Wrap(meddler.QueryRow(
  196. l2db.dbRead, tx,
  197. selectPoolTxCommon+"WHERE tx_id = $1;",
  198. txID,
  199. ))
  200. }
  201. // GetPendingTxs return all the pending txs of the L2DB, that have a non NULL AbsoluteFee
  202. func (l2db *L2DB) GetPendingTxs() ([]common.PoolL2Tx, error) {
  203. var txs []*common.PoolL2Tx
  204. err := meddler.QueryAll(
  205. l2db.dbRead, &txs,
  206. selectPoolTxCommon+"WHERE state = $1 AND NOT external_delete;",
  207. common.PoolL2TxStatePending,
  208. )
  209. return db.SlicePtrsToSlice(txs).([]common.PoolL2Tx), tracerr.Wrap(err)
  210. }
  211. // StartForging updates the state of the transactions that will begin the forging process.
  212. // The state of the txs referenced by txIDs will be changed from Pending -> Forging
  213. func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  214. if len(txIDs) == 0 {
  215. return nil
  216. }
  217. query, args, err := sqlx.In(
  218. `UPDATE tx_pool
  219. SET state = ?, batch_num = ?
  220. WHERE state = ? AND tx_id IN (?);`,
  221. common.PoolL2TxStateForging,
  222. batchNum,
  223. common.PoolL2TxStatePending,
  224. txIDs,
  225. )
  226. if err != nil {
  227. return tracerr.Wrap(err)
  228. }
  229. query = l2db.dbWrite.Rebind(query)
  230. _, err = l2db.dbWrite.Exec(query, args...)
  231. return tracerr.Wrap(err)
  232. }
  233. // DoneForging updates the state of the transactions that have been forged
  234. // so the state of the txs referenced by txIDs will be changed from Forging -> Forged
  235. func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  236. if len(txIDs) == 0 {
  237. return nil
  238. }
  239. query, args, err := sqlx.In(
  240. `UPDATE tx_pool
  241. SET state = ?, batch_num = ?
  242. WHERE state = ? AND tx_id IN (?);`,
  243. common.PoolL2TxStateForged,
  244. batchNum,
  245. common.PoolL2TxStateForging,
  246. txIDs,
  247. )
  248. if err != nil {
  249. return tracerr.Wrap(err)
  250. }
  251. query = l2db.dbWrite.Rebind(query)
  252. _, err = l2db.dbWrite.Exec(query, args...)
  253. return tracerr.Wrap(err)
  254. }
  255. // InvalidateTxs updates the state of the transactions that are invalid.
  256. // The state of the txs referenced by txIDs will be changed from * -> Invalid
  257. func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
  258. if len(txIDs) == 0 {
  259. return nil
  260. }
  261. query, args, err := sqlx.In(
  262. `UPDATE tx_pool
  263. SET state = ?, batch_num = ?
  264. WHERE tx_id IN (?);`,
  265. common.PoolL2TxStateInvalid,
  266. batchNum,
  267. txIDs,
  268. )
  269. if err != nil {
  270. return tracerr.Wrap(err)
  271. }
  272. query = l2db.dbWrite.Rebind(query)
  273. _, err = l2db.dbWrite.Exec(query, args...)
  274. return tracerr.Wrap(err)
  275. }
  276. // GetPendingUniqueFromIdxs returns from all the pending transactions, the set
  277. // of unique FromIdx
  278. func (l2db *L2DB) GetPendingUniqueFromIdxs() ([]common.Idx, error) {
  279. var idxs []common.Idx
  280. rows, err := l2db.dbRead.Query(`SELECT DISTINCT from_idx FROM tx_pool
  281. WHERE state = $1;`, common.PoolL2TxStatePending)
  282. if err != nil {
  283. return nil, tracerr.Wrap(err)
  284. }
  285. defer db.RowsClose(rows)
  286. var idx common.Idx
  287. for rows.Next() {
  288. err = rows.Scan(&idx)
  289. if err != nil {
  290. return nil, tracerr.Wrap(err)
  291. }
  292. idxs = append(idxs, idx)
  293. }
  294. return idxs, nil
  295. }
  296. var invalidateOldNoncesQuery = fmt.Sprintf(`
  297. UPDATE tx_pool SET
  298. state = '%s',
  299. batch_num = %%d
  300. FROM (VALUES
  301. (NULL::::BIGINT, NULL::::BIGINT),
  302. (:idx, :nonce)
  303. ) as updated_acc (idx, nonce)
  304. WHERE tx_pool.state = '%s' AND
  305. tx_pool.from_idx = updated_acc.idx AND
  306. tx_pool.nonce < updated_acc.nonce;
  307. `, common.PoolL2TxStateInvalid, common.PoolL2TxStatePending)
  308. // InvalidateOldNonces invalidate txs with nonces that are smaller or equal than their
  309. // respective accounts nonces. The state of the affected txs will be changed
  310. // from Pending to Invalid
  311. func (l2db *L2DB) InvalidateOldNonces(updatedAccounts []common.IdxNonce, batchNum common.BatchNum) (err error) {
  312. if len(updatedAccounts) == 0 {
  313. return nil
  314. }
  315. // Fill the batch_num in the query with Sprintf because we are using a
  316. // named query which works with slices, and doesn't handle an extra
  317. // individual argument.
  318. query := fmt.Sprintf(invalidateOldNoncesQuery, batchNum)
  319. if _, err := sqlx.NamedExec(l2db.dbWrite, query, updatedAccounts); err != nil {
  320. return tracerr.Wrap(err)
  321. }
  322. return nil
  323. }
  324. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
  325. // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
  326. func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
  327. _, err := l2db.dbWrite.Exec(
  328. `UPDATE tx_pool SET batch_num = NULL, state = $1
  329. WHERE (state = $2 OR state = $3 OR state = $4) AND batch_num > $5`,
  330. common.PoolL2TxStatePending,
  331. common.PoolL2TxStateForging,
  332. common.PoolL2TxStateForged,
  333. common.PoolL2TxStateInvalid,
  334. lastValidBatch,
  335. )
  336. return tracerr.Wrap(err)
  337. }
  338. // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
  339. // it also deletes pending txs that have been in the L2DB for longer than the ttl if maxTxs has been exceeded
  340. func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) (err error) {
  341. now := time.Now().UTC().Unix()
  342. _, err = l2db.dbWrite.Exec(
  343. `DELETE FROM tx_pool WHERE (
  344. batch_num < $1 AND (state = $2 OR state = $3)
  345. ) OR (
  346. (SELECT count(*) FROM tx_pool WHERE state = $4) > $5
  347. AND timestamp < $6 AND state = $4
  348. );`,
  349. currentBatchNum-l2db.safetyPeriod,
  350. common.PoolL2TxStateForged,
  351. common.PoolL2TxStateInvalid,
  352. common.PoolL2TxStatePending,
  353. l2db.maxTxs,
  354. time.Unix(now-int64(l2db.ttl.Seconds()), 0),
  355. )
  356. return tracerr.Wrap(err)
  357. }
  358. // PurgeByExternalDelete deletes all pending transactions marked with true in
  359. // the `external_delete` column. An external process can set this column to
  360. // true to instruct the coordinator to delete the tx when possible.
  361. func (l2db *L2DB) PurgeByExternalDelete() error {
  362. _, err := l2db.dbWrite.Exec(
  363. `DELETE from tx_pool WHERE (external_delete = true AND state = $1);`,
  364. common.PoolL2TxStatePending,
  365. )
  366. return tracerr.Wrap(err)
  367. }