You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
11 KiB

  1. // Package kvdb provides a key-value database with Checkpoints & Resets system
  2. package kvdb
  3. import (
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path"
  8. "sort"
  9. "strings"
  10. "github.com/hermeznetwork/hermez-node/common"
  11. "github.com/hermeznetwork/hermez-node/log"
  12. "github.com/hermeznetwork/tracerr"
  13. "github.com/iden3/go-merkletree/db"
  14. "github.com/iden3/go-merkletree/db/pebble"
  15. )
  16. const (
  17. // PathBatchNum defines the subpath of the Batch Checkpoint in the
  18. // subpath of the KVDB
  19. PathBatchNum = "BatchNum"
  20. // PathCurrent defines the subpath of the current Batch in the subpath
  21. // of the KVDB
  22. PathCurrent = "current"
  23. )
  24. var (
  25. // KeyCurrentBatch is used as key in the db to store the current BatchNum
  26. KeyCurrentBatch = []byte("k:currentbatch")
  27. // keyCurrentIdx is used as key in the db to store the CurrentIdx
  28. keyCurrentIdx = []byte("k:idx")
  29. )
  30. // KVDB represents the Key-Value DB object
  31. type KVDB struct {
  32. path string
  33. db *pebble.Storage
  34. // CurrentIdx holds the current Idx that the BatchBuilder is using
  35. CurrentIdx common.Idx
  36. CurrentBatch common.BatchNum
  37. keep int
  38. }
  39. // NewKVDB creates a new KVDB, allowing to use an in-memory or in-disk storage.
  40. // Checkpoints older than the value defined by `keep` will be deleted.
  41. func NewKVDB(pathDB string, keep int) (*KVDB, error) {
  42. var sto *pebble.Storage
  43. var err error
  44. sto, err = pebble.NewPebbleStorage(path.Join(pathDB, PathCurrent), false)
  45. if err != nil {
  46. return nil, tracerr.Wrap(err)
  47. }
  48. kvdb := &KVDB{
  49. path: pathDB,
  50. db: sto,
  51. keep: keep,
  52. }
  53. // load currentBatch
  54. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  55. if err != nil {
  56. return nil, tracerr.Wrap(err)
  57. }
  58. // make reset (get checkpoint) at currentBatch
  59. err = kvdb.reset(kvdb.CurrentBatch, false)
  60. if err != nil {
  61. return nil, tracerr.Wrap(err)
  62. }
  63. return kvdb, nil
  64. }
  65. // DB returns the *pebble.Storage from the KVDB
  66. func (kvdb *KVDB) DB() *pebble.Storage {
  67. return kvdb.db
  68. }
  69. // StorageWithPrefix returns the db.Storage with the given prefix from the
  70. // current KVDB
  71. func (kvdb *KVDB) StorageWithPrefix(prefix []byte) db.Storage {
  72. return kvdb.db.WithPrefix(prefix)
  73. }
  74. // Reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  75. // not delete the checkpoints between old current and the new current, those
  76. // checkpoints will remain in the storage, and eventually will be deleted when
  77. // MakeCheckpoint overwrites them.
  78. func (kvdb *KVDB) Reset(batchNum common.BatchNum) error {
  79. return kvdb.reset(batchNum, true)
  80. }
  81. // reset resets the KVDB to the checkpoint at the given batchNum. Reset does
  82. // not delete the checkpoints between old current and the new current, those
  83. // checkpoints will remain in the storage, and eventually will be deleted when
  84. // MakeCheckpoint overwrites them. `closeCurrent` will close the currently
  85. // opened db before doing the reset.
  86. func (kvdb *KVDB) reset(batchNum common.BatchNum, closeCurrent bool) error {
  87. currentPath := path.Join(kvdb.path, PathCurrent)
  88. if closeCurrent {
  89. if err := kvdb.db.Pebble().Close(); err != nil {
  90. return tracerr.Wrap(err)
  91. }
  92. }
  93. // remove 'current'
  94. err := os.RemoveAll(currentPath)
  95. if err != nil {
  96. return tracerr.Wrap(err)
  97. }
  98. // remove all checkpoints > batchNum
  99. for i := batchNum + 1; i <= kvdb.CurrentBatch; i++ {
  100. if err := kvdb.DeleteCheckpoint(i); err != nil {
  101. return tracerr.Wrap(err)
  102. }
  103. }
  104. if batchNum == 0 {
  105. // if batchNum == 0, open the new fresh 'current'
  106. sto, err := pebble.NewPebbleStorage(currentPath, false)
  107. if err != nil {
  108. return tracerr.Wrap(err)
  109. }
  110. kvdb.db = sto
  111. kvdb.CurrentIdx = 255
  112. kvdb.CurrentBatch = batchNum
  113. return nil
  114. }
  115. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  116. // copy 'BatchNumX' to 'current'
  117. err = pebbleMakeCheckpoint(checkpointPath, currentPath)
  118. if err != nil {
  119. return tracerr.Wrap(err)
  120. }
  121. // open the new 'current'
  122. sto, err := pebble.NewPebbleStorage(currentPath, false)
  123. if err != nil {
  124. return tracerr.Wrap(err)
  125. }
  126. kvdb.db = sto
  127. // get currentBatch num
  128. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  129. if err != nil {
  130. return tracerr.Wrap(err)
  131. }
  132. // idx is obtained from the statedb reset
  133. kvdb.CurrentIdx, err = kvdb.GetCurrentIdx()
  134. if err != nil {
  135. return tracerr.Wrap(err)
  136. }
  137. return nil
  138. }
  139. // ResetFromSynchronizer performs a reset in the KVDB getting the state from
  140. // synchronizerKVDB for the given batchNum.
  141. func (kvdb *KVDB) ResetFromSynchronizer(batchNum common.BatchNum, synchronizerKVDB *KVDB) error {
  142. if synchronizerKVDB == nil {
  143. return tracerr.Wrap(fmt.Errorf("synchronizerKVDB can not be nil"))
  144. }
  145. if batchNum == 0 {
  146. kvdb.CurrentIdx = 0
  147. return nil
  148. }
  149. synchronizerCheckpointPath := path.Join(synchronizerKVDB.path,
  150. fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  151. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  152. currentPath := path.Join(kvdb.path, PathCurrent)
  153. // use checkpoint from synchronizerKVDB
  154. if _, err := os.Stat(synchronizerCheckpointPath); os.IsNotExist(err) {
  155. // if synchronizerKVDB does not have checkpoint at batchNum, return err
  156. return tracerr.Wrap(fmt.Errorf("Checkpoint \"%v\" not exist in Synchronizer",
  157. synchronizerCheckpointPath))
  158. }
  159. if err := kvdb.db.Pebble().Close(); err != nil {
  160. return tracerr.Wrap(err)
  161. }
  162. // remove 'current'
  163. err := os.RemoveAll(currentPath)
  164. if err != nil {
  165. return tracerr.Wrap(err)
  166. }
  167. // copy synchronizer'BatchNumX' to 'current'
  168. err = pebbleMakeCheckpoint(synchronizerCheckpointPath, currentPath)
  169. if err != nil {
  170. return tracerr.Wrap(err)
  171. }
  172. // copy synchronizer'BatchNumX' to 'BatchNumX'
  173. err = pebbleMakeCheckpoint(synchronizerCheckpointPath, checkpointPath)
  174. if err != nil {
  175. return tracerr.Wrap(err)
  176. }
  177. // open the new 'current'
  178. sto, err := pebble.NewPebbleStorage(currentPath, false)
  179. if err != nil {
  180. return tracerr.Wrap(err)
  181. }
  182. kvdb.db = sto
  183. // get currentBatch num
  184. kvdb.CurrentBatch, err = kvdb.GetCurrentBatch()
  185. if err != nil {
  186. return tracerr.Wrap(err)
  187. }
  188. return nil
  189. }
  190. // GetCurrentBatch returns the current BatchNum stored in the KVDB
  191. func (kvdb *KVDB) GetCurrentBatch() (common.BatchNum, error) {
  192. cbBytes, err := kvdb.db.Get(KeyCurrentBatch)
  193. if tracerr.Unwrap(err) == db.ErrNotFound {
  194. return 0, nil
  195. }
  196. if err != nil {
  197. return 0, tracerr.Wrap(err)
  198. }
  199. return common.BatchNumFromBytes(cbBytes)
  200. }
  201. // setCurrentBatch stores the current BatchNum in the KVDB
  202. func (kvdb *KVDB) setCurrentBatch() error {
  203. tx, err := kvdb.db.NewTx()
  204. if err != nil {
  205. return tracerr.Wrap(err)
  206. }
  207. err = tx.Put(KeyCurrentBatch, kvdb.CurrentBatch.Bytes())
  208. if err != nil {
  209. return tracerr.Wrap(err)
  210. }
  211. if err := tx.Commit(); err != nil {
  212. return tracerr.Wrap(err)
  213. }
  214. return nil
  215. }
  216. // GetCurrentIdx returns the stored Idx from the KVDB, which is the last Idx
  217. // used for an Account in the KVDB.
  218. func (kvdb *KVDB) GetCurrentIdx() (common.Idx, error) {
  219. idxBytes, err := kvdb.db.Get(keyCurrentIdx)
  220. if tracerr.Unwrap(err) == db.ErrNotFound {
  221. return 0, nil
  222. }
  223. if err != nil {
  224. return 0, tracerr.Wrap(err)
  225. }
  226. return common.IdxFromBytes(idxBytes[:])
  227. }
  228. // SetCurrentIdx stores Idx in the KVDB
  229. func (kvdb *KVDB) SetCurrentIdx(idx common.Idx) error {
  230. kvdb.CurrentIdx = idx
  231. tx, err := kvdb.db.NewTx()
  232. if err != nil {
  233. return tracerr.Wrap(err)
  234. }
  235. idxBytes, err := idx.Bytes()
  236. if err != nil {
  237. return tracerr.Wrap(err)
  238. }
  239. err = tx.Put(keyCurrentIdx, idxBytes[:])
  240. if err != nil {
  241. return tracerr.Wrap(err)
  242. }
  243. if err := tx.Commit(); err != nil {
  244. return tracerr.Wrap(err)
  245. }
  246. return nil
  247. }
  248. // MakeCheckpoint does a checkpoint at the given batchNum in the defined path.
  249. // Internally this advances & stores the current BatchNum, and then stores a
  250. // Checkpoint of the current state of the KVDB.
  251. func (kvdb *KVDB) MakeCheckpoint() error {
  252. // advance currentBatch
  253. kvdb.CurrentBatch++
  254. log.Debugw("Making KVDB checkpoint", "batch", kvdb.CurrentBatch)
  255. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, kvdb.CurrentBatch))
  256. if err := kvdb.setCurrentBatch(); err != nil {
  257. return tracerr.Wrap(err)
  258. }
  259. // if checkpoint BatchNum already exist in disk, delete it
  260. if _, err := os.Stat(checkpointPath); !os.IsNotExist(err) {
  261. err := os.RemoveAll(checkpointPath)
  262. if err != nil {
  263. return tracerr.Wrap(err)
  264. }
  265. } else if err != nil && !os.IsNotExist(err) {
  266. return tracerr.Wrap(err)
  267. }
  268. // execute Checkpoint
  269. if err := kvdb.db.Pebble().Checkpoint(checkpointPath); err != nil {
  270. return tracerr.Wrap(err)
  271. }
  272. // delete old checkpoints
  273. if err := kvdb.deleteOldCheckpoints(); err != nil {
  274. return tracerr.Wrap(err)
  275. }
  276. return nil
  277. }
  278. // DeleteCheckpoint removes if exist the checkpoint of the given batchNum
  279. func (kvdb *KVDB) DeleteCheckpoint(batchNum common.BatchNum) error {
  280. checkpointPath := path.Join(kvdb.path, fmt.Sprintf("%s%d", PathBatchNum, batchNum))
  281. if _, err := os.Stat(checkpointPath); os.IsNotExist(err) {
  282. return tracerr.Wrap(fmt.Errorf("Checkpoint with batchNum %d does not exist in DB", batchNum))
  283. }
  284. return os.RemoveAll(checkpointPath)
  285. }
  286. // ListCheckpoints returns the list of batchNums of the checkpoints, sorted.
  287. // If there's a gap between the list of checkpoints, an error is returned.
  288. func (kvdb *KVDB) ListCheckpoints() ([]int, error) {
  289. files, err := ioutil.ReadDir(kvdb.path)
  290. if err != nil {
  291. return nil, tracerr.Wrap(err)
  292. }
  293. checkpoints := []int{}
  294. var checkpoint int
  295. pattern := fmt.Sprintf("%s%%d", PathBatchNum)
  296. for _, file := range files {
  297. fileName := file.Name()
  298. if file.IsDir() && strings.HasPrefix(fileName, PathBatchNum) {
  299. if _, err := fmt.Sscanf(fileName, pattern, &checkpoint); err != nil {
  300. return nil, tracerr.Wrap(err)
  301. }
  302. checkpoints = append(checkpoints, checkpoint)
  303. }
  304. }
  305. sort.Ints(checkpoints)
  306. if len(checkpoints) > 0 {
  307. first := checkpoints[0]
  308. for _, checkpoint := range checkpoints[1:] {
  309. first++
  310. if checkpoint != first {
  311. return nil, tracerr.Wrap(fmt.Errorf("checkpoint gap at %v", checkpoint))
  312. }
  313. }
  314. }
  315. return checkpoints, nil
  316. }
  317. // deleteOldCheckpoints deletes old checkpoints when there are more than
  318. // `s.keep` checkpoints
  319. func (kvdb *KVDB) deleteOldCheckpoints() error {
  320. list, err := kvdb.ListCheckpoints()
  321. if err != nil {
  322. return tracerr.Wrap(err)
  323. }
  324. if len(list) > kvdb.keep {
  325. for _, checkpoint := range list[:len(list)-kvdb.keep] {
  326. if err := kvdb.DeleteCheckpoint(common.BatchNum(checkpoint)); err != nil {
  327. return tracerr.Wrap(err)
  328. }
  329. }
  330. }
  331. return nil
  332. }
  333. func pebbleMakeCheckpoint(source, dest string) error {
  334. // Remove dest folder (if it exists) before doing the checkpoint
  335. if _, err := os.Stat(dest); !os.IsNotExist(err) {
  336. err := os.RemoveAll(dest)
  337. if err != nil {
  338. return tracerr.Wrap(err)
  339. }
  340. } else if err != nil && !os.IsNotExist(err) {
  341. return tracerr.Wrap(err)
  342. }
  343. sto, err := pebble.NewPebbleStorage(source, false)
  344. if err != nil {
  345. return tracerr.Wrap(err)
  346. }
  347. defer func() {
  348. errClose := sto.Pebble().Close()
  349. if errClose != nil {
  350. log.Errorw("Pebble.Close", "err", errClose)
  351. }
  352. }()
  353. // execute Checkpoint
  354. err = sto.Pebble().Checkpoint(dest)
  355. if err != nil {
  356. return tracerr.Wrap(err)
  357. }
  358. return nil
  359. }