You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

288 lines
8.1 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package l2db
  2. import (
  3. "fmt"
  4. "time"
  5. ethCommon "github.com/ethereum/go-ethereum/common"
  6. "github.com/gobuffalo/packr/v2"
  7. "github.com/hermeznetwork/hermez-node/common"
  8. "github.com/hermeznetwork/hermez-node/db"
  9. "github.com/jmoiron/sqlx"
  10. //nolint:errcheck // driver for postgres DB
  11. _ "github.com/lib/pq"
  12. migrate "github.com/rubenv/sql-migrate"
  13. "github.com/russross/meddler"
  14. )
  15. // L2DB stores L2 txs and authorization registers received by the coordinator and keeps them until they are no longer relevant
  16. // due to them being forged or invalid after a safety period
  17. type L2DB struct {
  18. db *sqlx.DB
  19. safetyPeriod common.BatchNum
  20. ttl time.Duration
  21. maxTxs uint32
  22. }
  23. // NewL2DB creates a L2DB.
  24. // To create it, it's needed postgres configuration, safety period expressed in batches,
  25. // maxTxs that the DB should have and TTL (time to live) for pending txs.
  26. func NewL2DB(
  27. port int, host, user, password, dbname string,
  28. safetyPeriod common.BatchNum,
  29. maxTxs uint32,
  30. TTL time.Duration,
  31. ) (*L2DB, error) {
  32. // init meddler
  33. db.InitMeddler()
  34. meddler.Default = meddler.PostgreSQL
  35. // Stablish DB connection
  36. psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
  37. db, err := sqlx.Connect("postgres", psqlconn)
  38. if err != nil {
  39. return nil, err
  40. }
  41. // Run DB migrations
  42. migrations := &migrate.PackrMigrationSource{
  43. Box: packr.New("history-migrations", "./migrations"),
  44. }
  45. if _, err := migrate.Exec(db.DB, "postgres", migrations, migrate.Up); err != nil {
  46. return nil, err
  47. }
  48. return &L2DB{
  49. db: db,
  50. safetyPeriod: safetyPeriod,
  51. ttl: TTL,
  52. maxTxs: maxTxs,
  53. }, nil
  54. }
  55. // DB returns a pointer to the L2DB.db. This method should be used only for
  56. // internal testing purposes.
  57. func (l2db *L2DB) DB() *sqlx.DB {
  58. return l2db.db
  59. }
  60. // AddAccountCreationAuth inserts an account creation authorization into the DB
  61. func (l2db *L2DB) AddAccountCreationAuth(auth *common.AccountCreationAuth) error {
  62. return meddler.Insert(l2db.db, "account_creation_auth", auth)
  63. }
  64. // GetAccountCreationAuth returns an account creation authorization into the DB
  65. func (l2db *L2DB) GetAccountCreationAuth(addr ethCommon.Address) (*common.AccountCreationAuth, error) {
  66. auth := new(common.AccountCreationAuth)
  67. return auth, meddler.QueryRow(
  68. l2db.db, auth,
  69. "SELECT * FROM account_creation_auth WHERE eth_addr = $1;",
  70. addr,
  71. )
  72. }
  73. // AddTx inserts a tx into the L2DB
  74. func (l2db *L2DB) AddTx(tx *common.PoolL2Tx) error {
  75. return meddler.Insert(l2db.db, "tx_pool", tx)
  76. }
  77. // GetTx return the specified Tx
  78. func (l2db *L2DB) GetTx(txID common.TxID) (*common.PoolL2Tx, error) {
  79. tx := new(common.PoolL2Tx)
  80. return tx, meddler.QueryRow(
  81. l2db.db, tx,
  82. "SELECT * FROM tx_pool WHERE tx_id = $1;",
  83. txID,
  84. )
  85. }
  86. // GetPendingTxs return all the pending txs of the L2DB
  87. func (l2db *L2DB) GetPendingTxs() ([]*common.PoolL2Tx, error) {
  88. var txs []*common.PoolL2Tx
  89. err := meddler.QueryAll(
  90. l2db.db, &txs,
  91. "SELECT * FROM tx_pool WHERE state = $1",
  92. common.PoolL2TxStatePending,
  93. )
  94. return txs, err
  95. }
  96. // StartForging updates the state of the transactions that will begin the forging process.
  97. // The state of the txs referenced by txIDs will be changed from Pending -> Forging
  98. func (l2db *L2DB) StartForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  99. query, args, err := sqlx.In(
  100. `UPDATE tx_pool
  101. SET state = ?, batch_num = ?
  102. WHERE state = ? AND tx_id IN (?);`,
  103. common.PoolL2TxStateForging,
  104. batchNum,
  105. common.PoolL2TxStatePending,
  106. txIDs,
  107. )
  108. if err != nil {
  109. return err
  110. }
  111. query = l2db.db.Rebind(query)
  112. _, err = l2db.db.Exec(query, args...)
  113. return err
  114. }
  115. // DoneForging updates the state of the transactions that have been forged
  116. // so the state of the txs referenced by txIDs will be changed from Forging -> Forged
  117. func (l2db *L2DB) DoneForging(txIDs []common.TxID, batchNum common.BatchNum) error {
  118. query, args, err := sqlx.In(
  119. `UPDATE tx_pool
  120. SET state = ?, batch_num = ?
  121. WHERE state = ? AND tx_id IN (?);`,
  122. common.PoolL2TxStateForged,
  123. batchNum,
  124. common.PoolL2TxStateForging,
  125. txIDs,
  126. )
  127. if err != nil {
  128. return err
  129. }
  130. query = l2db.db.Rebind(query)
  131. _, err = l2db.db.Exec(query, args...)
  132. return err
  133. }
  134. // InvalidateTxs updates the state of the transactions that are invalid.
  135. // The state of the txs referenced by txIDs will be changed from * -> Invalid
  136. func (l2db *L2DB) InvalidateTxs(txIDs []common.TxID, batchNum common.BatchNum) error {
  137. query, args, err := sqlx.In(
  138. `UPDATE tx_pool
  139. SET state = ?, batch_num = ?
  140. WHERE tx_id IN (?);`,
  141. common.PoolL2TxStateInvalid,
  142. batchNum,
  143. txIDs,
  144. )
  145. if err != nil {
  146. return err
  147. }
  148. query = l2db.db.Rebind(query)
  149. _, err = l2db.db.Exec(query, args...)
  150. return err
  151. }
  152. // CheckNonces invalidate txs with nonces that are smaller or equal than their respective accounts nonces.
  153. // The state of the affected txs will be changed from Pending -> Invalid
  154. func (l2db *L2DB) CheckNonces(updatedAccounts []common.Account, batchNum common.BatchNum) error {
  155. txn, err := l2db.db.Begin()
  156. if err != nil {
  157. return err
  158. }
  159. defer func() {
  160. // Rollback the transaction if there was an error.
  161. if err != nil {
  162. err = txn.Rollback()
  163. }
  164. }()
  165. for i := 0; i < len(updatedAccounts); i++ {
  166. _, err = txn.Exec(
  167. `UPDATE tx_pool
  168. SET state = $1, batch_num = $2
  169. WHERE state = $3 AND from_idx = $4 AND nonce <= $5;`,
  170. common.PoolL2TxStateInvalid,
  171. batchNum,
  172. common.PoolL2TxStatePending,
  173. updatedAccounts[i].Idx,
  174. updatedAccounts[i].Nonce,
  175. )
  176. if err != nil {
  177. return err
  178. }
  179. }
  180. return txn.Commit()
  181. }
  182. // UpdateTxValue updates the absolute fee and value of txs given a token list that include their price in USD
  183. func (l2db *L2DB) UpdateTxValue(tokens []common.Token) error {
  184. // WARNING: this is very slow and should be optimized
  185. txn, err := l2db.db.Begin()
  186. if err != nil {
  187. return err
  188. }
  189. defer func() {
  190. // Rollback the transaction if there was an error.
  191. if err != nil {
  192. err = txn.Rollback()
  193. }
  194. }()
  195. now := time.Now()
  196. for i := 0; i < len(tokens); i++ {
  197. _, err = txn.Exec(
  198. `UPDATE tx_pool
  199. SET usd_update = $1, value_usd = amount_f * $2, fee_usd = $2 * amount_f * CASE
  200. WHEN fee = 0 THEN 0
  201. WHEN fee >= 1 AND fee <= 32 THEN POWER(10,-24+(fee::float/2))
  202. WHEN fee >= 33 AND fee <= 223 THEN POWER(10,-8+(0.041666666666667*(fee::float-32)))
  203. WHEN fee >= 224 AND fee <= 255 THEN POWER(10,fee-224) END
  204. WHERE token_id = $3;`,
  205. now,
  206. tokens[i].USD,
  207. tokens[i].TokenID,
  208. )
  209. if err != nil {
  210. return err
  211. }
  212. }
  213. return txn.Commit()
  214. }
  215. // Reorg updates the state of txs that were updated in a batch that has been discarted due to a blockchain reorg.
  216. // The state of the affected txs can change form Forged -> Pending or from Invalid -> Pending
  217. func (l2db *L2DB) Reorg(lastValidBatch common.BatchNum) error {
  218. _, err := l2db.db.Exec(
  219. `UPDATE tx_pool SET batch_num = NULL, state = $1
  220. WHERE (state = $2 OR state = $3) AND batch_num > $4`,
  221. common.PoolL2TxStatePending,
  222. common.PoolL2TxStateForged,
  223. common.PoolL2TxStateInvalid,
  224. lastValidBatch,
  225. )
  226. return err
  227. }
  228. // Purge deletes transactions that have been forged or marked as invalid for longer than the safety period
  229. // it also deletes txs that has been in the L2DB for longer than the ttl if maxTxs has been exceeded
  230. func (l2db *L2DB) Purge(currentBatchNum common.BatchNum) error {
  231. txn, err := l2db.db.Begin()
  232. if err != nil {
  233. return err
  234. }
  235. defer func() {
  236. // Rollback the transaction if there was an error.
  237. if err != nil {
  238. err = txn.Rollback()
  239. }
  240. }()
  241. // Delete pending txs that have been in the pool after the TTL if maxTxs is reached
  242. now := time.Now().UTC().Unix()
  243. _, err = txn.Exec(
  244. `DELETE FROM tx_pool WHERE (SELECT count(*) FROM tx_pool) > $1 AND timestamp < $2`,
  245. l2db.maxTxs,
  246. time.Unix(now-int64(l2db.ttl.Seconds()), 0),
  247. )
  248. if err != nil {
  249. return err
  250. }
  251. // Delete txs that have been marked as forged / invalid after the safety period
  252. _, err = txn.Exec(
  253. `DELETE FROM tx_pool
  254. WHERE batch_num < $1 AND (state = $2 OR state = $3)`,
  255. currentBatchNum-l2db.safetyPeriod,
  256. common.PoolL2TxStateForged,
  257. common.PoolL2TxStateInvalid,
  258. )
  259. if err != nil {
  260. return err
  261. }
  262. return txn.Commit()
  263. }
  264. // Close frees the resources used by the L2DB
  265. func (l2db *L2DB) Close() error {
  266. return l2db.db.Close()
  267. }