|
|
package bolt
import ( "fmt" "io" "os" "sort" "strings" "time" "unsafe" )
// txid represents the internal transaction identifier.
type txid uint64
// Tx represents a read-only or read/write transaction on the database.
// Read-only transactions can be used for retrieving values for keys and creating cursors.
// Read/write transactions can create and remove buckets and create and remove keys.
//
// IMPORTANT: You must commit or rollback transactions when you are done with
// them. Pages can not be reclaimed by the writer until no more transactions
// are using them. A long running read transaction can cause the database to
// quickly grow.
type Tx struct { writable bool managed bool db *DB meta *meta root Bucket pages map[pgid]*page stats TxStats commitHandlers []func()
// WriteFlag specifies the flag for write-related methods like WriteTo().
// Tx opens the database file with the specified flag to copy the data.
//
// By default, the flag is unset, which works well for mostly in-memory
// workloads. For databases that are much larger than available RAM,
// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
WriteFlag int }
// init initializes the transaction.
func (tx *Tx) init(db *DB) { tx.db = db tx.pages = nil
// Copy the meta page since it can be changed by the writer.
tx.meta = &meta{} db.meta().copy(tx.meta)
// Copy over the root bucket.
tx.root = newBucket(tx) tx.root.bucket = &bucket{} *tx.root.bucket = tx.meta.root
// Increment the transaction id and add a page cache for writable transactions.
if tx.writable { tx.pages = make(map[pgid]*page) tx.meta.txid += txid(1) } }
// ID returns the transaction id.
func (tx *Tx) ID() int { return int(tx.meta.txid) }
// DB returns a reference to the database that created the transaction.
func (tx *Tx) DB() *DB { return tx.db }
// Size returns current database size in bytes as seen by this transaction.
func (tx *Tx) Size() int64 { return int64(tx.meta.pgid) * int64(tx.db.pageSize) }
// Writable returns whether the transaction can perform write operations.
func (tx *Tx) Writable() bool { return tx.writable }
// Cursor creates a cursor associated with the root bucket.
// All items in the cursor will return a nil value because all root bucket keys point to buckets.
// The cursor is only valid as long as the transaction is open.
// Do not use a cursor after the transaction is closed.
func (tx *Tx) Cursor() *Cursor { return tx.root.Cursor() }
// Stats retrieves a copy of the current transaction statistics.
func (tx *Tx) Stats() TxStats { return tx.stats }
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (tx *Tx) Bucket(name []byte) *Bucket { return tx.root.Bucket(name) }
// CreateBucket creates a new bucket.
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { return tx.root.CreateBucket(name) }
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { return tx.root.CreateBucketIfNotExists(name) }
// DeleteBucket deletes a bucket.
// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
func (tx *Tx) DeleteBucket(name []byte) error { return tx.root.DeleteBucket(name) }
// ForEach executes a function for each bucket in the root.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller.
func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error { return tx.root.ForEach(func(k, v []byte) error { return fn(k, tx.root.Bucket(k)) }) }
// OnCommit adds a handler function to be executed after the transaction successfully commits.
func (tx *Tx) OnCommit(fn func()) { tx.commitHandlers = append(tx.commitHandlers, fn) }
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error { _assert(!tx.managed, "managed tx commit not allowed") if tx.db == nil { return ErrTxClosed } else if !tx.writable { return ErrTxNotWritable }
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
// Rebalance nodes which have had deletions.
var startTime = time.Now() tx.root.rebalance() if tx.stats.Rebalance > 0 { tx.stats.RebalanceTime += time.Since(startTime) }
// spill data onto dirty pages.
startTime = time.Now() if err := tx.root.spill(); err != nil { tx.rollback() return err } tx.stats.SpillTime += time.Since(startTime)
// Free the old root bucket.
tx.meta.root.root = tx.root.root
// Free the old freelist because commit writes out a fresh freelist.
if tx.meta.freelist != pgidNoFreelist { tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) }
if !tx.db.NoFreelistSync { err := tx.commitFreelist() if err != nil { return err } } else { tx.meta.freelist = pgidNoFreelist }
// Write dirty pages to disk.
startTime = time.Now() if err := tx.write(); err != nil { tx.rollback() return err }
// If strict mode is enabled then perform a consistency check.
// Only the first consistency error is reported in the panic.
if tx.db.StrictMode { ch := tx.Check() var errs []string for { err, ok := <-ch if !ok { break } errs = append(errs, err.Error()) } if len(errs) > 0 { panic("check fail: " + strings.Join(errs, "\n")) } }
// Write meta to disk.
if err := tx.writeMeta(); err != nil { tx.rollback() return err } tx.stats.WriteTime += time.Since(startTime)
// Finalize the transaction.
tx.close()
// Execute commit handlers now that the locks have been removed.
for _, fn := range tx.commitHandlers { fn() }
return nil }
func (tx *Tx) commitFreelist() error { // Allocate new pages for the new free list. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
opgid := tx.meta.pgid p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) if err != nil { tx.rollback() return err } if err := tx.db.freelist.write(p); err != nil { tx.rollback() return err } tx.meta.freelist = p.id // If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid { if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil { tx.rollback() return err } }
return nil }
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error { _assert(!tx.managed, "managed tx rollback not allowed") if tx.db == nil { return ErrTxClosed } tx.rollback() return nil }
func (tx *Tx) rollback() { if tx.db == nil { return } if tx.writable { tx.db.freelist.rollback(tx.meta.txid) tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist)) } tx.close() }
func (tx *Tx) close() { if tx.db == nil { return } if tx.writable { // Grab freelist stats.
var freelistFreeN = tx.db.freelist.free_count() var freelistPendingN = tx.db.freelist.pending_count() var freelistAlloc = tx.db.freelist.size()
// Remove transaction ref & writer lock.
tx.db.rwtx = nil tx.db.rwlock.Unlock()
// Merge statistics.
tx.db.statlock.Lock() tx.db.stats.FreePageN = freelistFreeN tx.db.stats.PendingPageN = freelistPendingN tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize tx.db.stats.FreelistInuse = freelistAlloc tx.db.stats.TxStats.add(&tx.stats) tx.db.statlock.Unlock() } else { tx.db.removeTx(tx) }
// Clear all references.
tx.db = nil tx.meta = nil tx.root = Bucket{tx: tx} tx.pages = nil }
// Copy writes the entire database to a writer.
// This function exists for backwards compatibility. Use WriteTo() instead.
func (tx *Tx) Copy(w io.Writer) error { _, err := tx.WriteTo(w) return err }
// WriteTo writes the entire database to a writer.
// If err == nil then exactly tx.Size() bytes will be written into the writer.
func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { // Attempt to open reader with WriteFlag
f, err := os.OpenFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0) if err != nil { return 0, err } defer func() { if cerr := f.Close(); err == nil { err = cerr } }()
// Generate a meta page. We use the same page data for both meta pages.
buf := make([]byte, tx.db.pageSize) page := (*page)(unsafe.Pointer(&buf[0])) page.flags = metaPageFlag *page.meta() = *tx.meta
// Write meta 0.
page.id = 0 page.meta().checksum = page.meta().sum64() nn, err := w.Write(buf) n += int64(nn) if err != nil { return n, fmt.Errorf("meta 0 copy: %s", err) }
// Write meta 1 with a lower transaction id.
page.id = 1 page.meta().txid -= 1 page.meta().checksum = page.meta().sum64() nn, err = w.Write(buf) n += int64(nn) if err != nil { return n, fmt.Errorf("meta 1 copy: %s", err) }
// Move past the meta pages in the file.
if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil { return n, fmt.Errorf("seek: %s", err) }
// Copy data pages.
wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)) n += wn if err != nil { return n, err }
return n, nil }
// CopyFile copies the entire database to file at the given path.
// A reader transaction is maintained during the copy so it is safe to continue
// using the database while a copy is in progress.
func (tx *Tx) CopyFile(path string, mode os.FileMode) error { f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode) if err != nil { return err }
err = tx.Copy(f) if err != nil { _ = f.Close() return err } return f.Close() }
// Check performs several consistency checks on the database for this transaction.
// An error is returned if any inconsistency is found.
//
// It can be safely run concurrently on a writable transaction. However, this
// incurs a high cost for large databases and databases with a lot of subbuckets
// because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at
// the same time.
func (tx *Tx) Check() <-chan error { ch := make(chan error) go tx.check(ch) return ch }
func (tx *Tx) check(ch chan error) { // Force loading free list if opened in ReadOnly mode.
tx.db.loadFreelist()
// Check if any pages are double freed.
freed := make(map[pgid]bool) all := make([]pgid, tx.db.freelist.count()) tx.db.freelist.copyall(all) for _, id := range all { if freed[id] { ch <- fmt.Errorf("page %d: already freed", id) } freed[id] = true }
// Track every reachable page.
reachable := make(map[pgid]*page) reachable[0] = tx.page(0) // meta0
reachable[1] = tx.page(1) // meta1
if tx.meta.freelist != pgidNoFreelist { for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ { reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist) } }
// Recursively check buckets.
tx.checkBucket(&tx.root, reachable, freed, ch)
// Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ { _, isReachable := reachable[i] if !isReachable && !freed[i] { ch <- fmt.Errorf("page %d: unreachable unfreed", int(i)) } }
// Close the channel to signal completion.
close(ch) }
func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) { // Ignore inline buckets.
if b.root == 0 { return }
// Check every page used by this bucket.
b.tx.forEachPage(b.root, 0, func(p *page, _ int) { if p.id > tx.meta.pgid { ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid)) }
// Ensure each page is only referenced once.
for i := pgid(0); i <= pgid(p.overflow); i++ { var id = p.id + i if _, ok := reachable[id]; ok { ch <- fmt.Errorf("page %d: multiple references", int(id)) } reachable[id] = p }
// We should only encounter un-freed leaf and branch pages.
if freed[p.id] { ch <- fmt.Errorf("page %d: reachable freed", int(p.id)) } else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 { ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ()) } })
// Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error { if child := b.Bucket(k); child != nil { tx.checkBucket(child, reachable, freed, ch) } return nil }) }
// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*page, error) { p, err := tx.db.allocate(tx.meta.txid, count) if err != nil { return nil, err }
// Save to our page cache.
tx.pages[p.id] = p
// Update statistics.
tx.stats.PageCount += count tx.stats.PageAlloc += count * tx.db.pageSize
return p, nil }
// write writes any dirty pages to disk.
func (tx *Tx) write() error { // Sort pages by id.
pages := make(pages, 0, len(tx.pages)) for _, p := range tx.pages { pages = append(pages, p) } // Clear out page cache early.
tx.pages = make(map[pgid]*page) sort.Sort(pages)
// Write pages to disk in order.
for _, p := range pages { size := (int(p.overflow) + 1) * tx.db.pageSize offset := int64(p.id) * int64(tx.db.pageSize)
// Write out page in "max allocation" sized chunks.
ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) for { // Limit our write to our max allocation size.
sz := size if sz > maxAllocSize-1 { sz = maxAllocSize - 1 }
// Write chunk to disk.
buf := ptr[:sz] if _, err := tx.db.ops.writeAt(buf, offset); err != nil { return err }
// Update statistics.
tx.stats.Write++
// Exit inner for loop if we've written all the chunks.
size -= sz if size == 0 { break }
// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz) ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) } }
// Ignore file sync if flag is set on DB.
if !tx.db.NoSync || IgnoreNoSync { if err := fdatasync(tx.db); err != nil { return err } }
// Put small pages back to page pool.
for _, p := range pages { // Ignore page sizes over 1 page.
// These are allocated using make() instead of the page pool.
if int(p.overflow) != 0 { continue }
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]
// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
for i := range buf { buf[i] = 0 } tx.db.pagePool.Put(buf) }
return nil }
// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error { // Create a temporary buffer for the meta page.
buf := make([]byte, tx.db.pageSize) p := tx.db.pageInBuffer(buf, 0) tx.meta.write(p)
// Write the meta page to file.
if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil { return err } if !tx.db.NoSync || IgnoreNoSync { if err := fdatasync(tx.db); err != nil { return err } }
// Update statistics.
tx.stats.Write++
return nil }
// page returns a reference to the page with a given id.
// If page has been written to then a temporary buffered page is returned.
func (tx *Tx) page(id pgid) *page { // Check the dirty pages first.
if tx.pages != nil { if p, ok := tx.pages[id]; ok { return p } }
// Otherwise return directly from the mmap.
return tx.db.page(id) }
// forEachPage iterates over every page within a given page and executes a function.
func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := tx.page(pgid)
// Execute function.
fn(p, depth)
// Recursively loop over children.
if (p.flags & branchPageFlag) != 0 { for i := 0; i < int(p.count); i++ { elem := p.branchPageElement(uint16(i)) tx.forEachPage(elem.pgid, depth+1, fn) } } }
// Page returns page information for a given page number.
// This is only safe for concurrent use when used by a writable transaction.
func (tx *Tx) Page(id int) (*PageInfo, error) { if tx.db == nil { return nil, ErrTxClosed } else if pgid(id) >= tx.meta.pgid { return nil, nil }
// Build the page info.
p := tx.db.page(pgid(id)) info := &PageInfo{ ID: id, Count: int(p.count), OverflowCount: int(p.overflow), }
// Determine the type (or if it's free).
if tx.db.freelist.freed(pgid(id)) { info.Type = "free" } else { info.Type = p.typ() }
return info, nil }
// TxStats represents statistics about the actions performed by the transaction.
type TxStats struct { // Page statistics.
PageCount int // number of page allocations
PageAlloc int // total bytes allocated
// Cursor statistics.
CursorCount int // number of cursors created
// Node statistics
NodeCount int // number of node allocations
NodeDeref int // number of node dereferences
// Rebalance statistics.
Rebalance int // number of node rebalances
RebalanceTime time.Duration // total time spent rebalancing
// Split/Spill statistics.
Split int // number of nodes split
Spill int // number of nodes spilled
SpillTime time.Duration // total time spent spilling
// Write statistics.
Write int // number of writes performed
WriteTime time.Duration // total time spent writing to disk
}
func (s *TxStats) add(other *TxStats) { s.PageCount += other.PageCount s.PageAlloc += other.PageAlloc s.CursorCount += other.CursorCount s.NodeCount += other.NodeCount s.NodeDeref += other.NodeDeref s.Rebalance += other.Rebalance s.RebalanceTime += other.RebalanceTime s.Split += other.Split s.Spill += other.Spill s.SpillTime += other.SpillTime s.Write += other.Write s.WriteTime += other.WriteTime }
// Sub calculates and returns the difference between two sets of transaction stats.
// This is useful when obtaining stats at two different points and time and
// you need the performance counters that occurred within that time span.
func (s *TxStats) Sub(other *TxStats) TxStats { var diff TxStats diff.PageCount = s.PageCount - other.PageCount diff.PageAlloc = s.PageAlloc - other.PageAlloc diff.CursorCount = s.CursorCount - other.CursorCount diff.NodeCount = s.NodeCount - other.NodeCount diff.NodeDeref = s.NodeDeref - other.NodeDeref diff.Rebalance = s.Rebalance - other.Rebalance diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime diff.Split = s.Split - other.Split diff.Spill = s.Spill - other.Spill diff.SpillTime = s.SpillTime - other.SpillTime diff.Write = s.Write - other.Write diff.WriteTime = s.WriteTime - other.WriteTime return diff }
|