You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

444 lines
11 KiB

4 years ago
  1. package historydb
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. ethCommon "github.com/ethereum/go-ethereum/common"
  7. "github.com/gobuffalo/packr/v2"
  8. "github.com/hermeznetwork/hermez-node/common"
  9. "github.com/hermeznetwork/hermez-node/db"
  10. "github.com/hermeznetwork/hermez-node/log"
  11. "github.com/iden3/go-iden3-crypto/babyjub"
  12. "github.com/jmoiron/sqlx"
  13. //nolint:errcheck // driver for postgres DB
  14. _ "github.com/lib/pq"
  15. migrate "github.com/rubenv/sql-migrate"
  16. "github.com/russross/meddler"
  17. )
  18. // TODO(Edu): Document here how HistoryDB is kept consistent
  19. // HistoryDB persist the historic of the rollup
  20. type HistoryDB struct {
  21. db *sqlx.DB
  22. }
  23. // NewHistoryDB initialize the DB
  24. func NewHistoryDB(port int, host, user, password, dbname string) (*HistoryDB, error) {
  25. // Connect to DB
  26. psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)
  27. hdb, err := sqlx.Connect("postgres", psqlconn)
  28. if err != nil {
  29. return nil, err
  30. }
  31. // Init meddler
  32. db.InitMeddler()
  33. meddler.Default = meddler.PostgreSQL
  34. // Run DB migrations
  35. migrations := &migrate.PackrMigrationSource{
  36. Box: packr.New("history-migrations", "./migrations"),
  37. }
  38. nMigrations, err := migrate.Exec(hdb.DB, "postgres", migrations, migrate.Up)
  39. if err != nil {
  40. return nil, err
  41. }
  42. log.Debug("HistoryDB applied ", nMigrations, " migrations for ", dbname, " database")
  43. return &HistoryDB{hdb}, nil
  44. }
  45. // AddBlock insert a block into the DB
  46. func (hdb *HistoryDB) AddBlock(block *common.Block) error {
  47. return meddler.Insert(hdb.db, "block", block)
  48. }
  49. // AddBlocks inserts blocks into the DB
  50. func (hdb *HistoryDB) AddBlocks(blocks []common.Block) error {
  51. return db.BulkInsert(
  52. hdb.db,
  53. `INSERT INTO block (
  54. eth_block_num,
  55. timestamp,
  56. hash
  57. ) VALUES %s;`,
  58. blocks[:],
  59. )
  60. }
  61. // GetBlock retrieve a block from the DB, given a block number
  62. func (hdb *HistoryDB) GetBlock(blockNum int64) (*common.Block, error) {
  63. block := &common.Block{}
  64. err := meddler.QueryRow(
  65. hdb.db, block,
  66. "SELECT * FROM block WHERE eth_block_num = $1;", blockNum,
  67. )
  68. return block, err
  69. }
  70. // GetBlocks retrieve blocks from the DB, given a range of block numbers defined by from and to
  71. func (hdb *HistoryDB) GetBlocks(from, to int64) ([]*common.Block, error) {
  72. var blocks []*common.Block
  73. err := meddler.QueryAll(
  74. hdb.db, &blocks,
  75. "SELECT * FROM block WHERE $1 <= eth_block_num AND eth_block_num < $2",
  76. from, to,
  77. )
  78. return blocks, err
  79. }
  80. // GetLastBlock retrieve the block with the highest block number from the DB
  81. func (hdb *HistoryDB) GetLastBlock() (*common.Block, error) {
  82. block := &common.Block{}
  83. err := meddler.QueryRow(
  84. hdb.db, block, "SELECT * FROM block ORDER BY eth_block_num DESC LIMIT 1;",
  85. )
  86. return block, err
  87. }
  88. // AddBatches insert Bids into the DB
  89. func (hdb *HistoryDB) AddBatches(batches []common.Batch) error {
  90. return db.BulkInsert(
  91. hdb.db,
  92. `INSERT INTO batch (
  93. batch_num,
  94. eth_block_num,
  95. forger_addr,
  96. fees_collected,
  97. state_root,
  98. num_accounts,
  99. exit_root,
  100. forge_l1_txs_num,
  101. slot_num
  102. ) VALUES %s;`,
  103. batches[:],
  104. )
  105. }
  106. // GetBatches retrieve batches from the DB, given a range of batch numbers defined by from and to
  107. func (hdb *HistoryDB) GetBatches(from, to common.BatchNum) ([]*common.Batch, error) {
  108. var batches []*common.Batch
  109. err := meddler.QueryAll(
  110. hdb.db, &batches,
  111. "SELECT * FROM batch WHERE $1 <= batch_num AND batch_num < $2",
  112. from, to,
  113. )
  114. return batches, err
  115. }
  116. // GetLastBatchNum returns the BatchNum of the latest forged batch
  117. func (hdb *HistoryDB) GetLastBatchNum() (common.BatchNum, error) {
  118. row := hdb.db.QueryRow("SELECT batch_num FROM batch ORDER BY batch_num DESC LIMIT 1;")
  119. var batchNum common.BatchNum
  120. return batchNum, row.Scan(&batchNum)
  121. }
  122. // GetLastL1TxsNum returns the greatest ForgeL1TxsNum in the DB. If there's no
  123. // batch in the DB (nil, nil) is returned.
  124. func (hdb *HistoryDB) GetLastL1TxsNum() (*int64, error) {
  125. row := hdb.db.QueryRow("SELECT MAX(forge_l1_txs_num) FROM batch;")
  126. lastL1TxsNum := new(int64)
  127. return lastL1TxsNum, row.Scan(&lastL1TxsNum)
  128. }
  129. // Reorg deletes all the information that was added into the DB after the lastValidBlock
  130. func (hdb *HistoryDB) Reorg(lastValidBlock int64) error {
  131. _, err := hdb.db.Exec("DELETE FROM block WHERE eth_block_num > $1;", lastValidBlock)
  132. return err
  133. }
  134. // SyncRollup stores all the data that can be changed / added on a block in the Rollup SC
  135. func (hdb *HistoryDB) SyncRollup(
  136. blockNum uint64,
  137. l1txs []common.L1Tx,
  138. l2txs []common.L2Tx,
  139. registeredAccounts []common.Account,
  140. exitTree common.ExitInfo,
  141. withdrawals common.ExitInfo,
  142. registeredTokens []common.Token,
  143. batches []common.Batch,
  144. vars *common.RollupVars,
  145. ) error {
  146. // TODO: make all in a single DB commit
  147. if err := hdb.AddBatches(batches); err != nil {
  148. return err
  149. }
  150. return nil
  151. }
  152. // SyncPoD stores all the data that can be changed / added on a block in the PoD SC
  153. func (hdb *HistoryDB) SyncPoD(
  154. blockNum uint64,
  155. bids []common.Bid,
  156. coordinators []common.Coordinator,
  157. vars *common.AuctionVars,
  158. ) error {
  159. return nil
  160. }
  161. // addBids insert Bids into the DB
  162. func (hdb *HistoryDB) addBids(bids []common.Bid) error {
  163. // TODO: check the coordinator info
  164. return db.BulkInsert(
  165. hdb.db,
  166. "INSERT INTO bid (slot_num, forger_addr, bid_value, eth_block_num) VALUES %s",
  167. bids[:],
  168. )
  169. }
  170. // GetBids return the bids
  171. func (hdb *HistoryDB) GetBids() ([]*common.Bid, error) {
  172. var bids []*common.Bid
  173. err := meddler.QueryAll(
  174. hdb.db, &bids,
  175. "SELECT * FROM bid;",
  176. )
  177. return bids, err
  178. }
  179. // AddToken insert a token into the DB
  180. func (hdb *HistoryDB) AddToken(token *common.Token) error {
  181. return meddler.Insert(hdb.db, "token", token)
  182. }
  183. // AddTokens insert tokens into the DB
  184. func (hdb *HistoryDB) AddTokens(tokens []common.Token) error {
  185. return db.BulkInsert(
  186. hdb.db,
  187. `INSERT INTO token (
  188. token_id,
  189. eth_block_num,
  190. eth_addr,
  191. name,
  192. symbol,
  193. decimals,
  194. usd,
  195. usd_update
  196. ) VALUES %s;`,
  197. tokens[:],
  198. )
  199. }
  200. // UpdateTokenValue updates the USD value of a token
  201. func (hdb *HistoryDB) UpdateTokenValue(tokenID common.TokenID, value float64) error {
  202. _, err := hdb.db.Exec(
  203. "UPDATE token SET usd = $1 WHERE token_id = $2;",
  204. value, tokenID,
  205. )
  206. return err
  207. }
  208. // GetTokens returns a list of tokens from the DB
  209. func (hdb *HistoryDB) GetTokens() ([]*common.Token, error) {
  210. var tokens []*common.Token
  211. err := meddler.QueryAll(
  212. hdb.db, &tokens,
  213. "SELECT * FROM token ORDER BY token_id;",
  214. )
  215. return tokens, err
  216. }
  217. // AddAccounts insert accounts into the DB
  218. func (hdb *HistoryDB) AddAccounts(accounts []common.Account) error {
  219. return db.BulkInsert(
  220. hdb.db,
  221. `INSERT INTO account (
  222. idx,
  223. token_id,
  224. batch_num,
  225. bjj,
  226. eth_addr
  227. ) VALUES %s;`,
  228. accounts[:],
  229. )
  230. }
  231. // GetAccounts returns a list of accounts from the DB
  232. func (hdb *HistoryDB) GetAccounts() ([]*common.Account, error) {
  233. var accs []*common.Account
  234. err := meddler.QueryAll(
  235. hdb.db, &accs,
  236. "SELECT * FROM account ORDER BY idx;",
  237. )
  238. return accs, err
  239. }
  240. // AddL1Txs inserts L1 txs to the DB
  241. func (hdb *HistoryDB) AddL1Txs(l1txs []common.L1Tx) error {
  242. txs := []common.Tx{}
  243. for _, tx := range l1txs {
  244. txs = append(txs, *tx.Tx())
  245. }
  246. return hdb.AddTxs(txs)
  247. }
  248. // AddL2Txs inserts L2 txs to the DB
  249. func (hdb *HistoryDB) AddL2Txs(l2txs []common.L2Tx) error {
  250. txs := []common.Tx{}
  251. for _, tx := range l2txs {
  252. txs = append(txs, *tx.Tx())
  253. }
  254. return hdb.AddTxs(txs)
  255. }
  256. // AddTxs insert L1 txs into the DB
  257. func (hdb *HistoryDB) AddTxs(txs []common.Tx) error {
  258. return db.BulkInsert(
  259. hdb.db,
  260. `INSERT INTO tx (
  261. is_l1,
  262. id,
  263. type,
  264. position,
  265. from_idx,
  266. to_idx,
  267. amount,
  268. amount_f,
  269. token_id,
  270. amount_usd,
  271. batch_num,
  272. eth_block_num,
  273. to_forge_l1_txs_num,
  274. user_origin,
  275. from_eth_addr,
  276. from_bjj,
  277. load_amount,
  278. load_amount_f,
  279. load_amount_usd,
  280. fee,
  281. fee_usd,
  282. nonce
  283. ) VALUES %s;`,
  284. txs[:],
  285. )
  286. }
  287. // GetTxs returns a list of txs from the DB
  288. func (hdb *HistoryDB) GetTxs() ([]*common.Tx, error) {
  289. var txs []*common.Tx
  290. err := meddler.QueryAll(
  291. hdb.db, &txs,
  292. `SELECT * FROM tx
  293. ORDER BY (batch_num, position) ASC`,
  294. )
  295. return txs, err
  296. }
  297. // GetHistoryTxs returns a list of txs from the DB using the HistoryTx struct
  298. func (hdb *HistoryDB) GetHistoryTxs(
  299. ethAddr *ethCommon.Address, bjj *babyjub.PublicKey,
  300. tokenID, idx, batchNum *uint, txType *common.TxType,
  301. offset, limit *uint, last bool,
  302. ) ([]*HistoryTx, int, error) {
  303. if ethAddr != nil && bjj != nil {
  304. return nil, 0, errors.New("ethAddr and bjj are incompatible")
  305. }
  306. var query string
  307. var args []interface{}
  308. queryStr := `SELECT tx.*, tx.amount_f * token.usd AS current_usd,
  309. token.symbol, token.usd_update, block.timestamp, count(*) OVER() AS total_items FROM tx
  310. INNER JOIN token ON tx.token_id = token.token_id
  311. INNER JOIN block ON tx.eth_block_num = block.eth_block_num `
  312. // Apply filters
  313. nextIsAnd := false
  314. // ethAddr filter
  315. if ethAddr != nil {
  316. queryStr = `WITH acc AS
  317. (select idx from account where eth_addr = ?) ` + queryStr
  318. queryStr += ", acc WHERE (tx.from_idx IN(acc.idx) OR tx.to_idx IN(acc.idx)) "
  319. nextIsAnd = true
  320. args = append(args, ethAddr)
  321. } else if bjj != nil { // bjj filter
  322. queryStr = `WITH acc AS
  323. (select idx from account where bjj = ?) ` + queryStr
  324. queryStr += ", acc WHERE (tx.from_idx IN(acc.idx) OR tx.to_idx IN(acc.idx)) "
  325. nextIsAnd = true
  326. args = append(args, bjj)
  327. }
  328. // tokenID filter
  329. if tokenID != nil {
  330. if nextIsAnd {
  331. queryStr += "AND "
  332. } else {
  333. queryStr += "WHERE "
  334. }
  335. queryStr += "tx.token_id = ? "
  336. args = append(args, tokenID)
  337. nextIsAnd = true
  338. }
  339. // idx filter
  340. if idx != nil {
  341. if nextIsAnd {
  342. queryStr += "AND "
  343. } else {
  344. queryStr += "WHERE "
  345. }
  346. queryStr += "(tx.from_idx = ? OR tx.to_idx = ?) "
  347. args = append(args, idx, idx)
  348. nextIsAnd = true
  349. }
  350. // batchNum filter
  351. if batchNum != nil {
  352. if nextIsAnd {
  353. queryStr += "AND "
  354. } else {
  355. queryStr += "WHERE "
  356. }
  357. queryStr += "tx.batch_num = ? "
  358. args = append(args, batchNum)
  359. nextIsAnd = true
  360. }
  361. // txType filter
  362. if txType != nil {
  363. if nextIsAnd {
  364. queryStr += "AND "
  365. } else {
  366. queryStr += "WHERE "
  367. }
  368. queryStr += "tx.type = ? "
  369. args = append(args, txType)
  370. // nextIsAnd = true
  371. }
  372. // pagination
  373. if last {
  374. queryStr += "ORDER BY (batch_num, position) DESC NULLS FIRST "
  375. } else {
  376. queryStr += "ORDER BY (batch_num, position) ASC NULLS LAST "
  377. queryStr += fmt.Sprintf("OFFSET %d ", *offset)
  378. }
  379. queryStr += fmt.Sprintf("LIMIT %d ", *limit)
  380. query = hdb.db.Rebind(queryStr)
  381. // log.Debug(query)
  382. txs := []*HistoryTx{}
  383. if err := meddler.QueryAll(hdb.db, &txs, query, args...); err != nil {
  384. return nil, 0, err
  385. }
  386. if len(txs) == 0 {
  387. return nil, 0, sql.ErrNoRows
  388. } else if last {
  389. tmp := []*HistoryTx{}
  390. for i := len(txs) - 1; i >= 0; i-- {
  391. tmp = append(tmp, txs[i])
  392. }
  393. txs = tmp
  394. }
  395. return txs, txs[0].TotalItems, nil
  396. }
  397. // GetTx returns a tx from the DB
  398. func (hdb *HistoryDB) GetTx(txID common.TxID) (*common.Tx, error) {
  399. tx := new(common.Tx)
  400. return tx, meddler.QueryRow(
  401. hdb.db, tx,
  402. "SELECT * FROM tx WHERE id = $1;",
  403. txID,
  404. )
  405. }
  406. // Close frees the resources used by HistoryDB
  407. func (hdb *HistoryDB) Close() error {
  408. return hdb.db.Close()
  409. }