You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

164 lines
4.0 KiB

  1. package pebble
  2. import (
  3. "github.com/cockroachdb/pebble"
  4. "github.com/iden3/go-merkletree/db"
  5. )
  6. // Storage implements the db.Storage interface
  7. type Storage struct {
  8. pdb *pebble.DB
  9. prefix []byte
  10. }
  11. // StorageTx implements the db.Tx interface
  12. type StorageTx struct {
  13. *Storage
  14. batch *pebble.Batch
  15. }
  16. // NewPebbleStorage returns a new Storage
  17. func NewPebbleStorage(path string, errorIfMissing bool) (*Storage, error) {
  18. o := &pebble.Options{
  19. ErrorIfNotExists: errorIfMissing,
  20. }
  21. rdb, err := pebble.Open(path, o)
  22. if err != nil {
  23. return nil, err
  24. }
  25. return &Storage{rdb, []byte{}}, nil
  26. }
  27. // WithPrefix implements the method WithPrefix of the interface db.Storage
  28. func (p *Storage) WithPrefix(prefix []byte) db.Storage {
  29. return &Storage{p.pdb, db.Concat(p.prefix, prefix)}
  30. }
  31. // NewTx implements the method NewTx of the interface db.Storage
  32. func (p *Storage) NewTx() (db.Tx, error) {
  33. return &StorageTx{p, p.pdb.NewIndexedBatch()}, nil
  34. }
  35. // Get retreives a value from a key in the db.Storage
  36. func (p *Storage) Get(key []byte) ([]byte, error) {
  37. v, closer, err := p.pdb.Get(db.Concat(p.prefix, key[:]))
  38. if err == pebble.ErrNotFound {
  39. return nil, db.ErrNotFound
  40. }
  41. if err != nil {
  42. return nil, err
  43. }
  44. err = closer.Close()
  45. return v, err
  46. }
  47. //nolint:lll
  48. // https://github.com/cockroachdb/pebble/pull/923/files#diff-c2ade2f386c41794d5ebc57ee49b57a5fca8082e03255e5bff13977cbc061287R39
  49. func keyUpperBound(b []byte) []byte {
  50. end := make([]byte, len(b))
  51. copy(end, b)
  52. for i := len(end) - 1; i >= 0; i-- {
  53. end[i] = end[i] + 1
  54. if end[i] != 0 {
  55. return end[:i+1]
  56. }
  57. }
  58. return nil // no upper-bound
  59. }
  60. func prefixIterOptions(prefix []byte) *pebble.IterOptions {
  61. return &pebble.IterOptions{
  62. LowerBound: prefix,
  63. UpperBound: keyUpperBound(prefix),
  64. }
  65. }
  66. // Iterate implements the method Iterate of the interface db.Storage
  67. func (p *Storage) Iterate(f func([]byte, []byte) (bool, error)) (err error) {
  68. // NewIter already provides a point-in-time view of the current DB
  69. // state, but if is used for long term (is not the case), should use an
  70. // iterator over an snapshot:
  71. // snapshot := p.pdb.NewSnapshot()
  72. // defer snapshot.Close()
  73. // iter := snapshot.NewIter(nil)
  74. iter := p.pdb.NewIter(prefixIterOptions(p.prefix))
  75. defer func() {
  76. err1 := iter.Close()
  77. if err != nil {
  78. return
  79. }
  80. err = err1
  81. }()
  82. for iter.First(); iter.Valid(); iter.Next() {
  83. localKey := iter.Key()[len(p.prefix):]
  84. if cont, err := f(localKey, iter.Value()); err != nil {
  85. return err
  86. } else if !cont {
  87. break
  88. }
  89. }
  90. return iter.Error()
  91. }
  92. // Get retreives a value from a key in the interface db.Tx
  93. func (tx *StorageTx) Get(key []byte) ([]byte, error) {
  94. var err error
  95. fullkey := db.Concat(tx.prefix, key)
  96. v, closer, err := tx.batch.Get(fullkey)
  97. if err == pebble.ErrNotFound {
  98. return nil, db.ErrNotFound
  99. }
  100. if err != nil {
  101. return nil, err
  102. }
  103. err = closer.Close()
  104. return v, err
  105. }
  106. // Put saves a key:value into the db.Storage
  107. func (tx *StorageTx) Put(k, v []byte) error {
  108. return tx.batch.Set(db.Concat(tx.prefix, k[:]), v, nil)
  109. }
  110. // Add implements the method Add of the interface db.Tx
  111. func (tx *StorageTx) Add(atx db.Tx) error {
  112. patx := atx.(*StorageTx)
  113. return tx.batch.Apply(patx.batch, nil)
  114. }
  115. // Commit implements the method Commit of the interface db.Tx
  116. func (tx *StorageTx) Commit() error {
  117. return tx.batch.Commit(nil)
  118. }
  119. // Close implements the method Close of the interface db.Tx
  120. func (tx *StorageTx) Close() {
  121. _ = tx.batch.Close()
  122. }
  123. // Close implements the method Close of the interface db.Storage
  124. func (p *Storage) Close() {
  125. if err := p.pdb.Close(); err != nil {
  126. panic(err)
  127. }
  128. }
  129. // Pebble is an extra method that returns the *pebble.DB
  130. func (p *Storage) Pebble() *pebble.DB {
  131. return p.pdb
  132. }
  133. // List implements the method List of the interface db.Storage
  134. func (p *Storage) List(limit int) ([]db.KV, error) {
  135. ret := []db.KV{}
  136. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  137. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  138. if len(ret) == limit {
  139. return false, nil
  140. }
  141. return true, nil
  142. })
  143. return ret, err
  144. }