Implement ldb database functionality for optional addrindex.

This commit is contained in:
Olaoluwa Osuntokun 2014-12-24 17:55:14 -06:00
parent c67a3059df
commit b284bf0f90
11 changed files with 707 additions and 81 deletions

41
db.go
View file

@ -9,16 +9,21 @@ import (
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwire" "github.com/btcsuite/btcwire"
"golang.org/x/crypto/ripemd160"
) )
// Errors that the various database functions may return. // Errors that the various database functions may return.
var ( var (
ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet")
ErrUnsupportedAddressType = errors.New("address type is not supported " +
"by the address-index")
ErrPrevShaMissing = errors.New("previous sha missing from database") ErrPrevShaMissing = errors.New("previous sha missing from database")
ErrTxShaMissing = errors.New("requested transaction does not exist") ErrTxShaMissing = errors.New("requested transaction does not exist")
ErrBlockShaMissing = errors.New("requested block does not exist") ErrBlockShaMissing = errors.New("requested block does not exist")
ErrDuplicateSha = errors.New("duplicate insert attempted") ErrDuplicateSha = errors.New("duplicate insert attempted")
ErrDbDoesNotExist = errors.New("non-existent database") ErrDbDoesNotExist = errors.New("non-existent database")
ErrDbUnknownType = errors.New("non-existent database type") ErrDbUnknownType = errors.New("non-existent database type")
ErrNotImplemented = errors.New("method has not yet been implemented")
) )
// AllShas is a special value that can be used as the final sha when requesting // AllShas is a special value that can be used as the final sha when requesting
@ -106,6 +111,34 @@ type Db interface {
// the database yet. // the database yet.
NewestSha() (sha *btcwire.ShaHash, height int64, err error) NewestSha() (sha *btcwire.ShaHash, height int64, err error)
// FetchAddrIndexTip returns the hash and block height of the most recent
// block which has had its address index populated. It will return
// ErrAddrIndexDoesNotExist along with a zero hash, and -1 if the
// addrindex hasn't yet been built up.
FetchAddrIndexTip() (sha *btcwire.ShaHash, height int64, err error)
// UpdateAddrIndexForBlock updates the stored addrindex with passed
// index information for a particular block height. Additionally, it
// will update the stored meta-data related to the curent tip of the
// addr index. These two operations are performed in an atomic
// transaction which is commited before the function returns.
// Addresses are indexed by the raw bytes of their base58 decoded
// hash160.
UpdateAddrIndexForBlock(blkSha *btcwire.ShaHash, height int64,
addrIndex BlockAddrIndex) error
// FetchTxsForAddr looks up and returns all transactions which either
// spend a previously created output of the passed address, or create
// a new output locked to the passed address. The, `limit` parameter
// should be the max number of transactions to be returned.
// Additionally, if the caller wishes to skip forward in the results
// some amount, the 'seek' represents how many results to skip.
// NOTE: Values for both `seek` and `limit` MUST be positive.
FetchTxsForAddr(addr btcutil.Address, skip int, limit int) ([]*TxListReply, error)
// DeleteAddrIndex deletes the entire addrindex stored within the DB.
DeleteAddrIndex() error
// RollbackClose discards the recent database changes to the previously // RollbackClose discards the recent database changes to the previously
// saved data at last Sync and closes the database. // saved data at last Sync and closes the database.
RollbackClose() (err error) RollbackClose() (err error)
@ -134,6 +167,14 @@ type TxListReply struct {
Err error Err error
} }
// AddrIndexKeySize is the number of bytes used by keys into the BlockAddrIndex.
const AddrIndexKeySize = ripemd160.Size
// BlockAddrIndex represents the indexing structure for addresses.
// It maps a hash160 to a list of transaction locations within a block that
// either pays to or spends from the passed UTXO for the hash160.
type BlockAddrIndex map[[AddrIndexKeySize]byte][]*btcwire.TxLoc
// driverList holds all of the registered database backends. // driverList holds all of the registered database backends.
var driverList []DriverDB var driverList []DriverDB

View file

@ -19,7 +19,7 @@ var (
ignoreDbTypes = map[string]bool{"createopenfail": true} ignoreDbTypes = map[string]bool{"createopenfail": true}
) )
// testNewestShaEmpty ensures the NewestSha returns the values expected by // testNewestShaEmpty ensures that NewestSha returns the values expected by
// the interface contract. // the interface contract.
func testNewestShaEmpty(t *testing.T, db btcdb.Db) { func testNewestShaEmpty(t *testing.T, db btcdb.Db) {
sha, height, err := db.NewestSha() sha, height, err := db.NewestSha()

View file

@ -290,3 +290,41 @@ func (db *LevelDb) NewestSha() (rsha *btcwire.ShaHash, rblkid int64, err error)
return &sha, db.lastBlkIdx, nil return &sha, db.lastBlkIdx, nil
} }
// fetchAddrIndexTip returns the last block height and block sha to be indexed.
// Meta-data about the address tip is currently cached in memory, and will be
// updated accordingly by functions that modify the state. This function is
// used on start up to load the info into memory. Callers will use the public
// version of this function below, which returns our cached copy.
func (db *LevelDb) fetchAddrIndexTip() (*btcwire.ShaHash, int64, error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
data, err := db.lDb.Get(addrIndexMetaDataKey, db.ro)
if err != nil {
return &btcwire.ShaHash{}, -1, btcdb.ErrAddrIndexDoesNotExist
}
var blkSha btcwire.ShaHash
blkSha.SetBytes(data[0:32])
blkHeight := binary.LittleEndian.Uint64(data[32:])
return &blkSha, int64(blkHeight), nil
}
// FetchAddrIndexTip returns the hash and block height of the most recent
// block whose transactions have been indexed by address. It will return
// ErrAddrIndexDoesNotExist along with a zero hash, and -1 if the
// addrindex hasn't yet been built up.
func (db *LevelDb) FetchAddrIndexTip() (*btcwire.ShaHash, int64, error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
if db.lastAddrIndexBlkIdx == -1 {
return &btcwire.ShaHash{}, -1, btcdb.ErrAddrIndexDoesNotExist
}
sha := db.lastAddrIndexBlkSha
return &sha, db.lastAddrIndexBlkIdx, nil
}

