You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
4.7 KiB

  1. // Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package leveldb
  7. import (
  8. "errors"
  9. "sync/atomic"
  10. "time"
  11. "github.com/syndtr/goleveldb/leveldb/journal"
  12. "github.com/syndtr/goleveldb/leveldb/memdb"
  13. "github.com/syndtr/goleveldb/leveldb/storage"
  14. )
  15. var (
  16. errHasFrozenMem = errors.New("has frozen mem")
  17. )
  18. type memDB struct {
  19. db *DB
  20. *memdb.DB
  21. ref int32
  22. }
  23. func (m *memDB) getref() int32 {
  24. return atomic.LoadInt32(&m.ref)
  25. }
  26. func (m *memDB) incref() {
  27. atomic.AddInt32(&m.ref, 1)
  28. }
  29. func (m *memDB) decref() {
  30. if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
  31. // Only put back memdb with std capacity.
  32. if m.Capacity() == m.db.s.o.GetWriteBuffer() {
  33. m.Reset()
  34. m.db.mpoolPut(m.DB)
  35. }
  36. m.db = nil
  37. m.DB = nil
  38. } else if ref < 0 {
  39. panic("negative memdb ref")
  40. }
  41. }
  42. // Get latest sequence number.
  43. func (db *DB) getSeq() uint64 {
  44. return atomic.LoadUint64(&db.seq)
  45. }
  46. // Atomically adds delta to seq.
  47. func (db *DB) addSeq(delta uint64) {
  48. atomic.AddUint64(&db.seq, delta)
  49. }
  50. func (db *DB) setSeq(seq uint64) {
  51. atomic.StoreUint64(&db.seq, seq)
  52. }
  53. func (db *DB) sampleSeek(ikey internalKey) {
  54. v := db.s.version()
  55. if v.sampleSeek(ikey) {
  56. // Trigger table compaction.
  57. db.compTrigger(db.tcompCmdC)
  58. }
  59. v.release()
  60. }
  61. func (db *DB) mpoolPut(mem *memdb.DB) {
  62. if !db.isClosed() {
  63. select {
  64. case db.memPool <- mem:
  65. default:
  66. }
  67. }
  68. }
  69. func (db *DB) mpoolGet(n int) *memDB {
  70. var mdb *memdb.DB
  71. select {
  72. case mdb = <-db.memPool:
  73. default:
  74. }
  75. if mdb == nil || mdb.Capacity() < n {
  76. mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
  77. }
  78. return &memDB{
  79. db: db,
  80. DB: mdb,
  81. }
  82. }
  83. func (db *DB) mpoolDrain() {
  84. ticker := time.NewTicker(30 * time.Second)
  85. for {
  86. select {
  87. case <-ticker.C:
  88. select {
  89. case <-db.memPool:
  90. default:
  91. }
  92. case <-db.closeC:
  93. ticker.Stop()
  94. // Make sure the pool is drained.
  95. select {
  96. case <-db.memPool:
  97. case <-time.After(time.Second):
  98. }
  99. close(db.memPool)
  100. return
  101. }
  102. }
  103. }
  104. // Create new memdb and froze the old one; need external synchronization.
  105. // newMem only called synchronously by the writer.
  106. func (db *DB) newMem(n int) (mem *memDB, err error) {
  107. fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
  108. w, err := db.s.stor.Create(fd)
  109. if err != nil {
  110. db.s.reuseFileNum(fd.Num)
  111. return
  112. }
  113. db.memMu.Lock()
  114. defer db.memMu.Unlock()
  115. if db.frozenMem != nil {
  116. return nil, errHasFrozenMem
  117. }
  118. if db.journal == nil {
  119. db.journal = journal.NewWriter(w)
  120. } else {
  121. db.journal.Reset(w)
  122. db.journalWriter.Close()
  123. db.frozenJournalFd = db.journalFd
  124. }
  125. db.journalWriter = w
  126. db.journalFd = fd
  127. db.frozenMem = db.mem
  128. mem = db.mpoolGet(n)
  129. mem.incref() // for self
  130. mem.incref() // for caller
  131. db.mem = mem
  132. // The seq only incremented by the writer. And whoever called newMem
  133. // should hold write lock, so no need additional synchronization here.
  134. db.frozenSeq = db.seq
  135. return
  136. }
  137. // Get all memdbs.
  138. func (db *DB) getMems() (e, f *memDB) {
  139. db.memMu.RLock()
  140. defer db.memMu.RUnlock()
  141. if db.mem != nil {
  142. db.mem.incref()
  143. } else if !db.isClosed() {
  144. panic("nil effective mem")
  145. }
  146. if db.frozenMem != nil {
  147. db.frozenMem.incref()
  148. }
  149. return db.mem, db.frozenMem
  150. }
  151. // Get effective memdb.
  152. func (db *DB) getEffectiveMem() *memDB {
  153. db.memMu.RLock()
  154. defer db.memMu.RUnlock()
  155. if db.mem != nil {
  156. db.mem.incref()
  157. } else if !db.isClosed() {
  158. panic("nil effective mem")
  159. }
  160. return db.mem
  161. }
  162. // Check whether we has frozen memdb.
  163. func (db *DB) hasFrozenMem() bool {
  164. db.memMu.RLock()
  165. defer db.memMu.RUnlock()
  166. return db.frozenMem != nil
  167. }
  168. // Get frozen memdb.
  169. func (db *DB) getFrozenMem() *memDB {
  170. db.memMu.RLock()
  171. defer db.memMu.RUnlock()
  172. if db.frozenMem != nil {
  173. db.frozenMem.incref()
  174. }
  175. return db.frozenMem
  176. }
  177. // Drop frozen memdb; assume that frozen memdb isn't nil.
  178. func (db *DB) dropFrozenMem() {
  179. db.memMu.Lock()
  180. if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
  181. db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
  182. } else {
  183. db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
  184. }
  185. db.frozenJournalFd = storage.FileDesc{}
  186. db.frozenMem.decref()
  187. db.frozenMem = nil
  188. db.memMu.Unlock()
  189. }
  190. // Clear mems ptr; used by DB.Close().
  191. func (db *DB) clearMems() {
  192. db.memMu.Lock()
  193. db.mem = nil
  194. db.frozenMem = nil
  195. db.memMu.Unlock()
  196. }
  197. // Set closed flag; return true if not already closed.
  198. func (db *DB) setClosed() bool {
  199. return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
  200. }
  201. // Check whether DB was closed.
  202. func (db *DB) isClosed() bool {
  203. return atomic.LoadUint32(&db.closed) != 0
  204. }
  205. // Check read ok status.
  206. func (db *DB) ok() error {
  207. if db.isClosed() {
  208. return ErrClosed
  209. }
  210. return nil
  211. }