You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

598 lines
18 KiB

  1. package statedb
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/big"
  6. "os"
  7. "strconv"
  8. "github.com/hermeznetwork/hermez-node/common"
  9. "github.com/hermeznetwork/hermez-node/log"
  10. "github.com/hermeznetwork/tracerr"
  11. "github.com/iden3/go-merkletree"
  12. "github.com/iden3/go-merkletree/db"
  13. "github.com/iden3/go-merkletree/db/pebble"
  14. )
  15. // TODO(Edu): Document here how StateDB is kept consistent
  16. var (
  17. // ErrStateDBWithoutMT is used when a method that requires a MerkleTree
  18. // is called in a StateDB that does not have a MerkleTree defined
  19. ErrStateDBWithoutMT = errors.New("Can not call method to use MerkleTree in a StateDB without MerkleTree")
  20. // ErrAccountAlreadyExists is used when CreateAccount is called and the
  21. // Account already exists
  22. ErrAccountAlreadyExists = errors.New("Can not CreateAccount because Account already exists")
  23. // ErrToIdxNotFound is used when trying to get the ToIdx from ToEthAddr
  24. // or ToEthAddr&ToBJJ
  25. ErrToIdxNotFound = errors.New("ToIdx can not be found")
  26. // ErrGetIdxNoCase is used when trying to get the Idx from EthAddr &
  27. // BJJ with not compatible combination
  28. ErrGetIdxNoCase = errors.New("Can not get Idx due unexpected combination of ethereum Address & BabyJubJub PublicKey")
  29. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  30. KeyCurrentBatch = []byte("k:currentbatch")
  31. // PrefixKeyIdx is the key prefix for idx in the db
  32. PrefixKeyIdx = []byte("i:")
  33. // PrefixKeyAccHash is the key prefix for account hash in the db
  34. PrefixKeyAccHash = []byte("h:")
  35. // PrefixKeyMT is the key prefix for merkle tree in the db
  36. PrefixKeyMT = []byte("m:")
  37. // PrefixKeyAddr is the key prefix for address in the db
  38. PrefixKeyAddr = []byte("a:")
  39. // PrefixKeyAddrBJJ is the key prefix for address-babyjubjub in the db
  40. PrefixKeyAddrBJJ = []byte("ab:")
  41. )
  42. const (
  43. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  44. // subpath of the StateDB
  45. PathBatchNum = "/BatchNum"
  46. // PathCurrent defines the subpath of the current Batch in the subpath
  47. // of the StateDB
  48. PathCurrent = "/current"
  49. // TypeSynchronizer defines a StateDB used by the Synchronizer, that
  50. // generates the ExitTree when processing the txs
  51. TypeSynchronizer = "synchronizer"
  52. // TypeTxSelector defines a StateDB used by the TxSelector, without
  53. // computing ExitTree neither the ZKInputs
  54. TypeTxSelector = "txselector"
  55. // TypeBatchBuilder defines a StateDB used by the BatchBuilder, that
  56. // generates the ExitTree and the ZKInput when processing the txs
  57. TypeBatchBuilder = "batchbuilder"
  58. )
  59. // TypeStateDB determines the type of StateDB
  60. type TypeStateDB string
  61. // StateDB represents the StateDB object
  62. type StateDB struct {
  63. path string
  64. currentBatch common.BatchNum
  65. db *pebble.PebbleStorage
  66. mt *merkletree.MerkleTree
  67. typ TypeStateDB
  68. // idx holds the current Idx that the BatchBuilder is using
  69. idx common.Idx
  70. zki *common.ZKInputs
  71. // i is the current transaction index in the ZKInputs generation (zki)
  72. i int
  73. // AccumulatedFees contains the accumulated fees for each token (Coord
  74. // Idx) in the processed batch
  75. AccumulatedFees map[common.Idx]*big.Int
  76. }
  77. // NewStateDB creates a new StateDB, allowing to use an in-memory or in-disk
  78. // storage
  79. func NewStateDB(path string, typ TypeStateDB, nLevels int) (*StateDB, error) {
  80. var sto *pebble.PebbleStorage
  81. var err error
  82. sto, err = pebble.NewPebbleStorage(path+PathCurrent, false)
  83. if err != nil {
  84. return nil, tracerr.Wrap(err)
  85. }
  86. var mt *merkletree.MerkleTree = nil
  87. if typ == TypeSynchronizer || typ == TypeBatchBuilder {
  88. mt, err = merkletree.NewMerkleTree(sto.WithPrefix(PrefixKeyMT), nLevels)
  89. if err != nil {
  90. return nil, tracerr.Wrap(err)
  91. }
  92. }
  93. if typ == TypeTxSelector && nLevels != 0 {
  94. return nil, tracerr.Wrap(fmt.Errorf("invalid StateDB parameters: StateDB type==TypeStateDB can not have nLevels!=0"))
  95. }
  96. sdb := &StateDB{
  97. path: path,
  98. db: sto,
  99. mt: mt,
  100. typ: typ,
  101. }
  102. // load currentBatch
  103. sdb.currentBatch, err = sdb.GetCurrentBatch()
  104. if err != nil {
  105. return nil, tracerr.Wrap(err)
  106. }
  107. // make reset (get checkpoint) at currentBatch
  108. err = sdb.reset(sdb.currentBatch, false)
  109. if err != nil {
  110. return nil, tracerr.Wrap(err)
  111. }
  112. return sdb, nil
  113. }
  114. // DB returns the *pebble.PebbleStorage from the StateDB
  115. func (s *StateDB) DB() *pebble.PebbleStorage {
  116. return s.db
  117. }
  118. // GetCurrentBatch returns the current BatchNum stored in the StateDB
  119. func (s *StateDB) GetCurrentBatch() (common.BatchNum, error) {
  120. cbBytes, err := s.db.Get(KeyCurrentBatch)
  121. if tracerr.Unwrap(err) == db.ErrNotFound {
  122. return 0, nil
  123. }
  124. if err != nil {
  125. return 0, tracerr.Wrap(err)
  126. }
  127. return common.BatchNumFromBytes(cbBytes)
  128. }
  129. // setCurrentBatch stores the current BatchNum in the StateDB
  130. func (s *StateDB) setCurrentBatch() error {
  131. tx, err := s.db.NewTx()
  132. if err != nil {
  133. return tracerr.Wrap(err)
  134. }
  135. err = tx.Put(KeyCurrentBatch, s.currentBatch.Bytes())
  136. if err != nil {
  137. return tracerr.Wrap(err)
  138. }
  139. if err := tx.Commit(); err != nil {
  140. return tracerr.Wrap(err)
  141. }
  142. return nil
  143. }
  144. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path. Internally this advances & stores the current BatchNum, and then stores a Checkpoint of the current state of the StateDB.
  145. func (s *StateDB) MakeCheckpoint() error {
  146. // advance currentBatch
  147. s.currentBatch++
  148. log.Debugw("Making StateDB checkpoint", "batch", s.currentBatch, "type", s.typ)
  149. checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(s.currentBatch))
  150. err := s.setCurrentBatch()
  151. if err != nil {
  152. return tracerr.Wrap(err)
  153. }
  154. // if checkpoint BatchNum already exist in disk, delete it
  155. if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
  156. err := os.RemoveAll(checkpointPath)
  157. if err != nil {
  158. return tracerr.Wrap(err)
  159. }
  160. } else if err != nil && !os.IsNotExist(err) {
  161. return tracerr.Wrap(err)
  162. }
  163. // execute Checkpoint
  164. err = s.db.Pebble().Checkpoint(checkpointPath)
  165. if err != nil {
  166. return tracerr.Wrap(err)
  167. }
  168. return nil
  169. }
  170. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  171. func (s *StateDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  172. checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
  173. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  174. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  175. }
  176. return os.RemoveAll(checkpointPath)
  177. }
  178. func pebbleMakeCheckpoint(source, dest string) error {
  179. // Remove dest folder (if it exists) before doing the checkpoint
  180. if _, err := os.Stat(dest); !os.IsNotExist(err) {
  181. err := os.RemoveAll(dest)
  182. if err != nil {
  183. return tracerr.Wrap(err)
  184. }
  185. } else if err != nil && !os.IsNotExist(err) {
  186. return tracerr.Wrap(err)
  187. }
  188. sto, err := pebble.NewPebbleStorage(source, false)
  189. if err != nil {
  190. return tracerr.Wrap(err)
  191. }
  192. defer func() {
  193. errClose := sto.Pebble().Close()
  194. if errClose != nil {
  195. log.Errorw("Pebble.Close", "err", errClose)
  196. }
  197. }()
  198. // execute Checkpoint
  199. err = sto.Pebble().Checkpoint(dest)
  200. if err != nil {
  201. return tracerr.Wrap(err)
  202. }
  203. return nil
  204. }
  205. // Reset resets the StateDB to the checkpoint at the given batchNum. Reset
  206. // does not delete the checkpoints between old current and the new current,
  207. // those checkpoints will remain in the storage, and eventually will be
  208. // deleted when MakeCheckpoint overwrites them.
  209. func (s *StateDB) Reset(batchNum common.BatchNum) error {
  210. return s.reset(batchNum, true)
  211. }
  212. // reset resets the StateDB to the checkpoint at the given batchNum. Reset
  213. // does not delete the checkpoints between old current and the new current,
  214. // those checkpoints will remain in the storage, and eventually will be
  215. // deleted when MakeCheckpoint overwrites them. `closeCurrent` will close the
  216. // currently opened db before doing the reset.
  217. func (s *StateDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  218. currentPath := s.path + PathCurrent
  219. if closeCurrent {
  220. if err := s.db.Pebble().Close(); err != nil {
  221. return tracerr.Wrap(err)
  222. }
  223. }
  224. // remove 'current'
  225. err := os.RemoveAll(currentPath)
  226. if err != nil {
  227. return tracerr.Wrap(err)
  228. }
  229. if batchNum == 0 {
  230. // if batchNum == 0, open the new fresh 'current'
  231. sto, err := pebble.NewPebbleStorage(currentPath, false)
  232. if err != nil {
  233. return tracerr.Wrap(err)
  234. }
  235. s.db = sto
  236. s.idx = 255
  237. s.currentBatch = batchNum
  238. if s.mt != nil {
  239. // open the MT for the current s.db
  240. mt, err := merkletree.NewMerkleTree(s.db.WithPrefix(PrefixKeyMT), s.mt.MaxLevels())
  241. if err != nil {
  242. return tracerr.Wrap(err)
  243. }
  244. s.mt = mt
  245. }
  246. return nil
  247. }
  248. checkpointPath := s.path + PathBatchNum + strconv.Itoa(int(batchNum))
  249. // copy 'BatchNumX' to 'current'
  250. err = pebbleMakeCheckpoint(checkpointPath, currentPath)
  251. if err != nil {
  252. return tracerr.Wrap(err)
  253. }
  254. // open the new 'current'
  255. sto, err := pebble.NewPebbleStorage(currentPath, false)
  256. if err != nil {
  257. return tracerr.Wrap(err)
  258. }
  259. s.db = sto
  260. // get currentBatch num
  261. s.currentBatch, err = s.GetCurrentBatch()
  262. if err != nil {
  263. return tracerr.Wrap(err)
  264. }
  265. // idx is obtained from the statedb reset
  266. s.idx, err = s.GetIdx()
  267. if err != nil {
  268. return tracerr.Wrap(err)
  269. }
  270. if s.mt != nil {
  271. // open the MT for the current s.db
  272. mt, err := merkletree.NewMerkleTree(s.db.WithPrefix(PrefixKeyMT), s.mt.MaxLevels())
  273. if err != nil {
  274. return tracerr.Wrap(err)
  275. }
  276. s.mt = mt
  277. }
  278. return nil
  279. }
  280. // GetAccount returns the account for the given Idx
  281. func (s *StateDB) GetAccount(idx common.Idx) (*common.Account, error) {
  282. return getAccountInTreeDB(s.db, idx)
  283. }
  284. // GetAccounts returns all the accounts in the db. Use for debugging pruposes
  285. // only.
  286. func (s *StateDB) GetAccounts() ([]common.Account, error) {
  287. idxDB := s.db.WithPrefix(PrefixKeyIdx)
  288. idxs := []common.Idx{}
  289. // NOTE: Current implementation of Iterate in the pebble interface is
  290. // not efficient, as it iterates over all keys. Improve it following
  291. // this example: https://github.com/cockroachdb/pebble/pull/923/files
  292. if err := idxDB.Iterate(func(k []byte, v []byte) (bool, error) {
  293. idx, err := common.IdxFromBytes(k)
  294. if err != nil {
  295. return false, tracerr.Wrap(err)
  296. }
  297. idxs = append(idxs, idx)
  298. return true, nil
  299. }); err != nil {
  300. return nil, tracerr.Wrap(err)
  301. }
  302. accs := []common.Account{}
  303. for i := range idxs {
  304. acc, err := s.GetAccount(idxs[i])
  305. if err != nil {
  306. return nil, tracerr.Wrap(err)
  307. }
  308. accs = append(accs, *acc)
  309. }
  310. return accs, nil
  311. }
  312. // getAccountInTreeDB is abstracted from StateDB to be used from StateDB and
  313. // from ExitTree. GetAccount returns the account for the given Idx
  314. func getAccountInTreeDB(sto db.Storage, idx common.Idx) (*common.Account, error) {
  315. idxBytes, err := idx.Bytes()
  316. if err != nil {
  317. return nil, tracerr.Wrap(err)
  318. }
  319. vBytes, err := sto.Get(append(PrefixKeyIdx, idxBytes[:]...))
  320. if err != nil {
  321. return nil, tracerr.Wrap(err)
  322. }
  323. accBytes, err := sto.Get(append(PrefixKeyAccHash, vBytes...))
  324. if err != nil {
  325. return nil, tracerr.Wrap(err)
  326. }
  327. var b [32 * common.NLeafElems]byte
  328. copy(b[:], accBytes)
  329. account, err := common.AccountFromBytes(b)
  330. if err != nil {
  331. return nil, tracerr.Wrap(err)
  332. }
  333. account.Idx = idx
  334. return account, nil
  335. }
  336. // CreateAccount creates a new Account in the StateDB for the given Idx. If
  337. // StateDB.mt==nil, MerkleTree is not affected, otherwise updates the
  338. // MerkleTree, returning a CircomProcessorProof.
  339. func (s *StateDB) CreateAccount(idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) {
  340. cpp, err := createAccountInTreeDB(s.db, s.mt, idx, account)
  341. if err != nil {
  342. return cpp, tracerr.Wrap(err)
  343. }
  344. // store idx by EthAddr & BJJ
  345. err = s.setIdxByEthAddrBJJ(idx, account.EthAddr, account.PublicKey, account.TokenID)
  346. return cpp, tracerr.Wrap(err)
  347. }
  348. // createAccountInTreeDB is abstracted from StateDB to be used from StateDB and
  349. // from ExitTree. Creates a new Account in the StateDB for the given Idx. If
  350. // StateDB.mt==nil, MerkleTree is not affected, otherwise updates the
  351. // MerkleTree, returning a CircomProcessorProof.
  352. func createAccountInTreeDB(sto db.Storage, mt *merkletree.MerkleTree, idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) {
  353. // store at the DB the key: v, and value: leaf.Bytes()
  354. v, err := account.HashValue()
  355. if err != nil {
  356. return nil, tracerr.Wrap(err)
  357. }
  358. accountBytes, err := account.Bytes()
  359. if err != nil {
  360. return nil, tracerr.Wrap(err)
  361. }
  362. // store the Leaf value
  363. tx, err := sto.NewTx()
  364. if err != nil {
  365. return nil, tracerr.Wrap(err)
  366. }
  367. idxBytes, err := idx.Bytes()
  368. if err != nil {
  369. return nil, tracerr.Wrap(err)
  370. }
  371. _, err = tx.Get(append(PrefixKeyIdx, idxBytes[:]...))
  372. if tracerr.Unwrap(err) != db.ErrNotFound {
  373. return nil, tracerr.Wrap(ErrAccountAlreadyExists)
  374. }
  375. err = tx.Put(append(PrefixKeyAccHash, v.Bytes()...), accountBytes[:])
  376. if err != nil {
  377. return nil, tracerr.Wrap(err)
  378. }
  379. err = tx.Put(append(PrefixKeyIdx, idxBytes[:]...), v.Bytes())
  380. if err != nil {
  381. return nil, tracerr.Wrap(err)
  382. }
  383. if err := tx.Commit(); err != nil {
  384. return nil, tracerr.Wrap(err)
  385. }
  386. if mt != nil {
  387. return mt.AddAndGetCircomProof(idx.BigInt(), v)
  388. }
  389. return nil, nil
  390. }
  391. // UpdateAccount updates the Account in the StateDB for the given Idx. If
  392. // StateDB.mt==nil, MerkleTree is not affected, otherwise updates the
  393. // MerkleTree, returning a CircomProcessorProof.
  394. func (s *StateDB) UpdateAccount(idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) {
  395. return updateAccountInTreeDB(s.db, s.mt, idx, account)
  396. }
  397. // updateAccountInTreeDB is abstracted from StateDB to be used from StateDB and
  398. // from ExitTree. Updates the Account in the StateDB for the given Idx. If
  399. // StateDB.mt==nil, MerkleTree is not affected, otherwise updates the
  400. // MerkleTree, returning a CircomProcessorProof.
  401. func updateAccountInTreeDB(sto db.Storage, mt *merkletree.MerkleTree, idx common.Idx, account *common.Account) (*merkletree.CircomProcessorProof, error) {
  402. // store at the DB the key: v, and value: account.Bytes()
  403. v, err := account.HashValue()
  404. if err != nil {
  405. return nil, tracerr.Wrap(err)
  406. }
  407. accountBytes, err := account.Bytes()
  408. if err != nil {
  409. return nil, tracerr.Wrap(err)
  410. }
  411. tx, err := sto.NewTx()
  412. if err != nil {
  413. return nil, tracerr.Wrap(err)
  414. }
  415. err = tx.Put(append(PrefixKeyAccHash, v.Bytes()...), accountBytes[:])
  416. if err != nil {
  417. return nil, tracerr.Wrap(err)
  418. }
  419. idxBytes, err := idx.Bytes()
  420. if err != nil {
  421. return nil, tracerr.Wrap(err)
  422. }
  423. err = tx.Put(append(PrefixKeyIdx, idxBytes[:]...), v.Bytes())
  424. if err != nil {
  425. return nil, tracerr.Wrap(err)
  426. }
  427. if err := tx.Commit(); err != nil {
  428. return nil, tracerr.Wrap(err)
  429. }
  430. if mt != nil {
  431. proof, err := mt.Update(idx.BigInt(), v)
  432. return proof, tracerr.Wrap(err)
  433. }
  434. return nil, nil
  435. }
  436. // MTGetProof returns the CircomVerifierProof for a given Idx
  437. func (s *StateDB) MTGetProof(idx common.Idx) (*merkletree.CircomVerifierProof, error) {
  438. if s.mt == nil {
  439. return nil, tracerr.Wrap(ErrStateDBWithoutMT)
  440. }
  441. return s.mt.GenerateCircomVerifierProof(idx.BigInt(), s.mt.Root())
  442. }
  443. // MTGetRoot returns the current root of the underlying Merkle Tree
  444. func (s *StateDB) MTGetRoot() *big.Int {
  445. return s.mt.Root().BigInt()
  446. }
  447. // MerkleTree returns the underlying StateDB merkle tree. It can be nil.
  448. func (s *StateDB) MerkleTree() *merkletree.MerkleTree {
  449. return s.mt
  450. }
  451. // LocalStateDB represents the local StateDB which allows to make copies from
  452. // the synchronizer StateDB, and is used by the tx-selector and the
  453. // batch-builder. LocalStateDB is an in-memory storage.
  454. type LocalStateDB struct {
  455. *StateDB
  456. synchronizerStateDB *StateDB
  457. }
  458. // NewLocalStateDB returns a new LocalStateDB connected to the given
  459. // synchronizerDB
  460. func NewLocalStateDB(path string, synchronizerDB *StateDB, typ TypeStateDB, nLevels int) (*LocalStateDB, error) {
  461. s, err := NewStateDB(path, typ, nLevels)
  462. if err != nil {
  463. return nil, tracerr.Wrap(err)
  464. }
  465. return &LocalStateDB{
  466. s,
  467. synchronizerDB,
  468. }, nil
  469. }
  470. // Reset performs a reset in the LocaStateDB. If fromSynchronizer is true, it
  471. // gets the state from LocalStateDB.synchronizerStateDB for the given batchNum. If fromSynchronizer is false, get the state from LocalStateDB checkpoints.
  472. func (l *LocalStateDB) Reset(batchNum common.BatchNum, fromSynchronizer bool) error {
  473. if batchNum == 0 {
  474. l.idx = 0
  475. return nil
  476. }
  477. synchronizerCheckpointPath := l.synchronizerStateDB.path + PathBatchNum + strconv.Itoa(int(batchNum))
  478. checkpointPath := l.path + PathBatchNum + strconv.Itoa(int(batchNum))
  479. currentPath := l.path + PathCurrent
  480. if fromSynchronizer {
  481. // use checkpoint from SynchronizerStateDB
  482. if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) {
  483. // if synchronizerStateDB does not have checkpoint at batchNum, return err
  484. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" not exist in Synchronizer",
  485. synchronizerCheckpointPath))
  486. }
  487. if err := l.db.Pebble().Close(); err != nil {
  488. return tracerr.Wrap(err)
  489. }
  490. // remove 'current'
  491. err := os.RemoveAll(currentPath)
  492. if err != nil {
  493. return tracerr.Wrap(err)
  494. }
  495. // copy synchronizer'BatchNumX' to 'current'
  496. err = pebbleMakeCheckpoint(synchronizerCheckpointPath, currentPath)
  497. if err != nil {
  498. return tracerr.Wrap(err)
  499. }
  500. // copy synchronizer'BatchNumX' to 'BatchNumX'
  501. err = pebbleMakeCheckpoint(synchronizerCheckpointPath, checkpointPath)
  502. if err != nil {
  503. return tracerr.Wrap(err)
  504. }
  505. // open the new 'current'
  506. sto, err := pebble.NewPebbleStorage(currentPath, false)
  507. if err != nil {
  508. return tracerr.Wrap(err)
  509. }
  510. l.db = sto
  511. // get currentBatch num
  512. l.currentBatch, err = l.GetCurrentBatch()
  513. if err != nil {
  514. return tracerr.Wrap(err)
  515. }
  516. // open the MT for the current s.db
  517. if l.mt != nil {
  518. mt, err := merkletree.NewMerkleTree(l.db.WithPrefix(PrefixKeyMT), l.mt.MaxLevels())
  519. if err != nil {
  520. return tracerr.Wrap(err)
  521. }
  522. l.mt = mt
  523. }
  524. return nil
  525. }
  526. // use checkpoint from LocalStateDB
  527. return l.StateDB.reset(batchNum, true)
  528. }