You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
4.1 KiB

  1. package pebble
  2. import (
  3. "github.com/cockroachdb/pebble"
  4. "github.com/iden3/go-merkletree/db"
  5. log "github.com/sirupsen/logrus"
  6. )
  7. // Storage implements the db.Storage interface
  8. type Storage struct {
  9. pdb *pebble.DB
  10. prefix []byte
  11. }
  12. // StorageTx implements the db.Tx interface
  13. type StorageTx struct {
  14. *Storage
  15. batch *pebble.Batch
  16. }
  17. // NewPebbleStorage returns a new Storage
  18. func NewPebbleStorage(path string, errorIfMissing bool) (*Storage, error) {
  19. o := &pebble.Options{
  20. ErrorIfNotExists: errorIfMissing,
  21. }
  22. rdb, err := pebble.Open(path, o)
  23. if err != nil {
  24. return nil, err
  25. }
  26. return &Storage{rdb, []byte{}}, nil
  27. }
  28. // WithPrefix implements the method WithPrefix of the interface db.Storage
  29. func (p *Storage) WithPrefix(prefix []byte) db.Storage {
  30. return &Storage{p.pdb, db.Concat(p.prefix, prefix)}
  31. }
  32. // NewTx implements the method NewTx of the interface db.Storage
  33. func (p *Storage) NewTx() (db.Tx, error) {
  34. return &StorageTx{p, p.pdb.NewIndexedBatch()}, nil
  35. }
  36. // Get retreives a value from a key in the db.Storage
  37. func (p *Storage) Get(key []byte) ([]byte, error) {
  38. v, closer, err := p.pdb.Get(db.Concat(p.prefix, key[:]))
  39. if err == pebble.ErrNotFound {
  40. return nil, db.ErrNotFound
  41. }
  42. if err != nil {
  43. return nil, err
  44. }
  45. err = closer.Close()
  46. return v, err
  47. }
  48. //nolint:lll
  49. // https://github.com/cockroachdb/pebble/pull/923/files#diff-c2ade2f386c41794d5ebc57ee49b57a5fca8082e03255e5bff13977cbc061287R39
  50. func keyUpperBound(b []byte) []byte {
  51. end := make([]byte, len(b))
  52. copy(end, b)
  53. for i := len(end) - 1; i >= 0; i-- {
  54. end[i] = end[i] + 1
  55. if end[i] != 0 {
  56. return end[:i+1]
  57. }
  58. }
  59. return nil // no upper-bound
  60. }
  61. func prefixIterOptions(prefix []byte) *pebble.IterOptions {
  62. return &pebble.IterOptions{
  63. LowerBound: prefix,
  64. UpperBound: keyUpperBound(prefix),
  65. }
  66. }
  67. // Iterate implements the method Iterate of the interface db.Storage
  68. func (p *Storage) Iterate(f func([]byte, []byte) (bool, error)) (err error) {
  69. // NewIter already provides a point-in-time view of the current DB
  70. // state, but if is used for long term (is not the case), should use an
  71. // iterator over an snapshot:
  72. // snapshot := p.pdb.NewSnapshot()
  73. // defer snapshot.Close()
  74. // iter := snapshot.NewIter(nil)
  75. iter := p.pdb.NewIter(prefixIterOptions(p.prefix))
  76. defer func() {
  77. err1 := iter.Close()
  78. if err != nil {
  79. return
  80. }
  81. err = err1
  82. }()
  83. for iter.First(); iter.Valid(); iter.Next() {
  84. localKey := iter.Key()[len(p.prefix):]
  85. if cont, err := f(localKey, iter.Value()); err != nil {
  86. return err
  87. } else if !cont {
  88. break
  89. }
  90. }
  91. return iter.Error()
  92. }
  93. // Get retreives a value from a key in the interface db.Tx
  94. func (tx *StorageTx) Get(key []byte) ([]byte, error) {
  95. var err error
  96. fullkey := db.Concat(tx.prefix, key)
  97. v, closer, err := tx.batch.Get(fullkey)
  98. if err == pebble.ErrNotFound {
  99. return nil, db.ErrNotFound
  100. }
  101. if err != nil {
  102. return nil, err
  103. }
  104. err = closer.Close()
  105. return v, err
  106. }
  107. // Put saves a key:value into the db.Storage
  108. func (tx *StorageTx) Put(k, v []byte) error {
  109. return tx.batch.Set(db.Concat(tx.prefix, k[:]), v, nil)
  110. }
  111. // Add implements the method Add of the interface db.Tx
  112. func (tx *StorageTx) Add(atx db.Tx) error {
  113. patx := atx.(*StorageTx)
  114. return tx.batch.Apply(patx.batch, nil)
  115. }
  116. // Commit implements the method Commit of the interface db.Tx
  117. func (tx *StorageTx) Commit() error {
  118. return tx.batch.Commit(nil)
  119. }
  120. // Close implements the method Close of the interface db.Tx
  121. func (tx *StorageTx) Close() {
  122. _ = tx.batch.Close()
  123. }
  124. // Close implements the method Close of the interface db.Storage
  125. func (p *Storage) Close() {
  126. if err := p.pdb.Close(); err != nil {
  127. panic(err)
  128. }
  129. log.Info("Database closed")
  130. }
  131. // Pebble is an extra method that returns the *pebble.DB
  132. func (p *Storage) Pebble() *pebble.DB {
  133. return p.pdb
  134. }
  135. // List implements the method List of the interface db.Storage
  136. func (p *Storage) List(limit int) ([]db.KV, error) {
  137. ret := []db.KV{}
  138. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  139. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  140. if len(ret) == limit {
  141. return false, nil
  142. }
  143. return true, nil
  144. })
  145. return ret, err
  146. }