Fix #303 and #346 and change addrindex sort order

Fix #303 by changing the addrindex key prefix to 3 characters so that
it's easy to check length when dropping the index. To drop the old
index, check to make sure we aren't dropping any entries that end in
"sx" or "tx" as those aren't part of the addrindex. Update test to
deal with the new prefix length.

Fix #346 by changing the pointers in the mempool's addrindex map to
wire.ShaHash 32-byte values. This lets them be deleted even if the
transaction data changes places in memory upon expanding the maps.

Change the way addrindex uint32s are stored to big-endian in order to
sort the transactions on disk in chronological/dependency order.

Change the "searchrawtransactions" RPC call to return transactions
from the database before the memory pool so that they're returned in
order. This commit DOES NOT do topological sorting of the memory pool
transactions to ensure they're returned in dependency order. This may
be a good idea for a future enhancement.

Add addrindex versioning to automatically drop the old/incompatible
version of the index and rebuild with the new sort method and key
prefix.
This commit is contained in:
Alex Akselrod 2015-03-19 09:56:06 -04:00
parent ccc3a9b979
commit ead39153af
7 changed files with 174 additions and 53 deletions

View file

@ -14,7 +14,7 @@ import (
// Errors that the various database functions may return.
var (
ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet")
ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built or is an older version")
ErrUnsupportedAddressType = errors.New("address type is not supported " +
"by the address-index")
ErrPrevShaMissing = errors.New("previous sha missing from database")

View file

@ -286,6 +286,28 @@ func (db *LevelDb) NewestSha() (rsha *wire.ShaHash, rblkid int64, err error) {
return &sha, db.lastBlkIdx, nil
}
// checkAddrIndexVersion returns an error if the address index version stored
// in the database is less than the current version, or if it doesn't exist.
// This function is used on startup to signal OpenDB to drop the address index
// if it's in an old, incompatible format.
func (db *LevelDb) checkAddrIndexVersion() error {
db.dbLock.Lock()
defer db.dbLock.Unlock()
data, err := db.lDb.Get(addrIndexVersionKey, db.ro)
if err != nil {
return database.ErrAddrIndexDoesNotExist
}
indexVersion := binary.LittleEndian.Uint16(data)
if indexVersion != uint16(addrIndexCurrentVersion) {
return database.ErrAddrIndexDoesNotExist
}
return nil
}
// fetchAddrIndexTip returns the last block height and block sha to be indexed.
// Meta-data about the address tip is currently cached in memory, and will be
// updated accordingly by functions that modify the state. This function is

View file

@ -28,7 +28,7 @@ func TestAddrIndexKeySerialization(t *testing.T) {
}
serializedKey := addrIndexToKey(&fakeIndex)
copy(packedIndex[:], serializedKey[22:34])
copy(packedIndex[:], serializedKey[23:35])
unpackedIndex := unpackTxIndex(packedIndex)
if unpackedIndex.blkHeight != fakeIndex.blkHeight {

View file

@ -80,6 +80,9 @@ func parseArgs(funcName string, args ...interface{}) (string, error) {
return dbPath, nil
}
// CurrentDBVersion is the database version.
var CurrentDBVersion int32 = 1
// OpenDB opens an existing database for use.
func OpenDB(args ...interface{}) (database.Db, error) {
dbpath, err := parseArgs("OpenDB", args...)
@ -141,10 +144,20 @@ blocknarrow:
}
}
log.Infof("Checking address index")
// Load the last block whose transactions have been indexed by address.
if sha, idx, err := ldb.fetchAddrIndexTip(); err == nil {
ldb.lastAddrIndexBlkSha = *sha
ldb.lastAddrIndexBlkIdx = idx
if err = ldb.checkAddrIndexVersion(); err == nil {
ldb.lastAddrIndexBlkSha = *sha
ldb.lastAddrIndexBlkIdx = idx
log.Infof("Address index good, continuing")
} else {
log.Infof("Address index in old, incompatible format, dropping...")
ldb.deleteOldAddrIndex()
ldb.DeleteAddrIndex()
log.Infof("Old, incompatible address index dropped and can now be rebuilt")
}
} else {
ldb.lastAddrIndexBlkIdx = -1
}
@ -156,9 +169,6 @@ blocknarrow:
return db, nil
}
// CurrentDBVersion is the database version.
var CurrentDBVersion int32 = 1
func openDB(dbpath string, create bool) (pbdb database.Db, err error) {
var db LevelDb
var tlDb *leveldb.DB
@ -630,15 +640,20 @@ func shaBlkToKey(sha *wire.ShaHash) []byte {
return shaB
}
// These are used here and in tx.go's deleteOldAddrIndex() to prevent deletion
// of indexes other than the addrindex now.
var recordSuffixTx = []byte{'t', 'x'}
var recordSuffixSpentTx = []byte{'s', 'x'}
func shaTxToKey(sha *wire.ShaHash) []byte {
shaB := sha.Bytes()
shaB = append(shaB, "tx"...)
shaB = append(shaB, recordSuffixTx...)
return shaB
}
func shaSpentTxToKey(sha *wire.ShaHash) []byte {
shaB := sha.Bytes()
shaB = append(shaB, "sx"...)
shaB = append(shaB, recordSuffixSpentTx...)
return shaB
}

View file

@ -22,18 +22,27 @@ const (
// --------------------------------------------------------
// | Prefix | Hash160 | BlkHeight | Tx Offset | Tx Size |
// --------------------------------------------------------
// | 2 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes |
// | 3 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes |
// --------------------------------------------------------
addrIndexKeyLength = 2 + ripemd160.Size + 4 + 4 + 4
addrIndexKeyLength = 3 + ripemd160.Size + 4 + 4 + 4
batchDeleteThreshold = 10000
addrIndexCurrentVersion = 1
)
var addrIndexMetaDataKey = []byte("addrindex")
// All address index entries share this prefix to facilitate the use of
// iterators.
var addrIndexKeyPrefix = []byte("a-")
var addrIndexKeyPrefix = []byte("a+-")
// Address index version is required to drop/rebuild address index if version
// is older than current as the format of the index may have changed. This is
// true when going from no version to version 1 as the address index is stored
// as big endian in version 1 and little endian in the original code. Version
// is stored as two bytes, little endian (to match all the code but the index).
var addrIndexVersionKey = []byte("addrindexversion")
type txUpdateObj struct {
txSha *wire.ShaHash
@ -372,15 +381,19 @@ func (db *LevelDb) FetchTxBySha(txsha *wire.ShaHash) ([]*database.TxListReply, e
}
// addrIndexToKey serializes the passed txAddrIndex for storage within the DB.
// We want to use BigEndian to store at least block height and TX offset
// in order to ensure that the transactions are sorted in the index.
// This gives us the ability to use the index in more client-side
// applications that are order-dependent (specifically by dependency).
func addrIndexToKey(index *txAddrIndex) []byte {
record := make([]byte, addrIndexKeyLength, addrIndexKeyLength)
copy(record[0:2], addrIndexKeyPrefix)
copy(record[2:22], index.hash160[:])
copy(record[0:3], addrIndexKeyPrefix)
copy(record[3:23], index.hash160[:])
// The index itself.
binary.LittleEndian.PutUint32(record[22:26], uint32(index.blkHeight))
binary.LittleEndian.PutUint32(record[26:30], uint32(index.txoffset))
binary.LittleEndian.PutUint32(record[30:34], uint32(index.txlen))
binary.BigEndian.PutUint32(record[23:27], uint32(index.blkHeight))
binary.BigEndian.PutUint32(record[27:31], uint32(index.txoffset))
binary.BigEndian.PutUint32(record[31:35], uint32(index.txlen))
return record
}
@ -388,9 +401,9 @@ func addrIndexToKey(index *txAddrIndex) []byte {
// unpackTxIndex deserializes the raw bytes of a address tx index.
func unpackTxIndex(rawIndex [12]byte) *txAddrIndex {
return &txAddrIndex{
blkHeight: int64(binary.LittleEndian.Uint32(rawIndex[0:4])),
txoffset: int(binary.LittleEndian.Uint32(rawIndex[4:8])),
txlen: int(binary.LittleEndian.Uint32(rawIndex[8:12])),
blkHeight: int64(binary.BigEndian.Uint32(rawIndex[0:4])),
txoffset: int(binary.BigEndian.Uint32(rawIndex[4:8])),
txlen: int(binary.BigEndian.Uint32(rawIndex[8:12])),
}
}
@ -446,9 +459,9 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int,
}
// Create the prefix for our search.
addrPrefix := make([]byte, 22, 22)
copy(addrPrefix[0:2], addrIndexKeyPrefix)
copy(addrPrefix[2:22], addrKey)
addrPrefix := make([]byte, 23, 23)
copy(addrPrefix[0:3], addrIndexKeyPrefix)
copy(addrPrefix[3:23], addrKey)
iter := db.lDb.NewIterator(bytesPrefix(addrPrefix), nil)
for skip != 0 && iter.Next() {
@ -459,7 +472,7 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int,
var replies []*database.TxListReply
var rawIndex [12]byte
for iter.Next() && limit != 0 {
copy(rawIndex[:], iter.Key()[22:34])
copy(rawIndex[:], iter.Key()[23:35])
addrIndex := unpackTxIndex(rawIndex)
tx, blkSha, blkHeight, _, err := db.fetchTxDataByLoc(addrIndex.blkHeight,
@ -528,6 +541,12 @@ func (db *LevelDb) UpdateAddrIndexForBlock(blkSha *wire.ShaHash, blkHeight int64
binary.LittleEndian.PutUint64(newIndexTip[32:40], uint64(blkHeight))
batch.Put(addrIndexMetaDataKey, newIndexTip)
// Ensure we're writing an address index version
newIndexVersion := make([]byte, 2, 2)
binary.LittleEndian.PutUint16(newIndexVersion[0:2],
uint16(addrIndexCurrentVersion))
batch.Put(addrIndexVersionKey, newIndexVersion)
if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
@ -552,9 +571,12 @@ func (db *LevelDb) DeleteAddrIndex() error {
numInBatch := 0
for iter.Next() {
key := iter.Key()
batch.Delete(key)
numInBatch++
// With a 24-bit index key prefix, 1 in every 2^24 keys is a collision.
// We check the length to make sure we only delete address index keys.
if len(key) == addrIndexKeyLength {
batch.Delete(key)
numInBatch++
}
// Delete in chunks to potentially avoid very large batches.
if numInBatch >= batchDeleteThreshold {
@ -572,6 +594,61 @@ func (db *LevelDb) DeleteAddrIndex() error {
}
batch.Delete(addrIndexMetaDataKey)
batch.Delete(addrIndexVersionKey)
if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}
db.lastAddrIndexBlkIdx = -1
db.lastAddrIndexBlkSha = wire.ShaHash{}
return nil
}
// deleteOldAddrIndex deletes the entire addrindex stored within the DB for a
// 2-byte addrIndexKeyPrefix. It also resets the cached in-memory metadata about
// the addr index.
func (db *LevelDb) deleteOldAddrIndex() error {
db.dbLock.Lock()
defer db.dbLock.Unlock()
batch := db.lBatch()
defer batch.Reset()
// Delete the entire index along with any metadata about it.
iter := db.lDb.NewIterator(bytesPrefix([]byte("a-")), db.ro)
numInBatch := 0
for iter.Next() {
key := iter.Key()
// With a 24-bit index key prefix, 1 in every 2^24 keys is a collision.
// We check the length to make sure we only delete address index keys.
// We also check the last two bytes to make sure the suffix doesn't
// match other types of index that are 34 bytes long.
if len(key) == 34 && !bytes.HasSuffix(key, recordSuffixTx) &&
!bytes.HasSuffix(key, recordSuffixSpentTx) {
batch.Delete(key)
numInBatch++
}
// Delete in chunks to potentially avoid very large batches.
if numInBatch >= batchDeleteThreshold {
if err := db.lDb.Write(batch, db.wo); err != nil {
iter.Release()
return err
}
batch.Reset()
numInBatch = 0
}
}
iter.Release()
if err := iter.Error(); err != nil {
return err
}
batch.Delete(addrIndexMetaDataKey)
batch.Delete(addrIndexVersionKey)
if err := db.lDb.Write(batch, db.wo); err != nil {
return err
}

View file

@ -97,7 +97,7 @@ type txMemPool struct {
pool map[wire.ShaHash]*TxDesc
orphans map[wire.ShaHash]*btcutil.Tx
orphansByPrev map[wire.ShaHash]*list.List
addrindex map[string]map[*btcutil.Tx]struct{} // maps address to txs
addrindex map[string]map[wire.ShaHash]struct{} // maps address to txs
outpoints map[wire.OutPoint]*btcutil.Tx
lastUpdated time.Time // last time pool was updated
pennyTotal float64 // exponentially decaying total for penny spends.
@ -653,7 +653,7 @@ func (mp *txMemPool) removeScriptFromAddrIndex(pkScript []byte, tx *btcutil.Tx)
return err
}
for _, addr := range addresses {
delete(mp.addrindex[addr.EncodeAddress()], tx)
delete(mp.addrindex[addr.EncodeAddress()], *tx.Sha())
}
return nil
@ -777,9 +777,9 @@ func (mp *txMemPool) indexScriptAddressToTx(pkScript []byte, tx *btcutil.Tx) err
for _, addr := range addresses {
if mp.addrindex[addr.EncodeAddress()] == nil {
mp.addrindex[addr.EncodeAddress()] = make(map[*btcutil.Tx]struct{})
mp.addrindex[addr.EncodeAddress()] = make(map[wire.ShaHash]struct{})
}
mp.addrindex[addr.EncodeAddress()][tx] = struct{}{}
mp.addrindex[addr.EncodeAddress()][*tx.Sha()] = struct{}{}
}
return nil
@ -965,8 +965,10 @@ func (mp *txMemPool) FilterTransactionsByAddress(addr btcutil.Address) ([]*btcut
if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists {
addressTxs := make([]*btcutil.Tx, 0, len(txs))
for tx := range txs {
addressTxs = append(addressTxs, tx)
for txHash := range txs {
if tx, exists := mp.pool[txHash]; exists {
addressTxs = append(addressTxs, tx.Tx)
}
}
return addressTxs, nil
}
@ -1494,7 +1496,7 @@ func newTxMemPool(server *server) *txMemPool {
outpoints: make(map[wire.OutPoint]*btcutil.Tx),
}
if cfg.AddrIndex {
memPool.addrindex = make(map[string]map[*btcutil.Tx]struct{})
memPool.addrindex = make(map[string]map[wire.ShaHash]struct{})
}
return memPool
}

View file

@ -2598,15 +2598,6 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
var addressTxs []*database.TxListReply
// First check the mempool for relevent transactions.
memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr)
if err == nil && len(memPoolTxs) != 0 {
for _, tx := range memPoolTxs {
txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()}
addressTxs = append(addressTxs, txReply)
}
}
var numRequested, numToSkip int
if c.Count != nil {
numRequested = *c.Count
@ -2620,17 +2611,31 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan
numToSkip = 0
}
}
if len(addressTxs) >= numRequested {
// Tx's in the mempool exceed the requested number of tx's.
// Slice off any possible overflow.
addressTxs = addressTxs[:numRequested]
} else {
// Otherwise, we'll also take a look into the database.
dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip,
numRequested-len(addressTxs))
if err == nil && len(dbTxs) != 0 {
for _, txReply := range dbTxs {
// While it's more efficient to check the mempool for relevant transactions
// first, we want to return results in order of occurrence/dependency so
// we'll check the mempool only if there aren't enough results returned
// by the database.
dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip,
numRequested-len(addressTxs))
if err == nil {
for _, txReply := range dbTxs {
addressTxs = append(addressTxs, txReply)
}
}
// This code (and txMemPool.FilterTransactionsByAddress()) doesn't sort by
// dependency. This might be something we want to do in the future when we
// return results for the client's convenience, or leave it to the client.
if len(addressTxs) < numRequested {
memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr)
if err == nil {
for _, tx := range memPoolTxs {
txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()}
addressTxs = append(addressTxs, txReply)
if len(addressTxs) == numRequested {
break
}
}
}
}