You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
4.4 KiB

  1. package pebble
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "github.com/cockroachdb/pebble"
  6. "github.com/iden3/go-merkletree/db"
  7. log "github.com/sirupsen/logrus"
  8. )
  9. // PebbleStorage implements the db.Storage interface
  10. type PebbleStorage struct {
  11. pdb *pebble.DB
  12. prefix []byte
  13. }
  14. // PebbleStorageTx implements the db.Tx interface
  15. type PebbleStorageTx struct {
  16. *PebbleStorage
  17. batch *pebble.Batch
  18. }
  19. // NewPebbleStorage returns a new PebbleStorage
  20. func NewPebbleStorage(path string, errorIfMissing bool) (*PebbleStorage, error) {
  21. o := &pebble.Options{
  22. ErrorIfNotExists: errorIfMissing,
  23. }
  24. rdb, err := pebble.Open(path, o)
  25. if err != nil {
  26. return nil, err
  27. }
  28. return &PebbleStorage{rdb, []byte{}}, nil
  29. }
  30. type storageInfo struct {
  31. KeyCount int
  32. ClaimCount int
  33. }
  34. // Info implements the method Info of the interface db.Storage
  35. func (p *PebbleStorage) Info() string {
  36. keycount := 0
  37. claimcount := 0
  38. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  39. if value[0] == byte(1) {
  40. claimcount++
  41. }
  42. keycount++
  43. return true, nil
  44. })
  45. if err != nil {
  46. return err.Error()
  47. }
  48. json, _ := json.MarshalIndent(
  49. storageInfo{
  50. KeyCount: keycount,
  51. ClaimCount: claimcount,
  52. },
  53. "", " ",
  54. )
  55. return string(json)
  56. }
  57. // WithPrefix implements the method WithPrefix of the interface db.Storage
  58. func (p *PebbleStorage) WithPrefix(prefix []byte) db.Storage {
  59. return &PebbleStorage{p.pdb, db.Concat(p.prefix, prefix)}
  60. }
  61. // NewTx implements the method NewTx of the interface db.Storage
  62. func (p *PebbleStorage) NewTx() (db.Tx, error) {
  63. return &PebbleStorageTx{p, p.pdb.NewIndexedBatch()}, nil
  64. }
  65. // Get retreives a value from a key in the db.Storage
  66. func (p *PebbleStorage) Get(key []byte) ([]byte, error) {
  67. v, closer, err := p.pdb.Get(db.Concat(p.prefix, key[:]))
  68. if err == pebble.ErrNotFound {
  69. return nil, db.ErrNotFound
  70. }
  71. closer.Close()
  72. return v, err
  73. }
  74. // Iterate implements the method Iterate of the interface db.Storage
  75. func (p *PebbleStorage) Iterate(f func([]byte, []byte) (bool, error)) error {
  76. // NewIter already provides a point-in-time view of the current DB
  77. // state, but if is used for long term (is not the case), should use an
  78. // iterator over an snapshot:
  79. // snapshot := p.pdb.NewSnapshot()
  80. // defer snapshot.Close()
  81. // iter := snapshot.NewIter(nil)
  82. iter := p.pdb.NewIter(nil)
  83. defer iter.Close()
  84. iter.First() // move the iterator to the first key/value pair
  85. if len(iter.Key()) < len(p.prefix) || !bytes.Equal(iter.Key()[:len(p.prefix)], p.prefix) {
  86. } else {
  87. localKey := iter.Key()[len(p.prefix):]
  88. if _, err := f(localKey, iter.Value()); err != nil {
  89. return err
  90. }
  91. }
  92. for iter.Next() {
  93. if len(iter.Key()) < len(p.prefix) || !bytes.Equal(iter.Key()[:len(p.prefix)], p.prefix) {
  94. continue
  95. }
  96. localKey := iter.Key()[len(p.prefix):]
  97. if cont, err := f(localKey, iter.Value()); err != nil {
  98. return err
  99. } else if !cont {
  100. break
  101. }
  102. }
  103. return iter.Error()
  104. }
  105. // Get retreives a value from a key in the interface db.Tx
  106. func (tx *PebbleStorageTx) Get(key []byte) ([]byte, error) {
  107. var err error
  108. fullkey := db.Concat(tx.prefix, key)
  109. v, closer, err := tx.batch.Get(fullkey)
  110. if err == pebble.ErrNotFound {
  111. return nil, db.ErrNotFound
  112. }
  113. closer.Close()
  114. return v, err
  115. }
  116. // Put saves a key:value into the db.Storage
  117. func (tx *PebbleStorageTx) Put(k, v []byte) error {
  118. return tx.batch.Set(db.Concat(tx.prefix, k[:]), v, nil)
  119. }
  120. // Add implements the method Add of the interface db.Tx
  121. func (tx *PebbleStorageTx) Add(atx db.Tx) error {
  122. patx := atx.(*PebbleStorageTx)
  123. return tx.batch.Apply(patx.batch, nil)
  124. }
  125. // Commit implements the method Commit of the interface db.Tx
  126. func (tx *PebbleStorageTx) Commit() error {
  127. return tx.batch.Commit(nil)
  128. }
  129. // Close implements the method Close of the interface db.Tx
  130. func (tx *PebbleStorageTx) Close() {
  131. _ = tx.batch.Close()
  132. }
  133. // Close implements the method Close of the interface db.Storage
  134. func (p *PebbleStorage) Close() {
  135. if err := p.pdb.Close(); err != nil {
  136. panic(err)
  137. }
  138. log.Info("Database closed")
  139. }
  140. // Pebble is an extra method that returns the *pebble.DB
  141. func (p *PebbleStorage) Pebble() *pebble.DB {
  142. return p.pdb
  143. }
  144. // List implements the method List of the interface db.Storage
  145. func (p *PebbleStorage) List(limit int) ([]db.KV, error) {
  146. ret := []db.KV{}
  147. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  148. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  149. if len(ret) == limit {
  150. return false, nil
  151. }
  152. return true, nil
  153. })
  154. return ret, err
  155. }