From 9b4afe98167952958e1a2d223cce0eb6bda7ca3b Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 25 Nov 2020 16:08:04 -0500 Subject: [PATCH] store txids in memory, faster address subscription/history --- lbry/wallet/server/block_processor.py | 2 ++ lbry/wallet/server/history.py | 28 ++++++++--------- lbry/wallet/server/leveldb.py | 43 ++++++++++++++++----------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index bfd70ec50..26eaad00e 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -444,6 +444,7 @@ class BlockProcessor: append_hashX_by_tx(hashXs) update_touched(hashXs) + self.db.total_transactions.append(tx_hash) tx_num += 1 self.db.history.add_unflushed(hashXs_by_tx, self.tx_count) @@ -491,6 +492,7 @@ class BlockProcessor: undo_entry_len = 12 + HASHX_LEN for tx, tx_hash in reversed(txs): + self.db.total_transactions.pop() for idx, txout in enumerate(tx.outputs): # Spend the TX outputs. Be careful with unspendable # outputs - we didn't save those in the first place. diff --git a/lbry/wallet/server/history.py b/lbry/wallet/server/history.py index f810a1045..72952dc53 100644 --- a/lbry/wallet/server/history.py +++ b/lbry/wallet/server/history.py @@ -173,20 +173,20 @@ class History: self.logger.info(f'backing up removed {nremoves:,d} history entries') - def get_txnums(self, hashX, limit=1000): - """Generator that returns an unpruned, sorted list of tx_nums in the - history of a hashX. Includes both spending and receiving - transactions. By default yields at most 1000 entries. Set - limit to None to get them all. """ - limit = util.resolve_limit(limit) - for key, hist in self.db.iterator(prefix=hashX): - a = array.array('I') - a.frombytes(hist) - for tx_num in a: - if limit == 0: - return - yield tx_num - limit -= 1 + # def get_txnums(self, hashX, limit=1000): + # """Generator that returns an unpruned, sorted list of tx_nums in the + # history of a hashX. Includes both spending and receiving + # transactions. By default yields at most 1000 entries. Set + # limit to None to get them all. """ + # limit = util.resolve_limit(limit) + # for key, hist in self.db.iterator(prefix=hashX): + # a = array.array('I') + # a.frombytes(hist) + # for tx_num in a: + # if limit == 0: + # return + # yield tx_num + # limit -= 1 # # History compaction diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index b87011478..f06517edb 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -96,6 +96,7 @@ class LevelDB: self._block_txs_cache = pylru.lrucache(50000) self._merkle_tx_cache = pylru.lrucache(100000) + self.total_transactions = None async def _read_tx_counts(self): if self.tx_counts is not None: @@ -119,6 +120,18 @@ class LevelDB: else: assert self.db_tx_count == 0 + async def _read_txids(self): + def get_txids(): + return list(self.tx_db.iterator(prefix=TX_HASH_PREFIX, include_key=False)) + + start = time.perf_counter() + self.logger.info("loading txids") + txids = await asyncio.get_event_loop().run_in_executor(self.executor, get_txids) + assert len(txids) == len(self.tx_counts) == 0 or len(txids) == self.tx_counts[-1] + self.total_transactions = txids + ts = time.perf_counter() - start + self.logger.info("loaded %i txids in %ss", len(self.total_transactions), round(ts, 4)) + async def _read_headers(self): if self.headers is not None: return @@ -169,6 +182,8 @@ class LevelDB: # Read TX counts (requires meta directory) await self._read_tx_counts() + if self.total_transactions is None: + await self._read_txids() await self._read_headers() def close(self): @@ -452,10 +467,8 @@ class LevelDB: If the tx_height is not on disk, returns (None, tx_height).""" tx_height = bisect_right(self.tx_counts, tx_num) if tx_height > self.db_height: - tx_hash = None - else: - tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - return tx_hash, tx_height + return None, tx_height + return self.total_transactions[tx_num], tx_height async def tx_merkle(self, tx_num, tx_height): if tx_height == -1: @@ -557,38 +570,34 @@ class LevelDB: # return hashx_history # return hashx_history - def iter_tx_heights(): + def read_history(): db_height = self.db_height tx_counts = self.tx_counts tx_db_get = self.tx_db.get pack_be_uint64 = util.pack_be_uint64 cnt = 0 + txs = [] - for key, hist in self.history.db.iterator(prefix=hashX): + for hist in self.history.db.iterator(prefix=hashX, include_key=False): a = array.array('I') a.frombytes(hist) for tx_num in a: tx_height = bisect_right(tx_counts, tx_num) if tx_height > db_height: - yield None, tx_height return - yield tx_db_get(TX_HASH_PREFIX + pack_be_uint64(tx_num)), tx_height + txs.append((tx_num, tx_height)) cnt += 1 if limit and cnt >= limit: - return + break if limit and cnt >= limit: - return - - def read_history(): - return [ - (tx_num, tx_height) for (tx_num, tx_height) in iter_tx_heights() - ] + break + return txs while True: history = await asyncio.get_event_loop().run_in_executor(self.executor, read_history) - if not history or history[-1][0] is not None: - return history + if history is not None: + return [(self.total_transactions[tx_num], tx_height) for (tx_num, tx_height) in history] self.logger.warning(f'limited_history: tx hash ' f'not found (reorg?), retrying...') await sleep(0.25)