You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
4.3 KiB

  1. package leveldb
  2. import (
  3. "encoding/json"
  4. log "github.com/sirupsen/logrus"
  5. "github.com/syndtr/goleveldb/leveldb"
  6. "github.com/syndtr/goleveldb/leveldb/errors"
  7. "github.com/syndtr/goleveldb/leveldb/opt"
  8. "github.com/syndtr/goleveldb/leveldb/util"
  9. "github.com/iden3/go-merkletree/db"
  10. )
  11. // LevelDbStorage implements the db.Storage interface
  12. type LevelDbStorage struct {
  13. ldb *leveldb.DB
  14. prefix []byte
  15. }
  16. // LevelDbStorageTx implements the db.Tx interface
  17. type LevelDbStorageTx struct {
  18. *LevelDbStorage
  19. cache db.KvMap
  20. }
  21. // NewLevelStorage returns a new LevelDbStorage
  22. func NewLevelDbStorage(path string, errorIfMissing bool) (*LevelDbStorage, error) {
  23. o := &opt.Options{
  24. ErrorIfMissing: errorIfMissing,
  25. }
  26. ldb, err := leveldb.OpenFile(path, o)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &LevelDbStorage{ldb, []byte{}}, nil
  31. }
  32. type storageInfo struct {
  33. KeyCount int
  34. ClaimCount int
  35. }
  36. // Info implements the method Info of the interface db.Storage
  37. func (l *LevelDbStorage) Info() string {
  38. snapshot, err := l.ldb.GetSnapshot()
  39. if err != nil {
  40. return err.Error()
  41. }
  42. keycount := 0
  43. claimcount := 0
  44. iter := snapshot.NewIterator(nil, nil)
  45. for iter.Next() {
  46. if iter.Value()[0] == byte(1) {
  47. claimcount++
  48. }
  49. keycount++
  50. }
  51. iter.Release()
  52. if err := iter.Error(); err != nil {
  53. return err.Error()
  54. }
  55. json, _ := json.MarshalIndent(
  56. storageInfo{
  57. KeyCount: keycount,
  58. ClaimCount: claimcount,
  59. },
  60. "", " ",
  61. )
  62. return string(json)
  63. }
  64. // WithPrefix implements the method WithPrefix of the interface db.Storage
  65. func (l *LevelDbStorage) WithPrefix(prefix []byte) db.Storage {
  66. return &LevelDbStorage{l.ldb, db.Concat(l.prefix, prefix)}
  67. }
  68. // NewTx implements the method NewTx of the interface db.Storage
  69. func (l *LevelDbStorage) NewTx() (db.Tx, error) {
  70. return &LevelDbStorageTx{l, make(db.KvMap)}, nil
  71. }
  72. // Get retreives a value from a key in the db.Storage
  73. func (l *LevelDbStorage) Get(key []byte) ([]byte, error) {
  74. v, err := l.ldb.Get(db.Concat(l.prefix, key[:]), nil)
  75. if err == errors.ErrNotFound {
  76. return nil, db.ErrNotFound
  77. }
  78. return v, err
  79. }
  80. // Iterate implements the method Iterate of the interface db.Storage
  81. func (l *LevelDbStorage) Iterate(f func([]byte, []byte) (bool, error)) error {
  82. // FIXME: Use the prefix!
  83. snapshot, err := l.ldb.GetSnapshot()
  84. if err != nil {
  85. return err
  86. }
  87. iter := snapshot.NewIterator(util.BytesPrefix(l.prefix), nil)
  88. defer iter.Release()
  89. for iter.Next() {
  90. localKey := iter.Key()[len(l.prefix):]
  91. if cont, err := f(localKey, iter.Value()); err != nil {
  92. return err
  93. } else if !cont {
  94. break
  95. }
  96. }
  97. iter.Release()
  98. return iter.Error()
  99. }
  100. // Get retreives a value from a key in the interface db.Tx
  101. func (l *LevelDbStorageTx) Get(key []byte) ([]byte, error) {
  102. var err error
  103. fullkey := db.Concat(l.prefix, key)
  104. if value, ok := l.cache.Get(fullkey); ok {
  105. return value, nil
  106. }
  107. value, err := l.ldb.Get(fullkey, nil)
  108. if err == errors.ErrNotFound {
  109. return nil, db.ErrNotFound
  110. }
  111. return value, err
  112. }
  113. // Insert saves a key:value into the db.Storage
  114. func (tx *LevelDbStorageTx) Put(k, v []byte) {
  115. tx.cache.Put(db.Concat(tx.prefix, k[:]), v)
  116. }
  117. // Add implements the method Add of the interface db.Tx
  118. func (tx *LevelDbStorageTx) Add(atx db.Tx) {
  119. ldbtx := atx.(*LevelDbStorageTx)
  120. for _, v := range ldbtx.cache {
  121. tx.cache.Put(v.K, v.V)
  122. }
  123. }
  124. // Commit implements the method Commit of the interface db.Tx
  125. func (l *LevelDbStorageTx) Commit() error {
  126. var batch leveldb.Batch
  127. for _, v := range l.cache {
  128. batch.Put(v.K, v.V)
  129. }
  130. l.cache = nil
  131. return l.ldb.Write(&batch, nil)
  132. }
  133. // Close implements the method Close of the interface db.Tx
  134. func (l *LevelDbStorageTx) Close() {
  135. l.cache = nil
  136. }
  137. // Close implements the method Close of the interface db.Storage
  138. func (l *LevelDbStorage) Close() {
  139. if err := l.ldb.Close(); err != nil {
  140. panic(err)
  141. }
  142. log.Info("Database closed")
  143. }
  144. // LevelDB is an extra method that returns the *leveldb.DB
  145. func (l *LevelDbStorage) LevelDB() *leveldb.DB {
  146. return l.ldb
  147. }
  148. // List implements the method List of the interface db.Storage
  149. func (l *LevelDbStorage) List(limit int) ([]db.KV, error) {
  150. ret := []db.KV{}
  151. err := l.Iterate(func(key []byte, value []byte) (bool, error) {
  152. ret = append(ret, db.KV{K: db.Clone(key), V: db.Clone(value)})
  153. if len(ret) == limit {
  154. return false, nil
  155. }
  156. return true, nil
  157. })
  158. return ret, err
  159. }