From b284bf0f909aaea8fa8f6790200a09a756ee066c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 24 Dec 2014 17:55:14 -0600 Subject: [PATCH] Implement ldb database functionality for optional addrindex. --- db.go | 41 +++++ db_test.go | 2 +- ldb/block.go | 38 +++++ ldb/boundary_test.go | 3 +- ldb/dup_test.go | 2 +- ldb/insertremove_test.go | 2 +- ldb/internal_test.go | 65 +++++-- ldb/leveldb.go | 13 +- ldb/operational_test.go | 360 ++++++++++++++++++++++++++++++++------- ldb/tx.go | 237 +++++++++++++++++++++++++- memdb/memdb.go | 25 +++ 11 files changed, 707 insertions(+), 81 deletions(-) diff --git a/db.go b/db.go index 9753efbb..93e5291f 100644 --- a/db.go +++ b/db.go @@ -9,16 +9,21 @@ import ( "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwire" + "golang.org/x/crypto/ripemd160" ) // Errors that the various database functions may return. var ( + ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet") + ErrUnsupportedAddressType = errors.New("address type is not supported " + + "by the address-index") ErrPrevShaMissing = errors.New("previous sha missing from database") ErrTxShaMissing = errors.New("requested transaction does not exist") ErrBlockShaMissing = errors.New("requested block does not exist") ErrDuplicateSha = errors.New("duplicate insert attempted") ErrDbDoesNotExist = errors.New("non-existent database") ErrDbUnknownType = errors.New("non-existent database type") + ErrNotImplemented = errors.New("method has not yet been implemented") ) // AllShas is a special value that can be used as the final sha when requesting @@ -106,6 +111,34 @@ type Db interface { // the database yet. NewestSha() (sha *btcwire.ShaHash, height int64, err error) + // FetchAddrIndexTip returns the hash and block height of the most recent + // block which has had its address index populated. It will return + // ErrAddrIndexDoesNotExist along with a zero hash, and -1 if the + // addrindex hasn't yet been built up. + FetchAddrIndexTip() (sha *btcwire.ShaHash, height int64, err error) + + // UpdateAddrIndexForBlock updates the stored addrindex with passed + // index information for a particular block height. Additionally, it + // will update the stored meta-data related to the curent tip of the + // addr index. These two operations are performed in an atomic + // transaction which is commited before the function returns. + // Addresses are indexed by the raw bytes of their base58 decoded + // hash160. + UpdateAddrIndexForBlock(blkSha *btcwire.ShaHash, height int64, + addrIndex BlockAddrIndex) error + + // FetchTxsForAddr looks up and returns all transactions which either + // spend a previously created output of the passed address, or create + // a new output locked to the passed address. The, `limit` parameter + // should be the max number of transactions to be returned. + // Additionally, if the caller wishes to skip forward in the results + // some amount, the 'seek' represents how many results to skip. + // NOTE: Values for both `seek` and `limit` MUST be positive. + FetchTxsForAddr(addr btcutil.Address, skip int, limit int) ([]*TxListReply, error) + + // DeleteAddrIndex deletes the entire addrindex stored within the DB. + DeleteAddrIndex() error + // RollbackClose discards the recent database changes to the previously // saved data at last Sync and closes the database. RollbackClose() (err error) @@ -134,6 +167,14 @@ type TxListReply struct { Err error } +// AddrIndexKeySize is the number of bytes used by keys into the BlockAddrIndex. +const AddrIndexKeySize = ripemd160.Size + +// BlockAddrIndex represents the indexing structure for addresses. +// It maps a hash160 to a list of transaction locations within a block that +// either pays to or spends from the passed UTXO for the hash160. +type BlockAddrIndex map[[AddrIndexKeySize]byte][]*btcwire.TxLoc + // driverList holds all of the registered database backends. var driverList []DriverDB diff --git a/db_test.go b/db_test.go index d96a7e8a..b14804cd 100644 --- a/db_test.go +++ b/db_test.go @@ -19,7 +19,7 @@ var ( ignoreDbTypes = map[string]bool{"createopenfail": true} ) -// testNewestShaEmpty ensures the NewestSha returns the values expected by +// testNewestShaEmpty ensures that NewestSha returns the values expected by // the interface contract. func testNewestShaEmpty(t *testing.T, db btcdb.Db) { sha, height, err := db.NewestSha() diff --git a/ldb/block.go b/ldb/block.go index c5b4aa23..3e6c84f2 100644 --- a/ldb/block.go +++ b/ldb/block.go @@ -290,3 +290,41 @@ func (db *LevelDb) NewestSha() (rsha *btcwire.ShaHash, rblkid int64, err error) return &sha, db.lastBlkIdx, nil } + +// fetchAddrIndexTip returns the last block height and block sha to be indexed. +// Meta-data about the address tip is currently cached in memory, and will be +// updated accordingly by functions that modify the state. This function is +// used on start up to load the info into memory. Callers will use the public +// version of this function below, which returns our cached copy. +func (db *LevelDb) fetchAddrIndexTip() (*btcwire.ShaHash, int64, error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + data, err := db.lDb.Get(addrIndexMetaDataKey, db.ro) + if err != nil { + return &btcwire.ShaHash{}, -1, btcdb.ErrAddrIndexDoesNotExist + } + + var blkSha btcwire.ShaHash + blkSha.SetBytes(data[0:32]) + + blkHeight := binary.LittleEndian.Uint64(data[32:]) + + return &blkSha, int64(blkHeight), nil +} + +// FetchAddrIndexTip returns the hash and block height of the most recent +// block whose transactions have been indexed by address. It will return +// ErrAddrIndexDoesNotExist along with a zero hash, and -1 if the +// addrindex hasn't yet been built up. +func (db *LevelDb) FetchAddrIndexTip() (*btcwire.ShaHash, int64, error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + if db.lastAddrIndexBlkIdx == -1 { + return &btcwire.ShaHash{}, -1, btcdb.ErrAddrIndexDoesNotExist + } + sha := db.lastAddrIndexBlkSha + + return &sha, db.lastAddrIndexBlkIdx, nil +} diff --git a/ldb/boundary_test.go b/ldb/boundary_test.go index d3f10893..6d963d96 100644 --- a/ldb/boundary_test.go +++ b/ldb/boundary_test.go @@ -12,7 +12,8 @@ import ( "github.com/btcsuite/btcwire" ) -// we need to test for empty databas and make certain it returns proper value +// we need to test for an empty database and make certain it returns the proper +// values func TestEmptyDB(t *testing.T) { diff --git a/ldb/dup_test.go b/ldb/dup_test.go index 10557203..302aa05a 100644 --- a/ldb/dup_test.go +++ b/ldb/dup_test.go @@ -110,7 +110,7 @@ out: lastSha = blkSha } - // genrate a new block based on the last sha + // generate a new block based on the last sha // these block are not verified, so there are a bunch of garbage fields // in the 'generated' block. diff --git a/ldb/insertremove_test.go b/ldb/insertremove_test.go index dfa68b92..dd8ab524 100644 --- a/ldb/insertremove_test.go +++ b/ldb/insertremove_test.go @@ -66,7 +66,7 @@ endtest: for height := int64(0); height < int64(len(blocks)); height++ { block := blocks[height] - // look up inputs to this x + // look up inputs to this tx mblock := block.MsgBlock() var txneededList []*btcwire.ShaHash var txlookupList []*btcwire.ShaHash diff --git a/ldb/internal_test.go b/ldb/internal_test.go index f0cf74bc..d0fc00bd 100644 --- a/ldb/internal_test.go +++ b/ldb/internal_test.go @@ -5,21 +5,60 @@ package ldb import ( - "fmt" + "bytes" - "github.com/btcsuite/btcdb" - "github.com/btcsuite/btcwire" + "testing" + + "github.com/btcsuite/btcutil" + "golang.org/x/crypto/ripemd160" ) -// FetchSha returns the datablock and pver for the given ShaHash. -// This is a testing only interface. -func FetchSha(db btcdb.Db, sha *btcwire.ShaHash) (buf []byte, pver uint32, - blkid int64, err error) { - sqldb, ok := db.(*LevelDb) - if !ok { - err = fmt.Errorf("invalid data type") - return +func TestAddrIndexKeySerialization(t *testing.T) { + var hash160Bytes [ripemd160.Size]byte + + fakeHash160 := btcutil.Hash160([]byte("testing")) + copy(fakeHash160, hash160Bytes[:]) + + fakeIndex := txAddrIndex{ + hash160: hash160Bytes, + blkHeight: 1, + txoffset: 5, + txlen: 360, + } + + serializedKey := addrIndexToKey(&fakeIndex) + unpackedIndex := unpackTxIndex(serializedKey[22:]) + + if unpackedIndex.blkHeight != fakeIndex.blkHeight { + t.Errorf("Incorrect block height. Unpack addr index key"+ + "serialization failed. Expected %d, received %d", + 1, unpackedIndex.blkHeight) + } + + if unpackedIndex.txoffset != fakeIndex.txoffset { + t.Errorf("Incorrect tx offset. Unpack addr index key"+ + "serialization failed. Expected %d, received %d", + 5, unpackedIndex.txoffset) + } + + if unpackedIndex.txlen != fakeIndex.txlen { + t.Errorf("Incorrect tx len. Unpack addr index key"+ + "serialization failed. Expected %d, received %d", + 360, unpackedIndex.txlen) + } +} + +func TestBytesPrefix(t *testing.T) { + testKey := []byte("a") + + prefixRange := bytesPrefix(testKey) + if !bytes.Equal(prefixRange.Start, []byte("a")) { + t.Errorf("Wrong prefix start, got %d, expected %d", prefixRange.Start, + []byte("a")) + } + + if !bytes.Equal(prefixRange.Limit, []byte("b")) { + t.Errorf("Wrong prefix end, got %d, expected %d", prefixRange.Limit, + []byte("b")) } - buf, blkid, err = sqldb.fetchSha(sha) - return } diff --git a/ldb/leveldb.go b/ldb/leveldb.go index 64b72e57..cc946fbb 100644 --- a/ldb/leveldb.go +++ b/ldb/leveldb.go @@ -53,6 +53,9 @@ type LevelDb struct { lastBlkSha btcwire.ShaHash lastBlkIdx int64 + lastAddrIndexBlkSha btcwire.ShaHash + lastAddrIndexBlkIdx int64 + txUpdateMap map[btcwire.ShaHash]*txUpdateObj txSpentUpdateMap map[btcwire.ShaHash]*spentTxUpdate } @@ -92,7 +95,6 @@ func OpenDB(args ...interface{}) (btcdb.Db, error) { } // Need to find last block and tx - var lastknownblock, nextunknownblock, testblock int64 increment := int64(100000) @@ -139,6 +141,14 @@ blocknarrow: } } + // Load the last block whose transactions have been indexed by address. + if sha, idx, err := ldb.fetchAddrIndexTip(); err == nil { + ldb.lastAddrIndexBlkSha = *sha + ldb.lastAddrIndexBlkIdx = idx + } else { + ldb.lastAddrIndexBlkIdx = -1 + } + ldb.lastBlkSha = *lastSha ldb.lastBlkIdx = lastknownblock ldb.nextBlock = lastknownblock + 1 @@ -250,6 +260,7 @@ func CreateDB(args ...interface{}) (btcdb.Db, error) { if err == nil { ldb := db.(*LevelDb) ldb.lastBlkIdx = -1 + ldb.lastAddrIndexBlkIdx = -1 ldb.nextBlock = 0 } return db, err diff --git a/ldb/operational_test.go b/ldb/operational_test.go index bee33101..6cb16422 100644 --- a/ldb/operational_test.go +++ b/ldb/operational_test.go @@ -5,6 +5,7 @@ package ldb_test import ( + "bytes" "compress/bzip2" "encoding/binary" "fmt" @@ -16,22 +17,26 @@ import ( "github.com/btcsuite/btcdb" "github.com/btcsuite/btcnet" + "github.com/btcsuite/btcscript" "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwire" + "golang.org/x/crypto/ripemd160" ) var network = btcwire.MainNet -func TestOperational(t *testing.T) { - testOperationalMode(t) +// testDb is used to store db related context for a running test. +// the `cleanUpFunc` *must* be called after each test to maintain db +// consistency across tests. +type testDb struct { + db btcdb.Db + blocks []*btcutil.Block + dbName string + dbNameVer string + cleanUpFunc func() } -func testOperationalMode(t *testing.T) { - // simplified basic operation is: - // 1) fetch block from remote server - // 2) look up all txin (except coinbase in db) - // 3) insert block - +func setUpTestDb(t *testing.T) (*testDb, error) { // Ignore db remove errors since it means we didn't have an old one. dbname := fmt.Sprintf("tstdbop1") dbnamever := dbname + ".ver" @@ -39,28 +44,181 @@ func testOperationalMode(t *testing.T) { _ = os.RemoveAll(dbnamever) db, err := btcdb.CreateDB("leveldb", dbname) if err != nil { - t.Errorf("Failed to open test database %v", err) - return + return nil, err } - defer os.RemoveAll(dbname) - defer os.RemoveAll(dbnamever) - defer func() { - if err := db.Close(); err != nil { - t.Errorf("Close: unexpected error: %v", err) - } - }() testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2") blocks, err := loadBlocks(t, testdatafile) + if err != nil { + return nil, err + } + + cleanUp := func() { + db.Close() + os.RemoveAll(dbname) + os.RemoveAll(dbnamever) + } + + return &testDb{ + db: db, + blocks: blocks, + dbName: dbname, + dbNameVer: dbnamever, + cleanUpFunc: cleanUp, + }, nil +} + +func TestOperational(t *testing.T) { + testOperationalMode(t) +} + +// testAddrIndexOperations ensures that all normal operations concerning +// the optional address index function correctly. +func testAddrIndexOperations(t *testing.T, db btcdb.Db, newestBlock *btcutil.Block, newestSha *btcwire.ShaHash, newestBlockIdx int64) { + // Metadata about the current addr index state should be unset. + sha, height, err := db.FetchAddrIndexTip() + if err != btcdb.ErrAddrIndexDoesNotExist { + t.Fatalf("Address index metadata shouldn't be in db, hasn't been built up yet.") + } + + var zeroHash btcwire.ShaHash + if !sha.IsEqual(&zeroHash) { + t.Fatalf("AddrIndexTip wrong hash got: %s, want %s", sha, &zeroHash) + + } + + if height != -1 { + t.Fatalf("Addrindex not built up, yet a block index tip has been set to: %d.", height) + } + + // Test enforcement of constraints for "limit" and "skip" + var fakeAddr btcutil.Address + _, err = db.FetchTxsForAddr(fakeAddr, -1, 0) + if err == nil { + t.Fatalf("Negative value for skip passed, should return an error") + } + + _, err = db.FetchTxsForAddr(fakeAddr, 0, -1) + if err == nil { + t.Fatalf("Negative value for limit passed, should return an error") + } + + // Simple test to index outputs(s) of the first tx. + testIndex := make(btcdb.BlockAddrIndex) + testTx, err := newestBlock.Tx(0) + if err != nil { + t.Fatalf("Block has no transactions, unable to test addr "+ + "indexing, err %v", err) + } + + // Extract the dest addr from the tx. + _, testAddrs, _, err := btcscript.ExtractPkScriptAddrs(testTx.MsgTx().TxOut[0].PkScript, &btcnet.MainNetParams) + if err != nil { + t.Fatalf("Unable to decode tx output, err %v", err) + } + + // Extract the hash160 from the output script. + var hash160Bytes [ripemd160.Size]byte + testHash160 := testAddrs[0].(*btcutil.AddressPubKey).AddressPubKeyHash().ScriptAddress() + copy(hash160Bytes[:], testHash160[:]) + + // Create a fake index. + blktxLoc, _ := newestBlock.TxLoc() + testIndex[hash160Bytes] = []*btcwire.TxLoc{&blktxLoc[0]} + + // Insert our test addr index into the DB. + err = db.UpdateAddrIndexForBlock(newestSha, newestBlockIdx, testIndex) + if err != nil { + t.Fatalf("UpdateAddrIndexForBlock: failed to index"+ + " addrs for block #%d (%s) "+ + "err %v", newestBlockIdx, newestSha, err) + } + + // Chain Tip of address should've been updated. + assertAddrIndexTipIsUpdated(db, t, newestSha, newestBlockIdx) + + // Check index retrieval. + txReplies, err := db.FetchTxsForAddr(testAddrs[0], 0, 1000) + if err != nil { + t.Fatalf("FetchTxsForAddr failed to correctly fetch txs for an "+ + "address, err %v", err) + } + // Should have one reply. + if len(txReplies) != 1 { + t.Fatalf("Failed to properly index tx by address.") + } + + // Our test tx and indexed tx should have the same sha. + indexedTx := txReplies[0] + if !bytes.Equal(indexedTx.Sha.Bytes(), testTx.Sha().Bytes()) { + t.Fatalf("Failed to fetch proper indexed tx. Expected sha %v, "+ + "fetched %v", testTx.Sha(), indexedTx.Sha) + } + + // Shut down DB. + db.Sync() + db.Close() + + // Re-Open, tip still should be updated to current height and sha. + db, err = btcdb.OpenDB("leveldb", "tstdbop1") + if err != nil { + t.Fatalf("Unable to re-open created db, err %v", err) + } + assertAddrIndexTipIsUpdated(db, t, newestSha, newestBlockIdx) + + // Delete the entire index. + err = db.DeleteAddrIndex() + if err != nil { + t.Fatalf("Couldn't delete address index, err %v", err) + } + + // Former index should no longer exist. + txReplies, err = db.FetchTxsForAddr(testAddrs[0], 0, 1000) + if err != nil { + t.Fatalf("Unable to fetch transactions for address: %v", err) + } + if len(txReplies) != 0 { + t.Fatalf("Address index was not successfully deleted. "+ + "Should have 0 tx's indexed, %v were returned.", + len(txReplies)) + } + + // Tip should be blanked out. + if _, _, err := db.FetchAddrIndexTip(); err != btcdb.ErrAddrIndexDoesNotExist { + t.Fatalf("Address index was not fully deleted.") + } + +} + +func assertAddrIndexTipIsUpdated(db btcdb.Db, t *testing.T, newestSha *btcwire.ShaHash, newestBlockIdx int64) { + // Safe to ignore error, since height will be < 0 in "error" case. + sha, height, _ := db.FetchAddrIndexTip() + if newestBlockIdx != height { + t.Fatalf("Height of address index tip failed to update, "+ + "expected %v, got %v", newestBlockIdx, height) + } + if !bytes.Equal(newestSha.Bytes(), sha.Bytes()) { + t.Fatalf("Sha of address index tip failed to update, "+ + "expected %v, got %v", newestSha, sha) + } +} + +func testOperationalMode(t *testing.T) { + // simplified basic operation is: + // 1) fetch block from remote server + // 2) look up all txin (except coinbase in db) + // 3) insert block + // 4) exercise the optional addridex + testDb, err := setUpTestDb(t) + defer testDb.cleanUpFunc() if err != nil { t.Errorf("Unable to load blocks from test data: %v", err) return } - err = nil out: - for height := int64(0); height < int64(len(blocks)); height++ { - block := blocks[height] + for height := int64(0); height < int64(len(testDb.blocks)); height++ { + block := testDb.blocks[height] mblock := block.MsgBlock() var txneededList []*btcwire.ShaHash for _, tx := range mblock.Transactions { @@ -71,7 +229,7 @@ out: origintxsha := &txin.PreviousOutPoint.Hash txneededList = append(txneededList, origintxsha) - exists, err := db.ExistsTxSha(origintxsha) + exists, err := testDb.db.ExistsTxSha(origintxsha) if err != nil { t.Errorf("ExistsTxSha: unexpected error %v ", err) } @@ -79,13 +237,13 @@ out: t.Errorf("referenced tx not found %v ", origintxsha) } - _, err = db.FetchTxBySha(origintxsha) + _, err = testDb.db.FetchTxBySha(origintxsha) if err != nil { t.Errorf("referenced tx not found %v err %v ", origintxsha, err) } } } - txlist := db.FetchUnSpentTxByShaList(txneededList) + txlist := testDb.db.FetchUnSpentTxByShaList(txneededList) for _, txe := range txlist { if txe.Err != nil { t.Errorf("tx list fetch failed %v err %v ", txe.Sha, txe.Err) @@ -93,7 +251,7 @@ out: } } - newheight, err := db.InsertBlock(block) + newheight, err := testDb.db.InsertBlock(block) if err != nil { t.Errorf("failed to insert block %v err %v", height, err) break out @@ -103,13 +261,13 @@ out: break out } - newSha, blkid, err := db.NewestSha() + newSha, blkid, err := testDb.db.NewestSha() if err != nil { t.Errorf("failed to obtain latest sha %v %v", height, err) } if blkid != height { - t.Errorf("height doe not match latest block height %v %v %v", blkid, height, err) + t.Errorf("height does not match latest block height %v %v %v", blkid, height, err) } blkSha, _ := block.Sha() @@ -118,8 +276,13 @@ out: } } - // now that db is populated, do some additional test - testFetchRangeHeight(t, db, blocks) + // now that the db is populated, do some additional tests + testFetchHeightRange(t, testDb.db, testDb.blocks) + + // Ensure all operations dealing with the optional address index behave + // correctly. + newSha, blkid, err := testDb.db.NewestSha() + testAddrIndexOperations(t, testDb.db, testDb.blocks[len(testDb.blocks)-1], newSha, blkid) } func TestBackout(t *testing.T) { @@ -132,43 +295,35 @@ func testBackout(t *testing.T) { // 2) look up all txin (except coinbase in db) // 3) insert block - // Ignore db remove errors since it means we didn't have an old one. - dbname := fmt.Sprintf("tstdbop2") - dbnamever := dbname + ".ver" - _ = os.RemoveAll(dbname) - _ = os.RemoveAll(dbnamever) - db, err := btcdb.CreateDB("leveldb", dbname) + testDb, err := setUpTestDb(t) + defer testDb.cleanUpFunc() + if err != nil { t.Errorf("Failed to open test database %v", err) return } - defer os.RemoveAll(dbname) - defer os.RemoveAll(dbnamever) - defer db.Close() - testdatafile := filepath.Join("..", "testdata", "blocks1-256.bz2") - blocks, err := loadBlocks(t, testdatafile) - if len(blocks) < 120 { + if len(testDb.blocks) < 120 { t.Errorf("test data too small") return } err = nil - for height := int64(0); height < int64(len(blocks)); height++ { + for height := int64(0); height < int64(len(testDb.blocks)); height++ { if height == 100 { t.Logf("Syncing at block height 100") - db.Sync() + testDb.db.Sync() } if height == 120 { t.Logf("Simulating unexpected application quit") // Simulate unexpected application quit - db.RollbackClose() + testDb.db.RollbackClose() break } - block := blocks[height] + block := testDb.blocks[height] - newheight, err := db.InsertBlock(block) + newheight, err := testDb.db.InsertBlock(block) if err != nil { t.Errorf("failed to insert block %v err %v", height, err) return @@ -182,49 +337,49 @@ func testBackout(t *testing.T) { // db was closed at height 120, so no cleanup is possible. // reopen db - db, err = btcdb.OpenDB("leveldb", dbname) + testDb.db, err = btcdb.OpenDB("leveldb", testDb.dbName) if err != nil { t.Errorf("Failed to open test database %v", err) return } defer func() { - if err := db.Close(); err != nil { + if err := testDb.db.Close(); err != nil { t.Errorf("Close: unexpected error: %v", err) } }() - sha, err := blocks[99].Sha() + sha, err := testDb.blocks[99].Sha() if err != nil { t.Errorf("failed to get block 99 sha err %v", err) return } - if _, err := db.ExistsSha(sha); err != nil { + if _, err := testDb.db.ExistsSha(sha); err != nil { t.Errorf("ExistsSha: unexpected error: %v", err) } - _, err = db.FetchBlockBySha(sha) + _, err = testDb.db.FetchBlockBySha(sha) if err != nil { t.Errorf("failed to load block 99 from db %v", err) return } - sha, err = blocks[119].Sha() + sha, err = testDb.blocks[119].Sha() if err != nil { t.Errorf("failed to get block 110 sha err %v", err) return } - if _, err := db.ExistsSha(sha); err != nil { + if _, err := testDb.db.ExistsSha(sha); err != nil { t.Errorf("ExistsSha: unexpected error: %v", err) } - _, err = db.FetchBlockBySha(sha) + _, err = testDb.db.FetchBlockBySha(sha) if err != nil { t.Errorf("loaded block 119 from db") return } - block := blocks[119] + block := testDb.blocks[119] mblock := block.MsgBlock() txsha, err := mblock.Transactions[0].TxSha() - exists, err := db.ExistsTxSha(&txsha) + exists, err := testDb.db.ExistsTxSha(&txsha) if err != nil { t.Errorf("ExistsTxSha: unexpected error %v ", err) } @@ -232,7 +387,7 @@ func testBackout(t *testing.T) { t.Errorf("tx %v not located db\n", txsha) } - _, err = db.FetchTxBySha(&txsha) + _, err = testDb.db.FetchTxBySha(&txsha) if err != nil { t.Errorf("tx %v not located db\n", txsha) return @@ -310,7 +465,7 @@ func loadBlocks(t *testing.T, file string) (blocks []*btcutil.Block, err error) return } -func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { +func testFetchHeightRange(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { var testincrement int64 = 50 var testcnt int64 = 100 @@ -322,7 +477,7 @@ func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { for i := range blocks { blockSha, err := blocks[i].Sha() if err != nil { - t.Errorf("FetchRangeHeight: unexpected failure computing block sah %v", err) + t.Errorf("FetchHeightRange: unexpected failure computing block sah %v", err) } shanames[i] = blockSha } @@ -336,16 +491,16 @@ func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { shalist, err := db.FetchHeightRange(startheight, endheight) if err != nil { - t.Errorf("FetchRangeHeight: unexpected failure looking up shas %v", err) + t.Errorf("FetchHeightRange: unexpected failure looking up shas %v", err) } if endheight == btcdb.AllShas { if int64(len(shalist)) != nBlocks-startheight { - t.Errorf("FetchRangeHeight: expected A %v shas, got %v", nBlocks-startheight, len(shalist)) + t.Errorf("FetchHeightRange: expected A %v shas, got %v", nBlocks-startheight, len(shalist)) } } else { if int64(len(shalist)) != testcnt { - t.Errorf("FetchRangeHeight: expected %v shas, got %v", testcnt, len(shalist)) + t.Errorf("FetchHeightRange: expected %v shas, got %v", testcnt, len(shalist)) } } @@ -353,9 +508,90 @@ func testFetchRangeHeight(t *testing.T, db btcdb.Db, blocks []*btcutil.Block) { sha0 := *shanames[int64(i)+startheight] sha1 := shalist[i] if sha0 != sha1 { - t.Errorf("FetchRangeHeight: mismatch sha at %v requested range %v %v: %v %v ", int64(i)+startheight, startheight, endheight, sha0, sha1) + t.Errorf("FetchHeightRange: mismatch sha at %v requested range %v %v: %v %v ", int64(i)+startheight, startheight, endheight, sha0, sha1) } } } } + +func TestLimitAndSkipFetchTxsForAddr(t *testing.T) { + testDb, err := setUpTestDb(t) + defer testDb.cleanUpFunc() + + // Insert a block with some fake test transactions. The block will have + // 10 copies of a fake transaction involving same address. + addrString := "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa" + targetAddr, err := btcutil.DecodeAddress(addrString, &btcnet.MainNetParams) + if err != nil { + t.Fatalf("Unable to decode test address: %v", err) + } + outputScript, err := btcscript.PayToAddrScript(targetAddr) + if err != nil { + t.Fatalf("Unable make test pkScript %v", err) + } + fakeTxOut := btcwire.NewTxOut(10, outputScript) + var emptyHash btcwire.ShaHash + fakeHeader := btcwire.NewBlockHeader(&emptyHash, &emptyHash, 1, 1) + msgBlock := btcwire.NewMsgBlock(fakeHeader) + for i := 0; i < 10; i++ { + mtx := btcwire.NewMsgTx() + mtx.AddTxOut(fakeTxOut) + msgBlock.AddTransaction(mtx) + } + + // Insert the test block into the DB. + testBlock := btcutil.NewBlock(msgBlock) + newheight, err := testDb.db.InsertBlock(testBlock) + if err != nil { + t.Fatalf("Unable to insert block into db: %v", err) + } + + // Create and insert an address index for out test addr. + txLoc, _ := testBlock.TxLoc() + index := make(btcdb.BlockAddrIndex) + for i := range testBlock.Transactions() { + var hash160 [ripemd160.Size]byte + scriptAddr := targetAddr.ScriptAddress() + copy(hash160[:], scriptAddr[:]) + index[hash160] = append(index[hash160], &txLoc[i]) + } + blkSha, _ := testBlock.Sha() + err = testDb.db.UpdateAddrIndexForBlock(blkSha, newheight, index) + if err != nil { + t.Fatalf("UpdateAddrIndexForBlock: failed to index"+ + " addrs for block #%d (%s) "+ + "err %v", newheight, blkSha, err) + return + } + + // Try skipping the first 4 results, should get 6 in return. + txReply, err := testDb.db.FetchTxsForAddr(targetAddr, 4, 100000) + if err != nil { + t.Fatalf("Unable to fetch transactions for address: %v", err) + } + if len(txReply) != 6 { + t.Fatalf("Did not correctly skip forward in txs for address reply"+ + " got %v txs, expected %v", len(txReply), 6) + } + + // Limit the number of results to 3. + txReply, err = testDb.db.FetchTxsForAddr(targetAddr, 0, 3) + if err != nil { + t.Fatalf("Unable to fetch transactions for address: %v", err) + } + if len(txReply) != 3 { + t.Fatalf("Did not correctly limit in txs for address reply"+ + " got %v txs, expected %v", len(txReply), 3) + } + + // Skip 1, limit 5. + txReply, err = testDb.db.FetchTxsForAddr(targetAddr, 1, 5) + if err != nil { + t.Fatalf("Unable to fetch transactions for address: %v", err) + } + if len(txReply) != 5 { + t.Fatalf("Did not correctly limit in txs for address reply"+ + " got %v txs, expected %v", len(txReply), 5) + } +} diff --git a/ldb/tx.go b/ldb/tx.go index 8d28fed6..aff215a2 100644 --- a/ldb/tx.go +++ b/ldb/tx.go @@ -7,12 +7,35 @@ package ldb import ( "bytes" "encoding/binary" + "errors" "github.com/btcsuite/btcdb" + "github.com/btcsuite/btcutil" "github.com/btcsuite/btcwire" "github.com/btcsuite/goleveldb/leveldb" + "github.com/btcsuite/goleveldb/leveldb/util" + + "golang.org/x/crypto/ripemd160" ) +const ( + // Each address index is 34 bytes: + // -------------------------------------------------------- + // | Prefix | Hash160 | BlkHeight | Tx Offset | Tx Size | + // -------------------------------------------------------- + // | 2 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes | + // -------------------------------------------------------- + addrIndexKeyLength = 2 + ripemd160.Size + 4 + 4 + 4 + + batchDeleteThreshold = 10000 +) + +var addrIndexMetaDataKey = []byte("addrindex") + +// All address index entries share this prefix to facilitate the use of +// iterators. +var addrIndexKeyPrefix = []byte("a-") + type txUpdateObj struct { txSha *btcwire.ShaHash blkHeight int64 @@ -35,6 +58,13 @@ type spentTxUpdate struct { delete bool } +type txAddrIndex struct { + hash160 [ripemd160.Size]byte + blkHeight int64 + txoffset int + txlen int +} + // InsertTx inserts a tx hash and its associated data into the database. func (db *LevelDb) InsertTx(txsha *btcwire.ShaHash, height int64, txoff int, txlen int, spentbuf []byte) (err error) { db.dbLock.Lock() @@ -188,7 +218,7 @@ func (db *LevelDb) FetchTxByShaList(txShaList []*btcwire.ShaHash) []*btcdb.TxLis } if err == btcdb.ErrTxShaMissing { // if the unspent pool did not have the tx, - // look in the fully spent pool (only last instance + // look in the fully spent pool (only last instance) sTxList, fSerr := db.getTxFullySpent(txsha) if fSerr == nil && len(sTxList) != 0 { @@ -346,3 +376,208 @@ func (db *LevelDb) FetchTxBySha(txsha *btcwire.ShaHash) ([]*btcdb.TxListReply, e } return replies, nil } + +// addrIndexToKey serializes the passed txAddrIndex for storage within the DB. +func addrIndexToKey(index *txAddrIndex) []byte { + record := make([]byte, addrIndexKeyLength, addrIndexKeyLength) + copy(record[:2], addrIndexKeyPrefix) + copy(record[2:22], index.hash160[:]) + + // The index itself. + binary.LittleEndian.PutUint32(record[22:26], uint32(index.blkHeight)) + binary.LittleEndian.PutUint32(record[26:30], uint32(index.txoffset)) + binary.LittleEndian.PutUint32(record[30:34], uint32(index.txlen)) + + return record +} + +// unpackTxIndex deserializes the raw bytes of a address tx index. +func unpackTxIndex(rawIndex []byte) *txAddrIndex { + return &txAddrIndex{ + blkHeight: int64(binary.LittleEndian.Uint32(rawIndex[0:4])), + txoffset: int(binary.LittleEndian.Uint32(rawIndex[4:8])), + txlen: int(binary.LittleEndian.Uint32(rawIndex[8:12])), + } +} + +// bytesPrefix returns key range that satisfy the given prefix. +// This only applicable for the standard 'bytes comparer'. +func bytesPrefix(prefix []byte) *util.Range { + var limit []byte + for i := len(prefix) - 1; i >= 0; i-- { + c := prefix[i] + if c < 0xff { + limit = make([]byte, i+1) + copy(limit, prefix) + limit[i] = c + 1 + break + } + } + return &util.Range{Start: prefix, Limit: limit} +} + +// FetchTxsForAddr looks up and returns all transactions which either +// spend from a previously created output of the passed address, or +// create a new output locked to the passed address. The, `limit` parameter +// should be the max number of transactions to be returned. Additionally, if the +// caller wishes to seek forward in the results some amount, the 'seek' +// represents how many results to skip. +func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int, + limit int) ([]*btcdb.TxListReply, error) { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + // Enforce constraints for skip and limit. + if skip < 0 { + return nil, errors.New("offset for skip must be positive") + } + if limit < 0 { + return nil, errors.New("value for limit must be positive") + } + + // Parse address type, bailing on an unknown type. + var addrKey []byte + switch addr := addr.(type) { + case *btcutil.AddressPubKeyHash: + hash160 := addr.Hash160() + addrKey = hash160[:] + case *btcutil.AddressScriptHash: + hash160 := addr.Hash160() + addrKey = hash160[:] + case *btcutil.AddressPubKey: + hash160 := addr.AddressPubKeyHash().Hash160() + addrKey = hash160[:] + default: + return nil, btcdb.ErrUnsupportedAddressType + } + + // Create the prefix for our search. + addrPrefix := make([]byte, 22, 22) + copy(addrPrefix[:2], addrIndexKeyPrefix) + copy(addrPrefix[2:], addrKey) + + var replies []*btcdb.TxListReply + iter := db.lDb.NewIterator(bytesPrefix(addrPrefix), nil) + + for skip != 0 && iter.Next() { + skip-- + } + + // Iterate through all address indexes that match the targeted prefix. + for iter.Next() && limit != 0 { + rawIndex := make([]byte, 22, 22) + copy(rawIndex, iter.Key()[22:]) + addrIndex := unpackTxIndex(rawIndex) + + tx, blkSha, blkHeight, _, err := db.fetchTxDataByLoc(addrIndex.blkHeight, + addrIndex.txoffset, addrIndex.txlen, []byte{}) + if err != nil { + // Eat a possible error due to a potential re-org. + continue + } + + txSha, _ := tx.TxSha() + txReply := &btcdb.TxListReply{Sha: &txSha, Tx: tx, + BlkSha: blkSha, Height: blkHeight, TxSpent: []bool{}, Err: err} + + replies = append(replies, txReply) + limit-- + } + iter.Release() + + return replies, nil +} + +// UpdateAddrIndexForBlock updates the stored addrindex with passed +// index information for a particular block height. Additionally, it +// will update the stored meta-data related to the curent tip of the +// addr index. These two operations are performed in an atomic +// transaction which is commited before the function returns. +// Transactions indexed by address are stored with the following format: +// * prefix || hash160 || blockHeight || txoffset || txlen +// Indexes are stored purely in the key, with blank data for the actual value +// in order to facilitate ease of iteration by their shared prefix and +// also to allow limiting the number of returned transactions (RPC). +// Alternatively, indexes for each address could be stored as an +// append-only list for the stored value. However, this add unnecessary +// overhead when storing and retrieving since the entire list must +// be fetched each time. +func (db *LevelDb) UpdateAddrIndexForBlock(blkSha *btcwire.ShaHash, blkHeight int64, addrIndex btcdb.BlockAddrIndex) error { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + var blankData []byte + batch := db.lBatch() + defer db.lbatch.Reset() + + // Write all data for the new address indexes in a single batch + // transaction. + for addrKey, indexes := range addrIndex { + for _, txLoc := range indexes { + index := &txAddrIndex{ + hash160: addrKey, + blkHeight: blkHeight, + txoffset: txLoc.TxStart, + txlen: txLoc.TxLen, + } + // The index is stored purely in the key. + packedIndex := addrIndexToKey(index) + batch.Put(packedIndex, blankData) + } + } + + // Update tip of addrindex. + newIndexTip := make([]byte, 40, 40) + copy(newIndexTip[:32], blkSha.Bytes()) + binary.LittleEndian.PutUint64(newIndexTip[32:], uint64(blkHeight)) + batch.Put(addrIndexMetaDataKey, newIndexTip) + + if err := db.lDb.Write(batch, db.wo); err != nil { + return err + } + + db.lastAddrIndexBlkIdx = blkHeight + db.lastAddrIndexBlkSha = *blkSha + + return nil +} + +// DeleteAddrIndex deletes the entire addrindex stored within the DB. +// It also resets the cached in-memory metadata about the addr index. +func (db *LevelDb) DeleteAddrIndex() error { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + batch := db.lBatch() + defer batch.Reset() + + // Delete the entire index along with any metadata about it. + iter := db.lDb.NewIterator(bytesPrefix(addrIndexKeyPrefix), db.ro) + numInBatch := 0 + for iter.Next() { + key := iter.Key() + batch.Delete(key) + + numInBatch++ + + // Delete in chunks to potentially avoid very large batches. + if numInBatch >= batchDeleteThreshold { + if err := db.lDb.Write(batch, db.wo); err != nil { + return err + } + batch.Reset() + numInBatch = 0 + } + } + iter.Release() + + batch.Delete(addrIndexMetaDataKey) + if err := db.lDb.Write(batch, db.wo); err != nil { + return err + } + + db.lastAddrIndexBlkIdx = -1 + db.lastAddrIndexBlkSha = btcwire.ShaHash{} + + return nil +} diff --git a/memdb/memdb.go b/memdb/memdb.go index 9294309d..bacb9be6 100644 --- a/memdb/memdb.go +++ b/memdb/memdb.go @@ -698,6 +698,31 @@ func (db *MemDb) NewestSha() (*btcwire.ShaHash, int64, error) { return &blockSha, int64(numBlocks - 1), nil } +// FetchAddrIndexTip isn't currently implemented. This is a part of the +// btcdb.Db interface implementation. +func (db *MemDb) FetchAddrIndexTip() (*btcwire.ShaHash, int64, error) { + return nil, 0, btcdb.ErrNotImplemented +} + +// UpdateAddrIndexForBlock isn't currently implemented. This is a part of the +// btcdb.Db interface implementation. +func (db *MemDb) UpdateAddrIndexForBlock(*btcwire.ShaHash, int64, + btcdb.BlockAddrIndex) error { + return btcdb.ErrNotImplemented +} + +// FetchTxsForAddr isn't currently implemented. This is a part of the btcdb.Db +// interface implementation. +func (db *MemDb) FetchTxsForAddr(btcutil.Address, int, int) ([]*btcdb.TxListReply, error) { + return nil, btcdb.ErrNotImplemented +} + +// DeleteAddrIndex isn't currently implemented. This is a part of the btcdb.Db +// interface implementation. +func (db *MemDb) DeleteAddrIndex() error { + return btcdb.ErrNotImplemented +} + // RollbackClose discards the recent database changes to the previously saved // data at last Sync and closes the database. This is part of the btcdb.Db // interface implementation.