You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

322 lines
6.2 KiB

  1. /**
  2. * @file
  3. * @copyright defined in aergo/LICENSE.txt
  4. */
  5. package db
  6. import (
  7. "bytes"
  8. "fmt"
  9. "path/filepath"
  10. "github.com/syndtr/goleveldb/leveldb"
  11. "github.com/syndtr/goleveldb/leveldb/errors"
  12. "github.com/syndtr/goleveldb/leveldb/iterator"
  13. "github.com/syndtr/goleveldb/leveldb/opt"
  14. )
  15. // This function is always called first
  16. func init() {
  17. dbConstructor := func(dir string) (DB, error) {
  18. return newLevelDB(dir)
  19. }
  20. registorDBConstructor(LevelImpl, dbConstructor)
  21. }
  22. func newLevelDB(dir string) (DB, error) {
  23. dbPath := filepath.Join(dir, "data.db")
  24. db, err := leveldb.OpenFile(dbPath, nil)
  25. if err != nil {
  26. return nil, err
  27. }
  28. database := &levelDB{
  29. db: db,
  30. }
  31. return database, nil
  32. }
  33. //=========================================================
  34. // DB Implementation
  35. //=========================================================
  36. // Enforce database and transaction implements interfaces
  37. var _ DB = (*levelDB)(nil)
  38. type levelDB struct {
  39. db *leveldb.DB
  40. }
  41. func (db *levelDB) Type() string {
  42. return "leveldb"
  43. }
  44. func (db *levelDB) Set(key, value []byte) {
  45. key = convNilToBytes(key)
  46. value = convNilToBytes(value)
  47. err := db.db.Put(key, value, &opt.WriteOptions{Sync: true})
  48. if err != nil {
  49. panic(fmt.Sprintf("Database Error: %v", err))
  50. }
  51. }
  52. func (db *levelDB) Delete(key []byte) {
  53. key = convNilToBytes(key)
  54. err := db.db.Delete(key, &opt.WriteOptions{Sync: true})
  55. if err != nil {
  56. panic(fmt.Sprintf("Database Error: %v", err))
  57. }
  58. }
  59. func (db *levelDB) Get(key []byte) []byte {
  60. key = convNilToBytes(key)
  61. res, err := db.db.Get(key, nil)
  62. if err != nil {
  63. if err == errors.ErrNotFound {
  64. return []byte{}
  65. }
  66. panic(fmt.Sprintf("Database Error: %v", err))
  67. }
  68. return res
  69. }
  70. func (db *levelDB) Exist(key []byte) bool {
  71. res, _ := db.db.Has(key, nil)
  72. return res
  73. }
  74. func (db *levelDB) Close() {
  75. db.db.Close()
  76. }
  77. func (db *levelDB) NewTx() Transaction {
  78. batch := new(leveldb.Batch)
  79. return &levelTransaction{db, batch, false, false}
  80. }
  81. func (db *levelDB) NewBulk() Bulk {
  82. batch := new(leveldb.Batch)
  83. return &levelBulk{db, batch, false, false}
  84. }
  85. //=========================================================
  86. // Transaction Implementation
  87. //=========================================================
  88. type levelTransaction struct {
  89. db *levelDB
  90. tx *leveldb.Batch
  91. isDiscard bool
  92. isCommit bool
  93. }
  94. /*
  95. func (transaction *levelTransaction) Get(key []byte) []byte {
  96. panic(fmt.Sprintf("DO not support"))
  97. }
  98. */
  99. func (transaction *levelTransaction) Set(key, value []byte) {
  100. transaction.tx.Put(key, value)
  101. }
  102. func (transaction *levelTransaction) Delete(key []byte) {
  103. transaction.tx.Delete(key)
  104. }
  105. func (transaction *levelTransaction) Commit() {
  106. if transaction.isDiscard {
  107. panic("Commit after dicard tx is not allowed")
  108. } else if transaction.isCommit {
  109. panic("Commit occures two times")
  110. }
  111. err := transaction.db.db.Write(transaction.tx, &opt.WriteOptions{Sync: true})
  112. if err != nil {
  113. panic(fmt.Sprintf("Database Error: %v", err))
  114. }
  115. transaction.isCommit = true
  116. }
  117. func (transaction *levelTransaction) Discard() {
  118. transaction.isDiscard = true
  119. }
  120. //=========================================================
  121. // Bulk Implementation
  122. //=========================================================
  123. type levelBulk struct {
  124. db *levelDB
  125. tx *leveldb.Batch
  126. isDiscard bool
  127. isCommit bool
  128. }
  129. func (bulk *levelBulk) Set(key, value []byte) {
  130. bulk.tx.Put(key, value)
  131. }
  132. func (bulk *levelBulk) Delete(key []byte) {
  133. bulk.tx.Delete(key)
  134. }
  135. func (bulk *levelBulk) Flush() {
  136. // do the same behavior that a transaction commit does
  137. // db.write internally will handle large transaction
  138. if bulk.isDiscard {
  139. panic("Commit after dicard tx is not allowed")
  140. } else if bulk.isCommit {
  141. panic("Commit occures two times")
  142. }
  143. err := bulk.db.db.Write(bulk.tx, &opt.WriteOptions{Sync: true})
  144. if err != nil {
  145. panic(fmt.Sprintf("Database Error: %v", err))
  146. }
  147. bulk.isCommit = true
  148. }
  149. func (bulk *levelBulk) DiscardLast() {
  150. bulk.isDiscard = true
  151. }
  152. //=========================================================
  153. // Iterator Implementation
  154. //=========================================================
  155. type levelIterator struct {
  156. start []byte
  157. end []byte
  158. reverse bool
  159. iter iterator.Iterator
  160. isInvalid bool
  161. }
  162. func (db *levelDB) Iterator(start, end []byte) Iterator {
  163. var reverse bool
  164. // if end is bigger then start, then reverse order
  165. if bytes.Compare(start, end) == 1 {
  166. reverse = true
  167. } else {
  168. reverse = false
  169. }
  170. iter := db.db.NewIterator(nil, nil)
  171. if reverse {
  172. if start == nil {
  173. iter.Last()
  174. } else {
  175. valid := iter.Seek(start)
  176. if valid {
  177. soakey := iter.Key()
  178. if bytes.Compare(start, soakey) < 0 {
  179. iter.Prev()
  180. }
  181. } else {
  182. iter.Last()
  183. }
  184. }
  185. } else {
  186. if start == nil {
  187. iter.First()
  188. } else {
  189. iter.Seek(start)
  190. }
  191. }
  192. return &levelIterator{
  193. iter: iter,
  194. start: start,
  195. end: end,
  196. reverse: reverse,
  197. isInvalid: false,
  198. }
  199. }
  200. func (iter *levelIterator) Next() {
  201. if iter.Valid() {
  202. if iter.reverse {
  203. iter.iter.Prev()
  204. } else {
  205. iter.iter.Next()
  206. }
  207. } else {
  208. panic("Iterator is Invalid")
  209. }
  210. }
  211. func (iter *levelIterator) Valid() bool {
  212. // Once invalid, forever invalid.
  213. if iter.isInvalid {
  214. return false
  215. }
  216. // Panic on DB error. No way to recover.
  217. if err := iter.iter.Error(); err != nil {
  218. panic(err)
  219. }
  220. // If source is invalid, invalid.
  221. if !iter.iter.Valid() {
  222. iter.isInvalid = true
  223. return false
  224. }
  225. // If key is end or past it, invalid.
  226. var end = iter.end
  227. var key = iter.iter.Key()
  228. if iter.reverse {
  229. if end != nil && bytes.Compare(key, end) <= 0 {
  230. iter.isInvalid = true
  231. return false
  232. }
  233. } else {
  234. if end != nil && bytes.Compare(end, key) <= 0 {
  235. iter.isInvalid = true
  236. return false
  237. }
  238. }
  239. // Valid
  240. return true
  241. }
  242. func (iter *levelIterator) Key() (key []byte) {
  243. if !iter.Valid() {
  244. panic("Iterator is invalid")
  245. } else if err := iter.iter.Error(); err != nil {
  246. panic(err)
  247. }
  248. originalKey := iter.iter.Key()
  249. key = make([]byte, len(originalKey))
  250. copy(key, originalKey)
  251. return key
  252. }
  253. func (iter *levelIterator) Value() (value []byte) {
  254. if !iter.Valid() {
  255. panic("Iterator is invalid")
  256. } else if err := iter.iter.Error(); err != nil {
  257. panic(err)
  258. }
  259. originalValue := iter.iter.Value()
  260. value = make([]byte, len(originalValue))
  261. copy(value, originalValue)
  262. return value
  263. }