You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

193 lines
4.7 KiB

  1. package pebble
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "github.com/cockroachdb/pebble"
  6. "github.com/iden3/go-merkletree/db"
  7. log "github.com/sirupsen/logrus"
  8. )
  9. // PebbleStorage implements the db.Storage interface
  10. type PebbleStorage struct {
  11. pdb *pebble.DB
  12. prefix []byte
  13. }
  14. // PebbleStorageTx implements the db.Tx interface
  15. type PebbleStorageTx struct {
  16. // FUTURE currently Tx is using the same strategy than in MemoryDB and
  17. // LevelDB, in next iteration can be moved to Pebble Batch strategy
  18. *PebbleStorage
  19. cache db.KvMap
  20. }
  21. // NewPebbleStorage returns a new PebbleStorage
  22. func NewPebbleStorage(path string, errorIfMissing bool) (*PebbleStorage, error) {
  23. o := &pebble.Options{
  24. ErrorIfNotExists: errorIfMissing,
  25. }
  26. rdb, err := pebble.Open(path, o)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &PebbleStorage{rdb, []byte{}}, nil
  31. }
  32. type storageInfo struct {
  33. KeyCount int
  34. ClaimCount int
  35. }
  36. // Info implements the method Info of the interface db.Storage
  37. func (p *PebbleStorage) Info() string {
  38. keycount := 0
  39. claimcount := 0
  40. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  41. if value[0] == byte(1) {
  42. claimcount++
  43. }
  44. keycount++
  45. return true, nil
  46. })
  47. if err != nil {
  48. return err.Error()
  49. }
  50. json, _ := json.MarshalIndent(
  51. storageInfo{
  52. KeyCount: keycount,
  53. ClaimCount: claimcount,
  54. },
  55. "", " ",
  56. )
  57. return string(json)
  58. }
  59. // WithPrefix implements the method WithPrefix of the interface db.Storage
  60. func (p *PebbleStorage) WithPrefix(prefix []byte) db.Storage {
  61. return &PebbleStorage{p.pdb, db.Concat(p.prefix, prefix)}
  62. }
  63. // NewTx implements the method NewTx of the interface db.Storage
  64. func (p *PebbleStorage) NewTx() (db.Tx, error) {
  65. return &PebbleStorageTx{p, make(db.KvMap)}, nil
  66. }
  67. // Get retreives a value from a key in the db.Storage
  68. func (p *PebbleStorage) Get(key []byte) ([]byte, error) {
  69. v, closer, err := p.pdb.Get(db.Concat(p.prefix, key[:]))
  70. if err == pebble.ErrNotFound {
  71. return nil, db.ErrNotFound
  72. }
  73. closer.Close()
  74. return v, err
  75. }
  76. // Iterate implements the method Iterate of the interface db.Storage
  77. func (p *PebbleStorage) Iterate(f func([]byte, []byte) (bool, error)) error {
  78. // NewIter already provides a point-in-time view of the current DB
  79. // state, but if is used for long term (is not the case), should use an
  80. // iterator over an snapshot:
  81. // snapshot := p.pdb.NewSnapshot()
  82. // defer snapshot.Close()
  83. // iter := snapshot.NewIter(nil)
  84. iter := p.pdb.NewIter(nil)
  85. defer iter.Close()
  86. iter.First() // move the iterator to the first key/value pair
  87. if len(iter.Key()) < len(p.prefix) || !bytes.Equal(iter.Key()[:len(p.prefix)], p.prefix) {
  88. } else {
  89. localKey := iter.Key()[len(p.prefix):]
  90. if _, err := f(localKey, iter.Value()); err != nil {
  91. return err
  92. }
  93. }
  94. for iter.Next() {
  95. if len(iter.Key()) < len(p.prefix) || !bytes.Equal(iter.Key()[:len(p.prefix)], p.prefix) {
  96. continue
  97. }
  98. localKey := iter.Key()[len(p.prefix):]
  99. if cont, err := f(localKey, iter.Value()); err != nil {
  100. return err
  101. } else if !cont {
  102. break
  103. }
  104. }
  105. return iter.Error()
  106. }
  107. // Get retreives a value from a key in the interface db.Tx
  108. func (tx *PebbleStorageTx) Get(key []byte) ([]byte, error) {
  109. var err error
  110. fullkey := db.Concat(tx.prefix, key)
  111. if value, ok := tx.cache.Get(fullkey); ok {
  112. return value, nil
  113. }
  114. value, closer, err := tx.pdb.Get(fullkey)
  115. if err == pebble.ErrNotFound {
  116. return nil, db.ErrNotFound
  117. }
  118. closer.Close()
  119. return value, err
  120. }
  121. // Put saves a key:value into the db.Storage
  122. func (tx *PebbleStorageTx) Put(k, v []byte) {
  123. tx.cache.Put(db.Concat(tx.prefix, k[:]), v)
  124. }
  125. // Add implements the method Add of the interface db.Tx
  126. func (tx *PebbleStorageTx) Add(atx db.Tx) {
  127. ldbtx := atx.(*PebbleStorageTx)
  128. for _, v := range ldbtx.cache {
  129. tx.cache.Put(v.K, v.V)
  130. }
  131. }
  132. // Commit implements the method Commit of the interface db.Tx
  133. func (tx *PebbleStorageTx) Commit() error {
  134. batch := tx.PebbleStorage.pdb.NewBatch()
  135. for _, v := range tx.cache {
  136. batch.Set(v.K, v.V, nil)
  137. }
  138. tx.cache = nil
  139. return batch.Commit(nil)
  140. }
  141. // Close implements the method Close of the interface db.Tx
  142. func (tx *PebbleStorageTx) Close() {
  143. tx.cache = nil
  144. }
  145. // Close implements the method Close of the interface db.Storage
  146. func (p *PebbleStorage) Close() {
  147. if err := p.pdb.Close(); err != nil {
  148. panic(err)
  149. }
  150. log.Info("Database closed")
  151. }
  152. // Pebble is an extra method that returns the *pebble.DB
  153. func (p *PebbleStorage) Pebble() *pebble.DB {
  154. return p.pdb
  155. }
  156. // List implements the method List of the interface db.Storage
  157. func (p *PebbleStorage) List(limit int) ([]db.KV, error) {
  158. ret := []db.KV{}
  159. err := p.Iterate(func(key []byte, value []byte) (bool, error) {
  160. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  161. if len(ret) == limit {
  162. return false, nil
  163. }
  164. return true, nil
  165. })
  166. return ret, err
  167. }