You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

444 lines
12 KiB

  1. // Package kvdb provides a key-value database with Checkpoints & Resets system
  2. package kvdb
  3. import (
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "strings"
  10. "github.com/hermeznetwork/hermez-node/common"
  11. "github.com/hermeznetwork/hermez-node/log"
  12. "github.com/hermeznetwork/tracerr"
  13. "github.com/iden3/go-merkletree/db"
  14. "github.com/iden3/go-merkletree/db/pebble"
  15. )
  16. const (
  17. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  18. // subpath of the KVDB
  19. PathBatchNum = "BatchNum"
  20. // PathCurrent defines the subpath of the current Batch in the subpath
  21. // of the KVDB
  22. PathCurrent = "current"
  23. )
  24. var (
  25. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  26. KeyCurrentBatch = []byte("k:currentbatch")
  27. // keyCurrentIdx is used as key in the db to store the CurrentIdx
  28. keyCurrentIdx = []byte("k:idx")
  29. )
  30. // KVDB represents the Key-Value DB object
  31. type KVDB struct {
  32. path string
  33. db *pebble.Storage
  34. // CurrentIdx holds the current Idx that the BatchBuilder is using
  35. CurrentIdx common.Idx
  36. CurrentBatch common.BatchNum
  37. keep int
  38. }
  39. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
  40. // Checkpoints older than the value defined by `keep` will be deleted.
  41. func NewKVDB(pathDB string, keep int) (*KVDB, error) {
  42. var sto *pebble.Storage
  43. var err error
  44. sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
  45. if err != nil {
  46. return nil, tracerr.Wrap(err)
  47. }
  48. kvdb := &KVDB{
  49. path: pathDB,
  50. db: sto,
  51. keep: keep,
  52. }
  53. // load currentBatch
  54. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  55. if err != nil {
  56. return nil, tracerr.Wrap(err)
  57. }
  58. // make reset (get checkpoint) at currentBatch
  59. err = kvdb.reset(kvdb.CurrentBatch, true)
  60. if err != nil {
  61. return nil, tracerr.Wrap(err)
  62. }
  63. return kvdb, nil
  64. }
  65. // DB returns the *pebble.Storage from the KVDB
  66. func (kvdb *KVDB) DB() *pebble.Storage {
  67. return kvdb.db
  68. }
  69. // StorageWithPrefix returns the db.Storage with the given prefix from the
  70. // current KVDB
  71. func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
  72. return kvdb.db.WithPrefix(prefix)
  73. }
  74. // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  75. // not delete the checkpoints between old current and the new current, those
  76. // checkpoints will remain in the storage, and eventually will be deleted when
  77. // MakeCheckpoint overwrites them.
  78. func (kvdb *KVDB) Reset(batchNum common.BatchNum) error {
  79. return kvdb.reset(batchNum, true)
  80. }
  81. // reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  82. // not delete the checkpoints between old current and the new current, those
  83. // checkpoints will remain in the storage, and eventually will be deleted when
  84. // MakeCheckpoint overwrites them. `closeCurrent` will close the currently
  85. // opened db before doing the reset.
  86. func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  87. currentPath := path.Join(kvdb.path, PathCurrent)
  88. if closeCurrent {
  89. if err := kvdb.db.Pebble().Close(); err != nil {
  90. return tracerr.Wrap(err)
  91. }
  92. }
  93. // remove 'current'
  94. err := os.RemoveAll(currentPath)
  95. if err != nil {
  96. return tracerr.Wrap(err)
  97. }
  98. // remove all checkpoints > batchNum
  99. list, err := kvdb.ListCheckpoints()
  100. if err != nil {
  101. return tracerr.Wrap(err)
  102. }
  103. // Find first batch that is greater than batchNum, and delete
  104. // everything after that
  105. start := 0
  106. for ; start < len(list); start++ {
  107. if common.BatchNum(list[start]) > batchNum {
  108. break
  109. }
  110. }
  111. for _, bn := range list[start:] {
  112. if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  113. return tracerr.Wrap(err)
  114. }
  115. }
  116. if batchNum == 0 {
  117. // if batchNum == 0, open the new fresh 'current'
  118. sto, err := pebble.NewPebbleStorage(currentPath, false)
  119. if err != nil {
  120. return tracerr.Wrap(err)
  121. }
  122. kvdb.db = sto
  123. kvdb.CurrentIdx = 255
  124. kvdb.CurrentBatch = 0
  125. return nil
  126. }
  127. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  128. // copy 'BatchNumX' to 'current'
  129. err = pebbleMakeCheckpoint(checkpointPath, currentPath)
  130. if err != nil {
  131. return tracerr.Wrap(err)
  132. }
  133. // open the new 'current'
  134. sto, err := pebble.NewPebbleStorage(currentPath, false)
  135. if err != nil {
  136. return tracerr.Wrap(err)
  137. }
  138. kvdb.db = sto
  139. // get currentBatch num
  140. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  141. if err != nil {
  142. return tracerr.Wrap(err)
  143. }
  144. // idx is obtained from the statedb reset
  145. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  146. if err != nil {
  147. return tracerr.Wrap(err)
  148. }
  149. return nil
  150. }
  151. // ResetFromSynchronizer performs a reset in the KVDB getting the state from
  152. // synchronizerKVDB for the given batchNum.
  153. func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
  154. if synchronizerKVDB == nil {
  155. return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
  156. }
  157. currentPath := path.Join(kvdb.path, PathCurrent)
  158. if err := kvdb.db.Pebble().Close(); err != nil {
  159. return tracerr.Wrap(err)
  160. }
  161. // remove 'current'
  162. err := os.RemoveAll(currentPath)
  163. if err != nil {
  164. return tracerr.Wrap(err)
  165. }
  166. // remove all checkpoints
  167. list, err := kvdb.ListCheckpoints()
  168. if err != nil {
  169. return tracerr.Wrap(err)
  170. }
  171. for _, bn := range list {
  172. if err := kvdb.DeleteCheckpoint(common.BatchNum(bn)); err != nil {
  173. return tracerr.Wrap(err)
  174. }
  175. }
  176. if batchNum == 0 {
  177. // if batchNum == 0, open the new fresh 'current'
  178. sto, err := pebble.NewPebbleStorage(currentPath, false)
  179. if err != nil {
  180. return tracerr.Wrap(err)
  181. }
  182. kvdb.db = sto
  183. kvdb.CurrentIdx = 255
  184. kvdb.CurrentBatch = 0
  185. return nil
  186. }
  187. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  188. // use checkpoint from synchronizerKVDB
  189. synchronizerCheckpointPath := path.Join(synchronizerKVDB.path,
  190. fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  191. if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) {
  192. // if synchronizerKVDB does not have checkpoint at batchNum, return err
  193. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" not exist in Synchronizer",
  194. synchronizerCheckpointPath))
  195. }
  196. // copy synchronizer'BatchNumX' to 'BatchNumX'
  197. err = pebbleMakeCheckpoint(synchronizerCheckpointPath, checkpointPath)
  198. if err != nil {
  199. return tracerr.Wrap(err)
  200. }
  201. // copy 'BatchNumX' to 'current'
  202. err = pebbleMakeCheckpoint(checkpointPath, currentPath)
  203. if err != nil {
  204. return tracerr.Wrap(err)
  205. }
  206. // open the new 'current'
  207. sto, err := pebble.NewPebbleStorage(currentPath, false)
  208. if err != nil {
  209. return tracerr.Wrap(err)
  210. }
  211. kvdb.db = sto
  212. // get currentBatch num
  213. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  214. if err != nil {
  215. return tracerr.Wrap(err)
  216. }
  217. // get currentIdx
  218. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  219. if err != nil {
  220. return tracerr.Wrap(err)
  221. }
  222. return nil
  223. }
  224. // GetCurrentBatch returns the current BatchNum stored in the KVDB
  225. func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) {
  226. cbBytes, err := kvdb.db.Get(KeyCurrentBatch)
  227. if tracerr.Unwrap(err) == db.ErrNotFound {
  228. return 0, nil
  229. }
  230. if err != nil {
  231. return 0, tracerr.Wrap(err)
  232. }
  233. return common.BatchNumFromBytes(cbBytes)
  234. }
  235. // setCurrentBatch stores the current BatchNum in the KVDB
  236. func (kvdb *KVDB) setCurrentBatch() error {
  237. tx, err := kvdb.db.NewTx()
  238. if err != nil {
  239. return tracerr.Wrap(err)
  240. }
  241. err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes())
  242. if err != nil {
  243. return tracerr.Wrap(err)
  244. }
  245. if err := tx.Commit(); err != nil {
  246. return tracerr.Wrap(err)
  247. }
  248. return nil
  249. }
  250. // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
  251. // used for an Account in the KVDB.
  252. func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) {
  253. idxBytes, err := kvdb.db.Get(keyCurrentIdx)
  254. if tracerr.Unwrap(err) == db.ErrNotFound {
  255. return 0, nil
  256. }
  257. if err != nil {
  258. return 0, tracerr.Wrap(err)
  259. }
  260. return common.IdxFromBytes(idxBytes[:])
  261. }
  262. // SetCurrentIdx stores Idx in the KVDB
  263. func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error {
  264. kvdb.CurrentIdx = idx
  265. tx, err := kvdb.db.NewTx()
  266. if err != nil {
  267. return tracerr.Wrap(err)
  268. }
  269. idxBytes, err := idx.Bytes()
  270. if err != nil {
  271. return tracerr.Wrap(err)
  272. }
  273. err = tx.Put(keyCurrentIdx, idxBytes[:])
  274. if err != nil {
  275. return tracerr.Wrap(err)
  276. }
  277. if err := tx.Commit(); err != nil {
  278. return tracerr.Wrap(err)
  279. }
  280. return nil
  281. }
  282. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
  283. // Internally this advances & stores the current BatchNum, and then stores a
  284. // Checkpoint of the current state of the KVDB.
  285. func (kvdb *KVDB) MakeCheckpoint() error {
  286. // advance currentBatch
  287. kvdb.CurrentBatch++
  288. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch))
  289. if err := kvdb.setCurrentBatch(); err != nil {
  290. return tracerr.Wrap(err)
  291. }
  292. // if checkpoint BatchNum already exist in disk, delete it
  293. if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
  294. err := os.RemoveAll(checkpointPath)
  295. if err != nil {
  296. return tracerr.Wrap(err)
  297. }
  298. } else if err != nil && !os.IsNotExist(err) {
  299. return tracerr.Wrap(err)
  300. }
  301. // execute Checkpoint
  302. if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil {
  303. return tracerr.Wrap(err)
  304. }
  305. // delete old checkpoints
  306. if err := kvdb.deleteOldCheckpoints(); err != nil {
  307. return tracerr.Wrap(err)
  308. }
  309. return nil
  310. }
  311. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  312. func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  313. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  314. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  315. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  316. }
  317. return os.RemoveAll(checkpointPath)
  318. }
  319. // ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
  320. // If there's a gap between the list of checkpoints, an error is returned.
  321. func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
  322. files, err := ioutil.ReadDir(kvdb.path)
  323. if err != nil {
  324. return nil, tracerr.Wrap(err)
  325. }
  326. checkpoints := []int{}
  327. var checkpoint int
  328. pattern := fmt.Sprintf("%s%%d", PathBatchNum)
  329. for _, file := range files {
  330. fileName := file.Name()
  331. if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
  332. if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
  333. return nil, tracerr.Wrap(err)
  334. }
  335. checkpoints = append(checkpoints, checkpoint)
  336. }
  337. }
  338. sort.Ints(checkpoints)
  339. if len(checkpoints) > 0 {
  340. first := checkpoints[0]
  341. for _, checkpoint := range checkpoints[1:] {
  342. first++
  343. if checkpoint != first {
  344. log.Errorw("GAP", "checkpoints", checkpoints)
  345. return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
  346. }
  347. }
  348. }
  349. return checkpoints, nil
  350. }
  351. // deleteOldCheckpoints deletes old checkpoints when there are more than
  352. // `s.keep` checkpoints
  353. func (kvdb *KVDB) deleteOldCheckpoints() error {
  354. list, err := kvdb.ListCheckpoints()
  355. if err != nil {
  356. return tracerr.Wrap(err)
  357. }
  358. if len(list) > kvdb.keep {
  359. for _, checkpoint := range list[:len(list)-kvdb.keep] {
  360. if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
  361. return tracerr.Wrap(err)
  362. }
  363. }
  364. }
  365. return nil
  366. }
  367. func pebbleMakeCheckpoint(source, dest string) error {
  368. // Remove dest folder (if it exists) before doing the checkpoint
  369. if _, err := os.Stat(dest); !os.IsNotExist(err) {
  370. err := os.RemoveAll(dest)
  371. if err != nil {
  372. return tracerr.Wrap(err)
  373. }
  374. } else if err != nil && !os.IsNotExist(err) {
  375. return tracerr.Wrap(err)
  376. }
  377. sto, err := pebble.NewPebbleStorage(source, false)
  378. if err != nil {
  379. return tracerr.Wrap(err)
  380. }
  381. defer func() {
  382. errClose := sto.Pebble().Close()
  383. if errClose != nil {
  384. log.Errorw("Pebble.Close", "err", errClose)
  385. }
  386. }()
  387. // execute Checkpoint
  388. err = sto.Pebble().Checkpoint(dest)
  389. if err != nil {
  390. return tracerr.Wrap(err)
  391. }
  392. return nil
  393. }