You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

536 lines
14 KiB

  1. // Package kvdb provides a key-value database with Checkpoints & Resets system
  2. package kvdb
  3. import (
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "github.com/hermeznetwork/hermez-node/common"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/tracerr"
  14. "github.com/iden3/go-merkletree/db"
  15. "github.com/iden3/go-merkletree/db/pebble"
  16. )
  17. const (
  18. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  19. // subpath of the KVDB
  20. PathBatchNum = "BatchNum"
  21. // PathCurrent defines the subpath of the current Batch in the subpath
  22. // of the KVDB
  23. PathCurrent = "current"
  24. // PathLast defines the subpath of the last Batch in the subpath
  25. // of the StateDB
  26. PathLast = "last"
  27. )
  28. var (
  29. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  30. KeyCurrentBatch = []byte("k:currentbatch")
  31. // keyCurrentIdx is used as key in the db to store the CurrentIdx
  32. keyCurrentIdx = []byte("k:idx")
  33. )
  34. // KVDB represents the Key-Value DB object
  35. type KVDB struct {
  36. path string
  37. db *pebble.Storage
  38. // CurrentIdx holds the current Idx that the BatchBuilder is using
  39. CurrentIdx common.Idx
  40. CurrentBatch common.BatchNum
  41. keep int
  42. m sync.Mutex
  43. last *Last
  44. }
  45. // Last is a consistent view to the last batch of the stateDB that can
  46. // be queried concurrently.
  47. type Last struct {
  48. db *pebble.Storage
  49. path string
  50. rw sync.RWMutex
  51. }
  52. func (k *Last) setNew() error {
  53. k.rw.Lock()
  54. defer k.rw.Unlock()
  55. if k.db != nil {
  56. k.db.Close()
  57. }
  58. lastPath := path.Join(k.path, PathLast)
  59. err := os.RemoveAll(lastPath)
  60. if err != nil {
  61. return tracerr.Wrap(err)
  62. }
  63. db, err := pebble.NewPebbleStorage(path.Join(k.path, lastPath), false)
  64. if err != nil {
  65. return tracerr.Wrap(err)
  66. }
  67. k.db = db
  68. return nil
  69. }
  70. func (k *Last) set(kvdb *KVDB, batchNum common.BatchNum) error {
  71. k.rw.Lock()
  72. defer k.rw.Unlock()
  73. if k.db != nil {
  74. k.db.Close()
  75. }
  76. lastPath := path.Join(k.path, PathLast)
  77. if err := kvdb.MakeCheckpointFromTo(batchNum, lastPath); err != nil {
  78. return tracerr.Wrap(err)
  79. }
  80. db, err := pebble.NewPebbleStorage(lastPath, false)
  81. if err != nil {
  82. return tracerr.Wrap(err)
  83. }
  84. k.db = db
  85. return nil
  86. }
  87. func (k *Last) close() {
  88. k.rw.Lock()
  89. defer k.rw.Unlock()
  90. k.db.Close()
  91. }
  92. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
  93. // Checkpoints older than the value defined by `keep` will be deleted.
  94. func NewKVDB(pathDB string, keep int) (*KVDB, error) {
  95. var sto *pebble.Storage
  96. var err error
  97. sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
  98. if err != nil {
  99. return nil, tracerr.Wrap(err)
  100. }
  101. kvdb := &KVDB{
  102. path: pathDB,
  103. db: sto,
  104. keep: keep,
  105. last: &Last{
  106. path: pathDB,
  107. },
  108. }
  109. // load currentBatch
  110. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  111. if err != nil {
  112. return nil, tracerr.Wrap(err)
  113. }
  114. // make reset (get checkpoint) at currentBatch
  115. err = kvdb.reset(kvdb.CurrentBatch, true)
  116. if err != nil {
  117. return nil, tracerr.Wrap(err)
  118. }
  119. return kvdb, nil
  120. }
  121. // LastRead is a thread-safe method to query the last KVDB
  122. func (kvdb *KVDB) LastRead(fn func(db *pebble.Storage) error) error {
  123. kvdb.last.rw.RLock()
  124. defer kvdb.last.rw.RUnlock()
  125. return fn(kvdb.last.db)
  126. }
  127. // DB returns the *pebble.Storage from the KVDB
  128. func (kvdb *KVDB) DB() *pebble.Storage {
  129. return kvdb.db
  130. }
  131. // StorageWithPrefix returns the db.Storage with the given prefix from the
  132. // current KVDB
  133. func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
  134. return kvdb.db.WithPrefix(prefix)
  135. }
  136. // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  137. // not delete the checkpoints between old current and the new current, those
  138. // checkpoints will remain in the storage, and eventually will be deleted when
  139. // MakeCheckpoint overwrites them.
  140. func (kvdb *KVDB) Reset(batchNum common.BatchNum) error {
  141. return kvdb.reset(batchNum, true)
  142. }
  143. // reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  144. // not delete the checkpoints between old current and the new current, those
  145. // checkpoints will remain in the storage, and eventually will be deleted when
  146. // MakeCheckpoint overwrites them. `closeCurrent` will close the currently
  147. // opened db before doing the reset.
  148. func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  149. currentPath := path.Join(kvdb.path, PathCurrent)
  150. if closeCurrent {
  151. if err := kvdb.db.Pebble().Close(); err != nil {
  152. return tracerr.Wrap(err)
  153. }
  154. }
  155. // remove 'current'
  156. err := os.RemoveAll(currentPath)
  157. if err != nil {
  158. return tracerr.Wrap(err)
  159. }
  160. // remove all checkpoints > batchNum
  161. list, err := kvdb.ListCheckpoints()
  162. if err != nil {
  163. return tracerr.Wrap(err)
  164. }
  165. // Find first batch that is greater than batchNum, and delete
  166. // everything after that
  167. start := 0
  168. for ; start < len(list); start++ {
  169. if common.BatchNum(list[start]) > batchNum {
  170. break
  171. }
  172. }
  173. for _, bn := range list[start:] {
  174. if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  175. return tracerr.Wrap(err)
  176. }
  177. }
  178. if batchNum == 0 {
  179. // if batchNum == 0, open the new fresh 'current'
  180. sto, err := pebble.NewPebbleStorage(currentPath, false)
  181. if err != nil {
  182. return tracerr.Wrap(err)
  183. }
  184. kvdb.db = sto
  185. kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
  186. kvdb.CurrentBatch = 0
  187. if err := kvdb.last.setNew(); err != nil {
  188. return tracerr.Wrap(err)
  189. }
  190. return nil
  191. }
  192. // copy 'batchNum' to 'current'
  193. if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
  194. return tracerr.Wrap(err)
  195. }
  196. // copy 'batchNum' to 'last'
  197. if err := kvdb.last.set(kvdb, batchNum); err != nil {
  198. return tracerr.Wrap(err)
  199. }
  200. // open the new 'current'
  201. sto, err := pebble.NewPebbleStorage(currentPath, false)
  202. if err != nil {
  203. return tracerr.Wrap(err)
  204. }
  205. kvdb.db = sto
  206. // get currentBatch num
  207. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  208. if err != nil {
  209. return tracerr.Wrap(err)
  210. }
  211. // idx is obtained from the statedb reset
  212. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  213. if err != nil {
  214. return tracerr.Wrap(err)
  215. }
  216. return nil
  217. }
  218. // ResetFromSynchronizer performs a reset in the KVDB getting the state from
  219. // synchronizerKVDB for the given batchNum.
  220. func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
  221. if synchronizerKVDB == nil {
  222. return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
  223. }
  224. currentPath := path.Join(kvdb.path, PathCurrent)
  225. if err := kvdb.db.Pebble().Close(); err != nil {
  226. return tracerr.Wrap(err)
  227. }
  228. // remove 'current'
  229. err := os.RemoveAll(currentPath)
  230. if err != nil {
  231. return tracerr.Wrap(err)
  232. }
  233. // remove all checkpoints
  234. list, err := kvdb.ListCheckpoints()
  235. if err != nil {
  236. return tracerr.Wrap(err)
  237. }
  238. for _, bn := range list {
  239. if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  240. return tracerr.Wrap(err)
  241. }
  242. }
  243. if batchNum == 0 {
  244. // if batchNum == 0, open the new fresh 'current'
  245. sto, err := pebble.NewPebbleStorage(currentPath, false)
  246. if err != nil {
  247. return tracerr.Wrap(err)
  248. }
  249. kvdb.db = sto
  250. kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
  251. kvdb.CurrentBatch = 0
  252. return nil
  253. }
  254. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  255. // copy synchronizer'BatchNumX' to 'BatchNumX'
  256. if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
  257. return tracerr.Wrap(err)
  258. }
  259. // copy 'BatchNumX' to 'current'
  260. err = kvdb.MakeCheckpointFromTo(batchNum, currentPath)
  261. if err != nil {
  262. return tracerr.Wrap(err)
  263. }
  264. // open the new 'current'
  265. sto, err := pebble.NewPebbleStorage(currentPath, false)
  266. if err != nil {
  267. return tracerr.Wrap(err)
  268. }
  269. kvdb.db = sto
  270. // get currentBatch num
  271. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  272. if err != nil {
  273. return tracerr.Wrap(err)
  274. }
  275. // get currentIdx
  276. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  277. if err != nil {
  278. return tracerr.Wrap(err)
  279. }
  280. return nil
  281. }
  282. // GetCurrentBatch returns the current BatchNum stored in the KVDB
  283. func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) {
  284. cbBytes, err := kvdb.db.Get(KeyCurrentBatch)
  285. if tracerr.Unwrap(err) == db.ErrNotFound {
  286. return 0, nil
  287. }
  288. if err != nil {
  289. return 0, tracerr.Wrap(err)
  290. }
  291. return common.BatchNumFromBytes(cbBytes)
  292. }
  293. // setCurrentBatch stores the current BatchNum in the KVDB
  294. func (kvdb *KVDB) setCurrentBatch() error {
  295. tx, err := kvdb.db.NewTx()
  296. if err != nil {
  297. return tracerr.Wrap(err)
  298. }
  299. err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes())
  300. if err != nil {
  301. return tracerr.Wrap(err)
  302. }
  303. if err := tx.Commit(); err != nil {
  304. return tracerr.Wrap(err)
  305. }
  306. return nil
  307. }
  308. // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
  309. // used for an Account in the KVDB.
  310. func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) {
  311. idxBytes, err := kvdb.db.Get(keyCurrentIdx)
  312. if tracerr.Unwrap(err) == db.ErrNotFound {
  313. return common.RollupConstReservedIDx, nil // 255, nil
  314. }
  315. if err != nil {
  316. return 0, tracerr.Wrap(err)
  317. }
  318. return common.IdxFromBytes(idxBytes[:])
  319. }
  320. // SetCurrentIdx stores Idx in the KVDB
  321. func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error {
  322. kvdb.CurrentIdx = idx
  323. tx, err := kvdb.db.NewTx()
  324. if err != nil {
  325. return tracerr.Wrap(err)
  326. }
  327. idxBytes, err := idx.Bytes()
  328. if err != nil {
  329. return tracerr.Wrap(err)
  330. }
  331. err = tx.Put(keyCurrentIdx, idxBytes[:])
  332. if err != nil {
  333. return tracerr.Wrap(err)
  334. }
  335. if err := tx.Commit(); err != nil {
  336. return tracerr.Wrap(err)
  337. }
  338. return nil
  339. }
  340. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
  341. // Internally this advances & stores the current BatchNum, and then stores a
  342. // Checkpoint of the current state of the KVDB.
  343. func (kvdb *KVDB) MakeCheckpoint() error {
  344. // advance currentBatch
  345. kvdb.CurrentBatch++
  346. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch))
  347. if err := kvdb.setCurrentBatch(); err != nil {
  348. return tracerr.Wrap(err)
  349. }
  350. // if checkpoint BatchNum already exist in disk, delete it
  351. if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
  352. err := os.RemoveAll(checkpointPath)
  353. if err != nil {
  354. return tracerr.Wrap(err)
  355. }
  356. } else if err != nil && !os.IsNotExist(err) {
  357. return tracerr.Wrap(err)
  358. }
  359. // execute Checkpoint
  360. if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil {
  361. return tracerr.Wrap(err)
  362. }
  363. // copy 'CurrentBatch' to 'last'
  364. if err := kvdb.last.set(kvdb, kvdb.CurrentBatch); err != nil {
  365. return tracerr.Wrap(err)
  366. }
  367. // delete old checkpoints
  368. if err := kvdb.deleteOldCheckpoints(); err != nil {
  369. return tracerr.Wrap(err)
  370. }
  371. return nil
  372. }
  373. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  374. func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  375. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  376. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  377. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  378. }
  379. return os.RemoveAll(checkpointPath)
  380. }
  381. // ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
  382. // If there's a gap between the list of checkpoints, an error is returned.
  383. func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
  384. files, err := ioutil.ReadDir(kvdb.path)
  385. if err != nil {
  386. return nil, tracerr.Wrap(err)
  387. }
  388. checkpoints := []int{}
  389. var checkpoint int
  390. pattern := fmt.Sprintf("%s%%d", PathBatchNum)
  391. for _, file := range files {
  392. fileName := file.Name()
  393. if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
  394. if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
  395. return nil, tracerr.Wrap(err)
  396. }
  397. checkpoints = append(checkpoints, checkpoint)
  398. }
  399. }
  400. sort.Ints(checkpoints)
  401. if len(checkpoints) > 0 {
  402. first := checkpoints[0]
  403. for _, checkpoint := range checkpoints[1:] {
  404. first++
  405. if checkpoint != first {
  406. log.Errorw("GAP", "checkpoints", checkpoints)
  407. return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
  408. }
  409. }
  410. }
  411. return checkpoints, nil
  412. }
  413. // deleteOldCheckpoints deletes old checkpoints when there are more than
  414. // `s.keep` checkpoints
  415. func (kvdb *KVDB) deleteOldCheckpoints() error {
  416. list, err := kvdb.ListCheckpoints()
  417. if err != nil {
  418. return tracerr.Wrap(err)
  419. }
  420. if len(list) > kvdb.keep {
  421. for _, checkpoint := range list[:len(list)-kvdb.keep] {
  422. if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
  423. return tracerr.Wrap(err)
  424. }
  425. }
  426. }
  427. return nil
  428. }
  429. // MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
  430. // to the dest folder. This method is locking, so it can be called from
  431. // multiple places at the same time.
  432. func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
  433. source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
  434. if _, err := os.Stat(source); os.IsNotExist(err) {
  435. // if kvdb does not have checkpoint at batchNum, return err
  436. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
  437. }
  438. // By locking we allow calling MakeCheckpointFromTo from multiple
  439. // places at the same time for the same stateDB. This allows the
  440. // synchronizer to do a reset to a batchNum at the same time as the
  441. // pipeline is doing a txSelector.Reset and batchBuilder.Reset from
  442. // synchronizer to the same batchNum
  443. kvdb.m.Lock()
  444. defer kvdb.m.Unlock()
  445. return pebbleMakeCheckpoint(source, dest)
  446. }
  447. func pebbleMakeCheckpoint(source, dest string) error {
  448. // Remove dest folder (if it exists) before doing the checkpoint
  449. if _, err := os.Stat(dest); !os.IsNotExist(err) {
  450. err := os.RemoveAll(dest)
  451. if err != nil {
  452. return tracerr.Wrap(err)
  453. }
  454. } else if err != nil && !os.IsNotExist(err) {
  455. return tracerr.Wrap(err)
  456. }
  457. sto, err := pebble.NewPebbleStorage(source, false)
  458. if err != nil {
  459. return tracerr.Wrap(err)
  460. }
  461. defer func() {
  462. errClose := sto.Pebble().Close()
  463. if errClose != nil {
  464. log.Errorw("Pebble.Close", "err", errClose)
  465. }
  466. }()
  467. // execute Checkpoint
  468. err = sto.Pebble().Checkpoint(dest)
  469. if err != nil {
  470. return tracerr.Wrap(err)
  471. }
  472. return nil
  473. }
  474. // Close the DB
  475. func (kvdb *KVDB) Close() {
  476. kvdb.db.Close()
  477. kvdb.last.close()
  478. }