From ead39153aff20d191e4d28c39b86e82cce0924d6 Mon Sep 17 00:00:00 2001 From: Alex Akselrod Date: Thu, 19 Mar 2015 09:56:06 -0400 Subject: [PATCH] Fix #303 and #346 and change addrindex sort order Fix #303 by changing the addrindex key prefix to 3 characters so that it's easy to check length when dropping the index. To drop the old index, check to make sure we aren't dropping any entries that end in "sx" or "tx" as those aren't part of the addrindex. Update test to deal with the new prefix length. Fix #346 by changing the pointers in the mempool's addrindex map to wire.ShaHash 32-byte values. This lets them be deleted even if the transaction data changes places in memory upon expanding the maps. Change the way addrindex uint32s are stored to big-endian in order to sort the transactions on disk in chronological/dependency order. Change the "searchrawtransactions" RPC call to return transactions from the database before the memory pool so that they're returned in order. This commit DOES NOT do topological sorting of the memory pool transactions to ensure they're returned in dependency order. This may be a good idea for a future enhancement. Add addrindex versioning to automatically drop the old/incompatible version of the index and rebuild with the new sort method and key prefix. --- database/db.go | 2 +- database/ldb/block.go | 22 +++++++ database/ldb/internal_test.go | 2 +- database/ldb/leveldb.go | 29 ++++++--- database/ldb/tx.go | 113 ++++++++++++++++++++++++++++------ mempool.go | 16 ++--- rpcserver.go | 43 +++++++------ 7 files changed, 174 insertions(+), 53 deletions(-) diff --git a/database/db.go b/database/db.go index 2b8a3fae..2486a3fc 100644 --- a/database/db.go +++ b/database/db.go @@ -14,7 +14,7 @@ import ( // Errors that the various database functions may return. var ( - ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built up yet") + ErrAddrIndexDoesNotExist = errors.New("address index hasn't been built or is an older version") ErrUnsupportedAddressType = errors.New("address type is not supported " + "by the address-index") ErrPrevShaMissing = errors.New("previous sha missing from database") diff --git a/database/ldb/block.go b/database/ldb/block.go index ac755ca7..7e2f8ff2 100644 --- a/database/ldb/block.go +++ b/database/ldb/block.go @@ -286,6 +286,28 @@ func (db *LevelDb) NewestSha() (rsha *wire.ShaHash, rblkid int64, err error) { return &sha, db.lastBlkIdx, nil } +// checkAddrIndexVersion returns an error if the address index version stored +// in the database is less than the current version, or if it doesn't exist. +// This function is used on startup to signal OpenDB to drop the address index +// if it's in an old, incompatible format. +func (db *LevelDb) checkAddrIndexVersion() error { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + data, err := db.lDb.Get(addrIndexVersionKey, db.ro) + if err != nil { + return database.ErrAddrIndexDoesNotExist + } + + indexVersion := binary.LittleEndian.Uint16(data) + + if indexVersion != uint16(addrIndexCurrentVersion) { + return database.ErrAddrIndexDoesNotExist + } + + return nil +} + // fetchAddrIndexTip returns the last block height and block sha to be indexed. // Meta-data about the address tip is currently cached in memory, and will be // updated accordingly by functions that modify the state. This function is diff --git a/database/ldb/internal_test.go b/database/ldb/internal_test.go index 7601abc6..c0a954bd 100644 --- a/database/ldb/internal_test.go +++ b/database/ldb/internal_test.go @@ -28,7 +28,7 @@ func TestAddrIndexKeySerialization(t *testing.T) { } serializedKey := addrIndexToKey(&fakeIndex) - copy(packedIndex[:], serializedKey[22:34]) + copy(packedIndex[:], serializedKey[23:35]) unpackedIndex := unpackTxIndex(packedIndex) if unpackedIndex.blkHeight != fakeIndex.blkHeight { diff --git a/database/ldb/leveldb.go b/database/ldb/leveldb.go index 11632bbf..6440b5b6 100644 --- a/database/ldb/leveldb.go +++ b/database/ldb/leveldb.go @@ -80,6 +80,9 @@ func parseArgs(funcName string, args ...interface{}) (string, error) { return dbPath, nil } +// CurrentDBVersion is the database version. +var CurrentDBVersion int32 = 1 + // OpenDB opens an existing database for use. func OpenDB(args ...interface{}) (database.Db, error) { dbpath, err := parseArgs("OpenDB", args...) @@ -141,10 +144,20 @@ blocknarrow: } } + log.Infof("Checking address index") + // Load the last block whose transactions have been indexed by address. if sha, idx, err := ldb.fetchAddrIndexTip(); err == nil { - ldb.lastAddrIndexBlkSha = *sha - ldb.lastAddrIndexBlkIdx = idx + if err = ldb.checkAddrIndexVersion(); err == nil { + ldb.lastAddrIndexBlkSha = *sha + ldb.lastAddrIndexBlkIdx = idx + log.Infof("Address index good, continuing") + } else { + log.Infof("Address index in old, incompatible format, dropping...") + ldb.deleteOldAddrIndex() + ldb.DeleteAddrIndex() + log.Infof("Old, incompatible address index dropped and can now be rebuilt") + } } else { ldb.lastAddrIndexBlkIdx = -1 } @@ -156,9 +169,6 @@ blocknarrow: return db, nil } -// CurrentDBVersion is the database version. -var CurrentDBVersion int32 = 1 - func openDB(dbpath string, create bool) (pbdb database.Db, err error) { var db LevelDb var tlDb *leveldb.DB @@ -630,15 +640,20 @@ func shaBlkToKey(sha *wire.ShaHash) []byte { return shaB } +// These are used here and in tx.go's deleteOldAddrIndex() to prevent deletion +// of indexes other than the addrindex now. +var recordSuffixTx = []byte{'t', 'x'} +var recordSuffixSpentTx = []byte{'s', 'x'} + func shaTxToKey(sha *wire.ShaHash) []byte { shaB := sha.Bytes() - shaB = append(shaB, "tx"...) + shaB = append(shaB, recordSuffixTx...) return shaB } func shaSpentTxToKey(sha *wire.ShaHash) []byte { shaB := sha.Bytes() - shaB = append(shaB, "sx"...) + shaB = append(shaB, recordSuffixSpentTx...) return shaB } diff --git a/database/ldb/tx.go b/database/ldb/tx.go index eedce301..4f834e76 100644 --- a/database/ldb/tx.go +++ b/database/ldb/tx.go @@ -22,18 +22,27 @@ const ( // -------------------------------------------------------- // | Prefix | Hash160 | BlkHeight | Tx Offset | Tx Size | // -------------------------------------------------------- - // | 2 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes | + // | 3 bytes | 20 bytes | 4 bytes | 4 bytes | 4 bytes | // -------------------------------------------------------- - addrIndexKeyLength = 2 + ripemd160.Size + 4 + 4 + 4 + addrIndexKeyLength = 3 + ripemd160.Size + 4 + 4 + 4 batchDeleteThreshold = 10000 + + addrIndexCurrentVersion = 1 ) var addrIndexMetaDataKey = []byte("addrindex") // All address index entries share this prefix to facilitate the use of // iterators. -var addrIndexKeyPrefix = []byte("a-") +var addrIndexKeyPrefix = []byte("a+-") + +// Address index version is required to drop/rebuild address index if version +// is older than current as the format of the index may have changed. This is +// true when going from no version to version 1 as the address index is stored +// as big endian in version 1 and little endian in the original code. Version +// is stored as two bytes, little endian (to match all the code but the index). +var addrIndexVersionKey = []byte("addrindexversion") type txUpdateObj struct { txSha *wire.ShaHash @@ -372,15 +381,19 @@ func (db *LevelDb) FetchTxBySha(txsha *wire.ShaHash) ([]*database.TxListReply, e } // addrIndexToKey serializes the passed txAddrIndex for storage within the DB. +// We want to use BigEndian to store at least block height and TX offset +// in order to ensure that the transactions are sorted in the index. +// This gives us the ability to use the index in more client-side +// applications that are order-dependent (specifically by dependency). func addrIndexToKey(index *txAddrIndex) []byte { record := make([]byte, addrIndexKeyLength, addrIndexKeyLength) - copy(record[0:2], addrIndexKeyPrefix) - copy(record[2:22], index.hash160[:]) + copy(record[0:3], addrIndexKeyPrefix) + copy(record[3:23], index.hash160[:]) // The index itself. - binary.LittleEndian.PutUint32(record[22:26], uint32(index.blkHeight)) - binary.LittleEndian.PutUint32(record[26:30], uint32(index.txoffset)) - binary.LittleEndian.PutUint32(record[30:34], uint32(index.txlen)) + binary.BigEndian.PutUint32(record[23:27], uint32(index.blkHeight)) + binary.BigEndian.PutUint32(record[27:31], uint32(index.txoffset)) + binary.BigEndian.PutUint32(record[31:35], uint32(index.txlen)) return record } @@ -388,9 +401,9 @@ func addrIndexToKey(index *txAddrIndex) []byte { // unpackTxIndex deserializes the raw bytes of a address tx index. func unpackTxIndex(rawIndex [12]byte) *txAddrIndex { return &txAddrIndex{ - blkHeight: int64(binary.LittleEndian.Uint32(rawIndex[0:4])), - txoffset: int(binary.LittleEndian.Uint32(rawIndex[4:8])), - txlen: int(binary.LittleEndian.Uint32(rawIndex[8:12])), + blkHeight: int64(binary.BigEndian.Uint32(rawIndex[0:4])), + txoffset: int(binary.BigEndian.Uint32(rawIndex[4:8])), + txlen: int(binary.BigEndian.Uint32(rawIndex[8:12])), } } @@ -446,9 +459,9 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int, } // Create the prefix for our search. - addrPrefix := make([]byte, 22, 22) - copy(addrPrefix[0:2], addrIndexKeyPrefix) - copy(addrPrefix[2:22], addrKey) + addrPrefix := make([]byte, 23, 23) + copy(addrPrefix[0:3], addrIndexKeyPrefix) + copy(addrPrefix[3:23], addrKey) iter := db.lDb.NewIterator(bytesPrefix(addrPrefix), nil) for skip != 0 && iter.Next() { @@ -459,7 +472,7 @@ func (db *LevelDb) FetchTxsForAddr(addr btcutil.Address, skip int, var replies []*database.TxListReply var rawIndex [12]byte for iter.Next() && limit != 0 { - copy(rawIndex[:], iter.Key()[22:34]) + copy(rawIndex[:], iter.Key()[23:35]) addrIndex := unpackTxIndex(rawIndex) tx, blkSha, blkHeight, _, err := db.fetchTxDataByLoc(addrIndex.blkHeight, @@ -528,6 +541,12 @@ func (db *LevelDb) UpdateAddrIndexForBlock(blkSha *wire.ShaHash, blkHeight int64 binary.LittleEndian.PutUint64(newIndexTip[32:40], uint64(blkHeight)) batch.Put(addrIndexMetaDataKey, newIndexTip) + // Ensure we're writing an address index version + newIndexVersion := make([]byte, 2, 2) + binary.LittleEndian.PutUint16(newIndexVersion[0:2], + uint16(addrIndexCurrentVersion)) + batch.Put(addrIndexVersionKey, newIndexVersion) + if err := db.lDb.Write(batch, db.wo); err != nil { return err } @@ -552,9 +571,12 @@ func (db *LevelDb) DeleteAddrIndex() error { numInBatch := 0 for iter.Next() { key := iter.Key() - batch.Delete(key) - - numInBatch++ + // With a 24-bit index key prefix, 1 in every 2^24 keys is a collision. + // We check the length to make sure we only delete address index keys. + if len(key) == addrIndexKeyLength { + batch.Delete(key) + numInBatch++ + } // Delete in chunks to potentially avoid very large batches. if numInBatch >= batchDeleteThreshold { @@ -572,6 +594,61 @@ func (db *LevelDb) DeleteAddrIndex() error { } batch.Delete(addrIndexMetaDataKey) + batch.Delete(addrIndexVersionKey) + + if err := db.lDb.Write(batch, db.wo); err != nil { + return err + } + + db.lastAddrIndexBlkIdx = -1 + db.lastAddrIndexBlkSha = wire.ShaHash{} + + return nil +} + +// deleteOldAddrIndex deletes the entire addrindex stored within the DB for a +// 2-byte addrIndexKeyPrefix. It also resets the cached in-memory metadata about +// the addr index. +func (db *LevelDb) deleteOldAddrIndex() error { + db.dbLock.Lock() + defer db.dbLock.Unlock() + + batch := db.lBatch() + defer batch.Reset() + + // Delete the entire index along with any metadata about it. + iter := db.lDb.NewIterator(bytesPrefix([]byte("a-")), db.ro) + numInBatch := 0 + for iter.Next() { + key := iter.Key() + // With a 24-bit index key prefix, 1 in every 2^24 keys is a collision. + // We check the length to make sure we only delete address index keys. + // We also check the last two bytes to make sure the suffix doesn't + // match other types of index that are 34 bytes long. + if len(key) == 34 && !bytes.HasSuffix(key, recordSuffixTx) && + !bytes.HasSuffix(key, recordSuffixSpentTx) { + batch.Delete(key) + numInBatch++ + } + + // Delete in chunks to potentially avoid very large batches. + if numInBatch >= batchDeleteThreshold { + if err := db.lDb.Write(batch, db.wo); err != nil { + iter.Release() + return err + } + batch.Reset() + numInBatch = 0 + } + } + iter.Release() + if err := iter.Error(); err != nil { + return err + } + + batch.Delete(addrIndexMetaDataKey) + batch.Delete(addrIndexVersionKey) + if err := db.lDb.Write(batch, db.wo); err != nil { return err } diff --git a/mempool.go b/mempool.go index 5c1004fd..59667375 100644 --- a/mempool.go +++ b/mempool.go @@ -97,7 +97,7 @@ type txMemPool struct { pool map[wire.ShaHash]*TxDesc orphans map[wire.ShaHash]*btcutil.Tx orphansByPrev map[wire.ShaHash]*list.List - addrindex map[string]map[*btcutil.Tx]struct{} // maps address to txs + addrindex map[string]map[wire.ShaHash]struct{} // maps address to txs outpoints map[wire.OutPoint]*btcutil.Tx lastUpdated time.Time // last time pool was updated pennyTotal float64 // exponentially decaying total for penny spends. @@ -653,7 +653,7 @@ func (mp *txMemPool) removeScriptFromAddrIndex(pkScript []byte, tx *btcutil.Tx) return err } for _, addr := range addresses { - delete(mp.addrindex[addr.EncodeAddress()], tx) + delete(mp.addrindex[addr.EncodeAddress()], *tx.Sha()) } return nil @@ -777,9 +777,9 @@ func (mp *txMemPool) indexScriptAddressToTx(pkScript []byte, tx *btcutil.Tx) err for _, addr := range addresses { if mp.addrindex[addr.EncodeAddress()] == nil { - mp.addrindex[addr.EncodeAddress()] = make(map[*btcutil.Tx]struct{}) + mp.addrindex[addr.EncodeAddress()] = make(map[wire.ShaHash]struct{}) } - mp.addrindex[addr.EncodeAddress()][tx] = struct{}{} + mp.addrindex[addr.EncodeAddress()][*tx.Sha()] = struct{}{} } return nil @@ -965,8 +965,10 @@ func (mp *txMemPool) FilterTransactionsByAddress(addr btcutil.Address) ([]*btcut if txs, exists := mp.addrindex[addr.EncodeAddress()]; exists { addressTxs := make([]*btcutil.Tx, 0, len(txs)) - for tx := range txs { - addressTxs = append(addressTxs, tx) + for txHash := range txs { + if tx, exists := mp.pool[txHash]; exists { + addressTxs = append(addressTxs, tx.Tx) + } } return addressTxs, nil } @@ -1494,7 +1496,7 @@ func newTxMemPool(server *server) *txMemPool { outpoints: make(map[wire.OutPoint]*btcutil.Tx), } if cfg.AddrIndex { - memPool.addrindex = make(map[string]map[*btcutil.Tx]struct{}) + memPool.addrindex = make(map[string]map[wire.ShaHash]struct{}) } return memPool } diff --git a/rpcserver.go b/rpcserver.go index 87791f31..7d4b017e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2598,15 +2598,6 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan var addressTxs []*database.TxListReply - // First check the mempool for relevent transactions. - memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr) - if err == nil && len(memPoolTxs) != 0 { - for _, tx := range memPoolTxs { - txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()} - addressTxs = append(addressTxs, txReply) - } - } - var numRequested, numToSkip int if c.Count != nil { numRequested = *c.Count @@ -2620,17 +2611,31 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan numToSkip = 0 } } - if len(addressTxs) >= numRequested { - // Tx's in the mempool exceed the requested number of tx's. - // Slice off any possible overflow. - addressTxs = addressTxs[:numRequested] - } else { - // Otherwise, we'll also take a look into the database. - dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip, - numRequested-len(addressTxs)) - if err == nil && len(dbTxs) != 0 { - for _, txReply := range dbTxs { + + // While it's more efficient to check the mempool for relevant transactions + // first, we want to return results in order of occurrence/dependency so + // we'll check the mempool only if there aren't enough results returned + // by the database. + dbTxs, err := s.server.db.FetchTxsForAddr(addr, numToSkip, + numRequested-len(addressTxs)) + if err == nil { + for _, txReply := range dbTxs { + addressTxs = append(addressTxs, txReply) + } + } + + // This code (and txMemPool.FilterTransactionsByAddress()) doesn't sort by + // dependency. This might be something we want to do in the future when we + // return results for the client's convenience, or leave it to the client. + if len(addressTxs) < numRequested { + memPoolTxs, err := s.server.txMemPool.FilterTransactionsByAddress(addr) + if err == nil { + for _, tx := range memPoolTxs { + txReply := &database.TxListReply{Tx: tx.MsgTx(), Sha: tx.Sha()} addressTxs = append(addressTxs, txReply) + if len(addressTxs) == numRequested { + break + } } } }