You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

459 lines
12 KiB

  1. // Package kvdb provides a key-value database with Checkpoints & Resets system
  2. package kvdb
  3. import (
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "strings"
  10. "sync"
  11. "github.com/hermeznetwork/hermez-node/common"
  12. "github.com/hermeznetwork/hermez-node/log"
  13. "github.com/hermeznetwork/tracerr"
  14. "github.com/iden3/go-merkletree/db"
  15. "github.com/iden3/go-merkletree/db/pebble"
  16. )
  17. const (
  18. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  19. // subpath of the KVDB
  20. PathBatchNum = "BatchNum"
  21. // PathCurrent defines the subpath of the current Batch in the subpath
  22. // of the KVDB
  23. PathCurrent = "current"
  24. )
  25. var (
  26. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  27. KeyCurrentBatch = []byte("k:currentbatch")
  28. // keyCurrentIdx is used as key in the db to store the CurrentIdx
  29. keyCurrentIdx = []byte("k:idx")
  30. )
  31. // KVDB represents the Key-Value DB object
  32. type KVDB struct {
  33. path string
  34. db *pebble.Storage
  35. // CurrentIdx holds the current Idx that the BatchBuilder is using
  36. CurrentIdx common.Idx
  37. CurrentBatch common.BatchNum
  38. keep int
  39. m sync.Mutex
  40. }
  41. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
  42. // Checkpoints older than the value defined by `keep` will be deleted.
  43. func NewKVDB(pathDB string, keep int) (*KVDB, error) {
  44. var sto *pebble.Storage
  45. var err error
  46. sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
  47. if err != nil {
  48. return nil, tracerr.Wrap(err)
  49. }
  50. kvdb := &KVDB{
  51. path: pathDB,
  52. db: sto,
  53. keep: keep,
  54. }
  55. // load currentBatch
  56. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  57. if err != nil {
  58. return nil, tracerr.Wrap(err)
  59. }
  60. // make reset (get checkpoint) at currentBatch
  61. err = kvdb.reset(kvdb.CurrentBatch, true)
  62. if err != nil {
  63. return nil, tracerr.Wrap(err)
  64. }
  65. return kvdb, nil
  66. }
  67. // DB returns the *pebble.Storage from the KVDB
  68. func (kvdb *KVDB) DB() *pebble.Storage {
  69. return kvdb.db
  70. }
  71. // StorageWithPrefix returns the db.Storage with the given prefix from the
  72. // current KVDB
  73. func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
  74. return kvdb.db.WithPrefix(prefix)
  75. }
  76. // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  77. // not delete the checkpoints between old current and the new current, those
  78. // checkpoints will remain in the storage, and eventually will be deleted when
  79. // MakeCheckpoint overwrites them.
  80. func (kvdb *KVDB) Reset(batchNum common.BatchNum) error {
  81. return kvdb.reset(batchNum, true)
  82. }
  83. // reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  84. // not delete the checkpoints between old current and the new current, those
  85. // checkpoints will remain in the storage, and eventually will be deleted when
  86. // MakeCheckpoint overwrites them. `closeCurrent` will close the currently
  87. // opened db before doing the reset.
  88. func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  89. currentPath := path.Join(kvdb.path, PathCurrent)
  90. if closeCurrent {
  91. if err := kvdb.db.Pebble().Close(); err != nil {
  92. return tracerr.Wrap(err)
  93. }
  94. }
  95. // remove 'current'
  96. err := os.RemoveAll(currentPath)
  97. if err != nil {
  98. return tracerr.Wrap(err)
  99. }
  100. // remove all checkpoints > batchNum
  101. list, err := kvdb.ListCheckpoints()
  102. if err != nil {
  103. return tracerr.Wrap(err)
  104. }
  105. // Find first batch that is greater than batchNum, and delete
  106. // everything after that
  107. start := 0
  108. for ; start < len(list); start++ {
  109. if common.BatchNum(list[start]) > batchNum {
  110. break
  111. }
  112. }
  113. for _, bn := range list[start:] {
  114. if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  115. return tracerr.Wrap(err)
  116. }
  117. }
  118. if batchNum == 0 {
  119. // if batchNum == 0, open the new fresh 'current'
  120. sto, err := pebble.NewPebbleStorage(currentPath, false)
  121. if err != nil {
  122. return tracerr.Wrap(err)
  123. }
  124. kvdb.db = sto
  125. kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
  126. kvdb.CurrentBatch = 0
  127. return nil
  128. }
  129. // copy 'BatchNumX' to 'current'
  130. if err := kvdb.MakeCheckpointFromTo(batchNum, currentPath); err != nil {
  131. return tracerr.Wrap(err)
  132. }
  133. // open the new 'current'
  134. sto, err := pebble.NewPebbleStorage(currentPath, false)
  135. if err != nil {
  136. return tracerr.Wrap(err)
  137. }
  138. kvdb.db = sto
  139. // get currentBatch num
  140. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  141. if err != nil {
  142. return tracerr.Wrap(err)
  143. }
  144. // idx is obtained from the statedb reset
  145. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  146. if err != nil {
  147. return tracerr.Wrap(err)
  148. }
  149. return nil
  150. }
  151. // ResetFromSynchronizer performs a reset in the KVDB getting the state from
  152. // synchronizerKVDB for the given batchNum.
  153. func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
  154. if synchronizerKVDB == nil {
  155. return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
  156. }
  157. currentPath := path.Join(kvdb.path, PathCurrent)
  158. if err := kvdb.db.Pebble().Close(); err != nil {
  159. return tracerr.Wrap(err)
  160. }
  161. // remove 'current'
  162. err := os.RemoveAll(currentPath)
  163. if err != nil {
  164. return tracerr.Wrap(err)
  165. }
  166. // remove all checkpoints
  167. list, err := kvdb.ListCheckpoints()
  168. if err != nil {
  169. return tracerr.Wrap(err)
  170. }
  171. for _, bn := range list {
  172. if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  173. return tracerr.Wrap(err)
  174. }
  175. }
  176. if batchNum == 0 {
  177. // if batchNum == 0, open the new fresh 'current'
  178. sto, err := pebble.NewPebbleStorage(currentPath, false)
  179. if err != nil {
  180. return tracerr.Wrap(err)
  181. }
  182. kvdb.db = sto
  183. kvdb.CurrentIdx = common.RollupConstReservedIDx // 255
  184. kvdb.CurrentBatch = 0
  185. return nil
  186. }
  187. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  188. // copy synchronizer'BatchNumX' to 'BatchNumX'
  189. if err := synchronizerKVDB.MakeCheckpointFromTo(batchNum, checkpointPath); err != nil {
  190. return tracerr.Wrap(err)
  191. }
  192. // copy 'BatchNumX' to 'current'
  193. err = kvdb.MakeCheckpointFromTo(batchNum, currentPath)
  194. if err != nil {
  195. return tracerr.Wrap(err)
  196. }
  197. // open the new 'current'
  198. sto, err := pebble.NewPebbleStorage(currentPath, false)
  199. if err != nil {
  200. return tracerr.Wrap(err)
  201. }
  202. kvdb.db = sto
  203. // get currentBatch num
  204. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  205. if err != nil {
  206. return tracerr.Wrap(err)
  207. }
  208. // get currentIdx
  209. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  210. if err != nil {
  211. return tracerr.Wrap(err)
  212. }
  213. return nil
  214. }
  215. // GetCurrentBatch returns the current BatchNum stored in the KVDB
  216. func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) {
  217. cbBytes, err := kvdb.db.Get(KeyCurrentBatch)
  218. if tracerr.Unwrap(err) == db.ErrNotFound {
  219. return 0, nil
  220. }
  221. if err != nil {
  222. return 0, tracerr.Wrap(err)
  223. }
  224. return common.BatchNumFromBytes(cbBytes)
  225. }
  226. // setCurrentBatch stores the current BatchNum in the KVDB
  227. func (kvdb *KVDB) setCurrentBatch() error {
  228. tx, err := kvdb.db.NewTx()
  229. if err != nil {
  230. return tracerr.Wrap(err)
  231. }
  232. err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes())
  233. if err != nil {
  234. return tracerr.Wrap(err)
  235. }
  236. if err := tx.Commit(); err != nil {
  237. return tracerr.Wrap(err)
  238. }
  239. return nil
  240. }
  241. // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
  242. // used for an Account in the KVDB.
  243. func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) {
  244. idxBytes, err := kvdb.db.Get(keyCurrentIdx)
  245. if tracerr.Unwrap(err) == db.ErrNotFound {
  246. return common.RollupConstReservedIDx, nil // 255, nil
  247. }
  248. if err != nil {
  249. return 0, tracerr.Wrap(err)
  250. }
  251. return common.IdxFromBytes(idxBytes[:])
  252. }
  253. // SetCurrentIdx stores Idx in the KVDB
  254. func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error {
  255. kvdb.CurrentIdx = idx
  256. tx, err := kvdb.db.NewTx()
  257. if err != nil {
  258. return tracerr.Wrap(err)
  259. }
  260. idxBytes, err := idx.Bytes()
  261. if err != nil {
  262. return tracerr.Wrap(err)
  263. }
  264. err = tx.Put(keyCurrentIdx, idxBytes[:])
  265. if err != nil {
  266. return tracerr.Wrap(err)
  267. }
  268. if err := tx.Commit(); err != nil {
  269. return tracerr.Wrap(err)
  270. }
  271. return nil
  272. }
  273. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
  274. // Internally this advances & stores the current BatchNum, and then stores a
  275. // Checkpoint of the current state of the KVDB.
  276. func (kvdb *KVDB) MakeCheckpoint() error {
  277. // advance currentBatch
  278. kvdb.CurrentBatch++
  279. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch))
  280. if err := kvdb.setCurrentBatch(); err != nil {
  281. return tracerr.Wrap(err)
  282. }
  283. // if checkpoint BatchNum already exist in disk, delete it
  284. if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
  285. err := os.RemoveAll(checkpointPath)
  286. if err != nil {
  287. return tracerr.Wrap(err)
  288. }
  289. } else if err != nil && !os.IsNotExist(err) {
  290. return tracerr.Wrap(err)
  291. }
  292. // execute Checkpoint
  293. if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil {
  294. return tracerr.Wrap(err)
  295. }
  296. // delete old checkpoints
  297. if err := kvdb.deleteOldCheckpoints(); err != nil {
  298. return tracerr.Wrap(err)
  299. }
  300. return nil
  301. }
  302. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  303. func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  304. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  305. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  306. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  307. }
  308. return os.RemoveAll(checkpointPath)
  309. }
  310. // ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
  311. // If there's a gap between the list of checkpoints, an error is returned.
  312. func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
  313. files, err := ioutil.ReadDir(kvdb.path)
  314. if err != nil {
  315. return nil, tracerr.Wrap(err)
  316. }
  317. checkpoints := []int{}
  318. var checkpoint int
  319. pattern := fmt.Sprintf("%s%%d", PathBatchNum)
  320. for _, file := range files {
  321. fileName := file.Name()
  322. if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
  323. if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
  324. return nil, tracerr.Wrap(err)
  325. }
  326. checkpoints = append(checkpoints, checkpoint)
  327. }
  328. }
  329. sort.Ints(checkpoints)
  330. if len(checkpoints) > 0 {
  331. first := checkpoints[0]
  332. for _, checkpoint := range checkpoints[1:] {
  333. first++
  334. if checkpoint != first {
  335. log.Errorw("GAP", "checkpoints", checkpoints)
  336. return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
  337. }
  338. }
  339. }
  340. return checkpoints, nil
  341. }
  342. // deleteOldCheckpoints deletes old checkpoints when there are more than
  343. // `s.keep` checkpoints
  344. func (kvdb *KVDB) deleteOldCheckpoints() error {
  345. list, err := kvdb.ListCheckpoints()
  346. if err != nil {
  347. return tracerr.Wrap(err)
  348. }
  349. if len(list) > kvdb.keep {
  350. for _, checkpoint := range list[:len(list)-kvdb.keep] {
  351. if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
  352. return tracerr.Wrap(err)
  353. }
  354. }
  355. }
  356. return nil
  357. }
  358. // MakeCheckpointFromTo makes a checkpoint from the current db at fromBatchNum
  359. // to the dest folder. This method is locking, so it can be called from
  360. // multiple places at the same time.
  361. func (kvdb *KVDB) MakeCheckpointFromTo(fromBatchNum common.BatchNum, dest string) error {
  362. source := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, fromBatchNum))
  363. if _, err := os.Stat(source); os.IsNotExist(err) {
  364. // if kvdb does not have checkpoint at batchNum, return err
  365. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" does not exist", source))
  366. }
  367. // By locking we allow calling MakeCheckpointFromTo from multiple
  368. // places at the same time for the same stateDB. This allows the
  369. // synchronizer to do a reset to a batchNum at the same time as the
  370. // pipeline is doing a txSelector.Reset and batchBuilder.Reset from
  371. // synchronizer to the same batchNum
  372. kvdb.m.Lock()
  373. defer kvdb.m.Unlock()
  374. return pebbleMakeCheckpoint(source, dest)
  375. }
  376. func pebbleMakeCheckpoint(source, dest string) error {
  377. // Remove dest folder (if it exists) before doing the checkpoint
  378. if _, err := os.Stat(dest); !os.IsNotExist(err) {
  379. err := os.RemoveAll(dest)
  380. if err != nil {
  381. return tracerr.Wrap(err)
  382. }
  383. } else if err != nil && !os.IsNotExist(err) {
  384. return tracerr.Wrap(err)
  385. }
  386. sto, err := pebble.NewPebbleStorage(source, false)
  387. if err != nil {
  388. return tracerr.Wrap(err)
  389. }
  390. defer func() {
  391. errClose := sto.Pebble().Close()
  392. if errClose != nil {
  393. log.Errorw("Pebble.Close", "err", errClose)
  394. }
  395. }()
  396. // execute Checkpoint
  397. err = sto.Pebble().Checkpoint(dest)
  398. if err != nil {
  399. return tracerr.Wrap(err)
  400. }
  401. return nil
  402. }
  403. // Close the DB
  404. func (kvdb *KVDB) Close() {
  405. kvdb.db.Close()
  406. }