b87ee2b51e
forgot to run tidy
2027 lines
68 KiB
Go
2027 lines
68 KiB
Go
// Copyright (c) 2015-2016 The btcsuite developers
|
|
// Use of this source code is governed by an ISC
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package ffldb
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/lbryio/lbcd/chaincfg/chainhash"
|
|
"github.com/lbryio/lbcd/database"
|
|
"github.com/lbryio/lbcd/database/internal/treap"
|
|
"github.com/lbryio/lbcd/wire"
|
|
btcutil "github.com/lbryio/lbcutil"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/comparer"
|
|
ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/filter"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
const (
|
|
// metadataDbName is the name used for the metadata database.
|
|
metadataDbName = "metadata"
|
|
|
|
// blockHdrSize is the size of a block header. This is simply the
|
|
// constant from wire and is only provided here for convenience since
|
|
// wire.MaxBlockHeaderPayload is quite long.
|
|
blockHdrSize = wire.MaxBlockHeaderPayload
|
|
|
|
// blockHdrOffset defines the offsets into a block index row for the
|
|
// block header.
|
|
//
|
|
// The serialized block index row format is:
|
|
// <blocklocation><blockheader>
|
|
blockHdrOffset = blockLocSize
|
|
)
|
|
|
|
var (
|
|
// byteOrder is the preferred byte order used through the database and
|
|
// block files. Sometimes big endian will be used to allow ordered byte
|
|
// sortable integer values.
|
|
byteOrder = binary.LittleEndian
|
|
|
|
// bucketIndexPrefix is the prefix used for all entries in the bucket
|
|
// index.
|
|
bucketIndexPrefix = []byte("bidx")
|
|
|
|
// curBucketIDKeyName is the name of the key used to keep track of the
|
|
// current bucket ID counter.
|
|
curBucketIDKeyName = []byte("bidx-cbid")
|
|
|
|
// metadataBucketID is the ID of the top-level metadata bucket.
|
|
// It is the value 0 encoded as an unsigned big-endian uint32.
|
|
metadataBucketID = [4]byte{}
|
|
|
|
// blockIdxBucketID is the ID of the internal block metadata bucket.
|
|
// It is the value 1 encoded as an unsigned big-endian uint32.
|
|
blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01}
|
|
|
|
// blockIdxBucketName is the bucket used internally to track block
|
|
// metadata.
|
|
blockIdxBucketName = []byte("ffldb-blockidx")
|
|
|
|
// writeLocKeyName is the key used to store the current write file
|
|
// location.
|
|
writeLocKeyName = []byte("ffldb-writeloc")
|
|
)
|
|
|
|
// Common error strings.
|
|
const (
|
|
// errDbNotOpenStr is the text to use for the database.ErrDbNotOpen
|
|
// error code.
|
|
errDbNotOpenStr = "database is not open"
|
|
|
|
// errTxClosedStr is the text to use for the database.ErrTxClosed error
|
|
// code.
|
|
errTxClosedStr = "database tx is closed"
|
|
)
|
|
|
|
// bulkFetchData is allows a block location to be specified along with the
|
|
// index it was requested from. This in turn allows the bulk data loading
|
|
// functions to sort the data accesses based on the location to improve
|
|
// performance while keeping track of which result the data is for.
|
|
type bulkFetchData struct {
|
|
*blockLocation
|
|
replyIndex int
|
|
}
|
|
|
|
// bulkFetchDataSorter implements sort.Interface to allow a slice of
|
|
// bulkFetchData to be sorted. In particular it sorts by file and then
|
|
// offset so that reads from files are grouped and linear.
|
|
type bulkFetchDataSorter []bulkFetchData
|
|
|
|
// Len returns the number of items in the slice. It is part of the
|
|
// sort.Interface implementation.
|
|
func (s bulkFetchDataSorter) Len() int {
|
|
return len(s)
|
|
}
|
|
|
|
// Swap swaps the items at the passed indices. It is part of the
|
|
// sort.Interface implementation.
|
|
func (s bulkFetchDataSorter) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
// Less returns whether the item with index i should sort before the item with
|
|
// index j. It is part of the sort.Interface implementation.
|
|
func (s bulkFetchDataSorter) Less(i, j int) bool {
|
|
if s[i].blockFileNum < s[j].blockFileNum {
|
|
return true
|
|
}
|
|
if s[i].blockFileNum > s[j].blockFileNum {
|
|
return false
|
|
}
|
|
|
|
return s[i].fileOffset < s[j].fileOffset
|
|
}
|
|
|
|
// makeDbErr creates a database.Error given a set of arguments.
|
|
func makeDbErr(c database.ErrorCode, desc string, err error) database.Error {
|
|
return database.Error{ErrorCode: c, Description: desc, Err: err}
|
|
}
|
|
|
|
// convertErr converts the passed leveldb error into a database error with an
|
|
// equivalent error code and the passed description. It also sets the passed
|
|
// error as the underlying error.
|
|
func convertErr(desc string, ldbErr error) database.Error {
|
|
// Use the driver-specific error code by default. The code below will
|
|
// update this with the converted error if it's recognized.
|
|
var code = database.ErrDriverSpecific
|
|
|
|
switch {
|
|
// Database corruption errors.
|
|
case ldberrors.IsCorrupted(ldbErr):
|
|
code = database.ErrCorruption
|
|
|
|
// Database open/create errors.
|
|
case ldbErr == leveldb.ErrClosed:
|
|
code = database.ErrDbNotOpen
|
|
|
|
// Transaction errors.
|
|
case ldbErr == leveldb.ErrSnapshotReleased:
|
|
code = database.ErrTxClosed
|
|
case ldbErr == leveldb.ErrIterReleased:
|
|
code = database.ErrTxClosed
|
|
}
|
|
|
|
return database.Error{ErrorCode: code, Description: desc, Err: ldbErr}
|
|
}
|
|
|
|
// copySlice returns a copy of the passed slice. This is mostly used to copy
|
|
// leveldb iterator keys and values since they are only valid until the iterator
|
|
// is moved instead of during the entirety of the transaction.
|
|
func copySlice(slice []byte) []byte {
|
|
ret := make([]byte, len(slice))
|
|
copy(ret, slice)
|
|
return ret
|
|
}
|
|
|
|
// cursor is an internal type used to represent a cursor over key/value pairs
|
|
// and nested buckets of a bucket and implements the database.Cursor interface.
|
|
type cursor struct {
|
|
bucket *bucket
|
|
dbIter iterator.Iterator
|
|
pendingIter iterator.Iterator
|
|
currentIter iterator.Iterator
|
|
}
|
|
|
|
// Enforce cursor implements the database.Cursor interface.
|
|
var _ database.Cursor = (*cursor)(nil)
|
|
|
|
// Bucket returns the bucket the cursor was created for.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Bucket() database.Bucket {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
return c.bucket
|
|
}
|
|
|
|
// Delete removes the current key/value pair the cursor is at without
|
|
// invalidating the cursor.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrIncompatibleValue if attempted when the cursor points to a nested
|
|
// bucket
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Delete() error {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Error if the cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
str := "cursor is exhausted"
|
|
return makeDbErr(database.ErrIncompatibleValue, str, nil)
|
|
}
|
|
|
|
// Do not allow buckets to be deleted via the cursor.
|
|
key := c.currentIter.Key()
|
|
if bytes.HasPrefix(key, bucketIndexPrefix) {
|
|
str := "buckets may not be deleted from a cursor"
|
|
return makeDbErr(database.ErrIncompatibleValue, str, nil)
|
|
}
|
|
|
|
c.bucket.tx.deleteKey(copySlice(key), true)
|
|
return nil
|
|
}
|
|
|
|
// skipPendingUpdates skips any keys at the current database iterator position
|
|
// that are being updated by the transaction. The forwards flag indicates the
|
|
// direction the cursor is moving.
|
|
func (c *cursor) skipPendingUpdates(forwards bool) {
|
|
for c.dbIter.Valid() {
|
|
var skip bool
|
|
key := c.dbIter.Key()
|
|
if c.bucket.tx.pendingRemove.Has(key) {
|
|
skip = true
|
|
} else if c.bucket.tx.pendingKeys.Has(key) {
|
|
skip = true
|
|
}
|
|
if !skip {
|
|
break
|
|
}
|
|
|
|
if forwards {
|
|
c.dbIter.Next()
|
|
} else {
|
|
c.dbIter.Prev()
|
|
}
|
|
}
|
|
}
|
|
|
|
// chooseIterator first skips any entries in the database iterator that are
|
|
// being updated by the transaction and sets the current iterator to the
|
|
// appropriate iterator depending on their validity and the order they compare
|
|
// in while taking into account the direction flag. When the cursor is being
|
|
// moved forwards and both iterators are valid, the iterator with the smaller
|
|
// key is chosen and vice versa when the cursor is being moved backwards.
|
|
func (c *cursor) chooseIterator(forwards bool) bool {
|
|
// Skip any keys at the current database iterator position that are
|
|
// being updated by the transaction.
|
|
c.skipPendingUpdates(forwards)
|
|
|
|
// When both iterators are exhausted, the cursor is exhausted too.
|
|
if !c.dbIter.Valid() && !c.pendingIter.Valid() {
|
|
c.currentIter = nil
|
|
return false
|
|
}
|
|
|
|
// Choose the database iterator when the pending keys iterator is
|
|
// exhausted.
|
|
if !c.pendingIter.Valid() {
|
|
c.currentIter = c.dbIter
|
|
return true
|
|
}
|
|
|
|
// Choose the pending keys iterator when the database iterator is
|
|
// exhausted.
|
|
if !c.dbIter.Valid() {
|
|
c.currentIter = c.pendingIter
|
|
return true
|
|
}
|
|
|
|
// Both iterators are valid, so choose the iterator with either the
|
|
// smaller or larger key depending on the forwards flag.
|
|
compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key())
|
|
if (forwards && compare > 0) || (!forwards && compare < 0) {
|
|
c.currentIter = c.pendingIter
|
|
} else {
|
|
c.currentIter = c.dbIter
|
|
}
|
|
return true
|
|
}
|
|
|
|
// First positions the cursor at the first key/value pair and returns whether or
|
|
// not the pair exists.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) First() bool {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Seek to the first key in both the database and pending iterators and
|
|
// choose the iterator that is both valid and has the smaller key.
|
|
c.dbIter.First()
|
|
c.pendingIter.First()
|
|
return c.chooseIterator(true)
|
|
}
|
|
|
|
// Last positions the cursor at the last key/value pair and returns whether or
|
|
// not the pair exists.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Last() bool {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Seek to the last key in both the database and pending iterators and
|
|
// choose the iterator that is both valid and has the larger key.
|
|
c.dbIter.Last()
|
|
c.pendingIter.Last()
|
|
return c.chooseIterator(false)
|
|
}
|
|
|
|
// Next moves the cursor one key/value pair forward and returns whether or not
|
|
// the pair exists.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Next() bool {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Nothing to return if cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
return false
|
|
}
|
|
|
|
// Move the current iterator to the next entry and choose the iterator
|
|
// that is both valid and has the smaller key.
|
|
c.currentIter.Next()
|
|
return c.chooseIterator(true)
|
|
}
|
|
|
|
// Prev moves the cursor one key/value pair backward and returns whether or not
|
|
// the pair exists.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Prev() bool {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Nothing to return if cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
return false
|
|
}
|
|
|
|
// Move the current iterator to the previous entry and choose the
|
|
// iterator that is both valid and has the larger key.
|
|
c.currentIter.Prev()
|
|
return c.chooseIterator(false)
|
|
}
|
|
|
|
// Seek positions the cursor at the first key/value pair that is greater than or
|
|
// equal to the passed seek key. Returns false if no suitable key was found.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Seek(seek []byte) bool {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return false
|
|
}
|
|
|
|
// Seek to the provided key in both the database and pending iterators
|
|
// then choose the iterator that is both valid and has the larger key.
|
|
seekKey := bucketizedKey(c.bucket.id, seek)
|
|
c.dbIter.Seek(seekKey)
|
|
c.pendingIter.Seek(seekKey)
|
|
return c.chooseIterator(true)
|
|
}
|
|
|
|
// rawKey returns the current key the cursor is pointing to without stripping
|
|
// the current bucket prefix or bucket index prefix.
|
|
func (c *cursor) rawKey() []byte {
|
|
// Nothing to return if cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
return nil
|
|
}
|
|
|
|
return copySlice(c.currentIter.Key())
|
|
}
|
|
|
|
// Key returns the current key the cursor is pointing to.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Key() []byte {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Nothing to return if cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
return nil
|
|
}
|
|
|
|
// Slice out the actual key name and make a copy since it is no longer
|
|
// valid after iterating to the next item.
|
|
//
|
|
// The key is after the bucket index prefix and parent ID when the
|
|
// cursor is pointing to a nested bucket.
|
|
key := c.currentIter.Key()
|
|
if bytes.HasPrefix(key, bucketIndexPrefix) {
|
|
key = key[len(bucketIndexPrefix)+4:]
|
|
return copySlice(key)
|
|
}
|
|
|
|
// The key is after the bucket ID when the cursor is pointing to a
|
|
// normal entry.
|
|
key = key[len(c.bucket.id):]
|
|
return copySlice(key)
|
|
}
|
|
|
|
// rawValue returns the current value the cursor is pointing to without
|
|
// stripping without filtering bucket index values.
|
|
func (c *cursor) rawValue() []byte {
|
|
// Nothing to return if cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
return nil
|
|
}
|
|
|
|
return copySlice(c.currentIter.Value())
|
|
}
|
|
|
|
// Value returns the current value the cursor is pointing to. This will be nil
|
|
// for nested buckets.
|
|
//
|
|
// This function is part of the database.Cursor interface implementation.
|
|
func (c *cursor) Value() []byte {
|
|
// Ensure transaction state is valid.
|
|
if err := c.bucket.tx.checkClosed(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Nothing to return if cursor is exhausted.
|
|
if c.currentIter == nil {
|
|
return nil
|
|
}
|
|
|
|
// Return nil for the value when the cursor is pointing to a nested
|
|
// bucket.
|
|
if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) {
|
|
return nil
|
|
}
|
|
|
|
return copySlice(c.currentIter.Value())
|
|
}
|
|
|
|
// cursorType defines the type of cursor to create.
|
|
type cursorType int
|
|
|
|
// The following constants define the allowed cursor types.
|
|
const (
|
|
// ctKeys iterates through all of the keys in a given bucket.
|
|
ctKeys cursorType = iota
|
|
|
|
// ctBuckets iterates through all directly nested buckets in a given
|
|
// bucket.
|
|
ctBuckets
|
|
|
|
// ctFull iterates through both the keys and the directly nested buckets
|
|
// in a given bucket.
|
|
ctFull
|
|
)
|
|
|
|
// cursorFinalizer is either invoked when a cursor is being garbage collected or
|
|
// called manually to ensure the underlying cursor iterators are released.
|
|
func cursorFinalizer(c *cursor) {
|
|
c.dbIter.Release()
|
|
c.pendingIter.Release()
|
|
}
|
|
|
|
// newCursor returns a new cursor for the given bucket, bucket ID, and cursor
|
|
// type.
|
|
//
|
|
// NOTE: The caller is responsible for calling the cursorFinalizer function on
|
|
// the returned cursor.
|
|
func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor {
|
|
var dbIter, pendingIter iterator.Iterator
|
|
switch cursorTyp {
|
|
case ctKeys:
|
|
keyRange := util.BytesPrefix(bucketID)
|
|
dbIter = b.tx.snapshot.NewIterator(keyRange)
|
|
pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
|
|
pendingIter = pendingKeyIter
|
|
|
|
case ctBuckets:
|
|
// The serialized bucket index key format is:
|
|
// <bucketindexprefix><parentbucketid><bucketname>
|
|
|
|
// Create an iterator for the both the database and the pending
|
|
// keys which are prefixed by the bucket index identifier and
|
|
// the provided bucket ID.
|
|
prefix := make([]byte, len(bucketIndexPrefix)+4)
|
|
copy(prefix, bucketIndexPrefix)
|
|
copy(prefix[len(bucketIndexPrefix):], bucketID)
|
|
bucketRange := util.BytesPrefix(prefix)
|
|
|
|
dbIter = b.tx.snapshot.NewIterator(bucketRange)
|
|
pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
|
|
pendingIter = pendingBucketIter
|
|
|
|
case ctFull:
|
|
fallthrough
|
|
default:
|
|
// The serialized bucket index key format is:
|
|
// <bucketindexprefix><parentbucketid><bucketname>
|
|
prefix := make([]byte, len(bucketIndexPrefix)+4)
|
|
copy(prefix, bucketIndexPrefix)
|
|
copy(prefix[len(bucketIndexPrefix):], bucketID)
|
|
bucketRange := util.BytesPrefix(prefix)
|
|
keyRange := util.BytesPrefix(bucketID)
|
|
|
|
// Since both keys and buckets are needed from the database,
|
|
// create an individual iterator for each prefix and then create
|
|
// a merged iterator from them.
|
|
dbKeyIter := b.tx.snapshot.NewIterator(keyRange)
|
|
dbBucketIter := b.tx.snapshot.NewIterator(bucketRange)
|
|
iters := []iterator.Iterator{dbKeyIter, dbBucketIter}
|
|
dbIter = iterator.NewMergedIterator(iters,
|
|
comparer.DefaultComparer, true)
|
|
|
|
// Since both keys and buckets are needed from the pending keys,
|
|
// create an individual iterator for each prefix and then create
|
|
// a merged iterator from them.
|
|
pendingKeyIter := newLdbTreapIter(b.tx, keyRange)
|
|
pendingBucketIter := newLdbTreapIter(b.tx, bucketRange)
|
|
iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter}
|
|
pendingIter = iterator.NewMergedIterator(iters,
|
|
comparer.DefaultComparer, true)
|
|
}
|
|
|
|
// Create the cursor using the iterators.
|
|
return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter}
|
|
}
|
|
|
|
// bucket is an internal type used to represent a collection of key/value pairs
|
|
// and implements the database.Bucket interface.
|
|
type bucket struct {
|
|
tx *transaction
|
|
id [4]byte
|
|
}
|
|
|
|
// Enforce bucket implements the database.Bucket interface.
|
|
var _ database.Bucket = (*bucket)(nil)
|
|
|
|
// bucketIndexKey returns the actual key to use for storing and retrieving a
|
|
// child bucket in the bucket index. This is required because additional
|
|
// information is needed to distinguish nested buckets with the same name.
|
|
func bucketIndexKey(parentID [4]byte, key []byte) []byte {
|
|
// The serialized bucket index key format is:
|
|
// <bucketindexprefix><parentbucketid><bucketname>
|
|
indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key))
|
|
copy(indexKey, bucketIndexPrefix)
|
|
copy(indexKey[len(bucketIndexPrefix):], parentID[:])
|
|
copy(indexKey[len(bucketIndexPrefix)+4:], key)
|
|
return indexKey
|
|
}
|
|
|
|
// bucketizedKey returns the actual key to use for storing and retrieving a key
|
|
// for the provided bucket ID. This is required because bucketizing is handled
|
|
// through the use of a unique prefix per bucket.
|
|
func bucketizedKey(bucketID [4]byte, key []byte) []byte {
|
|
// The serialized block index key format is:
|
|
// <bucketid><key>
|
|
bKey := make([]byte, 4+len(key))
|
|
copy(bKey, bucketID[:])
|
|
copy(bKey[4:], key)
|
|
return bKey
|
|
}
|
|
|
|
// Bucket retrieves a nested bucket with the given key. Returns nil if
|
|
// the bucket does not exist.
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) Bucket(key []byte) database.Bucket {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Attempt to fetch the ID for the child bucket. The bucket does not
|
|
// exist if the bucket index entry does not exist.
|
|
childID := b.tx.fetchKey(bucketIndexKey(b.id, key))
|
|
if childID == nil {
|
|
return nil
|
|
}
|
|
|
|
childBucket := &bucket{tx: b.tx}
|
|
copy(childBucket.id[:], childID)
|
|
return childBucket
|
|
}
|
|
|
|
// CreateBucket creates and returns a new nested bucket with the given key.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBucketExists if the bucket already exists
|
|
// - ErrBucketNameRequired if the key is empty
|
|
// - ErrIncompatibleValue if the key is otherwise invalid for the particular
|
|
// implementation
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure the transaction is writable.
|
|
if !b.tx.writable {
|
|
str := "create bucket requires a writable database transaction"
|
|
return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Ensure a key was provided.
|
|
if len(key) == 0 {
|
|
str := "create bucket requires a key"
|
|
return nil, makeDbErr(database.ErrBucketNameRequired, str, nil)
|
|
}
|
|
|
|
// Ensure bucket does not already exist.
|
|
bidxKey := bucketIndexKey(b.id, key)
|
|
if b.tx.hasKey(bidxKey) {
|
|
str := "bucket already exists"
|
|
return nil, makeDbErr(database.ErrBucketExists, str, nil)
|
|
}
|
|
|
|
// Find the appropriate next bucket ID to use for the new bucket. In
|
|
// the case of the special internal block index, keep the fixed ID.
|
|
var childID [4]byte
|
|
if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) {
|
|
childID = blockIdxBucketID
|
|
} else {
|
|
var err error
|
|
childID, err = b.tx.nextBucketID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Add the new bucket to the bucket index.
|
|
if err := b.tx.putKey(bidxKey, childID[:]); err != nil {
|
|
str := fmt.Sprintf("failed to create bucket with key %q", key)
|
|
return nil, convertErr(str, err)
|
|
}
|
|
return &bucket{tx: b.tx, id: childID}, nil
|
|
}
|
|
|
|
// CreateBucketIfNotExists creates and returns a new nested bucket with the
|
|
// given key if it does not already exist.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBucketNameRequired if the key is empty
|
|
// - ErrIncompatibleValue if the key is otherwise invalid for the particular
|
|
// implementation
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Ensure the transaction is writable.
|
|
if !b.tx.writable {
|
|
str := "create bucket requires a writable database transaction"
|
|
return nil, makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Return existing bucket if it already exists, otherwise create it.
|
|
if bucket := b.Bucket(key); bucket != nil {
|
|
return bucket, nil
|
|
}
|
|
return b.CreateBucket(key)
|
|
}
|
|
|
|
// DeleteBucket removes a nested bucket with the given key.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBucketNotFound if the specified bucket does not exist
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) DeleteBucket(key []byte) error {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure the transaction is writable.
|
|
if !b.tx.writable {
|
|
str := "delete bucket requires a writable database transaction"
|
|
return makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Attempt to fetch the ID for the child bucket. The bucket does not
|
|
// exist if the bucket index entry does not exist. In the case of the
|
|
// special internal block index, keep the fixed ID.
|
|
bidxKey := bucketIndexKey(b.id, key)
|
|
childID := b.tx.fetchKey(bidxKey)
|
|
if childID == nil {
|
|
str := fmt.Sprintf("bucket %q does not exist", key)
|
|
return makeDbErr(database.ErrBucketNotFound, str, nil)
|
|
}
|
|
|
|
// Remove all nested buckets and their keys.
|
|
childIDs := [][]byte{childID}
|
|
for len(childIDs) > 0 {
|
|
childID = childIDs[len(childIDs)-1]
|
|
childIDs = childIDs[:len(childIDs)-1]
|
|
|
|
// Delete all keys in the nested bucket.
|
|
keyCursor := newCursor(b, childID, ctKeys)
|
|
for ok := keyCursor.First(); ok; ok = keyCursor.Next() {
|
|
b.tx.deleteKey(keyCursor.rawKey(), false)
|
|
}
|
|
cursorFinalizer(keyCursor)
|
|
|
|
// Iterate through all nested buckets.
|
|
bucketCursor := newCursor(b, childID, ctBuckets)
|
|
for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() {
|
|
// Push the id of the nested bucket onto the stack for
|
|
// the next iteration.
|
|
childID := bucketCursor.rawValue()
|
|
childIDs = append(childIDs, childID)
|
|
|
|
// Remove the nested bucket from the bucket index.
|
|
b.tx.deleteKey(bucketCursor.rawKey(), false)
|
|
}
|
|
cursorFinalizer(bucketCursor)
|
|
}
|
|
|
|
// Remove the nested bucket from the bucket index. Any buckets nested
|
|
// under it were already removed above.
|
|
b.tx.deleteKey(bidxKey, true)
|
|
return nil
|
|
}
|
|
|
|
// Cursor returns a new cursor, allowing for iteration over the bucket's
|
|
// key/value pairs and nested buckets in forward or backward order.
|
|
//
|
|
// You must seek to a position using the First, Last, or Seek functions before
|
|
// calling the Next, Prev, Key, or Value functions. Failure to do so will
|
|
// result in the same return values as an exhausted cursor, which is false for
|
|
// the Prev and Next functions and nil for Key and Value functions.
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) Cursor() database.Cursor {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return &cursor{bucket: b}
|
|
}
|
|
|
|
// Create the cursor and setup a runtime finalizer to ensure the
|
|
// iterators are released when the cursor is garbage collected.
|
|
c := newCursor(b, b.id[:], ctFull)
|
|
runtime.SetFinalizer(c, cursorFinalizer)
|
|
return c
|
|
}
|
|
|
|
// ForEach invokes the passed function with every key/value pair in the bucket.
|
|
// This does not include nested buckets or the key/value pairs within those
|
|
// nested buckets.
|
|
//
|
|
// WARNING: It is not safe to mutate data while iterating with this method.
|
|
// Doing so may cause the underlying cursor to be invalidated and return
|
|
// unexpected keys and/or values.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// NOTE: The values returned by this function are only valid during a
|
|
// transaction. Attempting to access them after a transaction has ended will
|
|
// likely result in an access violation.
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) ForEach(fn func(k, v []byte) error) error {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Invoke the callback for each cursor item. Return the error returned
|
|
// from the callback when it is non-nil.
|
|
c := newCursor(b, b.id[:], ctKeys)
|
|
defer cursorFinalizer(c)
|
|
for ok := c.First(); ok; ok = c.Next() {
|
|
err := fn(c.Key(), c.Value())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ForEachBucket invokes the passed function with the key of every nested bucket
|
|
// in the current bucket. This does not include any nested buckets within those
|
|
// nested buckets.
|
|
//
|
|
// WARNING: It is not safe to mutate data while iterating with this method.
|
|
// Doing so may cause the underlying cursor to be invalidated and return
|
|
// unexpected keys.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// NOTE: The values returned by this function are only valid during a
|
|
// transaction. Attempting to access them after a transaction has ended will
|
|
// likely result in an access violation.
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) ForEachBucket(fn func(k []byte) error) error {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Invoke the callback for each cursor item. Return the error returned
|
|
// from the callback when it is non-nil.
|
|
c := newCursor(b, b.id[:], ctBuckets)
|
|
defer cursorFinalizer(c)
|
|
for ok := c.First(); ok; ok = c.Next() {
|
|
err := fn(c.Key())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Writable returns whether or not the bucket is writable.
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) Writable() bool {
|
|
return b.tx.writable
|
|
}
|
|
|
|
// Put saves the specified key/value pair to the bucket. Keys that do not
|
|
// already exist are added and keys that already exist are overwritten.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrKeyRequired if the key is empty
|
|
// - ErrIncompatibleValue if the key is the same as an existing bucket
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) Put(key, value []byte) error {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure the transaction is writable.
|
|
if !b.tx.writable {
|
|
str := "setting a key requires a writable database transaction"
|
|
return makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Ensure a key was provided.
|
|
if len(key) == 0 {
|
|
str := "put requires a key"
|
|
return makeDbErr(database.ErrKeyRequired, str, nil)
|
|
}
|
|
|
|
return b.tx.putKey(bucketizedKey(b.id, key), value)
|
|
}
|
|
|
|
// Get returns the value for the given key. Returns nil if the key does not
|
|
// exist in this bucket. An empty slice is returned for keys that exist but
|
|
// have no value assigned.
|
|
//
|
|
// NOTE: The value returned by this function is only valid during a transaction.
|
|
// Attempting to access it after a transaction has ended results in undefined
|
|
// behavior. Additionally, the value must NOT be modified by the caller.
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) Get(key []byte) []byte {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return nil
|
|
}
|
|
|
|
// Nothing to return if there is no key.
|
|
if len(key) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return b.tx.fetchKey(bucketizedKey(b.id, key))
|
|
}
|
|
|
|
// Delete removes the specified key from the bucket. Deleting a key that does
|
|
// not exist does not return an error.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrKeyRequired if the key is empty
|
|
// - ErrIncompatibleValue if the key is the same as an existing bucket
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Bucket interface implementation.
|
|
func (b *bucket) Delete(key []byte) error {
|
|
// Ensure transaction state is valid.
|
|
if err := b.tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure the transaction is writable.
|
|
if !b.tx.writable {
|
|
str := "deleting a value requires a writable database transaction"
|
|
return makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Nothing to do if there is no key.
|
|
if len(key) == 0 {
|
|
return nil
|
|
}
|
|
|
|
b.tx.deleteKey(bucketizedKey(b.id, key), true)
|
|
return nil
|
|
}
|
|
|
|
// pendingBlock houses a block that will be written to disk when the database
|
|
// transaction is committed.
|
|
type pendingBlock struct {
|
|
hash *chainhash.Hash
|
|
bytes []byte
|
|
}
|
|
|
|
// transaction represents a database transaction. It can either be read-only or
|
|
// read-write and implements the database.Tx interface. The transaction
|
|
// provides a root bucket against which all read and writes occur.
|
|
type transaction struct {
|
|
managed bool // Is the transaction managed?
|
|
closed bool // Is the transaction closed?
|
|
writable bool // Is the transaction writable?
|
|
db *db // DB instance the tx was created from.
|
|
snapshot *dbCacheSnapshot // Underlying snapshot for txns.
|
|
metaBucket *bucket // The root metadata bucket.
|
|
blockIdxBucket *bucket // The block index bucket.
|
|
|
|
// Blocks that need to be stored on commit. The pendingBlocks map is
|
|
// kept to allow quick lookups of pending data by block hash.
|
|
pendingBlocks map[chainhash.Hash]int
|
|
pendingBlockData []pendingBlock
|
|
|
|
// Keys that need to be stored or deleted on commit.
|
|
pendingKeys *treap.Mutable
|
|
pendingRemove *treap.Mutable
|
|
|
|
// Active iterators that need to be notified when the pending keys have
|
|
// been updated so the cursors can properly handle updates to the
|
|
// transaction state.
|
|
activeIterLock sync.RWMutex
|
|
activeIters []*treap.Iterator
|
|
}
|
|
|
|
// Enforce transaction implements the database.Tx interface.
|
|
var _ database.Tx = (*transaction)(nil)
|
|
|
|
// removeActiveIter removes the passed iterator from the list of active
|
|
// iterators against the pending keys treap.
|
|
func (tx *transaction) removeActiveIter(iter *treap.Iterator) {
|
|
// An indexing for loop is intentionally used over a range here as range
|
|
// does not reevaluate the slice on each iteration nor does it adjust
|
|
// the index for the modified slice.
|
|
tx.activeIterLock.Lock()
|
|
for i := 0; i < len(tx.activeIters); i++ {
|
|
if tx.activeIters[i] == iter {
|
|
copy(tx.activeIters[i:], tx.activeIters[i+1:])
|
|
tx.activeIters[len(tx.activeIters)-1] = nil
|
|
tx.activeIters = tx.activeIters[:len(tx.activeIters)-1]
|
|
}
|
|
}
|
|
tx.activeIterLock.Unlock()
|
|
}
|
|
|
|
// addActiveIter adds the passed iterator to the list of active iterators for
|
|
// the pending keys treap.
|
|
func (tx *transaction) addActiveIter(iter *treap.Iterator) {
|
|
tx.activeIterLock.Lock()
|
|
tx.activeIters = append(tx.activeIters, iter)
|
|
tx.activeIterLock.Unlock()
|
|
}
|
|
|
|
// notifyActiveIters notifies all of the active iterators for the pending keys
|
|
// treap that it has been updated.
|
|
func (tx *transaction) notifyActiveIters() {
|
|
tx.activeIterLock.RLock()
|
|
for _, iter := range tx.activeIters {
|
|
iter.ForceReseek()
|
|
}
|
|
tx.activeIterLock.RUnlock()
|
|
}
|
|
|
|
// checkClosed returns an error if the the database or transaction is closed.
|
|
func (tx *transaction) checkClosed() error {
|
|
// The transaction is no longer valid if it has been closed.
|
|
if tx.closed {
|
|
return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// hasKey returns whether or not the provided key exists in the database while
|
|
// taking into account the current transaction state.
|
|
func (tx *transaction) hasKey(key []byte) bool {
|
|
// When the transaction is writable, check the pending transaction
|
|
// state first.
|
|
if tx.writable {
|
|
if tx.pendingRemove.Has(key) {
|
|
return false
|
|
}
|
|
if tx.pendingKeys.Has(key) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Consult the database cache and underlying database.
|
|
return tx.snapshot.Has(key)
|
|
}
|
|
|
|
// putKey adds the provided key to the list of keys to be updated in the
|
|
// database when the transaction is committed.
|
|
//
|
|
// NOTE: This function must only be called on a writable transaction. Since it
|
|
// is an internal helper function, it does not check.
|
|
func (tx *transaction) putKey(key, value []byte) error {
|
|
// Prevent the key from being deleted if it was previously scheduled
|
|
// to be deleted on transaction commit.
|
|
tx.pendingRemove.Delete(key)
|
|
|
|
// Add the key/value pair to the list to be written on transaction
|
|
// commit.
|
|
tx.pendingKeys.Put(key, value)
|
|
tx.notifyActiveIters()
|
|
return nil
|
|
}
|
|
|
|
// fetchKey attempts to fetch the provided key from the database cache (and
|
|
// hence underlying database) while taking into account the current transaction
|
|
// state. Returns nil if the key does not exist.
|
|
func (tx *transaction) fetchKey(key []byte) []byte {
|
|
// When the transaction is writable, check the pending transaction
|
|
// state first.
|
|
if tx.writable {
|
|
if tx.pendingRemove.Has(key) {
|
|
return nil
|
|
}
|
|
if value := tx.pendingKeys.Get(key); value != nil {
|
|
return value
|
|
}
|
|
}
|
|
|
|
// Consult the database cache and underlying database.
|
|
return tx.snapshot.Get(key)
|
|
}
|
|
|
|
// deleteKey adds the provided key to the list of keys to be deleted from the
|
|
// database when the transaction is committed. The notify iterators flag is
|
|
// useful to delay notifying iterators about the changes during bulk deletes.
|
|
//
|
|
// NOTE: This function must only be called on a writable transaction. Since it
|
|
// is an internal helper function, it does not check.
|
|
func (tx *transaction) deleteKey(key []byte, notifyIterators bool) {
|
|
// Remove the key from the list of pendings keys to be written on
|
|
// transaction commit if needed.
|
|
tx.pendingKeys.Delete(key)
|
|
|
|
// Add the key to the list to be deleted on transaction commit.
|
|
tx.pendingRemove.Put(key, nil)
|
|
|
|
// Notify the active iterators about the change if the flag is set.
|
|
if notifyIterators {
|
|
tx.notifyActiveIters()
|
|
}
|
|
}
|
|
|
|
// nextBucketID returns the next bucket ID to use for creating a new bucket.
|
|
//
|
|
// NOTE: This function must only be called on a writable transaction. Since it
|
|
// is an internal helper function, it does not check.
|
|
func (tx *transaction) nextBucketID() ([4]byte, error) {
|
|
// Load the currently highest used bucket ID.
|
|
curIDBytes := tx.fetchKey(curBucketIDKeyName)
|
|
curBucketNum := binary.BigEndian.Uint32(curIDBytes)
|
|
|
|
// Increment and update the current bucket ID and return it.
|
|
var nextBucketID [4]byte
|
|
binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1)
|
|
if err := tx.putKey(curBucketIDKeyName, nextBucketID[:]); err != nil {
|
|
return [4]byte{}, err
|
|
}
|
|
return nextBucketID, nil
|
|
}
|
|
|
|
// Metadata returns the top-most bucket for all metadata storage.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) Metadata() database.Bucket {
|
|
return tx.metaBucket
|
|
}
|
|
|
|
// hasBlock returns whether or not a block with the given hash exists.
|
|
func (tx *transaction) hasBlock(hash *chainhash.Hash) bool {
|
|
// Return true if the block is pending to be written on commit since
|
|
// it exists from the viewpoint of this transaction.
|
|
if _, exists := tx.pendingBlocks[*hash]; exists {
|
|
return true
|
|
}
|
|
|
|
return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:]))
|
|
}
|
|
|
|
// StoreBlock stores the provided block into the database. There are no checks
|
|
// to ensure the block connects to a previous block, contains double spends, or
|
|
// any additional functionality such as transaction indexing. It simply stores
|
|
// the block in the database.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockExists when the block hash already exists
|
|
// - ErrTxNotWritable if attempted against a read-only transaction
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) StoreBlock(block *btcutil.Block) error {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Ensure the transaction is writable.
|
|
if !tx.writable {
|
|
str := "store block requires a writable database transaction"
|
|
return makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Reject the block if it already exists.
|
|
blockHash := block.Hash()
|
|
if tx.hasBlock(blockHash) {
|
|
str := fmt.Sprintf("block %s already exists", blockHash)
|
|
return makeDbErr(database.ErrBlockExists, str, nil)
|
|
}
|
|
|
|
blockBytes, err := block.Bytes()
|
|
if err != nil {
|
|
str := fmt.Sprintf("failed to get serialized bytes for block %s",
|
|
blockHash)
|
|
return makeDbErr(database.ErrDriverSpecific, str, err)
|
|
}
|
|
|
|
// Add the block to be stored to the list of pending blocks to store
|
|
// when the transaction is committed. Also, add it to pending blocks
|
|
// map so it is easy to determine the block is pending based on the
|
|
// block hash.
|
|
if tx.pendingBlocks == nil {
|
|
tx.pendingBlocks = make(map[chainhash.Hash]int)
|
|
}
|
|
tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData)
|
|
tx.pendingBlockData = append(tx.pendingBlockData, pendingBlock{
|
|
hash: blockHash,
|
|
bytes: blockBytes,
|
|
})
|
|
log.Tracef("Added block %s to pending blocks", blockHash)
|
|
|
|
return nil
|
|
}
|
|
|
|
// HasBlock returns whether or not a block with the given hash exists in the
|
|
// database.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return tx.hasBlock(hash), nil
|
|
}
|
|
|
|
// HasBlocks returns whether or not the blocks with the provided hashes
|
|
// exist in the database.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
results := make([]bool, len(hashes))
|
|
for i := range hashes {
|
|
results[i] = tx.hasBlock(&hashes[i])
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// fetchBlockRow fetches the metadata stored in the block index for the provided
|
|
// hash. It will return ErrBlockNotFound if there is no entry.
|
|
func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) {
|
|
blockRow := tx.blockIdxBucket.Get(hash[:])
|
|
if blockRow == nil {
|
|
str := fmt.Sprintf("block %s does not exist", hash)
|
|
return nil, makeDbErr(database.ErrBlockNotFound, str, nil)
|
|
}
|
|
|
|
return blockRow, nil
|
|
}
|
|
|
|
// FetchBlockHeader returns the raw serialized bytes for the block header
|
|
// identified by the given hash. The raw bytes are in the format returned by
|
|
// Serialize on a wire.BlockHeader.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockNotFound if the requested block hash does not exist
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
// - ErrCorruption if the database has somehow become corrupted
|
|
//
|
|
// NOTE: The data returned by this function is only valid during a
|
|
// database transaction. Attempting to access it after a transaction
|
|
// has ended results in undefined behavior. This constraint prevents
|
|
// additional data copies and allows support for memory-mapped database
|
|
// implementations.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) {
|
|
return tx.FetchBlockRegion(&database.BlockRegion{
|
|
Hash: hash,
|
|
Offset: 0,
|
|
Len: blockHdrSize,
|
|
})
|
|
}
|
|
|
|
// FetchBlockHeaders returns the raw serialized bytes for the block headers
|
|
// identified by the given hashes. The raw bytes are in the format returned by
|
|
// Serialize on a wire.BlockHeader.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockNotFound if the any of the requested block hashes do not exist
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
// - ErrCorruption if the database has somehow become corrupted
|
|
//
|
|
// NOTE: The data returned by this function is only valid during a database
|
|
// transaction. Attempting to access it after a transaction has ended results
|
|
// in undefined behavior. This constraint prevents additional data copies and
|
|
// allows support for memory-mapped database implementations.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) {
|
|
regions := make([]database.BlockRegion, len(hashes))
|
|
for i := range hashes {
|
|
regions[i].Hash = &hashes[i]
|
|
regions[i].Offset = 0
|
|
regions[i].Len = blockHdrSize
|
|
}
|
|
return tx.FetchBlockRegions(regions)
|
|
}
|
|
|
|
// FetchBlock returns the raw serialized bytes for the block identified by the
|
|
// given hash. The raw bytes are in the format returned by Serialize on a
|
|
// wire.MsgBlock.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockNotFound if the requested block hash does not exist
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
// - ErrCorruption if the database has somehow become corrupted
|
|
//
|
|
// In addition, returns ErrDriverSpecific if any failures occur when reading the
|
|
// block files.
|
|
//
|
|
// NOTE: The data returned by this function is only valid during a database
|
|
// transaction. Attempting to access it after a transaction has ended results
|
|
// in undefined behavior. This constraint prevents additional data copies and
|
|
// allows support for memory-mapped database implementations.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// When the block is pending to be written on commit return the bytes
|
|
// from there.
|
|
if idx, exists := tx.pendingBlocks[*hash]; exists {
|
|
return tx.pendingBlockData[idx].bytes, nil
|
|
}
|
|
|
|
// Lookup the location of the block in the files from the block index.
|
|
blockRow, err := tx.fetchBlockRow(hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
location := deserializeBlockLoc(blockRow)
|
|
|
|
// Read the block from the appropriate location. The function also
|
|
// performs a checksum over the data to detect data corruption.
|
|
blockBytes, err := tx.db.store.readBlock(hash, location)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return blockBytes, nil
|
|
}
|
|
|
|
// FetchBlocks returns the raw serialized bytes for the blocks identified by the
|
|
// given hashes. The raw bytes are in the format returned by Serialize on a
|
|
// wire.MsgBlock.
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockNotFound if any of the requested block hashed do not exist
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
// - ErrCorruption if the database has somehow become corrupted
|
|
//
|
|
// In addition, returns ErrDriverSpecific if any failures occur when reading the
|
|
// block files.
|
|
//
|
|
// NOTE: The data returned by this function is only valid during a database
|
|
// transaction. Attempting to access it after a transaction has ended results
|
|
// in undefined behavior. This constraint prevents additional data copies and
|
|
// allows support for memory-mapped database implementations.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// NOTE: This could check for the existence of all blocks before loading
|
|
// any of them which would be faster in the failure case, however
|
|
// callers will not typically be calling this function with invalid
|
|
// values, so optimize for the common case.
|
|
|
|
// Load the blocks.
|
|
blocks := make([][]byte, len(hashes))
|
|
for i := range hashes {
|
|
var err error
|
|
blocks[i], err = tx.FetchBlock(&hashes[i])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return blocks, nil
|
|
}
|
|
|
|
// fetchPendingRegion attempts to fetch the provided region from any block which
|
|
// are pending to be written on commit. It will return nil for the byte slice
|
|
// when the region references a block which is not pending. When the region
|
|
// does reference a pending block, it is bounds checked and returns
|
|
// ErrBlockRegionInvalid if invalid.
|
|
func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) {
|
|
// Nothing to do if the block is not pending to be written on commit.
|
|
idx, exists := tx.pendingBlocks[*region.Hash]
|
|
if !exists {
|
|
return nil, nil
|
|
}
|
|
|
|
// Ensure the region is within the bounds of the block.
|
|
blockBytes := tx.pendingBlockData[idx].bytes
|
|
blockLen := uint32(len(blockBytes))
|
|
endOffset := region.Offset + region.Len
|
|
if endOffset < region.Offset || endOffset > blockLen {
|
|
str := fmt.Sprintf("block %s region offset %d, length %d "+
|
|
"exceeds block length of %d", region.Hash,
|
|
region.Offset, region.Len, blockLen)
|
|
return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
|
|
}
|
|
|
|
// Return the bytes from the pending block.
|
|
return blockBytes[region.Offset:endOffset:endOffset], nil
|
|
}
|
|
|
|
// FetchBlockRegion returns the raw serialized bytes for the given block region.
|
|
//
|
|
// For example, it is possible to directly extract Bitcoin transactions and/or
|
|
// scripts from a block with this function. Depending on the backend
|
|
// implementation, this can provide significant savings by avoiding the need to
|
|
// load entire blocks.
|
|
//
|
|
// The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
|
|
// the Offset field in the provided BlockRegion is zero-based and relative to
|
|
// the start of the block (byte 0).
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockNotFound if the requested block hash does not exist
|
|
// - ErrBlockRegionInvalid if the region exceeds the bounds of the associated
|
|
// block
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
// - ErrCorruption if the database has somehow become corrupted
|
|
//
|
|
// In addition, returns ErrDriverSpecific if any failures occur when reading the
|
|
// block files.
|
|
//
|
|
// NOTE: The data returned by this function is only valid during a database
|
|
// transaction. Attempting to access it after a transaction has ended results
|
|
// in undefined behavior. This constraint prevents additional data copies and
|
|
// allows support for memory-mapped database implementations.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// When the block is pending to be written on commit return the bytes
|
|
// from there.
|
|
if tx.pendingBlocks != nil {
|
|
regionBytes, err := tx.fetchPendingRegion(region)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if regionBytes != nil {
|
|
return regionBytes, nil
|
|
}
|
|
}
|
|
|
|
// Lookup the location of the block in the files from the block index.
|
|
blockRow, err := tx.fetchBlockRow(region.Hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
location := deserializeBlockLoc(blockRow)
|
|
|
|
// Ensure the region is within the bounds of the block.
|
|
endOffset := region.Offset + region.Len
|
|
if endOffset < region.Offset || endOffset > location.blockLen {
|
|
str := fmt.Sprintf("block %s region offset %d, length %d "+
|
|
"exceeds block length of %d", region.Hash,
|
|
region.Offset, region.Len, location.blockLen)
|
|
return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
|
|
|
|
}
|
|
|
|
// Read the region from the appropriate disk block file.
|
|
regionBytes, err := tx.db.store.readBlockRegion(location, region.Offset,
|
|
region.Len)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return regionBytes, nil
|
|
}
|
|
|
|
// FetchBlockRegions returns the raw serialized bytes for the given block
|
|
// regions.
|
|
//
|
|
// For example, it is possible to directly extract Bitcoin transactions and/or
|
|
// scripts from various blocks with this function. Depending on the backend
|
|
// implementation, this can provide significant savings by avoiding the need to
|
|
// load entire blocks.
|
|
//
|
|
// The raw bytes are in the format returned by Serialize on a wire.MsgBlock and
|
|
// the Offset fields in the provided BlockRegions are zero-based and relative to
|
|
// the start of the block (byte 0).
|
|
//
|
|
// Returns the following errors as required by the interface contract:
|
|
// - ErrBlockNotFound if any of the request block hashes do not exist
|
|
// - ErrBlockRegionInvalid if one or more region exceed the bounds of the
|
|
// associated block
|
|
// - ErrTxClosed if the transaction has already been closed
|
|
// - ErrCorruption if the database has somehow become corrupted
|
|
//
|
|
// In addition, returns ErrDriverSpecific if any failures occur when reading the
|
|
// block files.
|
|
//
|
|
// NOTE: The data returned by this function is only valid during a database
|
|
// transaction. Attempting to access it after a transaction has ended results
|
|
// in undefined behavior. This constraint prevents additional data copies and
|
|
// allows support for memory-mapped database implementations.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) {
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// NOTE: This could check for the existence of all blocks before
|
|
// deserializing the locations and building up the fetch list which
|
|
// would be faster in the failure case, however callers will not
|
|
// typically be calling this function with invalid values, so optimize
|
|
// for the common case.
|
|
|
|
// NOTE: A potential optimization here would be to combine adjacent
|
|
// regions to reduce the number of reads.
|
|
|
|
// In order to improve efficiency of loading the bulk data, first grab
|
|
// the block location for all of the requested block hashes and sort
|
|
// the reads by filenum:offset so that all reads are grouped by file
|
|
// and linear within each file. This can result in quite a significant
|
|
// performance increase depending on how spread out the requested hashes
|
|
// are by reducing the number of file open/closes and random accesses
|
|
// needed. The fetchList is intentionally allocated with a cap because
|
|
// some of the regions might be fetched from the pending blocks and
|
|
// hence there is no need to fetch those from disk.
|
|
blockRegions := make([][]byte, len(regions))
|
|
fetchList := make([]bulkFetchData, 0, len(regions))
|
|
for i := range regions {
|
|
region := ®ions[i]
|
|
|
|
// When the block is pending to be written on commit grab the
|
|
// bytes from there.
|
|
if tx.pendingBlocks != nil {
|
|
regionBytes, err := tx.fetchPendingRegion(region)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if regionBytes != nil {
|
|
blockRegions[i] = regionBytes
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Lookup the location of the block in the files from the block
|
|
// index.
|
|
blockRow, err := tx.fetchBlockRow(region.Hash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
location := deserializeBlockLoc(blockRow)
|
|
|
|
// Ensure the region is within the bounds of the block.
|
|
endOffset := region.Offset + region.Len
|
|
if endOffset < region.Offset || endOffset > location.blockLen {
|
|
str := fmt.Sprintf("block %s region offset %d, length "+
|
|
"%d exceeds block length of %d", region.Hash,
|
|
region.Offset, region.Len, location.blockLen)
|
|
return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil)
|
|
}
|
|
|
|
fetchList = append(fetchList, bulkFetchData{&location, i})
|
|
}
|
|
sort.Sort(bulkFetchDataSorter(fetchList))
|
|
|
|
// Read all of the regions in the fetch list and set the results.
|
|
for i := range fetchList {
|
|
fetchData := &fetchList[i]
|
|
ri := fetchData.replyIndex
|
|
region := ®ions[ri]
|
|
location := fetchData.blockLocation
|
|
regionBytes, err := tx.db.store.readBlockRegion(*location,
|
|
region.Offset, region.Len)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
blockRegions[ri] = regionBytes
|
|
}
|
|
|
|
return blockRegions, nil
|
|
}
|
|
|
|
// close marks the transaction closed then releases any pending data, the
|
|
// underlying snapshot, the transaction read lock, and the write lock when the
|
|
// transaction is writable.
|
|
func (tx *transaction) close() {
|
|
tx.closed = true
|
|
|
|
// Clear pending blocks that would have been written on commit.
|
|
tx.pendingBlocks = nil
|
|
tx.pendingBlockData = nil
|
|
|
|
// Clear pending keys that would have been written or deleted on commit.
|
|
tx.pendingKeys = nil
|
|
tx.pendingRemove = nil
|
|
|
|
// Release the snapshot.
|
|
if tx.snapshot != nil {
|
|
tx.snapshot.Release()
|
|
tx.snapshot = nil
|
|
}
|
|
|
|
tx.db.closeLock.RUnlock()
|
|
|
|
// Release the writer lock for writable transactions to unblock any
|
|
// other write transaction which are possibly waiting.
|
|
if tx.writable {
|
|
tx.db.writeLock.Unlock()
|
|
}
|
|
}
|
|
|
|
// writePendingAndCommit writes pending block data to the flat block files,
|
|
// updates the metadata with their locations as well as the new current write
|
|
// location, and commits the metadata to the memory database cache. It also
|
|
// properly handles rollback in the case of failures.
|
|
//
|
|
// This function MUST only be called when there is pending data to be written.
|
|
func (tx *transaction) writePendingAndCommit() error {
|
|
// Save the current block store write position for potential rollback.
|
|
// These variables are only updated here in this function and there can
|
|
// only be one write transaction active at a time, so it's safe to store
|
|
// them for potential rollback.
|
|
wc := tx.db.store.writeCursor
|
|
wc.RLock()
|
|
oldBlkFileNum := wc.curFileNum
|
|
oldBlkOffset := wc.curOffset
|
|
wc.RUnlock()
|
|
|
|
// rollback is a closure that is used to rollback all writes to the
|
|
// block files.
|
|
rollback := func() {
|
|
// Rollback any modifications made to the block files if needed.
|
|
tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset)
|
|
}
|
|
|
|
// Loop through all of the pending blocks to store and write them.
|
|
for _, blockData := range tx.pendingBlockData {
|
|
log.Tracef("Storing block %s", blockData.hash)
|
|
location, err := tx.db.store.writeBlock(blockData.bytes)
|
|
if err != nil {
|
|
rollback()
|
|
return err
|
|
}
|
|
|
|
// Add a record in the block index for the block. The record
|
|
// includes the location information needed to locate the block
|
|
// on the filesystem as well as the block header since they are
|
|
// so commonly needed.
|
|
blockRow := serializeBlockLoc(location)
|
|
err = tx.blockIdxBucket.Put(blockData.hash[:], blockRow)
|
|
if err != nil {
|
|
rollback()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Update the metadata for the current write file and offset.
|
|
writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset)
|
|
if err := tx.metaBucket.Put(writeLocKeyName, writeRow); err != nil {
|
|
rollback()
|
|
return convertErr("failed to store write cursor", err)
|
|
}
|
|
|
|
// Atomically update the database cache. The cache automatically
|
|
// handles flushing to the underlying persistent storage database.
|
|
return tx.db.cache.commitTx(tx)
|
|
}
|
|
|
|
// Commit commits all changes that have been made to the root metadata bucket
|
|
// and all of its sub-buckets to the database cache which is periodically synced
|
|
// to persistent storage. In addition, it commits all new blocks directly to
|
|
// persistent storage bypassing the db cache. Blocks can be rather large, so
|
|
// this help increase the amount of cache available for the metadata updates and
|
|
// is safe since blocks are immutable.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) Commit() error {
|
|
// Prevent commits on managed transactions.
|
|
if tx.managed {
|
|
tx.close()
|
|
panic("managed transaction commit not allowed")
|
|
}
|
|
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Regardless of whether the commit succeeds, the transaction is closed
|
|
// on return.
|
|
defer tx.close()
|
|
|
|
// Ensure the transaction is writable.
|
|
if !tx.writable {
|
|
str := "Commit requires a writable database transaction"
|
|
return makeDbErr(database.ErrTxNotWritable, str, nil)
|
|
}
|
|
|
|
// Write pending data. The function will rollback if any errors occur.
|
|
return tx.writePendingAndCommit()
|
|
}
|
|
|
|
// Rollback undoes all changes that have been made to the root bucket and all of
|
|
// its sub-buckets.
|
|
//
|
|
// This function is part of the database.Tx interface implementation.
|
|
func (tx *transaction) Rollback() error {
|
|
// Prevent rollbacks on managed transactions.
|
|
if tx.managed {
|
|
tx.close()
|
|
panic("managed transaction rollback not allowed")
|
|
}
|
|
|
|
// Ensure transaction state is valid.
|
|
if err := tx.checkClosed(); err != nil {
|
|
return err
|
|
}
|
|
|
|
tx.close()
|
|
return nil
|
|
}
|
|
|
|
// db represents a collection of namespaces which are persisted and implements
|
|
// the database.DB interface. All database access is performed through
|
|
// transactions which are obtained through the specific Namespace.
|
|
type db struct {
|
|
writeLock sync.Mutex // Limit to one write transaction at a time.
|
|
closeLock sync.RWMutex // Make database close block while txns active.
|
|
closed bool // Is the database closed?
|
|
store *blockStore // Handles read/writing blocks to flat files.
|
|
cache *dbCache // Cache layer which wraps underlying leveldb DB.
|
|
}
|
|
|
|
// Enforce db implements the database.DB interface.
|
|
var _ database.DB = (*db)(nil)
|
|
|
|
// Type returns the database driver type the current database instance was
|
|
// created with.
|
|
//
|
|
// This function is part of the database.DB interface implementation.
|
|
func (db *db) Type() string {
|
|
return dbType
|
|
}
|
|
|
|
// begin is the implementation function for the Begin database method. See its
|
|
// documentation for more details.
|
|
//
|
|
// This function is only separate because it returns the internal transaction
|
|
// which is used by the managed transaction code while the database method
|
|
// returns the interface.
|
|
func (db *db) begin(writable bool) (*transaction, error) {
|
|
// Whenever a new writable transaction is started, grab the write lock
|
|
// to ensure only a single write transaction can be active at the same
|
|
// time. This lock will not be released until the transaction is
|
|
// closed (via Rollback or Commit).
|
|
if writable {
|
|
db.writeLock.Lock()
|
|
}
|
|
|
|
// Whenever a new transaction is started, grab a read lock against the
|
|
// database to ensure Close will wait for the transaction to finish.
|
|
// This lock will not be released until the transaction is closed (via
|
|
// Rollback or Commit).
|
|
db.closeLock.RLock()
|
|
if db.closed {
|
|
db.closeLock.RUnlock()
|
|
if writable {
|
|
db.writeLock.Unlock()
|
|
}
|
|
return nil, makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr,
|
|
nil)
|
|
}
|
|
|
|
// Grab a snapshot of the database cache (which in turn also handles the
|
|
// underlying database).
|
|
snapshot, err := db.cache.Snapshot()
|
|
if err != nil {
|
|
db.closeLock.RUnlock()
|
|
if writable {
|
|
db.writeLock.Unlock()
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// The metadata and block index buckets are internal-only buckets, so
|
|
// they have defined IDs.
|
|
tx := &transaction{
|
|
writable: writable,
|
|
db: db,
|
|
snapshot: snapshot,
|
|
pendingKeys: treap.NewMutable(),
|
|
pendingRemove: treap.NewMutable(),
|
|
}
|
|
tx.metaBucket = &bucket{tx: tx, id: metadataBucketID}
|
|
tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID}
|
|
return tx, nil
|
|
}
|
|
|
|
// Begin starts a transaction which is either read-only or read-write depending
|
|
// on the specified flag. Multiple read-only transactions can be started
|
|
// simultaneously while only a single read-write transaction can be started at a
|
|
// time. The call will block when starting a read-write transaction when one is
|
|
// already open.
|
|
//
|
|
// NOTE: The transaction must be closed by calling Rollback or Commit on it when
|
|
// it is no longer needed. Failure to do so will result in unclaimed memory.
|
|
//
|
|
// This function is part of the database.DB interface implementation.
|
|
func (db *db) Begin(writable bool) (database.Tx, error) {
|
|
return db.begin(writable)
|
|
}
|
|
|
|
// rollbackOnPanic rolls the passed transaction back if the code in the calling
|
|
// function panics. This is needed since the mutex on a transaction must be
|
|
// released and a panic in called code would prevent that from happening.
|
|
//
|
|
// NOTE: This can only be handled manually for managed transactions since they
|
|
// control the life-cycle of the transaction. As the documentation on Begin
|
|
// calls out, callers opting to use manual transactions will have to ensure the
|
|
// transaction is rolled back on panic if it desires that functionality as well
|
|
// or the database will fail to close since the read-lock will never be
|
|
// released.
|
|
func rollbackOnPanic(tx *transaction) {
|
|
if err := recover(); err != nil {
|
|
tx.managed = false
|
|
_ = tx.Rollback()
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
// View invokes the passed function in the context of a managed read-only
|
|
// transaction with the root bucket for the namespace. Any errors returned from
|
|
// the user-supplied function are returned from this function.
|
|
//
|
|
// This function is part of the database.DB interface implementation.
|
|
func (db *db) View(fn func(database.Tx) error) error {
|
|
// Start a read-only transaction.
|
|
tx, err := db.begin(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Since the user-provided function might panic, ensure the transaction
|
|
// releases all mutexes and resources. There is no guarantee the caller
|
|
// won't use recover and keep going. Thus, the database must still be
|
|
// in a usable state on panics due to caller issues.
|
|
defer rollbackOnPanic(tx)
|
|
|
|
tx.managed = true
|
|
err = fn(tx)
|
|
tx.managed = false
|
|
if err != nil {
|
|
// The error is ignored here because nothing was written yet
|
|
// and regardless of a rollback failure, the tx is closed now
|
|
// anyways.
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
|
|
return tx.Rollback()
|
|
}
|
|
|
|
// Update invokes the passed function in the context of a managed read-write
|
|
// transaction with the root bucket for the namespace. Any errors returned from
|
|
// the user-supplied function will cause the transaction to be rolled back and
|
|
// are returned from this function. Otherwise, the transaction is committed
|
|
// when the user-supplied function returns a nil error.
|
|
//
|
|
// This function is part of the database.DB interface implementation.
|
|
func (db *db) Update(fn func(database.Tx) error) error {
|
|
// Start a read-write transaction.
|
|
tx, err := db.begin(true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Since the user-provided function might panic, ensure the transaction
|
|
// releases all mutexes and resources. There is no guarantee the caller
|
|
// won't use recover and keep going. Thus, the database must still be
|
|
// in a usable state on panics due to caller issues.
|
|
defer rollbackOnPanic(tx)
|
|
|
|
tx.managed = true
|
|
err = fn(tx)
|
|
tx.managed = false
|
|
if err != nil {
|
|
// The error is ignored here because nothing was written yet
|
|
// and regardless of a rollback failure, the tx is closed now
|
|
// anyways.
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// Close cleanly shuts down the database and syncs all data. It will block
|
|
// until all database transactions have been finalized (rolled back or
|
|
// committed).
|
|
//
|
|
// This function is part of the database.DB interface implementation.
|
|
func (db *db) Close() error {
|
|
// Since all transactions have a read lock on this mutex, this will
|
|
// cause Close to wait for all readers to complete.
|
|
db.closeLock.Lock()
|
|
defer db.closeLock.Unlock()
|
|
|
|
if db.closed {
|
|
return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil)
|
|
}
|
|
db.closed = true
|
|
|
|
// NOTE: Since the above lock waits for all transactions to finish and
|
|
// prevents any new ones from being started, it is safe to flush the
|
|
// cache and clear all state without the individual locks.
|
|
|
|
// Close the database cache which will flush any existing entries to
|
|
// disk and close the underlying leveldb database. Any error is saved
|
|
// and returned at the end after the remaining cleanup since the
|
|
// database will be marked closed even if this fails given there is no
|
|
// good way for the caller to recover from a failure here anyways.
|
|
closeErr := db.cache.Close()
|
|
|
|
// Close any open flat files that house the blocks.
|
|
wc := db.store.writeCursor
|
|
if wc.curFile.file != nil {
|
|
_ = wc.curFile.file.Close()
|
|
wc.curFile.file = nil
|
|
}
|
|
for _, blockFile := range db.store.openBlockFiles {
|
|
_ = blockFile.file.Close()
|
|
}
|
|
db.store.openBlockFiles = nil
|
|
db.store.openBlocksLRU.Init()
|
|
db.store.fileNumToLRUElem = nil
|
|
|
|
return closeErr
|
|
}
|
|
|
|
// filesExists reports whether the named file or directory exists.
|
|
func fileExists(name string) bool {
|
|
if _, err := os.Stat(name); err != nil {
|
|
if os.IsNotExist(err) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// initDB creates the initial buckets and values used by the package. This is
|
|
// mainly in a separate function for testing purposes.
|
|
func initDB(ldb *leveldb.DB) error {
|
|
// The starting block file write cursor location is file num 0, offset
|
|
// 0.
|
|
batch := new(leveldb.Batch)
|
|
batch.Put(bucketizedKey(metadataBucketID, writeLocKeyName),
|
|
serializeWriteRow(0, 0))
|
|
|
|
// Create block index bucket and set the current bucket id.
|
|
//
|
|
// NOTE: Since buckets are virtualized through the use of prefixes,
|
|
// there is no need to store the bucket index data for the metadata
|
|
// bucket in the database. However, the first bucket ID to use does
|
|
// need to account for it to ensure there are no key collisions.
|
|
batch.Put(bucketIndexKey(metadataBucketID, blockIdxBucketName),
|
|
blockIdxBucketID[:])
|
|
batch.Put(curBucketIDKeyName, blockIdxBucketID[:])
|
|
|
|
// Write everything as a single batch.
|
|
if err := ldb.Write(batch, nil); err != nil {
|
|
str := fmt.Sprintf("failed to initialize metadata database: %v",
|
|
err)
|
|
return convertErr(str, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// openDB opens the database at the provided path. database.ErrDbDoesNotExist
|
|
// is returned if the database doesn't exist and the create flag is not set.
|
|
func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) {
|
|
// Error if the database doesn't exist and the create flag is not set.
|
|
metadataDbPath := filepath.Join(dbPath, metadataDbName)
|
|
dbExists := fileExists(metadataDbPath)
|
|
if !create && !dbExists {
|
|
str := fmt.Sprintf("database %q does not exist", metadataDbPath)
|
|
return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil)
|
|
}
|
|
|
|
// Ensure the full path to the database exists.
|
|
if !dbExists {
|
|
// The error can be ignored here since the call to
|
|
// leveldb.OpenFile will fail if the directory couldn't be
|
|
// created.
|
|
_ = os.MkdirAll(dbPath, 0700)
|
|
}
|
|
|
|
// Open the metadata database (will create it if needed).
|
|
opts := opt.Options{
|
|
ErrorIfExist: create,
|
|
Strict: opt.DefaultStrict,
|
|
Compression: opt.NoCompression,
|
|
Filter: filter.NewBloomFilter(10),
|
|
OpenFilesCacheCapacity: 2000,
|
|
}
|
|
ldb, err := leveldb.OpenFile(metadataDbPath, &opts)
|
|
if err != nil {
|
|
return nil, convertErr(err.Error(), err)
|
|
}
|
|
|
|
// Create the block store which includes scanning the existing flat
|
|
// block files to find what the current write cursor position is
|
|
// according to the data that is actually on disk. Also create the
|
|
// database cache which wraps the underlying leveldb database to provide
|
|
// write caching.
|
|
store := newBlockStore(dbPath, network)
|
|
cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs)
|
|
pdb := &db{store: store, cache: cache}
|
|
|
|
// Perform any reconciliation needed between the block and metadata as
|
|
// well as database initialization, if needed.
|
|
return reconcileDB(pdb, create)
|
|
}
|