View file

@ -12,7 +12,8 @@ import (
"github.com/btcsuite/btcwire" "github.com/btcsuite/btcwire"
) )
// we need to test for empty databas and make certain it returns proper value // we need to test for an empty database and make certain it returns the proper
// values
func TestEmptyDB(t *testing.T) { func TestEmptyDB(t *testing.T) {

View file

@ -110,7 +110,7 @@ out:
lastSha = blkSha lastSha = blkSha
} }
// genrate a new block based on the last sha // generate a new block based on the last sha
// these block are not verified, so there are a bunch of garbage fields // these block are not verified, so there are a bunch of garbage fields
// in the 'generated' block. // in the 'generated' block.

View file

@ -66,7 +66,7 @@ endtest:
for height := int64(0); height < int64(len(blocks)); height++ { for height := int64(0); height < int64(len(blocks)); height++ {
block := blocks[height] block := blocks[height]
// look up inputs to this x // look up inputs to this tx
mblock := block.MsgBlock() mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash var txneededList []*btcwire.ShaHash
var txlookupList []*btcwire.ShaHash var txlookupList []*btcwire.ShaHash

View file

@ -5,21 +5,60 @@
package ldb package ldb
import ( import (
"fmt" "bytes"
"github.com/btcsuite/btcdb" "testing"
"github.com/btcsuite/btcwire"
"github.com/btcsuite/btcutil"
"golang.org/x/crypto/ripemd160"
) )
// FetchSha returns the datablock and pver for the given ShaHash. func TestAddrIndexKeySerialization(t *testing.T) {
// This is a testing only interface. var hash160Bytes [ripemd160.Size]byte
func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32,
blkid int64, err error) { fakeHash160 := btcutil.Hash160([]byte("testing"))
sqldb, ok := db.(*LevelDb) copy(fakeHash160, hash160Bytes[:])
if !ok {
err = fmt.Errorf("invalid data type") fakeIndex := txAddrIndex{
return hash160: hash160Bytes,
blkHeight: 1,
txoffset: 5,
txlen: 360,
}
serializedKey := addrIndexToKey(&fakeIndex)
unpackedIndex := unpackTxIndex(serializedKey[22:])
if unpackedIndex.blkHeight != fakeIndex.blkHeight {
t.Errorf("Incorrect block height. Unpack addr index key"+
"serialization failed. Expected %d, received %d",
1, unpackedIndex.blkHeight)
}
if unpackedIndex.txoffset != fakeIndex.txoffset {
t.Errorf("Incorrect tx offset. Unpack addr index key"+
"serialization failed. Expected %d, received %d",
5, unpackedIndex.txoffset)
}
if unpackedIndex.txlen != fakeIndex.txlen {
t.Errorf("Incorrect tx len. Unpack addr index key"+
"serialization failed. Expected %d, received %d",
360, unpackedIndex.txlen)
}
}
func TestBytesPrefix(t *testing.T) {
testKey := []byte("a")
prefixRange := bytesPrefix(testKey)
if !bytes.Equal(prefixRange.Start, []byte("a")) {
t.Errorf("Wrong prefix start, got %d, expected %d", prefixRange.Start,
[]byte("a"))
}
if !bytes.Equal(prefixRange.Limit, []byte("b")) {
t.Errorf("Wrong prefix end, got %d, expected %d", prefixRange.Limit,
[]byte("b"))
} }
buf, blkid, err = sqldb.fetchSha(sha)
return
} }

View file

