You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

293 lines
8.4 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package l2db
  2. import (
  3. "fmt"
  4. "time"
  5. ethCommon "github.com/ethereum/go-ethereum/common"
  6. "github.com/gobuffalo/packr/v2"
  7. "github.com/hermeznetwork/hermez-node/common"
  8. "github.com/hermeznetwork/hermez-node/db"
  9. "github.com/hermeznetwork/hermez-node/log"
  10. "github.com/jmoiron/sqlx"
  11. //nolint:errcheck // driver for postgres DB
  12. _ "github.com/lib/pq"
  13. migrate "github.com/rubenv/sql-migrate"
  14. "github.com/russross/meddler"
  15. )
  16. // TODO(Edu): Check DB consistency while there's concurrent use from Coordinator/TxSelector & API
  17. // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
  18. // due to them being forged or invalid after a safety period
  19. type L2DB struct {
  20. db *sqlx.DB
  21. safetyPeriod common.BatchNum
  22. ttl time.Duration
  23. maxTxs uint32
  24. }
  25. // NewL2DB creates a L2DB.
  26. // To create it, it's needed postgres configuration, safety period expressed in batches,
  27. // maxTxs that the DB should have and TTL (time to live) for pending txs.
  28. func NewL2DB(
  29. port int, host, user, password, dbname string,
  30. safetyPeriod common.BatchNum,
  31. maxTxs uint32,
  32. TTL time.Duration,
  33. ) (*L2DB, error) {
  34. // init meddler
  35. db.InitMeddler()
  36. meddler.Default = meddler.PostgreSQL
  37. // Stablish DB connection
  38. psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
  39. db, err := sqlx.Connect("postgres", psqlconn)
  40. if err != nil {
  41. return nil, err
  42. }
  43. // Run DB migrations
  44. migrations := &migrate.PackrMigrationSource{
  45. Box: packr.New("l2db-migrations", "./migrations"),
  46. }
  47. nMigrations, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up)
  48. if err != nil {
  49. return nil, err
  50. }
  51. log.Debug("L2DB applied ", nMigrations, " migrations for ", dbname, " database")
  52. return &L2DB{
  53. db: db,
  54. safetyPeriod: safetyPeriod,
  55. ttl: TTL,
  56. maxTxs: maxTxs,
  57. }, nil
  58. }
  59. // DB returns a pointer to the L2DB.db. This method should be used only for
  60. // internal testing purposes.
  61. func (l2db *L2DB) DB() *sqlx.DB {
  62. return l2db.db
  63. }
  64. // AddAccountCreationAuth inserts an account creation authorization into the DB
  65. func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
  66. return meddler.Insert(l2db.db, "account_creation_auth", auth)
  67. }
  68. // GetAccountCreationAuth returns an account creation authorization into the DB
  69. func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
  70. auth := new(common.AccountCreationAuth)
  71. return auth, meddler.QueryRow(
  72. l2db.db, auth,
  73. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  74. addr,
  75. )
  76. }
  77. // AddTx inserts a tx into the L2DB
  78. func (l2db *L2DB) AddTx(tx *common.PoolL2Tx) error {
  79. return meddler.Insert(l2db.db, "tx_pool", tx)
  80. }
  81. // GetTx return the specified Tx
  82. func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
  83. tx := new(common.PoolL2Tx)
  84. return tx, meddler.QueryRow(
  85. l2db.db, tx,
  86. "SELECT * FROM tx_pool WHERE tx_id = $1;",
  87. txID,
  88. )
  89. }
  90. // GetPendingTxs return all the pending txs of the L2DB
  91. func (l2db *L2DB) GetPendingTxs() ([]*common.PoolL2Tx, error) {
  92. var txs []*common.PoolL2Tx
  93. err := meddler.QueryAll(
  94. l2db.db, &txs,
  95. "SELECT * FROM tx_pool WHERE state = $1",
  96. common.PoolL2TxStatePending,
  97. )
  98. return txs, err
  99. }
  100. // StartForging updates the state of the transactions that will begin the forging process.
  101. // The state of the txs referenced by txIDs will be changed from Pending -> Forging
  102. func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  103. query, args, err := sqlx.In(
  104. `UPDATE tx_pool
  105. SET state = ?, batch_num = ?
  106. WHERE state = ? AND tx_id IN (?);`,
  107. common.PoolL2TxStateForging,
  108. batchNum,
  109. common.PoolL2TxStatePending,
  110. txIDs,
  111. )
  112. if err != nil {
  113. return err
  114. }
  115. query = l2db.db.Rebind(query)
  116. _, err = l2db.db.Exec(query, args...)
  117. return err
  118. }
  119. // DoneForging updates the state of the transactions that have been forged
  120. // so the state of the txs referenced by txIDs will be changed from Forging -> Forged
  121. func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  122. query, args, err := sqlx.In(
  123. `UPDATE tx_pool
  124. SET state = ?, batch_num = ?
  125. WHERE state = ? AND tx_id IN (?);`,
  126. common.PoolL2TxStateForged,
  127. batchNum,
  128. common.PoolL2TxStateForging,
  129. txIDs,
  130. )
  131. if err != nil {
  132. return err
  133. }
  134. query = l2db.db.Rebind(query)
  135. _, err = l2db.db.Exec(query, args...)
  136. return err
  137. }
  138. // InvalidateTxs updates the state of the transactions that are invalid.
  139. // The state of the txs referenced by txIDs will be changed from * -> Invalid
  140. func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
  141. query, args, err := sqlx.In(
  142. `UPDATE tx_pool
  143. SET state = ?, batch_num = ?
  144. WHERE tx_id IN (?);`,
  145. common.PoolL2TxStateInvalid,
  146. batchNum,
  147. txIDs,
  148. )
  149. if err != nil {
  150. return err
  151. }
  152. query = l2db.db.Rebind(query)
  153. _, err = l2db.db.Exec(query, args...)
  154. return err
  155. }
  156. // CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces.
  157. // The state of the affected txs will be changed from Pending -> Invalid
  158. func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) error {
  159. txn, err := l2db.db.Begin()
  160. if err != nil {
  161. return err
  162. }
  163. defer func() {
  164. // Rollback the transaction if there was an error.
  165. if err != nil {
  166. err = txn.Rollback()
  167. }
  168. }()
  169. for i := 0; i < len(updatedAccounts); i++ {
  170. _, err = txn.Exec(
  171. `UPDATE tx_pool
  172. SET state = $1, batch_num = $2
  173. WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`,
  174. common.PoolL2TxStateInvalid,
  175. batchNum,
  176. common.PoolL2TxStatePending,
  177. updatedAccounts[i].Idx,
  178. updatedAccounts[i].Nonce,
  179. )
  180. if err != nil {
  181. return err
  182. }
  183. }
  184. return txn.Commit()
  185. }
  186. // UpdateTxValue updates the absolute fee and value of txs given a token list that include their price in USD
  187. func (l2db *L2DB) UpdateTxValue(tokens []common.Token) error {
  188. // WARNING: this is very slow and should be optimized
  189. txn, err := l2db.db.Begin()
  190. if err != nil {
  191. return err
  192. }
  193. defer func() {
  194. // Rollback the transaction if there was an error.
  195. if err != nil {
  196. err = txn.Rollback()
  197. }
  198. }()
  199. now := time.Now()
  200. for i := 0; i < len(tokens); i++ {
  201. _, err = txn.Exec(
  202. `UPDATE tx_pool
  203. SET usd_update = $1, value_usd = amount_f * $2, fee_usd = $2 * amount_f * CASE
  204. WHEN fee = 0 THEN 0
  205. WHEN fee >= 1 AND fee <= 32 THEN POWER(10,-24+(fee::float/2))
  206. WHEN fee >= 33 AND fee <= 223 THEN POWER(10,-8+(0.041666666666667*(fee::float-32)))
  207. WHEN fee >= 224 AND fee <= 255 THEN POWER(10,fee-224) END
  208. WHERE token_id = $3;`,
  209. now,
  210. tokens[i].USD,
  211. tokens[i].TokenID,
  212. )
  213. if err != nil {
  214. return err
  215. }
  216. }
  217. return txn.Commit()
  218. }
  219. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
  220. // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
  221. func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
  222. _, err := l2db.db.Exec(
  223. `UPDATE tx_pool SET batch_num = NULL, state = $1
  224. WHERE (state = $2 OR state = $3) AND batch_num > $4`,
  225. common.PoolL2TxStatePending,
  226. common.PoolL2TxStateForged,
  227. common.PoolL2TxStateInvalid,
  228. lastValidBatch,
  229. )
  230. return err
  231. }
  232. // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
  233. // it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
  234. func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) error {
  235. txn, err := l2db.db.Begin()
  236. if err != nil {
  237. return err
  238. }
  239. defer func() {
  240. // Rollback the transaction if there was an error.
  241. if err != nil {
  242. err = txn.Rollback()
  243. }
  244. }()
  245. // Delete pending txs that have been in the pool after the TTL if maxTxs is reached
  246. now := time.Now().UTC().Unix()
  247. _, err = txn.Exec(
  248. `DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
  249. l2db.maxTxs,
  250. time.Unix(now-int64(l2db.ttl.Seconds()), 0),
  251. )
  252. if err != nil {
  253. return err
  254. }
  255. // Delete txs that have been marked as forged / invalid after the safety period
  256. _, err = txn.Exec(
  257. `DELETE FROM tx_pool
  258. WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
  259. currentBatchNum-l2db.safetyPeriod,
  260. common.PoolL2TxStateForged,
  261. common.PoolL2TxStateInvalid,
  262. )
  263. if err != nil {
  264. return err
  265. }
  266. return txn.Commit()
  267. }
  268. // Close frees the resources used by the L2DB
  269. func (l2db *L2DB) Close() error {
  270. return l2db.db.Close()
  271. }