You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
4.6 KiB

  1. package pebble
  2. import (
  3. "encoding/json"
  4. "github.com/cockroachdb/pebble"
  5. "github.com/iden3/go-merkletree/db"
  6. log "github.com/sirupsen/logrus"
  7. )
  8. // PebbleStorage implements the db.Storage interface
  9. type PebbleStorage struct {
  10. pdb *pebble.DB
  11. prefix []byte
  12. }
  13. // PebbleStorageTx implements the db.Tx interface
  14. type PebbleStorageTx struct {
  15. *PebbleStorage
  16. batch *pebble.Batch
  17. }
  18. // NewPebbleStorage returns a new PebbleStorage
  19. func NewPebbleStorage(path string, errorIfMissing bool) (*PebbleStorage, error) {
  20. o := &pebble.Options{
  21. ErrorIfNotExists: errorIfMissing,
  22. }
  23. rdb, err := pebble.Open(path, o)
  24. if err != nil {
  25. return nil, err
  26. }
  27. return &PebbleStorage{rdb, []byte{}}, nil
  28. }
  29. type storageInfo struct {
  30. KeyCount int
  31. ClaimCount int
  32. }
  33. // Info implements the method Info of the interface db.Storage
  34. func (p *PebbleStorage) Info() string {
  35. keycount := 0
  36. claimcount := 0
  37. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  38. if value[0] == byte(1) {
  39. claimcount++
  40. }
  41. keycount++
  42. return true, nil
  43. })
  44. if err != nil {
  45. return err.Error()
  46. }
  47. json, _ := json.MarshalIndent(
  48. storageInfo{
  49. KeyCount: keycount,
  50. ClaimCount: claimcount,
  51. },
  52. "", " ",
  53. )
  54. return string(json)
  55. }
  56. // WithPrefix implements the method WithPrefix of the interface db.Storage
  57. func (p *PebbleStorage) WithPrefix(prefix []byte) db.Storage {
  58. return &PebbleStorage{p.pdb, db.Concat(p.prefix, prefix)}
  59. }
  60. // NewTx implements the method NewTx of the interface db.Storage
  61. func (p *PebbleStorage) NewTx() (db.Tx, error) {
  62. return &PebbleStorageTx{p, p.pdb.NewIndexedBatch()}, nil
  63. }
  64. // Get retreives a value from a key in the db.Storage
  65. func (p *PebbleStorage) Get(key []byte) ([]byte, error) {
  66. v, closer, err := p.pdb.Get(db.Concat(p.prefix, key[:]))
  67. if err == pebble.ErrNotFound {
  68. return nil, db.ErrNotFound
  69. }
  70. closer.Close()
  71. return v, err
  72. }
  73. // https://github.com/cockroachdb/pebble/pull/923/files#diff-c2ade2f386c41794d5ebc57ee49b57a5fca8082e03255e5bff13977cbc061287R39
  74. func keyUpperBound(b []byte) []byte {
  75. end := make([]byte, len(b))
  76. copy(end, b)
  77. for i := len(end) - 1; i >= 0; i-- {
  78. end[i] = end[i] + 1
  79. if end[i] != 0 {
  80. return end[:i+1]
  81. }
  82. }
  83. return nil // no upper-bound
  84. }
  85. func prefixIterOptions(prefix []byte) *pebble.IterOptions {
  86. return &pebble.IterOptions{
  87. LowerBound: prefix,
  88. UpperBound: keyUpperBound(prefix),
  89. }
  90. }
  91. // Iterate implements the method Iterate of the interface db.Storage
  92. func (p *PebbleStorage) Iterate(f func([]byte, []byte) (bool, error)) error {
  93. // NewIter already provides a point-in-time view of the current DB
  94. // state, but if is used for long term (is not the case), should use an
  95. // iterator over an snapshot:
  96. // snapshot := p.pdb.NewSnapshot()
  97. // defer snapshot.Close()
  98. // iter := snapshot.NewIter(nil)
  99. iter := p.pdb.NewIter(prefixIterOptions(p.prefix))
  100. defer iter.Close()
  101. for iter.First(); iter.Valid(); iter.Next() {
  102. localKey := iter.Key()[len(p.prefix):]
  103. if cont, err := f(localKey, iter.Value()); err != nil {
  104. return err
  105. } else if !cont {
  106. break
  107. }
  108. }
  109. return iter.Error()
  110. }
  111. // Get retreives a value from a key in the interface db.Tx
  112. func (tx *PebbleStorageTx) Get(key []byte) ([]byte, error) {
  113. var err error
  114. fullkey := db.Concat(tx.prefix, key)
  115. v, closer, err := tx.batch.Get(fullkey)
  116. if err == pebble.ErrNotFound {
  117. return nil, db.ErrNotFound
  118. }
  119. closer.Close()
  120. return v, err
  121. }
  122. // Put saves a key:value into the db.Storage
  123. func (tx *PebbleStorageTx) Put(k, v []byte) error {
  124. return tx.batch.Set(db.Concat(tx.prefix, k[:]), v, nil)
  125. }
  126. // Add implements the method Add of the interface db.Tx
  127. func (tx *PebbleStorageTx) Add(atx db.Tx) error {
  128. patx := atx.(*PebbleStorageTx)
  129. return tx.batch.Apply(patx.batch, nil)
  130. }
  131. // Commit implements the method Commit of the interface db.Tx
  132. func (tx *PebbleStorageTx) Commit() error {
  133. return tx.batch.Commit(nil)
  134. }
  135. // Close implements the method Close of the interface db.Tx
  136. func (tx *PebbleStorageTx) Close() {
  137. _ = tx.batch.Close()
  138. }
  139. // Close implements the method Close of the interface db.Storage
  140. func (p *PebbleStorage) Close() {
  141. if err := p.pdb.Close(); err != nil {
  142. panic(err)
  143. }
  144. log.Info("Database closed")
  145. }
  146. // Pebble is an extra method that returns the *pebble.DB
  147. func (p *PebbleStorage) Pebble() *pebble.DB {
  148. return p.pdb
  149. }
  150. // List implements the method List of the interface db.Storage
  151. func (p *PebbleStorage) List(limit int) ([]db.KV, error) {
  152. ret := []db.KV{}
  153. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  154. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  155. if len(ret) == limit {
  156. return false, nil
  157. }
  158. return true, nil
  159. })
  160. return ret, err
  161. }