You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

290 lines
8.2 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package l2db
  2. import (
  3. "fmt"
  4. "time"
  5. ethCommon "github.com/ethereum/go-ethereum/common"
  6. "github.com/gobuffalo/packr/v2"
  7. "github.com/hermeznetwork/hermez-node/common"
  8. "github.com/hermeznetwork/hermez-node/db"
  9. "github.com/jmoiron/sqlx"
  10. //nolint:errcheck // driver for postgres DB
  11. _ "github.com/lib/pq"
  12. migrate "github.com/rubenv/sql-migrate"
  13. "github.com/russross/meddler"
  14. )
  15. // TODO(Edu): Check DB consistency while there's concurrent use from Coordinator/TxSelector & API
  16. // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
  17. // due to them being forged or invalid after a safety period
  18. type L2DB struct {
  19. db *sqlx.DB
  20. safetyPeriod common.BatchNum
  21. ttl time.Duration
  22. maxTxs uint32
  23. }
  24. // NewL2DB creates a L2DB.
  25. // To create it, it's needed postgres configuration, safety period expressed in batches,
  26. // maxTxs that the DB should have and TTL (time to live) for pending txs.
  27. func NewL2DB(
  28. port int, host, user, password, dbname string,
  29. safetyPeriod common.BatchNum,
  30. maxTxs uint32,
  31. TTL time.Duration,
  32. ) (*L2DB, error) {
  33. // init meddler
  34. db.InitMeddler()
  35. meddler.Default = meddler.PostgreSQL
  36. // Stablish DB connection
  37. psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
  38. db, err := sqlx.Connect("postgres", psqlconn)
  39. if err != nil {
  40. return nil, err
  41. }
  42. // Run DB migrations
  43. migrations := &migrate.PackrMigrationSource{
  44. Box: packr.New("history-migrations", "./migrations"),
  45. }
  46. if _, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up); err != nil {
  47. return nil, err
  48. }
  49. return &L2DB{
  50. db: db,
  51. safetyPeriod: safetyPeriod,
  52. ttl: TTL,
  53. maxTxs: maxTxs,
  54. }, nil
  55. }
  56. // DB returns a pointer to the L2DB.db. This method should be used only for
  57. // internal testing purposes.
  58. func (l2db *L2DB) DB() *sqlx.DB {
  59. return l2db.db
  60. }
  61. // AddAccountCreationAuth inserts an account creation authorization into the DB
  62. func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
  63. return meddler.Insert(l2db.db, "account_creation_auth", auth)
  64. }
  65. // GetAccountCreationAuth returns an account creation authorization into the DB
  66. func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
  67. auth := new(common.AccountCreationAuth)
  68. return auth, meddler.QueryRow(
  69. l2db.db, auth,
  70. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  71. addr,
  72. )
  73. }
  74. // AddTx inserts a tx into the L2DB
  75. func (l2db *L2DB) AddTx(tx *common.PoolL2Tx) error {
  76. return meddler.Insert(l2db.db, "tx_pool", tx)
  77. }
  78. // GetTx return the specified Tx
  79. func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
  80. tx := new(common.PoolL2Tx)
  81. return tx, meddler.QueryRow(
  82. l2db.db, tx,
  83. "SELECT * FROM tx_pool WHERE tx_id = $1;",
  84. txID,
  85. )
  86. }
  87. // GetPendingTxs return all the pending txs of the L2DB
  88. func (l2db *L2DB) GetPendingTxs() ([]*common.PoolL2Tx, error) {
  89. var txs []*common.PoolL2Tx
  90. err := meddler.QueryAll(
  91. l2db.db, &txs,
  92. "SELECT * FROM tx_pool WHERE state = $1",
  93. common.PoolL2TxStatePending,
  94. )
  95. return txs, err
  96. }
  97. // StartForging updates the state of the transactions that will begin the forging process.
  98. // The state of the txs referenced by txIDs will be changed from Pending -> Forging
  99. func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  100. query, args, err := sqlx.In(
  101. `UPDATE tx_pool
  102. SET state = ?, batch_num = ?
  103. WHERE state = ? AND tx_id IN (?);`,
  104. common.PoolL2TxStateForging,
  105. batchNum,
  106. common.PoolL2TxStatePending,
  107. txIDs,
  108. )
  109. if err != nil {
  110. return err
  111. }
  112. query = l2db.db.Rebind(query)
  113. _, err = l2db.db.Exec(query, args...)
  114. return err
  115. }
  116. // DoneForging updates the state of the transactions that have been forged
  117. // so the state of the txs referenced by txIDs will be changed from Forging -> Forged
  118. func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  119. query, args, err := sqlx.In(
  120. `UPDATE tx_pool
  121. SET state = ?, batch_num = ?
  122. WHERE state = ? AND tx_id IN (?);`,
  123. common.PoolL2TxStateForged,
  124. batchNum,
  125. common.PoolL2TxStateForging,
  126. txIDs,
  127. )
  128. if err != nil {
  129. return err
  130. }
  131. query = l2db.db.Rebind(query)
  132. _, err = l2db.db.Exec(query, args...)
  133. return err
  134. }
  135. // InvalidateTxs updates the state of the transactions that are invalid.
  136. // The state of the txs referenced by txIDs will be changed from * -> Invalid
  137. func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
  138. query, args, err := sqlx.In(
  139. `UPDATE tx_pool
  140. SET state = ?, batch_num = ?
  141. WHERE tx_id IN (?);`,
  142. common.PoolL2TxStateInvalid,
  143. batchNum,
  144. txIDs,
  145. )
  146. if err != nil {
  147. return err
  148. }
  149. query = l2db.db.Rebind(query)
  150. _, err = l2db.db.Exec(query, args...)
  151. return err
  152. }
  153. // CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces.
  154. // The state of the affected txs will be changed from Pending -> Invalid
  155. func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) error {
  156. txn, err := l2db.db.Begin()
  157. if err != nil {
  158. return err
  159. }
  160. defer func() {
  161. // Rollback the transaction if there was an error.
  162. if err != nil {
  163. err = txn.Rollback()
  164. }
  165. }()
  166. for i := 0; i < len(updatedAccounts); i++ {
  167. _, err = txn.Exec(
  168. `UPDATE tx_pool
  169. SET state = $1, batch_num = $2
  170. WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`,
  171. common.PoolL2TxStateInvalid,
  172. batchNum,
  173. common.PoolL2TxStatePending,
  174. updatedAccounts[i].Idx,
  175. updatedAccounts[i].Nonce,
  176. )
  177. if err != nil {
  178. return err
  179. }
  180. }
  181. return txn.Commit()
  182. }
  183. // UpdateTxValue updates the absolute fee and value of txs given a token list that include their price in USD
  184. func (l2db *L2DB) UpdateTxValue(tokens []common.Token) error {
  185. // WARNING: this is very slow and should be optimized
  186. txn, err := l2db.db.Begin()
  187. if err != nil {
  188. return err
  189. }
  190. defer func() {
  191. // Rollback the transaction if there was an error.
  192. if err != nil {
  193. err = txn.Rollback()
  194. }
  195. }()
  196. now := time.Now()
  197. for i := 0; i < len(tokens); i++ {
  198. _, err = txn.Exec(
  199. `UPDATE tx_pool
  200. SET usd_update = $1, value_usd = amount_f * $2, fee_usd = $2 * amount_f * CASE
  201. WHEN fee = 0 THEN 0
  202. WHEN fee >= 1 AND fee <= 32 THEN POWER(10,-24+(fee::float/2))
  203. WHEN fee >= 33 AND fee <= 223 THEN POWER(10,-8+(0.041666666666667*(fee::float-32)))
  204. WHEN fee >= 224 AND fee <= 255 THEN POWER(10,fee-224) END
  205. WHERE token_id = $3;`,
  206. now,
  207. tokens[i].USD,
  208. tokens[i].TokenID,
  209. )
  210. if err != nil {
  211. return err
  212. }
  213. }
  214. return txn.Commit()
  215. }
  216. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
  217. // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
  218. func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
  219. _, err := l2db.db.Exec(
  220. `UPDATE tx_pool SET batch_num = NULL, state = $1
  221. WHERE (state = $2 OR state = $3) AND batch_num > $4`,
  222. common.PoolL2TxStatePending,
  223. common.PoolL2TxStateForged,
  224. common.PoolL2TxStateInvalid,
  225. lastValidBatch,
  226. )
  227. return err
  228. }
  229. // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
  230. // it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
  231. func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) error {
  232. txn, err := l2db.db.Begin()
  233. if err != nil {
  234. return err
  235. }
  236. defer func() {
  237. // Rollback the transaction if there was an error.
  238. if err != nil {
  239. err = txn.Rollback()
  240. }
  241. }()
  242. // Delete pending txs that have been in the pool after the TTL if maxTxs is reached
  243. now := time.Now().UTC().Unix()
  244. _, err = txn.Exec(
  245. `DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
  246. l2db.maxTxs,
  247. time.Unix(now-int64(l2db.ttl.Seconds()), 0),
  248. )
  249. if err != nil {
  250. return err
  251. }
  252. // Delete txs that have been marked as forged / invalid after the safety period
  253. _, err = txn.Exec(
  254. `DELETE FROM tx_pool
  255. WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
  256. currentBatchNum-l2db.safetyPeriod,
  257. common.PoolL2TxStateForged,
  258. common.PoolL2TxStateInvalid,
  259. )
  260. if err != nil {
  261. return err
  262. }
  263. return txn.Commit()
  264. }
  265. // Close frees the resources used by the L2DB
  266. func (l2db *L2DB) Close() error {
  267. return l2db.db.Close()
  268. }