You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
4.6 KiB

  1. package pebble
  2. import (
  3. "encoding/json"
  4. "github.com/cockroachdb/pebble"
  5. "github.com/iden3/go-merkletree/db"
  6. log "github.com/sirupsen/logrus"
  7. )
  8. // Storage implements the db.Storage interface
  9. type Storage struct {
  10. pdb *pebble.DB
  11. prefix []byte
  12. }
  13. // StorageTx implements the db.Tx interface
  14. type StorageTx struct {
  15. *Storage
  16. batch *pebble.Batch
  17. }
  18. // NewPebbleStorage returns a new Storage
  19. func NewPebbleStorage(path string, errorIfMissing bool) (*Storage, error) {
  20. o := &pebble.Options{
  21. ErrorIfNotExists: errorIfMissing,
  22. }
  23. rdb, err := pebble.Open(path, o)
  24. if err != nil {
  25. return nil, err
  26. }
  27. return &Storage{rdb, []byte{}}, nil
  28. }
  29. type storageInfo struct {
  30. KeyCount int
  31. ClaimCount int
  32. }
  33. // Info implements the method Info of the interface db.Storage
  34. func (p *Storage) Info() string {
  35. keycount := 0
  36. claimcount := 0
  37. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  38. if value[0] == byte(1) {
  39. claimcount++
  40. }
  41. keycount++
  42. return true, nil
  43. })
  44. if err != nil {
  45. return err.Error()
  46. }
  47. json, _ := json.MarshalIndent(
  48. storageInfo{
  49. KeyCount: keycount,
  50. ClaimCount: claimcount,
  51. },
  52. "", " ",
  53. )
  54. return string(json)
  55. }
  56. // WithPrefix implements the method WithPrefix of the interface db.Storage
  57. func (p *Storage) WithPrefix(prefix []byte) db.Storage {
  58. return &Storage{p.pdb, db.Concat(p.prefix, prefix)}
  59. }
  60. // NewTx implements the method NewTx of the interface db.Storage
  61. func (p *Storage) NewTx() (db.Tx, error) {
  62. return &StorageTx{p, p.pdb.NewIndexedBatch()}, nil
  63. }
  64. // Get retreives a value from a key in the db.Storage
  65. func (p *Storage) Get(key []byte) ([]byte, error) {
  66. v, closer, err := p.pdb.Get(db.Concat(p.prefix, key[:]))
  67. if err == pebble.ErrNotFound {
  68. return nil, db.ErrNotFound
  69. }
  70. if err != nil {
  71. return nil, err
  72. }
  73. err = closer.Close()
  74. return v, err
  75. }
  76. //nolint:lll
  77. // https://github.com/cockroachdb/pebble/pull/923/files#diff-c2ade2f386c41794d5ebc57ee49b57a5fca8082e03255e5bff13977cbc061287R39
  78. func keyUpperBound(b []byte) []byte {
  79. end := make([]byte, len(b))
  80. copy(end, b)
  81. for i := len(end) - 1; i >= 0; i-- {
  82. end[i] = end[i] + 1
  83. if end[i] != 0 {
  84. return end[:i+1]
  85. }
  86. }
  87. return nil // no upper-bound
  88. }
  89. func prefixIterOptions(prefix []byte) *pebble.IterOptions {
  90. return &pebble.IterOptions{
  91. LowerBound: prefix,
  92. UpperBound: keyUpperBound(prefix),
  93. }
  94. }
  95. // Iterate implements the method Iterate of the interface db.Storage
  96. func (p *Storage) Iterate(f func([]byte, []byte) (bool, error)) (err error) {
  97. // NewIter already provides a point-in-time view of the current DB
  98. // state, but if is used for long term (is not the case), should use an
  99. // iterator over an snapshot:
  100. // snapshot := p.pdb.NewSnapshot()
  101. // defer snapshot.Close()
  102. // iter := snapshot.NewIter(nil)
  103. iter := p.pdb.NewIter(prefixIterOptions(p.prefix))
  104. defer func() {
  105. err1 := iter.Close()
  106. if err != nil {
  107. return
  108. }
  109. err = err1
  110. }()
  111. for iter.First(); iter.Valid(); iter.Next() {
  112. localKey := iter.Key()[len(p.prefix):]
  113. if cont, err := f(localKey, iter.Value()); err != nil {
  114. return err
  115. } else if !cont {
  116. break
  117. }
  118. }
  119. return iter.Error()
  120. }
  121. // Get retreives a value from a key in the interface db.Tx
  122. func (tx *StorageTx) Get(key []byte) ([]byte, error) {
  123. var err error
  124. fullkey := db.Concat(tx.prefix, key)
  125. v, closer, err := tx.batch.Get(fullkey)
  126. if err == pebble.ErrNotFound {
  127. return nil, db.ErrNotFound
  128. }
  129. if err != nil {
  130. return nil, err
  131. }
  132. err = closer.Close()
  133. return v, err
  134. }
  135. // Put saves a key:value into the db.Storage
  136. func (tx *StorageTx) Put(k, v []byte) error {
  137. return tx.batch.Set(db.Concat(tx.prefix, k[:]), v, nil)
  138. }
  139. // Add implements the method Add of the interface db.Tx
  140. func (tx *StorageTx) Add(atx db.Tx) error {
  141. patx := atx.(*StorageTx)
  142. return tx.batch.Apply(patx.batch, nil)
  143. }
  144. // Commit implements the method Commit of the interface db.Tx
  145. func (tx *StorageTx) Commit() error {
  146. return tx.batch.Commit(nil)
  147. }
  148. // Close implements the method Close of the interface db.Tx
  149. func (tx *StorageTx) Close() {
  150. _ = tx.batch.Close()
  151. }
  152. // Close implements the method Close of the interface db.Storage
  153. func (p *Storage) Close() {
  154. if err := p.pdb.Close(); err != nil {
  155. panic(err)
  156. }
  157. log.Info("Database closed")
  158. }
  159. // Pebble is an extra method that returns the *pebble.DB
  160. func (p *Storage) Pebble() *pebble.DB {
  161. return p.pdb
  162. }
  163. // List implements the method List of the interface db.Storage
  164. func (p *Storage) List(limit int) ([]db.KV, error) {
  165. ret := []db.KV{}
  166. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  167. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  168. if len(ret) == limit {
  169. return false, nil
  170. }
  171. return true, nil
  172. })
  173. return ret, err
  174. }