@ -53,6 +53,9 @@ type LevelDb struct {
lastBlkSha btcwire.ShaHash lastBlkSha btcwire.ShaHash
lastBlkIdx int64 lastBlkIdx int64
lastAddrIndexBlkSha btcwire.ShaHash
lastAddrIndexBlkIdx int64
txUpdateMap map[btcwire.ShaHash]*txUpdateObj txUpdateMap map[btcwire.ShaHash]*txUpdateObj
txSpentUpdateMap map[btcwire.ShaHash]*spentTxUpdate txSpentUpdateMap map[btcwire.ShaHash]*spentTxUpdate
} }
@ -92,7 +95,6 @@ func OpenDB(args ...interface{}) (btcdb.Db, error) {
} }
// Need to find last block and tx // Need to find last block and tx
var lastknownblock, nextunknownblock, testblock int64 var lastknownblock, nextunknownblock, testblock int64
increment := int64(100000) increment := int64(100000)
@ -139,6 +141,14 @@ blocknarrow:
} }
} }
// Load the last block whose transactions have been indexed by address.
if sha, idx, err := ldb.fetchAddrIndexTip(); err == nil {
ldb.lastAddrIndexBlkSha = *sha
ldb.lastAddrIndexBlkIdx = idx
} else {
ldb.lastAddrIndexBlkIdx = -1
}
ldb.lastBlkSha = *lastSha ldb.lastBlkSha = *lastSha
ldb.lastBlkIdx = lastknownblock ldb.lastBlkIdx = lastknownblock
ldb.nextBlock = lastknownblock + 1 ldb.nextBlock = lastknownblock + 1
@ -250,6 +260,7 @@ func CreateDB(args ...interface{}) (btcdb.Db, error) {
if err == nil { if err == nil {
ldb := db.(*LevelDb) ldb := db.(*LevelDb)
ldb.lastBlkIdx = -1 ldb.lastBlkIdx = -1
ldb.lastAddrIndexBlkIdx = -1
ldb.nextBlock = 0 ldb.nextBlock = 0
} }
return db, err return db, err

View file

@ -5,6 +5,7 @@
package ldb_test package ldb_test
import ( import (
"bytes"
"compress/bzip2" "compress/bzip2"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
@ -16,22 +17,26 @@ import (
"github.com/btcsuite/btcdb" "github.com/btcsuite/btcdb"
"github.com/btcsuite/btcnet" "github.com/btcsuite/btcnet"
"github.com/btcsuite/btcscript"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwire" "github.com/btcsuite/btcwire"
"golang.org/x/crypto/ripemd160"
) )
var network = btcwire.MainNet var network = btcwire.MainNet
func TestOperational(t *testing.T) { // testDb is used to store db related context for a running test.
testOperationalMode(t) // the `cleanUpFunc` *must* be called after each test to maintain db
// consistency across tests.
type testDb struct {
db btcdb.Db
blocks []*btcutil.Block
dbName string
dbNameVer string
cleanUpFunc func()
} }
func testOperationalMode(t *testing.T) { func setUpTestDb(t *testing.T) (*testDb, error) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// Ignore db remove errors since it means we didn't have an old one. // Ignore db remove errors since it means we didn't have an old one.
dbname := fmt.Sprintf("tstdbop1") dbname := fmt.Sprintf("tstdbop1")
dbnamever := dbname + ".ver" dbnamever := dbname + ".ver"
@ -39,28 +44,181 @@ func testOperationalMode(t *testing.T) {
_ = os.RemoveAll(dbnamever) _ = os.RemoveAll(dbnamever)
db, err := btcdb.CreateDB("leveldb", dbname) db, err := btcdb.CreateDB("leveldb", dbname)
if err != nil { if err != nil {
t.Errorf("Failed to open test database %v", err) return nil, err
return
} }
defer os.RemoveAll(dbname)
defer os.RemoveAll(dbnamever)
defer func() {
if err := db.Close(); err != nil {
t.Errorf("Close: unexpected error: %v", err)
}
}()
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2") testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2")
blocks, err := loadBlocks(t, testdatafile) blocks, err := loadBlocks(t, testdatafile)
if err != nil {
return nil, err
}
cleanUp := func() {
db.Close()
os.RemoveAll(dbname)
os.RemoveAll(dbnamever)
}
return &testDb{
db: db,
blocks: blocks,
dbName: dbname,
dbNameVer: dbnamever,
cleanUpFunc: cleanUp,
}, nil
}
func TestOperational(t *testing.T) {
testOperationalMode(t)
}
// testAddrIndexOperations ensures that all normal operations concerning
// the optional address index function correctly.
func testAddrIndexOperations(t *testing.T, db btcdb.Db, newestBlock *btcutil.Block, newestSha *btcwire.ShaHash, newestBlockIdx int64) {
// Metadata about the current addr index state should be unset.
sha, height, err := db.FetchAddrIndexTip()
if err != btcdb.ErrAddrIndexDoesNotExist {
t.Fatalf("Address index metadata shouldn't be in db, hasn't been built up yet.")
}
var zeroHash btcwire.ShaHash
if !sha.IsEqual(&zeroHash) {
t.Fatalf("AddrIndexTip wrong hash got: %s, want %s", sha, &zeroHash)
}
if height != -1 {
t.Fatalf("Addrindex not built up, yet a block index tip has been set to: %d.", height)
}
// Test enforcement of constraints for "limit" and "skip"
var fakeAddr btcutil.Address
_, err = db.FetchTxsForAddr(fakeAddr, -1, 0)
if err == nil {
t.Fatalf("Negative value for skip passed, should return an error")
}
_, err = db.FetchTxsForAddr(fakeAddr, 0, -1)
if err == nil {
t.Fatalf("Negative value for limit passed, should return an error")
}
// Simple test to index outputs(s) of the first tx.
testIndex := make(btcdb.BlockAddrIndex)
testTx, err := newestBlock.Tx(0)
if err != nil {
t.Fatalf("Block has no transactions, unable to test addr "+
"indexing, err %v", err)
}
// Extract the dest addr from the tx.
_, testAddrs, _, err := btcscript.ExtractPkScriptAddrs(testTx.MsgTx().TxOut[0].PkScript, &btcnet.MainNetParams)
if err != nil {
t.Fatalf("Unable to decode tx output, err %v", err)
}
// Extract the hash160 from the output script.
var hash160Bytes [ripemd160.Size]byte
testHash160 := testAddrs[0].(*btcutil.AddressPubKey).AddressPubKeyHash().ScriptAddress()
copy(hash160Bytes[:], testHash160[:])
// Create a fake index.
blktxLoc, _ := newestBlock.TxLoc()
testIndex[hash160Bytes] = []*btcwire.TxLoc{&blktxLoc[0]}
// Insert our test addr index into the DB.
err = db.UpdateAddrIndexForBlock(newestSha, newestBlockIdx, testIndex)
if err != nil {
t.Fatalf("UpdateAddrIndexForBlock: failed to index"+
" addrs for block #%d (%s) "+
"err %v", newestBlockIdx, newestSha, err)
}
// Chain Tip of address should've been updated.
assertAddrIndexTipIsUpdated(db, t, newestSha, newestBlockIdx)
// Check index retrieval.
txReplies, err := db.FetchTxsForAddr(testAddrs[0], 0, 1000)
if err != nil {
t.Fatalf("FetchTxsForAddr failed to correctly fetch txs for an "+
"address, err %v", err)
}
// Should have one reply.
if len(txReplies) != 1 {
t.Fatalf("Failed to properly index tx by address.")
}
// Our test tx and indexed tx should have the same sha.
indexedTx := txReplies[0]
if !bytes.Equal(indexedTx.Sha.Bytes(), testTx.Sha().Bytes()) {
t.Fatalf("Failed to fetch proper indexed tx. Expected sha %v, "+
"fetched %v", testTx.Sha(), indexedTx.Sha)
}
// Shut down DB.
db.Sync()
db.Close()
// Re-Open, tip still should be updated to current height and sha.
db, err = btcdb.OpenDB("leveldb", "tstdbop1")
if err != nil {
t.Fatalf("Unable to re-open created db, err %v", err)
}
assertAddrIndexTipIsUpdated(db, t, newestSha, newestBlockIdx)
// Delete the entire index.
err = db.DeleteAddrIndex()
if err != nil {
t.Fatalf("Couldn't delete address index, err %v", err)
}
// Former index should no longer exist.
txReplies, err = db.FetchTxsForAddr(testAddrs[0], 0, 1000)
if err != nil {
t.Fatalf("Unable to fetch transactions for address: %v", err)
}
if len(txReplies) != 0 {
t.Fatalf("Address index was not successfully deleted. "+
"Should have 0 tx's indexed, %v were returned.",
len(txReplies))
}
// Tip should be blanked out.
if _, _, err := db.FetchAddrIndexTip(); err != btcdb.ErrAddrIndexDoesNotExist {
t.Fatalf("Address index was not fully deleted.")
}
}
func assertAddrIndexTipIsUpdated(db btcdb.Db, t *testing.T, newestSha *btcwire.ShaHash, newestBlockIdx int64) {
// Safe to ignore error, since height will be < 0 in "error" case.
sha, height, _ := db.FetchAddrIndexTip()
if newestBlockIdx != height {
t.Fatalf("Height of address index tip failed to update, "+
"expected %v, got %v", newestBlockIdx, height)
}
if !bytes.Equal(newestSha.Bytes(), sha.Bytes()) {
t.Fatalf("Sha of address index tip failed to update, "+
"expected %v, got %v", newestSha, sha)
}
}
func testOperationalMode(t *testing.T) {
// simplified basic operation is:
// 1) fetch block from remote server
// 2) look up all txin (except coinbase in db)
// 3) insert block
// 4) exercise the optional addridex
testDb, err := setUpTestDb(t)
defer testDb.cleanUpFunc()
if err != nil { if err != nil {
t.Errorf("Unable to load blocks from test data: %v", err) t.Errorf("Unable to load blocks from test data: %v", err)
return return
} }
err = nil err = nil
out: out:
for height := int64(0); height < int64(len(blocks)); height++ { for height := int64(0); height < int64(len(testDb.blocks)); height++ {
block := blocks[height] block := testDb.blocks[height]
mblock := block.MsgBlock() mblock := block.MsgBlock()
var txneededList []*btcwire.ShaHash var txneededList []*btcwire.ShaHash
for _, tx := range mblock.Transactions { for _, tx := range mblock.Transactions {
@ -71,7 +229,7 @@ out:
origintxsha := &txin.PreviousOutPoint.Hash origintxsha := &txin.PreviousOutPoint.Hash
txneededList = append(txneededList, origintxsha) txneededList = append(txneededList, origintxsha)
exists, err := db.ExistsTxSha(origintxsha) exists, err := testDb.db.ExistsTxSha(origintxsha)
if err != nil { if err != nil {
t.Errorf("ExistsTxSha: unexpected error %v ", err) t.Errorf("ExistsTxSha: unexpected error %v ", err)
} }
@ -79,13 +237,13 @@ out:
t.Errorf("referenced tx not found %v ", origintxsha) t.Errorf("referenced tx not found %v ", origintxsha)
} }
_, err = db.FetchTxBySha(origintxsha) _, err = testDb.db.FetchTxBySha(origintxsha)
if err != nil { if err != nil {
t.Errorf("referenced tx not found %v err %v ", origintxsha, err) t.Errorf("referenced tx not found %v err %v ", origintxsha, err)
} }
} }
} }
txlist := db.FetchUnSpentTxByShaList(txneededList) txlist := testDb.db.FetchUnSpentTxByShaList(txneededList)
for _, txe := range txlist { for _, txe := range txlist {
if txe.Err != nil { if txe.Err != nil {
t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err)
@ -93,7 +251,7 @@ out:
} }
} }
newheight, err := db.InsertBlock(block) newheight, err := testDb.db.InsertBlock(block)
if err != nil { if err != nil {
t.Errorf("failed to insert block %v err %v", height, err) t.Errorf("failed to insert block %v err %v", height, err)
break out break out
@ -103,13 +261,13 @@ out:
break out break out
} }
newSha, blkid, err := db.NewestSha() newSha, blkid, err := testDb.db.NewestSha()
if err != nil { if err != nil {
t.Errorf("failed to obtain latest sha %v %v", height, err) t.Errorf("failed to obtain latest sha %v %v", height, err)
} }
if blkid != height { if blkid != height {
t.Errorf("height doe not match latest block height %v %v %v", blkid, height, err) t.Errorf("height does not match latest block height %v %v %v", blkid, height, err)
} }
blkSha, _ := block.Sha() blkSha, _ := block.Sha()
@ -118,8 +276,13 @@ out:
} }
} }
// now that db is populated, do some additional test // now that the db is populated, do some additional tests
testFetchRangeHeight(t, db, blocks) testFetchHeightRange(t, testDb.db, testDb.blocks)
// Ensure all operations dealing with the optional address index behave
// correctly.
newSha, blkid, err := testDb.db.NewestSha()
testAddrIndexOperations(t, testDb.db, testDb.blocks[len(testDb.blocks)-1], newSha, blkid)
} }
func TestBackout(t *testing.T) { func TestBackout(t *testing.T) {
@ -132,43 +295,35 @@ func testBackout(t *testing.T) {
// 2) look up all txin (except coinbase in db) // 2) look up all txin (except coinbase in db)
// 3) insert block // 3) insert block
// Ignore db remove errors since it means we didn't have an old one. testDb, err := setUpTestDb(t)
dbname := fmt.Sprintf("tstdbop2") defer testDb.cleanUpFunc()
dbnamever := dbname + ".ver"
_ = os.RemoveAll(dbname)
_ = os.RemoveAll(dbnamever)
db, err := btcdb.CreateDB("leveldb", dbname)
if err != nil { if err != nil {
t.Errorf("Failed to open test database %v", err) t.Errorf("Failed to open test database %v", err)
return return
} }
defer os.RemoveAll(dbname)
defer os.RemoveAll(dbnamever)
defer db.Close()
testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2") if len(testDb.blocks) < 120 {
blocks, err := loadBlocks(t, testdatafile)
if len(blocks) < 120 {
t.Errorf("test data too small") t.Errorf("test data too small")
return return
} }
err = nil err = nil
for height := int64(0); height < int64(len(blocks)); height++ { for height := int64(0); height < int64(len(testDb.blocks)); height++ {
if height == 100 { if height == 100 {
t.Logf("Syncing at block height 100") t.Logf("Syncing at block height 100")
db.Sync() testDb.db.Sync()
} }
if height == 120 { if height == 120 {
t.Logf("Simulating unexpected application quit") t.Logf("Simulating unexpected application quit")
// Simulate unexpected application quit // Simulate unexpected application quit
db.RollbackClose() testDb.db.RollbackClose()
break break
} }
block := blocks[height] block := testDb.blocks[height]
newheight, err := db.InsertBlock(block) newheight, err := testDb.db.InsertBlock(block)
if err != nil { if err != nil {
t.Errorf("failed to insert block %v err %v", height, err) t.Errorf("failed to insert block %v err %v", height, err)
return return
@ -182,49 +337,49 @@ func testBackout(t *testing.T) {
// db was closed at height 120, so no cleanup is possible. // db was closed at height 120, so no cleanup is possible.
// reopen db // reopen db
db, err = btcdb.OpenDB("leveldb", dbname) testDb.db, err = btcdb.OpenDB("leveldb", testDb.dbName)
if err != nil { if err != nil {
t.Errorf("Failed to open test database %v", err) t.Errorf("Failed to open test database %v", err)
return return
} }
defer func() { defer func() {
if err := db.Close(); err != nil { if err := testDb.db.Close(); err != nil {
t.Errorf("Close: unexpected error: %v", err) t.Errorf("Close: unexpected error: %v", err)
} }
}() }()
sha, err := blocks[99].Sha() sha, err := testDb.blocks[99].Sha()
if err != nil { if err != nil {
t.Errorf("failed to get block 99 sha err %v", err) t.Errorf("failed to get block 99 sha err %v", err)
return return
} }
if _, err := db.ExistsSha(sha); err != nil { if _, err := testDb.db.ExistsSha(sha); err != nil {
t.Errorf("ExistsSha: unexpected error: %v", err) t.Errorf("ExistsSha: unexpected error: %v", err)
} }
_, err = db.FetchBlockBySha(sha) _, err = testDb.db.FetchBlockBySha(sha)
if err != nil { if err != nil {
t.Errorf("failed to load block 99 from db %v", err) t.Errorf("failed to load block 99 from db %v", err)
return return
} }
sha, err = blocks[119].Sha() sha, err = testDb.blocks[119].Sha()
if err != nil { if err != nil {
t.Errorf("failed to get block 110 sha err %v", err) t.Errorf("failed to get block 110 sha err %v", err)
return return
} }
if _, err := db.ExistsSha(sha); err != nil { if _, err := testDb.db.ExistsSha(sha); err != nil {
t.Errorf("ExistsSha: unexpected error: %v", err) t.Errorf("ExistsSha: unexpected error: %v", err)
} }
_, err = db.FetchBlockBySha(sha) _, err = testDb.db.FetchBlockBySha(sha)
if err != nil { if err != nil {
t.Errorf("loaded block 119 from db") t.Errorf("loaded block 119 from db")
return return
} }
block := blocks[119] block := testDb.blocks[119]
mblock := block.MsgBlock() mblock := block.MsgBlock()
txsha, err := mblock.Transactions[0].TxSha() txsha, err := mblock.Transactions[0].TxSha()
exists, err := db.ExistsTxSha(&txsha) exists, err := testDb.db.ExistsTxSha(&txsha)
if err != nil { if err != nil {
t.Errorf("ExistsTxSha: unexpected error %v ", err) t.Errorf("ExistsTxSha: unexpected error %v ", err)
} }
@ -232,7 +387,7 @@ func testBackout(t *testing.T) {
t.Errorf("tx %v not located db\n", txsha) t.Errorf("tx %v not located db\n", txsha)
} }
_, err = db.FetchTxBySha(&txsha) _, err = testDb.db.FetchTxBySha(&txsha)
if err != nil { if err != nil {
t.Errorf("tx %v not located db\n", txsha) t.Errorf("tx %v not located db\n", txsha)
return return
@ -310,7 +465,7 @@ func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error)
return return
} }
func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { func testFetchHeightRange(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) {
var testincrement int64 = 50 var testincrement int64 = 50
var testcnt int64 = 100 var testcnt int64 = 100
@ -322,7 +477,7 @@ func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) {
for i := range blocks { for i := range blocks {
blockSha, err := blocks[i].Sha() blockSha, err := blocks[i].Sha()
if err != nil { if err != nil {
t.Errorf("FetchRangeHeight: unexpected failure computing block sah %v", err) t.Errorf("FetchHeightRange: unexpected failure computing block sah %v", err)
} }
shanames[i] = blockSha shanames[i] = blockSha
} }
@ -336,16 +491,16 @@ func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) {
shalist, err := db.FetchHeightRange(startheight, endheight) shalist, err := db.FetchHeightRange(startheight, endheight)
if err != nil { if err != nil {
t.Errorf("FetchRangeHeight: unexpected failure looking up shas %v", err) t.Errorf("FetchHeightRange: unexpected failure looking up shas %v", err)
} }
if endheight == btcdb.AllShas { if endheight == btcdb.AllShas {
if int64(len(shalist)) != nBlocks-startheight { if int64(len(shalist)) != nBlocks-startheight {
t.Errorf("FetchRangeHeight: expected A %v shas, got %v", nBlocks-startheight, len(shalist)) t.Errorf("FetchHeightRange: expected A %v shas, got %v", nBlocks-startheight, len(shalist))
} }
} else { } else {
if int64(len(shalist)) != testcnt { if int64(len(shalist)) != testcnt {
t.Errorf("FetchRangeHeight: expected %v shas, got %v", testcnt, len(shalist)) t.Errorf("FetchHeightRange: expected %v shas, got %v", testcnt, len(shalist))
} }
} }
@ -353,9 +508,90 @@ func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) {
sha0 := *shanames[int64(i)+startheight] sha0 := *shanames[int64(i)+startheight]
sha1 := shalist[i] sha1 := shalist[i]
if sha0 != sha1 { if sha0 != sha1 {
t.Errorf("FetchRangeHeight: mismatch sha at %v requested range %v %v: %v %v ", int64(i)+startheight, startheight, endheight, sha0, sha1) t.Errorf("FetchHeightRange: mismatch sha at %v requested range %v %v: %v %v ", int64(i)+startheight, startheight, endheight, sha0, sha1)
} }
} }
} }
} }
func TestLimitAndSkipFetchTxsForAddr(t *testing.T) {
testDb, err := setUpTestDb(t)
defer testDb.cleanUpFunc()
// Insert a block with some fake test transactions. The block will have
// 10 copies of a fake transaction involving same address.
addrString := "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa"
targetAddr, err := btcutil.DecodeAddress(addrString, &btcnet.MainNetParams)
if err != nil {
t.Fatalf("Unable to decode test address: %v", err)
}
outputScript, err := btcscript.PayToAddrScript(targetAddr)
if err != nil {
t.Fatalf("Unable make test pkScript %v", err)
}
fakeTxOut := btcwire.NewTxOut(10, outputScript)
var emptyHash btcwire.ShaHash
fakeHeader := btcwire.NewBlockHeader(&emptyHash, &emptyHash, 1, 1)
msgBlock := btcwire.NewMsgBlock(fakeHeader)
for i := 0; i < 10; i++ {
mtx := btcwire.NewMsgTx()
mtx.AddTxOut(fakeTxOut)
msgBlock.AddTransaction(mtx)
}
// Insert the test block into the DB.
testBlock := btcutil.NewBlock(msgBlock)
newheight, err := testDb.db.InsertBlock(testBlock)
if err != nil {
t.Fatalf("Unable to insert block into db: %v", err)
}
// Create and insert an address index for out test addr.
txLoc, _ := testBlock.TxLoc()
index := make(btcdb.BlockAddrIndex)
for i := range testBlock.Transactions() {
var hash160 [ripemd160.Size]byte
scriptAddr := targetAddr.ScriptAddress()
copy(hash160[:], scriptAddr[:])
index[hash160] = append(index[hash160], &txLoc[i])
}
blkSha, _ := testBlock.Sha()
err = testDb.db.UpdateAddrIndexForBlock(blkSha, newheight, index)
if err != nil {
t.Fatalf("UpdateAddrIndexForBlock: failed to index"+
" addrs for block #%d (%s) "+
"err %v", newheight, blkSha, err)
return
}
// Try skipping the first 4 results, should get 6 in return.
txReply, err := testDb.db.FetchTxsForAddr(targetAddr, 4, 100000)
if err != nil {
t.Fatalf("Unable to fetch transactions for address: %v", err)
}
if len(txReply) != 6 {
t.Fatalf("Did not correctly skip forward in txs for address reply"+
" got %v txs, expected %v", len(txReply), 6)
}
// Limit the number of results to 3.
txReply, err = testDb.db.FetchTxsForAddr(targetAddr, 0, 3)
if err != nil {
t.Fatalf("Unable to fetch transactions for address: %v", err)
}
if len(txReply) != 3 {
t.Fatalf("Did not correctly limit in txs for address reply"+
" got %v txs, expected %v", len(txReply), 3)
}
// Skip 1, limit 5.
txReply, err = testDb.db.FetchTxsForAddr(targetAddr, 1, 5)
if err != nil {
t.Fatalf("Unable to fetch transactions for address: %v", err)
}
if len(txReply) != 5 {
t.Fatalf("Did not correctly limit in txs for address reply"+
" got %v txs, expected %v", len(txReply), 5)
}
}

237
ldb/tx.go
View file

@ -7,12 +7,35 @@ package ldb
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors"
"github.com/btcsuite/btcdb" "github.com/btcsuite/btcdb"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcwire" "github.com/btcsuite/btcwire"
"github.com/btcsuite/goleveldb/leveldb" "github.com/btcsuite/goleveldb/leveldb"
"github.com/btcsuite/goleveldb/leveldb/util"
"golang.org/x/crypto/ripemd160"
) )
const (
// Each address index is 34 bytes:
// --------------------------------------------------------
// | Prefix | Hash160 | BlkHeight | Tx Offset | Tx Size |
// --------------------------------------------------------
// | 2 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes |
// --------------------------------------------------------
addrIndexKeyLength = 2 + ripemd160.Size + 4 + 4 + 4
batchDeleteThreshold = 10000
)
var addrIndexMetaDataKey = []byte("addrindex")
// All address index entries share this prefix to facilitate the use of
// iterators.
var addrIndexKeyPrefix = []byte("a-")
type txUpdateObj struct { type txUpdateObj struct {
txSha *btcwire.ShaHash txSha *btcwire.ShaHash
blkHeight int64 blkHeight int64
@ -35,6 +58,13 @@ type spentTxUpdate struct {
delete bool delete bool
} }
type txAddrIndex struct {
hash160 [ripemd160.Size]byte
blkHeight int64
txoffset int
txlen int
}
// InsertTx inserts a tx hash and its associated data into the database. // InsertTx inserts a tx hash and its associated data into the database.
func (db *LevelDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, spentbuf []byte) (err error) { func (db *LevelDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, spentbuf []byte) (err error) {
db.dbLock.Lock() db.dbLock.Lock()
@ -188,7 +218,7 @@ func (db *LevelDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxLis
} }
if err == btcdb.ErrTxShaMissing { if err == btcdb.ErrTxShaMissing {
// if the unspent pool did not have the tx, // if the unspent pool did not have the tx,
// look in the fully spent pool (only last instance // look in the fully spent pool (only last instance)
sTxList, fSerr := db.getTxFullySpent(txsha) sTxList, fSerr := db.getTxFullySpent(txsha)
if fSerr == nil && len(sTxList) != 0 { if fSerr == nil && len(sTxList) != 0 {
@ -346,3 +376,208 @@ func (db *LevelDb) FetchTxBySha(txsha *btcwire.ShaHash) ([]*btcdb.TxListReply, e
} }
return replies, nil return replies, nil
} }
// addrIndexToKey serializes the passed txAddrIndex for storage within the DB.
func addrIndexToKey(index *txAddrIndex) []byte {
record := make([]byte, addrIndexKeyLength, addrIndexKeyLength)
copy(record[:2], addrIndexKeyPrefix)
copy(record[2:22], index.hash160[:])
// The index itself.
binary.LittleEndian.PutUint32(record[22:26], uint32(index.blkHeight))
binary.LittleEndian.PutUint32(record[26:30], uint32(index.txoffset))
binary.LittleEndian.PutUint32(record[30:34], uint32(index.txlen))
return record
}
// unpackTxIndex deserializes the raw bytes of a address tx index.
func unpackTxIndex(rawIndex []byte) *txAddrIndex {
return &txAddrIndex{
blkHeight: int64(binary.LittleEndian.Uint32(rawIndex[0:4])),
txoffset: int(binary.LittleEndian.Uint32(rawIndex[4:8])),
txlen: int(binary.LittleEndian.Uint32(rawIndex[8:12])),
}
}
// bytesPrefix returns key range that satisfy the given prefix.
// This only applicable for the standard 'bytes comparer'.
func bytesPrefix(prefix []byte) *util.Range {
var limit []byte
for i := len(prefix) - 1; i >= 0; i-- {
c := prefix[i]
if c < 0xff {
limit = make([]byte, i+1)
copy(limit, prefix)
limit[i] = c + 1
break
}
}
return &util.Range{Start: prefix, Limit: limit}
}
// FetchTxsForAddr looks up and returns all transactions which either
// spend from a previously created output of the passed address, or
// create a new output locked to the passed address. The, `limit` parameter
// should be the max number of transactions to be returned. Additionally, if the
// caller wishes to seek forward in the results some amount, the 'seek'
// represents how many results to skip.
func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int,
limit int) ([]*btcdb.TxListReply, error) {
db.dbLock.Lock()
defer db.dbLock.Unlock()
// Enforce constraints for skip and limit.
if skip < 0 {
return nil, errors.New("offset for skip must be positive")
}
if limit < 0 {
return nil, errors.New("value for limit must be positive")
}
// Parse address type, bailing on an unknown type.
var addrKey []byte
switch addr := addr.(type) {
case *btcutil.AddressPubKeyHash:
hash160 := addr.Hash160()
addrKey = hash160[:]
case *btcutil.AddressScriptHash:
hash160 := addr.Hash160()
addrKey = hash160[:]
case *btcutil.AddressPubKey:
hash160 := addr.AddressPubKeyHash().Hash160()
addrKey = hash160[:]
default:
return nil, btcdb.ErrUnsupportedAddressType
}
// Create the prefix for our search.
addrPrefix := make([]byte, 22, 22)
copy(addrPrefix[:2], addrIndexKeyPrefix)
copy(addrPrefix[2:], addrKey)
var replies []*btcdb.TxListReply
iter := db.lDb.NewIterator(bytesPrefix(addrPrefix), nil)
for skip != 0 && iter.Next() {
skip--
}
// Iterate through all address indexes that match the targeted prefix.
for iter.Next() && limit != 0 {
rawIndex := make([]byte, 22, 22)
copy(rawIndex, iter.Key()[22:])
addrIndex := unpackTxIndex(rawIndex)
tx, blkSha, blkHeight, _, err := db.fetchTxDataByLoc(addrIndex.blkHeight,
addrIndex.txoffset, addrIndex.txlen, []byte{})
if err != nil {
// Eat a possible error due to a potential re-org.
continue
}
txSha, _ := tx.TxSha()
txReply := &btcdb.TxListReply{Sha: &txSha, Tx: tx,
BlkSha: blkSha, Height: blkHeight, TxSpent: []bool{}, Err: err}
replies = append(replies, txReply)
limit--
}
iter.Release()
return replies, nil
}
// UpdateAddrIndexForBlock updates the stored addrindex with passed
// index information for a particular block height. Additionally, it
// will update the stored meta-data related to the curent tip of the
// addr index. These two operations are performed in an atomic
// transaction which is commited before the function returns.
// Transactions indexed by address are stored with the following format:
// * prefix || hash160 || blockHeight || txoffset || txlen
// Indexes are stored purely in the key, with blank data for the actual value
// in order to facilitate ease of iteration by their shared prefix and
// also to allow limiting the number of returned transactions (RPC).
// Alternatively, indexes for each address could be stored as an
// append-only list for the stored value. However, this add unnecessary
// overhead when storing and retrieving since the entire list must
// be fetched each time.
func (db *LevelDb) UpdateAddrIndexForBlock(blkSha *btcwire.ShaHash, blkHeight int64, addrIndex btcdb.BlockAddrIndex) error {
db.dbLock.Lock()
defer db.dbLock.Unlock()
var blankData []byte
batch := db.lBatch()
defer db.lbatch.Reset()
// Write all data for the new address indexes in a single batch
// transaction.
for addrKey, indexes := range addrIndex {
for _, txLoc := range indexes {
index := &txAddrIndex{
hash160: addrKey,
blkHeight: blkHeight,
txoffset: txLoc.TxStart,
txlen: txLoc.TxLen,
}
// The index is stored purely in the key.
packedIndex := addrIndexToKey(index)
batch.Put(packedIndex, blankData)
}
}
// Update tip of addrindex.
newIndexTip := make([]byte, 40, 40)
copy(newIndexTip[:32], blkSha.Bytes())
binary.LittleEndian.PutUint64(newIndexTip[32:], uint64(blkHeight))
batch.Put(addrIndexMetaDataKey, newIndexTip)
if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
db.lastAddrIndexBlkIdx = blkHeight
db.lastAddrIndexBlkSha = *blkSha
return nil
}
// DeleteAddrIndex deletes the entire addrindex stored within the DB.
// It also resets the cached in-memory metadata about the addr index.
func (db *LevelDb) DeleteAddrIndex() error {
db.dbLock.Lock()
defer db.dbLock.Unlock()
batch := db.lBatch()
defer batch.Reset()
// Delete the entire index along with any metadata about it.
iter := db.lDb.NewIterator(bytesPrefix(addrIndexKeyPrefix), db.ro)
numInBatch := 0
for iter.Next() {
key := iter.Key()
batch.Delete(key)
numInBatch++
// Delete in chunks to potentially avoid very large batches.
if numInBatch >= batchDeleteThreshold {
if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
batch.Reset()
numInBatch = 0
}
}
iter.Release()
batch.Delete(addrIndexMetaDataKey)
if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
db.lastAddrIndexBlkIdx = -1
db.lastAddrIndexBlkSha = btcwire.ShaHash{}
return nil
}

View file

@ -698,6 +698,31 @@ func (db *MemDb) NewestSha() (*btcwire.ShaHash, int64, error) {
return &blockSha, int64(numBlocks - 1), nil return &blockSha, int64(numBlocks - 1), nil
} }
// FetchAddrIndexTip isn't currently implemented. This is a part of the
// btcdb.Db interface implementation.
func (db *MemDb) FetchAddrIndexTip() (*btcwire.ShaHash, int64, error) {
return nil, 0, btcdb.ErrNotImplemented
}
// UpdateAddrIndexForBlock isn't currently implemented. This is a part of the
// btcdb.Db interface implementation.
func (db *MemDb) UpdateAddrIndexForBlock(*btcwire.ShaHash, int64,
btcdb.BlockAddrIndex) error {
return btcdb.ErrNotImplemented
}
// FetchTxsForAddr isn't currently implemented. This is a part of the btcdb.Db
// interface implementation.
func (db *MemDb) FetchTxsForAddr(btcutil.Address, int, int) ([]*btcdb.TxListReply, error) {
return nil, btcdb.ErrNotImplemented
}
// DeleteAddrIndex isn't currently implemented. This is a part of the btcdb.Db
// interface implementation.
func (db *MemDb) DeleteAddrIndex() error {
return btcdb.ErrNotImplemented
}
// RollbackClose discards the recent database changes to the previously saved // RollbackClose discards the recent database changes to the previously saved
// data at last Sync and closes the database. This is part of the btcdb.Db // data at last Sync and closes the database. This is part of the btcdb.Db
// interface implementation. // interface implementation.