// Copyright (c) 2015-2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package ffldb import ( "bytes" "encoding/binary" "fmt" "os" "path/filepath" "runtime" "sort" "sync" "github.com/btcsuite/goleveldb/leveldb" "github.com/btcsuite/goleveldb/leveldb/comparer" ldberrors "github.com/btcsuite/goleveldb/leveldb/errors" "github.com/btcsuite/goleveldb/leveldb/filter" "github.com/btcsuite/goleveldb/leveldb/iterator" "github.com/btcsuite/goleveldb/leveldb/opt" "github.com/btcsuite/goleveldb/leveldb/util" "github.com/lbryio/lbcd/chaincfg/chainhash" "github.com/lbryio/lbcd/database" "github.com/lbryio/lbcd/database/internal/treap" "github.com/lbryio/lbcd/wire" btcutil "github.com/lbryio/lbcutil" ) const ( // metadataDbName is the name used for the metadata database. metadataDbName = "metadata" // blockHdrSize is the size of a block header. This is simply the // constant from wire and is only provided here for convenience since // wire.MaxBlockHeaderPayload is quite long. blockHdrSize = wire.MaxBlockHeaderPayload // blockHdrOffset defines the offsets into a block index row for the // block header. // // The serialized block index row format is: // blockHdrOffset = blockLocSize ) var ( // byteOrder is the preferred byte order used through the database and // block files. Sometimes big endian will be used to allow ordered byte // sortable integer values. byteOrder = binary.LittleEndian // bucketIndexPrefix is the prefix used for all entries in the bucket // index. bucketIndexPrefix = []byte("bidx") // curBucketIDKeyName is the name of the key used to keep track of the // current bucket ID counter. curBucketIDKeyName = []byte("bidx-cbid") // metadataBucketID is the ID of the top-level metadata bucket. // It is the value 0 encoded as an unsigned big-endian uint32. metadataBucketID = [4]byte{} // blockIdxBucketID is the ID of the internal block metadata bucket. // It is the value 1 encoded as an unsigned big-endian uint32. blockIdxBucketID = [4]byte{0x00, 0x00, 0x00, 0x01} // blockIdxBucketName is the bucket used internally to track block // metadata. blockIdxBucketName = []byte("ffldb-blockidx") // writeLocKeyName is the key used to store the current write file // location. writeLocKeyName = []byte("ffldb-writeloc") ) // Common error strings. const ( // errDbNotOpenStr is the text to use for the database.ErrDbNotOpen // error code. errDbNotOpenStr = "database is not open" // errTxClosedStr is the text to use for the database.ErrTxClosed error // code. errTxClosedStr = "database tx is closed" ) // bulkFetchData is allows a block location to be specified along with the // index it was requested from. This in turn allows the bulk data loading // functions to sort the data accesses based on the location to improve // performance while keeping track of which result the data is for. type bulkFetchData struct { *blockLocation replyIndex int } // bulkFetchDataSorter implements sort.Interface to allow a slice of // bulkFetchData to be sorted. In particular it sorts by file and then // offset so that reads from files are grouped and linear. type bulkFetchDataSorter []bulkFetchData // Len returns the number of items in the slice. It is part of the // sort.Interface implementation. func (s bulkFetchDataSorter) Len() int { return len(s) } // Swap swaps the items at the passed indices. It is part of the // sort.Interface implementation. func (s bulkFetchDataSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // Less returns whether the item with index i should sort before the item with // index j. It is part of the sort.Interface implementation. func (s bulkFetchDataSorter) Less(i, j int) bool { if s[i].blockFileNum < s[j].blockFileNum { return true } if s[i].blockFileNum > s[j].blockFileNum { return false } return s[i].fileOffset < s[j].fileOffset } // makeDbErr creates a database.Error given a set of arguments. func makeDbErr(c database.ErrorCode, desc string, err error) database.Error { return database.Error{ErrorCode: c, Description: desc, Err: err} } // convertErr converts the passed leveldb error into a database error with an // equivalent error code and the passed description. It also sets the passed // error as the underlying error. func convertErr(desc string, ldbErr error) database.Error { // Use the driver-specific error code by default. The code below will // update this with the converted error if it's recognized. var code = database.ErrDriverSpecific switch { // Database corruption errors. case ldberrors.IsCorrupted(ldbErr): code = database.ErrCorruption // Database open/create errors. case ldbErr == leveldb.ErrClosed: code = database.ErrDbNotOpen // Transaction errors. case ldbErr == leveldb.ErrSnapshotReleased: code = database.ErrTxClosed case ldbErr == leveldb.ErrIterReleased: code = database.ErrTxClosed } return database.Error{ErrorCode: code, Description: desc, Err: ldbErr} } // copySlice returns a copy of the passed slice. This is mostly used to copy // leveldb iterator keys and values since they are only valid until the iterator // is moved instead of during the entirety of the transaction. func copySlice(slice []byte) []byte { ret := make([]byte, len(slice)) copy(ret, slice) return ret } // cursor is an internal type used to represent a cursor over key/value pairs // and nested buckets of a bucket and implements the database.Cursor interface. type cursor struct { bucket *bucket dbIter iterator.Iterator pendingIter iterator.Iterator currentIter iterator.Iterator } // Enforce cursor implements the database.Cursor interface. var _ database.Cursor = (*cursor)(nil) // Bucket returns the bucket the cursor was created for. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Bucket() database.Bucket { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return nil } return c.bucket } // Delete removes the current key/value pair the cursor is at without // invalidating the cursor. // // Returns the following errors as required by the interface contract: // - ErrIncompatibleValue if attempted when the cursor points to a nested // bucket // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Cursor interface implementation. func (c *cursor) Delete() error { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return err } // Error if the cursor is exhausted. if c.currentIter == nil { str := "cursor is exhausted" return makeDbErr(database.ErrIncompatibleValue, str, nil) } // Do not allow buckets to be deleted via the cursor. key := c.currentIter.Key() if bytes.HasPrefix(key, bucketIndexPrefix) { str := "buckets may not be deleted from a cursor" return makeDbErr(database.ErrIncompatibleValue, str, nil) } c.bucket.tx.deleteKey(copySlice(key), true) return nil } // skipPendingUpdates skips any keys at the current database iterator position // that are being updated by the transaction. The forwards flag indicates the // direction the cursor is moving. func (c *cursor) skipPendingUpdates(forwards bool) { for c.dbIter.Valid() { var skip bool key := c.dbIter.Key() if c.bucket.tx.pendingRemove.Has(key) { skip = true } else if c.bucket.tx.pendingKeys.Has(key) { skip = true } if !skip { break } if forwards { c.dbIter.Next() } else { c.dbIter.Prev() } } } // chooseIterator first skips any entries in the database iterator that are // being updated by the transaction and sets the current iterator to the // appropriate iterator depending on their validity and the order they compare // in while taking into account the direction flag. When the cursor is being // moved forwards and both iterators are valid, the iterator with the smaller // key is chosen and vice versa when the cursor is being moved backwards. func (c *cursor) chooseIterator(forwards bool) bool { // Skip any keys at the current database iterator position that are // being updated by the transaction. c.skipPendingUpdates(forwards) // When both iterators are exhausted, the cursor is exhausted too. if !c.dbIter.Valid() && !c.pendingIter.Valid() { c.currentIter = nil return false } // Choose the database iterator when the pending keys iterator is // exhausted. if !c.pendingIter.Valid() { c.currentIter = c.dbIter return true } // Choose the pending keys iterator when the database iterator is // exhausted. if !c.dbIter.Valid() { c.currentIter = c.pendingIter return true } // Both iterators are valid, so choose the iterator with either the // smaller or larger key depending on the forwards flag. compare := bytes.Compare(c.dbIter.Key(), c.pendingIter.Key()) if (forwards && compare > 0) || (!forwards && compare < 0) { c.currentIter = c.pendingIter } else { c.currentIter = c.dbIter } return true } // First positions the cursor at the first key/value pair and returns whether or // not the pair exists. // // This function is part of the database.Cursor interface implementation. func (c *cursor) First() bool { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return false } // Seek to the first key in both the database and pending iterators and // choose the iterator that is both valid and has the smaller key. c.dbIter.First() c.pendingIter.First() return c.chooseIterator(true) } // Last positions the cursor at the last key/value pair and returns whether or // not the pair exists. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Last() bool { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return false } // Seek to the last key in both the database and pending iterators and // choose the iterator that is both valid and has the larger key. c.dbIter.Last() c.pendingIter.Last() return c.chooseIterator(false) } // Next moves the cursor one key/value pair forward and returns whether or not // the pair exists. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Next() bool { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return false } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return false } // Move the current iterator to the next entry and choose the iterator // that is both valid and has the smaller key. c.currentIter.Next() return c.chooseIterator(true) } // Prev moves the cursor one key/value pair backward and returns whether or not // the pair exists. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Prev() bool { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return false } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return false } // Move the current iterator to the previous entry and choose the // iterator that is both valid and has the larger key. c.currentIter.Prev() return c.chooseIterator(false) } // Seek positions the cursor at the first key/value pair that is greater than or // equal to the passed seek key. Returns false if no suitable key was found. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Seek(seek []byte) bool { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return false } // Seek to the provided key in both the database and pending iterators // then choose the iterator that is both valid and has the larger key. seekKey := bucketizedKey(c.bucket.id, seek) c.dbIter.Seek(seekKey) c.pendingIter.Seek(seekKey) return c.chooseIterator(true) } // rawKey returns the current key the cursor is pointing to without stripping // the current bucket prefix or bucket index prefix. func (c *cursor) rawKey() []byte { // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } return copySlice(c.currentIter.Key()) } // Key returns the current key the cursor is pointing to. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Key() []byte { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return nil } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } // Slice out the actual key name and make a copy since it is no longer // valid after iterating to the next item. // // The key is after the bucket index prefix and parent ID when the // cursor is pointing to a nested bucket. key := c.currentIter.Key() if bytes.HasPrefix(key, bucketIndexPrefix) { key = key[len(bucketIndexPrefix)+4:] return copySlice(key) } // The key is after the bucket ID when the cursor is pointing to a // normal entry. key = key[len(c.bucket.id):] return copySlice(key) } // rawValue returns the current value the cursor is pointing to without // stripping without filtering bucket index values. func (c *cursor) rawValue() []byte { // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } return copySlice(c.currentIter.Value()) } // Value returns the current value the cursor is pointing to. This will be nil // for nested buckets. // // This function is part of the database.Cursor interface implementation. func (c *cursor) Value() []byte { // Ensure transaction state is valid. if err := c.bucket.tx.checkClosed(); err != nil { return nil } // Nothing to return if cursor is exhausted. if c.currentIter == nil { return nil } // Return nil for the value when the cursor is pointing to a nested // bucket. if bytes.HasPrefix(c.currentIter.Key(), bucketIndexPrefix) { return nil } return copySlice(c.currentIter.Value()) } // cursorType defines the type of cursor to create. type cursorType int // The following constants define the allowed cursor types. const ( // ctKeys iterates through all of the keys in a given bucket. ctKeys cursorType = iota // ctBuckets iterates through all directly nested buckets in a given // bucket. ctBuckets // ctFull iterates through both the keys and the directly nested buckets // in a given bucket. ctFull ) // cursorFinalizer is either invoked when a cursor is being garbage collected or // called manually to ensure the underlying cursor iterators are released. func cursorFinalizer(c *cursor) { c.dbIter.Release() c.pendingIter.Release() } // newCursor returns a new cursor for the given bucket, bucket ID, and cursor // type. // // NOTE: The caller is responsible for calling the cursorFinalizer function on // the returned cursor. func newCursor(b *bucket, bucketID []byte, cursorTyp cursorType) *cursor { var dbIter, pendingIter iterator.Iterator switch cursorTyp { case ctKeys: keyRange := util.BytesPrefix(bucketID) dbIter = b.tx.snapshot.NewIterator(keyRange) pendingKeyIter := newLdbTreapIter(b.tx, keyRange) pendingIter = pendingKeyIter case ctBuckets: // The serialized bucket index key format is: // // Create an iterator for the both the database and the pending // keys which are prefixed by the bucket index identifier and // the provided bucket ID. prefix := make([]byte, len(bucketIndexPrefix)+4) copy(prefix, bucketIndexPrefix) copy(prefix[len(bucketIndexPrefix):], bucketID) bucketRange := util.BytesPrefix(prefix) dbIter = b.tx.snapshot.NewIterator(bucketRange) pendingBucketIter := newLdbTreapIter(b.tx, bucketRange) pendingIter = pendingBucketIter case ctFull: fallthrough default: // The serialized bucket index key format is: // prefix := make([]byte, len(bucketIndexPrefix)+4) copy(prefix, bucketIndexPrefix) copy(prefix[len(bucketIndexPrefix):], bucketID) bucketRange := util.BytesPrefix(prefix) keyRange := util.BytesPrefix(bucketID) // Since both keys and buckets are needed from the database, // create an individual iterator for each prefix and then create // a merged iterator from them. dbKeyIter := b.tx.snapshot.NewIterator(keyRange) dbBucketIter := b.tx.snapshot.NewIterator(bucketRange) iters := []iterator.Iterator{dbKeyIter, dbBucketIter} dbIter = iterator.NewMergedIterator(iters, comparer.DefaultComparer, true) // Since both keys and buckets are needed from the pending keys, // create an individual iterator for each prefix and then create // a merged iterator from them. pendingKeyIter := newLdbTreapIter(b.tx, keyRange) pendingBucketIter := newLdbTreapIter(b.tx, bucketRange) iters = []iterator.Iterator{pendingKeyIter, pendingBucketIter} pendingIter = iterator.NewMergedIterator(iters, comparer.DefaultComparer, true) } // Create the cursor using the iterators. return &cursor{bucket: b, dbIter: dbIter, pendingIter: pendingIter} } // bucket is an internal type used to represent a collection of key/value pairs // and implements the database.Bucket interface. type bucket struct { tx *transaction id [4]byte } // Enforce bucket implements the database.Bucket interface. var _ database.Bucket = (*bucket)(nil) // bucketIndexKey returns the actual key to use for storing and retrieving a // child bucket in the bucket index. This is required because additional // information is needed to distinguish nested buckets with the same name. func bucketIndexKey(parentID [4]byte, key []byte) []byte { // The serialized bucket index key format is: // indexKey := make([]byte, len(bucketIndexPrefix)+4+len(key)) copy(indexKey, bucketIndexPrefix) copy(indexKey[len(bucketIndexPrefix):], parentID[:]) copy(indexKey[len(bucketIndexPrefix)+4:], key) return indexKey } // bucketizedKey returns the actual key to use for storing and retrieving a key // for the provided bucket ID. This is required because bucketizing is handled // through the use of a unique prefix per bucket. func bucketizedKey(bucketID [4]byte, key []byte) []byte { // The serialized block index key format is: // bKey := make([]byte, 4+len(key)) copy(bKey, bucketID[:]) copy(bKey[4:], key) return bKey } // Bucket retrieves a nested bucket with the given key. Returns nil if // the bucket does not exist. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Bucket(key []byte) database.Bucket { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return nil } // Attempt to fetch the ID for the child bucket. The bucket does not // exist if the bucket index entry does not exist. childID := b.tx.fetchKey(bucketIndexKey(b.id, key)) if childID == nil { return nil } childBucket := &bucket{tx: b.tx} copy(childBucket.id[:], childID) return childBucket } // CreateBucket creates and returns a new nested bucket with the given key. // // Returns the following errors as required by the interface contract: // - ErrBucketExists if the bucket already exists // - ErrBucketNameRequired if the key is empty // - ErrIncompatibleValue if the key is otherwise invalid for the particular // implementation // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) CreateBucket(key []byte) (database.Bucket, error) { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return nil, err } // Ensure the transaction is writable. if !b.tx.writable { str := "create bucket requires a writable database transaction" return nil, makeDbErr(database.ErrTxNotWritable, str, nil) } // Ensure a key was provided. if len(key) == 0 { str := "create bucket requires a key" return nil, makeDbErr(database.ErrBucketNameRequired, str, nil) } // Ensure bucket does not already exist. bidxKey := bucketIndexKey(b.id, key) if b.tx.hasKey(bidxKey) { str := "bucket already exists" return nil, makeDbErr(database.ErrBucketExists, str, nil) } // Find the appropriate next bucket ID to use for the new bucket. In // the case of the special internal block index, keep the fixed ID. var childID [4]byte if b.id == metadataBucketID && bytes.Equal(key, blockIdxBucketName) { childID = blockIdxBucketID } else { var err error childID, err = b.tx.nextBucketID() if err != nil { return nil, err } } // Add the new bucket to the bucket index. if err := b.tx.putKey(bidxKey, childID[:]); err != nil { str := fmt.Sprintf("failed to create bucket with key %q", key) return nil, convertErr(str, err) } return &bucket{tx: b.tx, id: childID}, nil } // CreateBucketIfNotExists creates and returns a new nested bucket with the // given key if it does not already exist. // // Returns the following errors as required by the interface contract: // - ErrBucketNameRequired if the key is empty // - ErrIncompatibleValue if the key is otherwise invalid for the particular // implementation // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) CreateBucketIfNotExists(key []byte) (database.Bucket, error) { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return nil, err } // Ensure the transaction is writable. if !b.tx.writable { str := "create bucket requires a writable database transaction" return nil, makeDbErr(database.ErrTxNotWritable, str, nil) } // Return existing bucket if it already exists, otherwise create it. if bucket := b.Bucket(key); bucket != nil { return bucket, nil } return b.CreateBucket(key) } // DeleteBucket removes a nested bucket with the given key. // // Returns the following errors as required by the interface contract: // - ErrBucketNotFound if the specified bucket does not exist // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) DeleteBucket(key []byte) error { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return err } // Ensure the transaction is writable. if !b.tx.writable { str := "delete bucket requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Attempt to fetch the ID for the child bucket. The bucket does not // exist if the bucket index entry does not exist. In the case of the // special internal block index, keep the fixed ID. bidxKey := bucketIndexKey(b.id, key) childID := b.tx.fetchKey(bidxKey) if childID == nil { str := fmt.Sprintf("bucket %q does not exist", key) return makeDbErr(database.ErrBucketNotFound, str, nil) } // Remove all nested buckets and their keys. childIDs := [][]byte{childID} for len(childIDs) > 0 { childID = childIDs[len(childIDs)-1] childIDs = childIDs[:len(childIDs)-1] // Delete all keys in the nested bucket. keyCursor := newCursor(b, childID, ctKeys) for ok := keyCursor.First(); ok; ok = keyCursor.Next() { b.tx.deleteKey(keyCursor.rawKey(), false) } cursorFinalizer(keyCursor) // Iterate through all nested buckets. bucketCursor := newCursor(b, childID, ctBuckets) for ok := bucketCursor.First(); ok; ok = bucketCursor.Next() { // Push the id of the nested bucket onto the stack for // the next iteration. childID := bucketCursor.rawValue() childIDs = append(childIDs, childID) // Remove the nested bucket from the bucket index. b.tx.deleteKey(bucketCursor.rawKey(), false) } cursorFinalizer(bucketCursor) } // Remove the nested bucket from the bucket index. Any buckets nested // under it were already removed above. b.tx.deleteKey(bidxKey, true) return nil } // Cursor returns a new cursor, allowing for iteration over the bucket's // key/value pairs and nested buckets in forward or backward order. // // You must seek to a position using the First, Last, or Seek functions before // calling the Next, Prev, Key, or Value functions. Failure to do so will // result in the same return values as an exhausted cursor, which is false for // the Prev and Next functions and nil for Key and Value functions. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Cursor() database.Cursor { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return &cursor{bucket: b} } // Create the cursor and setup a runtime finalizer to ensure the // iterators are released when the cursor is garbage collected. c := newCursor(b, b.id[:], ctFull) runtime.SetFinalizer(c, cursorFinalizer) return c } // ForEach invokes the passed function with every key/value pair in the bucket. // This does not include nested buckets or the key/value pairs within those // nested buckets. // // WARNING: It is not safe to mutate data while iterating with this method. // Doing so may cause the underlying cursor to be invalidated and return // unexpected keys and/or values. // // Returns the following errors as required by the interface contract: // - ErrTxClosed if the transaction has already been closed // // NOTE: The values returned by this function are only valid during a // transaction. Attempting to access them after a transaction has ended will // likely result in an access violation. // // This function is part of the database.Bucket interface implementation. func (b *bucket) ForEach(fn func(k, v []byte) error) error { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return err } // Invoke the callback for each cursor item. Return the error returned // from the callback when it is non-nil. c := newCursor(b, b.id[:], ctKeys) defer cursorFinalizer(c) for ok := c.First(); ok; ok = c.Next() { err := fn(c.Key(), c.Value()) if err != nil { return err } } return nil } // ForEachBucket invokes the passed function with the key of every nested bucket // in the current bucket. This does not include any nested buckets within those // nested buckets. // // WARNING: It is not safe to mutate data while iterating with this method. // Doing so may cause the underlying cursor to be invalidated and return // unexpected keys. // // Returns the following errors as required by the interface contract: // - ErrTxClosed if the transaction has already been closed // // NOTE: The values returned by this function are only valid during a // transaction. Attempting to access them after a transaction has ended will // likely result in an access violation. // // This function is part of the database.Bucket interface implementation. func (b *bucket) ForEachBucket(fn func(k []byte) error) error { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return err } // Invoke the callback for each cursor item. Return the error returned // from the callback when it is non-nil. c := newCursor(b, b.id[:], ctBuckets) defer cursorFinalizer(c) for ok := c.First(); ok; ok = c.Next() { err := fn(c.Key()) if err != nil { return err } } return nil } // Writable returns whether or not the bucket is writable. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Writable() bool { return b.tx.writable } // Put saves the specified key/value pair to the bucket. Keys that do not // already exist are added and keys that already exist are overwritten. // // Returns the following errors as required by the interface contract: // - ErrKeyRequired if the key is empty // - ErrIncompatibleValue if the key is the same as an existing bucket // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) Put(key, value []byte) error { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return err } // Ensure the transaction is writable. if !b.tx.writable { str := "setting a key requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Ensure a key was provided. if len(key) == 0 { str := "put requires a key" return makeDbErr(database.ErrKeyRequired, str, nil) } return b.tx.putKey(bucketizedKey(b.id, key), value) } // Get returns the value for the given key. Returns nil if the key does not // exist in this bucket. An empty slice is returned for keys that exist but // have no value assigned. // // NOTE: The value returned by this function is only valid during a transaction. // Attempting to access it after a transaction has ended results in undefined // behavior. Additionally, the value must NOT be modified by the caller. // // This function is part of the database.Bucket interface implementation. func (b *bucket) Get(key []byte) []byte { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return nil } // Nothing to return if there is no key. if len(key) == 0 { return nil } return b.tx.fetchKey(bucketizedKey(b.id, key)) } // Delete removes the specified key from the bucket. Deleting a key that does // not exist does not return an error. // // Returns the following errors as required by the interface contract: // - ErrKeyRequired if the key is empty // - ErrIncompatibleValue if the key is the same as an existing bucket // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Bucket interface implementation. func (b *bucket) Delete(key []byte) error { // Ensure transaction state is valid. if err := b.tx.checkClosed(); err != nil { return err } // Ensure the transaction is writable. if !b.tx.writable { str := "deleting a value requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Nothing to do if there is no key. if len(key) == 0 { return nil } b.tx.deleteKey(bucketizedKey(b.id, key), true) return nil } // pendingBlock houses a block that will be written to disk when the database // transaction is committed. type pendingBlock struct { hash *chainhash.Hash bytes []byte } // transaction represents a database transaction. It can either be read-only or // read-write and implements the database.Tx interface. The transaction // provides a root bucket against which all read and writes occur. type transaction struct { managed bool // Is the transaction managed? closed bool // Is the transaction closed? writable bool // Is the transaction writable? db *db // DB instance the tx was created from. snapshot *dbCacheSnapshot // Underlying snapshot for txns. metaBucket *bucket // The root metadata bucket. blockIdxBucket *bucket // The block index bucket. // Blocks that need to be stored on commit. The pendingBlocks map is // kept to allow quick lookups of pending data by block hash. pendingBlocks map[chainhash.Hash]int pendingBlockData []pendingBlock // Keys that need to be stored or deleted on commit. pendingKeys *treap.Mutable pendingRemove *treap.Mutable // Active iterators that need to be notified when the pending keys have // been updated so the cursors can properly handle updates to the // transaction state. activeIterLock sync.RWMutex activeIters []*treap.Iterator } // Enforce transaction implements the database.Tx interface. var _ database.Tx = (*transaction)(nil) // removeActiveIter removes the passed iterator from the list of active // iterators against the pending keys treap. func (tx *transaction) removeActiveIter(iter *treap.Iterator) { // An indexing for loop is intentionally used over a range here as range // does not reevaluate the slice on each iteration nor does it adjust // the index for the modified slice. tx.activeIterLock.Lock() for i := 0; i < len(tx.activeIters); i++ { if tx.activeIters[i] == iter { copy(tx.activeIters[i:], tx.activeIters[i+1:]) tx.activeIters[len(tx.activeIters)-1] = nil tx.activeIters = tx.activeIters[:len(tx.activeIters)-1] } } tx.activeIterLock.Unlock() } // addActiveIter adds the passed iterator to the list of active iterators for // the pending keys treap. func (tx *transaction) addActiveIter(iter *treap.Iterator) { tx.activeIterLock.Lock() tx.activeIters = append(tx.activeIters, iter) tx.activeIterLock.Unlock() } // notifyActiveIters notifies all of the active iterators for the pending keys // treap that it has been updated. func (tx *transaction) notifyActiveIters() { tx.activeIterLock.RLock() for _, iter := range tx.activeIters { iter.ForceReseek() } tx.activeIterLock.RUnlock() } // checkClosed returns an error if the the database or transaction is closed. func (tx *transaction) checkClosed() error { // The transaction is no longer valid if it has been closed. if tx.closed { return makeDbErr(database.ErrTxClosed, errTxClosedStr, nil) } return nil } // hasKey returns whether or not the provided key exists in the database while // taking into account the current transaction state. func (tx *transaction) hasKey(key []byte) bool { // When the transaction is writable, check the pending transaction // state first. if tx.writable { if tx.pendingRemove.Has(key) { return false } if tx.pendingKeys.Has(key) { return true } } // Consult the database cache and underlying database. return tx.snapshot.Has(key) } // putKey adds the provided key to the list of keys to be updated in the // database when the transaction is committed. // // NOTE: This function must only be called on a writable transaction. Since it // is an internal helper function, it does not check. func (tx *transaction) putKey(key, value []byte) error { // Prevent the key from being deleted if it was previously scheduled // to be deleted on transaction commit. tx.pendingRemove.Delete(key) // Add the key/value pair to the list to be written on transaction // commit. tx.pendingKeys.Put(key, value) tx.notifyActiveIters() return nil } // fetchKey attempts to fetch the provided key from the database cache (and // hence underlying database) while taking into account the current transaction // state. Returns nil if the key does not exist. func (tx *transaction) fetchKey(key []byte) []byte { // When the transaction is writable, check the pending transaction // state first. if tx.writable { if tx.pendingRemove.Has(key) { return nil } if value := tx.pendingKeys.Get(key); value != nil { return value } } // Consult the database cache and underlying database. return tx.snapshot.Get(key) } // deleteKey adds the provided key to the list of keys to be deleted from the // database when the transaction is committed. The notify iterators flag is // useful to delay notifying iterators about the changes during bulk deletes. // // NOTE: This function must only be called on a writable transaction. Since it // is an internal helper function, it does not check. func (tx *transaction) deleteKey(key []byte, notifyIterators bool) { // Remove the key from the list of pendings keys to be written on // transaction commit if needed. tx.pendingKeys.Delete(key) // Add the key to the list to be deleted on transaction commit. tx.pendingRemove.Put(key, nil) // Notify the active iterators about the change if the flag is set. if notifyIterators { tx.notifyActiveIters() } } // nextBucketID returns the next bucket ID to use for creating a new bucket. // // NOTE: This function must only be called on a writable transaction. Since it // is an internal helper function, it does not check. func (tx *transaction) nextBucketID() ([4]byte, error) { // Load the currently highest used bucket ID. curIDBytes := tx.fetchKey(curBucketIDKeyName) curBucketNum := binary.BigEndian.Uint32(curIDBytes) // Increment and update the current bucket ID and return it. var nextBucketID [4]byte binary.BigEndian.PutUint32(nextBucketID[:], curBucketNum+1) if err := tx.putKey(curBucketIDKeyName, nextBucketID[:]); err != nil { return [4]byte{}, err } return nextBucketID, nil } // Metadata returns the top-most bucket for all metadata storage. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Metadata() database.Bucket { return tx.metaBucket } // hasBlock returns whether or not a block with the given hash exists. func (tx *transaction) hasBlock(hash *chainhash.Hash) bool { // Return true if the block is pending to be written on commit since // it exists from the viewpoint of this transaction. if _, exists := tx.pendingBlocks[*hash]; exists { return true } return tx.hasKey(bucketizedKey(blockIdxBucketID, hash[:])) } // StoreBlock stores the provided block into the database. There are no checks // to ensure the block connects to a previous block, contains double spends, or // any additional functionality such as transaction indexing. It simply stores // the block in the database. // // Returns the following errors as required by the interface contract: // - ErrBlockExists when the block hash already exists // - ErrTxNotWritable if attempted against a read-only transaction // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Tx interface implementation. func (tx *transaction) StoreBlock(block *btcutil.Block) error { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return err } // Ensure the transaction is writable. if !tx.writable { str := "store block requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Reject the block if it already exists. blockHash := block.Hash() if tx.hasBlock(blockHash) { str := fmt.Sprintf("block %s already exists", blockHash) return makeDbErr(database.ErrBlockExists, str, nil) } blockBytes, err := block.Bytes() if err != nil { str := fmt.Sprintf("failed to get serialized bytes for block %s", blockHash) return makeDbErr(database.ErrDriverSpecific, str, err) } // Add the block to be stored to the list of pending blocks to store // when the transaction is committed. Also, add it to pending blocks // map so it is easy to determine the block is pending based on the // block hash. if tx.pendingBlocks == nil { tx.pendingBlocks = make(map[chainhash.Hash]int) } tx.pendingBlocks[*blockHash] = len(tx.pendingBlockData) tx.pendingBlockData = append(tx.pendingBlockData, pendingBlock{ hash: blockHash, bytes: blockBytes, }) log.Tracef("Added block %s to pending blocks", blockHash) return nil } // HasBlock returns whether or not a block with the given hash exists in the // database. // // Returns the following errors as required by the interface contract: // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Tx interface implementation. func (tx *transaction) HasBlock(hash *chainhash.Hash) (bool, error) { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return false, err } return tx.hasBlock(hash), nil } // HasBlocks returns whether or not the blocks with the provided hashes // exist in the database. // // Returns the following errors as required by the interface contract: // - ErrTxClosed if the transaction has already been closed // // This function is part of the database.Tx interface implementation. func (tx *transaction) HasBlocks(hashes []chainhash.Hash) ([]bool, error) { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return nil, err } results := make([]bool, len(hashes)) for i := range hashes { results[i] = tx.hasBlock(&hashes[i]) } return results, nil } // fetchBlockRow fetches the metadata stored in the block index for the provided // hash. It will return ErrBlockNotFound if there is no entry. func (tx *transaction) fetchBlockRow(hash *chainhash.Hash) ([]byte, error) { blockRow := tx.blockIdxBucket.Get(hash[:]) if blockRow == nil { str := fmt.Sprintf("block %s does not exist", hash) return nil, makeDbErr(database.ErrBlockNotFound, str, nil) } return blockRow, nil } // FetchBlockHeader returns the raw serialized bytes for the block header // identified by the given hash. The raw bytes are in the format returned by // Serialize on a wire.BlockHeader. // // Returns the following errors as required by the interface contract: // - ErrBlockNotFound if the requested block hash does not exist // - ErrTxClosed if the transaction has already been closed // - ErrCorruption if the database has somehow become corrupted // // NOTE: The data returned by this function is only valid during a // database transaction. Attempting to access it after a transaction // has ended results in undefined behavior. This constraint prevents // additional data copies and allows support for memory-mapped database // implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlockHeader(hash *chainhash.Hash) ([]byte, error) { return tx.FetchBlockRegion(&database.BlockRegion{ Hash: hash, Offset: 0, Len: blockHdrSize, }) } // FetchBlockHeaders returns the raw serialized bytes for the block headers // identified by the given hashes. The raw bytes are in the format returned by // Serialize on a wire.BlockHeader. // // Returns the following errors as required by the interface contract: // - ErrBlockNotFound if the any of the requested block hashes do not exist // - ErrTxClosed if the transaction has already been closed // - ErrCorruption if the database has somehow become corrupted // // NOTE: The data returned by this function is only valid during a database // transaction. Attempting to access it after a transaction has ended results // in undefined behavior. This constraint prevents additional data copies and // allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlockHeaders(hashes []chainhash.Hash) ([][]byte, error) { regions := make([]database.BlockRegion, len(hashes)) for i := range hashes { regions[i].Hash = &hashes[i] regions[i].Offset = 0 regions[i].Len = blockHdrSize } return tx.FetchBlockRegions(regions) } // FetchBlock returns the raw serialized bytes for the block identified by the // given hash. The raw bytes are in the format returned by Serialize on a // wire.MsgBlock. // // Returns the following errors as required by the interface contract: // - ErrBlockNotFound if the requested block hash does not exist // - ErrTxClosed if the transaction has already been closed // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the // block files. // // NOTE: The data returned by this function is only valid during a database // transaction. Attempting to access it after a transaction has ended results // in undefined behavior. This constraint prevents additional data copies and // allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlock(hash *chainhash.Hash) ([]byte, error) { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return nil, err } // When the block is pending to be written on commit return the bytes // from there. if idx, exists := tx.pendingBlocks[*hash]; exists { return tx.pendingBlockData[idx].bytes, nil } // Lookup the location of the block in the files from the block index. blockRow, err := tx.fetchBlockRow(hash) if err != nil { return nil, err } location := deserializeBlockLoc(blockRow) // Read the block from the appropriate location. The function also // performs a checksum over the data to detect data corruption. blockBytes, err := tx.db.store.readBlock(hash, location) if err != nil { return nil, err } return blockBytes, nil } // FetchBlocks returns the raw serialized bytes for the blocks identified by the // given hashes. The raw bytes are in the format returned by Serialize on a // wire.MsgBlock. // // Returns the following errors as required by the interface contract: // - ErrBlockNotFound if any of the requested block hashed do not exist // - ErrTxClosed if the transaction has already been closed // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the // block files. // // NOTE: The data returned by this function is only valid during a database // transaction. Attempting to access it after a transaction has ended results // in undefined behavior. This constraint prevents additional data copies and // allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlocks(hashes []chainhash.Hash) ([][]byte, error) { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return nil, err } // NOTE: This could check for the existence of all blocks before loading // any of them which would be faster in the failure case, however // callers will not typically be calling this function with invalid // values, so optimize for the common case. // Load the blocks. blocks := make([][]byte, len(hashes)) for i := range hashes { var err error blocks[i], err = tx.FetchBlock(&hashes[i]) if err != nil { return nil, err } } return blocks, nil } // fetchPendingRegion attempts to fetch the provided region from any block which // are pending to be written on commit. It will return nil for the byte slice // when the region references a block which is not pending. When the region // does reference a pending block, it is bounds checked and returns // ErrBlockRegionInvalid if invalid. func (tx *transaction) fetchPendingRegion(region *database.BlockRegion) ([]byte, error) { // Nothing to do if the block is not pending to be written on commit. idx, exists := tx.pendingBlocks[*region.Hash] if !exists { return nil, nil } // Ensure the region is within the bounds of the block. blockBytes := tx.pendingBlockData[idx].bytes blockLen := uint32(len(blockBytes)) endOffset := region.Offset + region.Len if endOffset < region.Offset || endOffset > blockLen { str := fmt.Sprintf("block %s region offset %d, length %d "+ "exceeds block length of %d", region.Hash, region.Offset, region.Len, blockLen) return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil) } // Return the bytes from the pending block. return blockBytes[region.Offset:endOffset:endOffset], nil } // FetchBlockRegion returns the raw serialized bytes for the given block region. // // For example, it is possible to directly extract Bitcoin transactions and/or // scripts from a block with this function. Depending on the backend // implementation, this can provide significant savings by avoiding the need to // load entire blocks. // // The raw bytes are in the format returned by Serialize on a wire.MsgBlock and // the Offset field in the provided BlockRegion is zero-based and relative to // the start of the block (byte 0). // // Returns the following errors as required by the interface contract: // - ErrBlockNotFound if the requested block hash does not exist // - ErrBlockRegionInvalid if the region exceeds the bounds of the associated // block // - ErrTxClosed if the transaction has already been closed // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the // block files. // // NOTE: The data returned by this function is only valid during a database // transaction. Attempting to access it after a transaction has ended results // in undefined behavior. This constraint prevents additional data copies and // allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlockRegion(region *database.BlockRegion) ([]byte, error) { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return nil, err } // When the block is pending to be written on commit return the bytes // from there. if tx.pendingBlocks != nil { regionBytes, err := tx.fetchPendingRegion(region) if err != nil { return nil, err } if regionBytes != nil { return regionBytes, nil } } // Lookup the location of the block in the files from the block index. blockRow, err := tx.fetchBlockRow(region.Hash) if err != nil { return nil, err } location := deserializeBlockLoc(blockRow) // Ensure the region is within the bounds of the block. endOffset := region.Offset + region.Len if endOffset < region.Offset || endOffset > location.blockLen { str := fmt.Sprintf("block %s region offset %d, length %d "+ "exceeds block length of %d", region.Hash, region.Offset, region.Len, location.blockLen) return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil) } // Read the region from the appropriate disk block file. regionBytes, err := tx.db.store.readBlockRegion(location, region.Offset, region.Len) if err != nil { return nil, err } return regionBytes, nil } // FetchBlockRegions returns the raw serialized bytes for the given block // regions. // // For example, it is possible to directly extract Bitcoin transactions and/or // scripts from various blocks with this function. Depending on the backend // implementation, this can provide significant savings by avoiding the need to // load entire blocks. // // The raw bytes are in the format returned by Serialize on a wire.MsgBlock and // the Offset fields in the provided BlockRegions are zero-based and relative to // the start of the block (byte 0). // // Returns the following errors as required by the interface contract: // - ErrBlockNotFound if any of the request block hashes do not exist // - ErrBlockRegionInvalid if one or more region exceed the bounds of the // associated block // - ErrTxClosed if the transaction has already been closed // - ErrCorruption if the database has somehow become corrupted // // In addition, returns ErrDriverSpecific if any failures occur when reading the // block files. // // NOTE: The data returned by this function is only valid during a database // transaction. Attempting to access it after a transaction has ended results // in undefined behavior. This constraint prevents additional data copies and // allows support for memory-mapped database implementations. // // This function is part of the database.Tx interface implementation. func (tx *transaction) FetchBlockRegions(regions []database.BlockRegion) ([][]byte, error) { // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return nil, err } // NOTE: This could check for the existence of all blocks before // deserializing the locations and building up the fetch list which // would be faster in the failure case, however callers will not // typically be calling this function with invalid values, so optimize // for the common case. // NOTE: A potential optimization here would be to combine adjacent // regions to reduce the number of reads. // In order to improve efficiency of loading the bulk data, first grab // the block location for all of the requested block hashes and sort // the reads by filenum:offset so that all reads are grouped by file // and linear within each file. This can result in quite a significant // performance increase depending on how spread out the requested hashes // are by reducing the number of file open/closes and random accesses // needed. The fetchList is intentionally allocated with a cap because // some of the regions might be fetched from the pending blocks and // hence there is no need to fetch those from disk. blockRegions := make([][]byte, len(regions)) fetchList := make([]bulkFetchData, 0, len(regions)) for i := range regions { region := ®ions[i] // When the block is pending to be written on commit grab the // bytes from there. if tx.pendingBlocks != nil { regionBytes, err := tx.fetchPendingRegion(region) if err != nil { return nil, err } if regionBytes != nil { blockRegions[i] = regionBytes continue } } // Lookup the location of the block in the files from the block // index. blockRow, err := tx.fetchBlockRow(region.Hash) if err != nil { return nil, err } location := deserializeBlockLoc(blockRow) // Ensure the region is within the bounds of the block. endOffset := region.Offset + region.Len if endOffset < region.Offset || endOffset > location.blockLen { str := fmt.Sprintf("block %s region offset %d, length "+ "%d exceeds block length of %d", region.Hash, region.Offset, region.Len, location.blockLen) return nil, makeDbErr(database.ErrBlockRegionInvalid, str, nil) } fetchList = append(fetchList, bulkFetchData{&location, i}) } sort.Sort(bulkFetchDataSorter(fetchList)) // Read all of the regions in the fetch list and set the results. for i := range fetchList { fetchData := &fetchList[i] ri := fetchData.replyIndex region := ®ions[ri] location := fetchData.blockLocation regionBytes, err := tx.db.store.readBlockRegion(*location, region.Offset, region.Len) if err != nil { return nil, err } blockRegions[ri] = regionBytes } return blockRegions, nil } // close marks the transaction closed then releases any pending data, the // underlying snapshot, the transaction read lock, and the write lock when the // transaction is writable. func (tx *transaction) close() { tx.closed = true // Clear pending blocks that would have been written on commit. tx.pendingBlocks = nil tx.pendingBlockData = nil // Clear pending keys that would have been written or deleted on commit. tx.pendingKeys = nil tx.pendingRemove = nil // Release the snapshot. if tx.snapshot != nil { tx.snapshot.Release() tx.snapshot = nil } tx.db.closeLock.RUnlock() // Release the writer lock for writable transactions to unblock any // other write transaction which are possibly waiting. if tx.writable { tx.db.writeLock.Unlock() } } // writePendingAndCommit writes pending block data to the flat block files, // updates the metadata with their locations as well as the new current write // location, and commits the metadata to the memory database cache. It also // properly handles rollback in the case of failures. // // This function MUST only be called when there is pending data to be written. func (tx *transaction) writePendingAndCommit() error { // Save the current block store write position for potential rollback. // These variables are only updated here in this function and there can // only be one write transaction active at a time, so it's safe to store // them for potential rollback. wc := tx.db.store.writeCursor wc.RLock() oldBlkFileNum := wc.curFileNum oldBlkOffset := wc.curOffset wc.RUnlock() // rollback is a closure that is used to rollback all writes to the // block files. rollback := func() { // Rollback any modifications made to the block files if needed. tx.db.store.handleRollback(oldBlkFileNum, oldBlkOffset) } // Loop through all of the pending blocks to store and write them. for _, blockData := range tx.pendingBlockData { log.Tracef("Storing block %s", blockData.hash) location, err := tx.db.store.writeBlock(blockData.bytes) if err != nil { rollback() return err } // Add a record in the block index for the block. The record // includes the location information needed to locate the block // on the filesystem as well as the block header since they are // so commonly needed. blockRow := serializeBlockLoc(location) err = tx.blockIdxBucket.Put(blockData.hash[:], blockRow) if err != nil { rollback() return err } } // Update the metadata for the current write file and offset. writeRow := serializeWriteRow(wc.curFileNum, wc.curOffset) if err := tx.metaBucket.Put(writeLocKeyName, writeRow); err != nil { rollback() return convertErr("failed to store write cursor", err) } // Atomically update the database cache. The cache automatically // handles flushing to the underlying persistent storage database. return tx.db.cache.commitTx(tx) } // Commit commits all changes that have been made to the root metadata bucket // and all of its sub-buckets to the database cache which is periodically synced // to persistent storage. In addition, it commits all new blocks directly to // persistent storage bypassing the db cache. Blocks can be rather large, so // this help increase the amount of cache available for the metadata updates and // is safe since blocks are immutable. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Commit() error { // Prevent commits on managed transactions. if tx.managed { tx.close() panic("managed transaction commit not allowed") } // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return err } // Regardless of whether the commit succeeds, the transaction is closed // on return. defer tx.close() // Ensure the transaction is writable. if !tx.writable { str := "Commit requires a writable database transaction" return makeDbErr(database.ErrTxNotWritable, str, nil) } // Write pending data. The function will rollback if any errors occur. return tx.writePendingAndCommit() } // Rollback undoes all changes that have been made to the root bucket and all of // its sub-buckets. // // This function is part of the database.Tx interface implementation. func (tx *transaction) Rollback() error { // Prevent rollbacks on managed transactions. if tx.managed { tx.close() panic("managed transaction rollback not allowed") } // Ensure transaction state is valid. if err := tx.checkClosed(); err != nil { return err } tx.close() return nil } // db represents a collection of namespaces which are persisted and implements // the database.DB interface. All database access is performed through // transactions which are obtained through the specific Namespace. type db struct { writeLock sync.Mutex // Limit to one write transaction at a time. closeLock sync.RWMutex // Make database close block while txns active. closed bool // Is the database closed? store *blockStore // Handles read/writing blocks to flat files. cache *dbCache // Cache layer which wraps underlying leveldb DB. } // Enforce db implements the database.DB interface. var _ database.DB = (*db)(nil) // Type returns the database driver type the current database instance was // created with. // // This function is part of the database.DB interface implementation. func (db *db) Type() string { return dbType } // begin is the implementation function for the Begin database method. See its // documentation for more details. // // This function is only separate because it returns the internal transaction // which is used by the managed transaction code while the database method // returns the interface. func (db *db) begin(writable bool) (*transaction, error) { // Whenever a new writable transaction is started, grab the write lock // to ensure only a single write transaction can be active at the same // time. This lock will not be released until the transaction is // closed (via Rollback or Commit). if writable { db.writeLock.Lock() } // Whenever a new transaction is started, grab a read lock against the // database to ensure Close will wait for the transaction to finish. // This lock will not be released until the transaction is closed (via // Rollback or Commit). db.closeLock.RLock() if db.closed { db.closeLock.RUnlock() if writable { db.writeLock.Unlock() } return nil, makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil) } // Grab a snapshot of the database cache (which in turn also handles the // underlying database). snapshot, err := db.cache.Snapshot() if err != nil { db.closeLock.RUnlock() if writable { db.writeLock.Unlock() } return nil, err } // The metadata and block index buckets are internal-only buckets, so // they have defined IDs. tx := &transaction{ writable: writable, db: db, snapshot: snapshot, pendingKeys: treap.NewMutable(), pendingRemove: treap.NewMutable(), } tx.metaBucket = &bucket{tx: tx, id: metadataBucketID} tx.blockIdxBucket = &bucket{tx: tx, id: blockIdxBucketID} return tx, nil } // Begin starts a transaction which is either read-only or read-write depending // on the specified flag. Multiple read-only transactions can be started // simultaneously while only a single read-write transaction can be started at a // time. The call will block when starting a read-write transaction when one is // already open. // // NOTE: The transaction must be closed by calling Rollback or Commit on it when // it is no longer needed. Failure to do so will result in unclaimed memory. // // This function is part of the database.DB interface implementation. func (db *db) Begin(writable bool) (database.Tx, error) { return db.begin(writable) } // rollbackOnPanic rolls the passed transaction back if the code in the calling // function panics. This is needed since the mutex on a transaction must be // released and a panic in called code would prevent that from happening. // // NOTE: This can only be handled manually for managed transactions since they // control the life-cycle of the transaction. As the documentation on Begin // calls out, callers opting to use manual transactions will have to ensure the // transaction is rolled back on panic if it desires that functionality as well // or the database will fail to close since the read-lock will never be // released. func rollbackOnPanic(tx *transaction) { if err := recover(); err != nil { tx.managed = false _ = tx.Rollback() panic(err) } } // View invokes the passed function in the context of a managed read-only // transaction with the root bucket for the namespace. Any errors returned from // the user-supplied function are returned from this function. // // This function is part of the database.DB interface implementation. func (db *db) View(fn func(database.Tx) error) error { // Start a read-only transaction. tx, err := db.begin(false) if err != nil { return err } // Since the user-provided function might panic, ensure the transaction // releases all mutexes and resources. There is no guarantee the caller // won't use recover and keep going. Thus, the database must still be // in a usable state on panics due to caller issues. defer rollbackOnPanic(tx) tx.managed = true err = fn(tx) tx.managed = false if err != nil { // The error is ignored here because nothing was written yet // and regardless of a rollback failure, the tx is closed now // anyways. _ = tx.Rollback() return err } return tx.Rollback() } // Update invokes the passed function in the context of a managed read-write // transaction with the root bucket for the namespace. Any errors returned from // the user-supplied function will cause the transaction to be rolled back and // are returned from this function. Otherwise, the transaction is committed // when the user-supplied function returns a nil error. // // This function is part of the database.DB interface implementation. func (db *db) Update(fn func(database.Tx) error) error { // Start a read-write transaction. tx, err := db.begin(true) if err != nil { return err } // Since the user-provided function might panic, ensure the transaction // releases all mutexes and resources. There is no guarantee the caller // won't use recover and keep going. Thus, the database must still be // in a usable state on panics due to caller issues. defer rollbackOnPanic(tx) tx.managed = true err = fn(tx) tx.managed = false if err != nil { // The error is ignored here because nothing was written yet // and regardless of a rollback failure, the tx is closed now // anyways. _ = tx.Rollback() return err } return tx.Commit() } // Close cleanly shuts down the database and syncs all data. It will block // until all database transactions have been finalized (rolled back or // committed). // // This function is part of the database.DB interface implementation. func (db *db) Close() error { // Since all transactions have a read lock on this mutex, this will // cause Close to wait for all readers to complete. db.closeLock.Lock() defer db.closeLock.Unlock() if db.closed { return makeDbErr(database.ErrDbNotOpen, errDbNotOpenStr, nil) } db.closed = true // NOTE: Since the above lock waits for all transactions to finish and // prevents any new ones from being started, it is safe to flush the // cache and clear all state without the individual locks. // Close the database cache which will flush any existing entries to // disk and close the underlying leveldb database. Any error is saved // and returned at the end after the remaining cleanup since the // database will be marked closed even if this fails given there is no // good way for the caller to recover from a failure here anyways. closeErr := db.cache.Close() // Close any open flat files that house the blocks. wc := db.store.writeCursor if wc.curFile.file != nil { _ = wc.curFile.file.Close() wc.curFile.file = nil } for _, blockFile := range db.store.openBlockFiles { _ = blockFile.file.Close() } db.store.openBlockFiles = nil db.store.openBlocksLRU.Init() db.store.fileNumToLRUElem = nil return closeErr } // filesExists reports whether the named file or directory exists. func fileExists(name string) bool { if _, err := os.Stat(name); err != nil { if os.IsNotExist(err) { return false } } return true } // initDB creates the initial buckets and values used by the package. This is // mainly in a separate function for testing purposes. func initDB(ldb *leveldb.DB) error { // The starting block file write cursor location is file num 0, offset // 0. batch := new(leveldb.Batch) batch.Put(bucketizedKey(metadataBucketID, writeLocKeyName), serializeWriteRow(0, 0)) // Create block index bucket and set the current bucket id. // // NOTE: Since buckets are virtualized through the use of prefixes, // there is no need to store the bucket index data for the metadata // bucket in the database. However, the first bucket ID to use does // need to account for it to ensure there are no key collisions. batch.Put(bucketIndexKey(metadataBucketID, blockIdxBucketName), blockIdxBucketID[:]) batch.Put(curBucketIDKeyName, blockIdxBucketID[:]) // Write everything as a single batch. if err := ldb.Write(batch, nil); err != nil { str := fmt.Sprintf("failed to initialize metadata database: %v", err) return convertErr(str, err) } return nil } // openDB opens the database at the provided path. database.ErrDbDoesNotExist // is returned if the database doesn't exist and the create flag is not set. func openDB(dbPath string, network wire.BitcoinNet, create bool) (database.DB, error) { // Error if the database doesn't exist and the create flag is not set. metadataDbPath := filepath.Join(dbPath, metadataDbName) dbExists := fileExists(metadataDbPath) if !create && !dbExists { str := fmt.Sprintf("database %q does not exist", metadataDbPath) return nil, makeDbErr(database.ErrDbDoesNotExist, str, nil) } // Ensure the full path to the database exists. if !dbExists { // The error can be ignored here since the call to // leveldb.OpenFile will fail if the directory couldn't be // created. _ = os.MkdirAll(dbPath, 0700) } // Open the metadata database (will create it if needed). opts := opt.Options{ ErrorIfExist: create, Strict: opt.DefaultStrict, Compression: opt.NoCompression, Filter: filter.NewBloomFilter(10), } ldb, err := leveldb.OpenFile(metadataDbPath, &opts) if err != nil { return nil, convertErr(err.Error(), err) } // Create the block store which includes scanning the existing flat // block files to find what the current write cursor position is // according to the data that is actually on disk. Also create the // database cache which wraps the underlying leveldb database to provide // write caching. store := newBlockStore(dbPath, network) cache := newDbCache(ldb, store, defaultCacheSize, defaultFlushSecs) pdb := &db{store: store, cache: cache} // Perform any reconciliation needed between the block and metadata as // well as database initialization, if needed. return reconcileDB(pdb, create) }