From 2318e6d8e92110c608a3bbf497da8b29776d5a09 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 10 Dec 2020 19:26:34 -0500 Subject: [PATCH] faster fs_transactions --- lbry/wallet/server/leveldb.py | 123 +++++++++++----------------------- 1 file changed, 40 insertions(+), 83 deletions(-) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index f06517edb..50ac57a96 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -94,8 +94,7 @@ class LevelDB: self.headers_db = None self.tx_db = None - self._block_txs_cache = pylru.lrucache(50000) - self._merkle_tx_cache = pylru.lrucache(100000) + self._tx_and_merkle_cache = pylru.lrucache(100000) self.total_transactions = None async def _read_tx_counts(self): @@ -147,7 +146,7 @@ class LevelDB: async def _open_dbs(self, for_sync, compacting): if self.executor is None: - self.executor = ThreadPoolExecutor(max(1, os.cpu_count() - 1)) + self.executor = ThreadPoolExecutor(1) coin_path = os.path.join(self.env.db_dir, 'COIN') if not os.path.isfile(coin_path): with util.open_file(coin_path, create=True) as f: @@ -470,76 +469,52 @@ class LevelDB: return None, tx_height return self.total_transactions[tx_num], tx_height - async def tx_merkle(self, tx_num, tx_height): - if tx_height == -1: - return { - 'block_height': -1 - } - tx_counts = self.tx_counts - tx_pos = tx_num - tx_counts[tx_height - 1] - - def _update_block_txs_cache(): - block_txs = list(self.tx_db.iterator( - start=TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height - 1]), - stop=None if tx_height + 1 == len(tx_counts) else - TX_HASH_PREFIX + util.pack_be_uint64(tx_counts[tx_height]), include_key=False - )) - if tx_height + 100 > self.db_height: - return block_txs - self._block_txs_cache[tx_height] = block_txs - - uncached = None - if (tx_num, tx_height) in self._merkle_tx_cache: - return self._merkle_tx_cache[(tx_num, tx_height)] - if tx_height not in self._block_txs_cache: - uncached = await asyncio.get_event_loop().run_in_executor(self.executor, _update_block_txs_cache) - block_txs = self._block_txs_cache.get(tx_height, uncached) - branch, root = self.merkle.branch_and_root(block_txs, tx_pos) - merkle = { - 'block_height': tx_height, - 'merkle': [ - hash_to_hex_str(hash) - for hash in branch - ], - 'pos': tx_pos - } - if tx_height + 100 < self.db_height: - self._merkle_tx_cache[(tx_num, tx_height)] = merkle - return merkle - - def _fs_transactions(self, txids: Iterable[str]) -> List[Tuple[str, Optional[str], int, int]]: + def _fs_transactions(self, txids: Iterable[str]): unpack_be_uint64 = util.unpack_be_uint64 tx_counts = self.tx_counts tx_db_get = self.tx_db.get - tx_infos = [] + tx_cache = self._tx_and_merkle_cache + + tx_infos = {} for tx_hash in txids: - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) - tx = None - tx_height = -1 - if tx_num is not None: - tx_num = unpack_be_uint64(tx_num) - tx_height = bisect_right(tx_counts, tx_num) - if tx_height < self.db_height: - tx = tx_db_get(TX_PREFIX + tx_hash_bytes) - tx_infos.append((tx_hash, None if not tx else tx.hex(), tx_num, tx_height)) - + cached_tx = tx_cache.get(tx_hash) + if cached_tx: + tx, merkle = cached_tx + else: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = tx_db_get(TX_NUM_PREFIX + tx_hash_bytes) + tx = None + tx_height = -1 + if tx_num is not None: + tx_num = unpack_be_uint64(tx_num) + tx_height = bisect_right(tx_counts, tx_num) + if tx_height < self.db_height: + tx = tx_db_get(TX_PREFIX + tx_hash_bytes) + if tx_height == -1: + merkle = { + 'block_height': -1 + } + else: + tx_pos = tx_num - tx_counts[tx_height - 1] + branch, root = self.merkle.branch_and_root( + self.total_transactions[tx_counts[tx_height - 1]:tx_counts[tx_height]], tx_pos + ) + merkle = { + 'block_height': tx_height, + 'merkle': [ + hash_to_hex_str(hash) + for hash in branch + ], + 'pos': tx_pos + } + if tx_height + 10 < self.db_height: + tx_cache[tx_hash] = tx, merkle + tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle) return tx_infos async def fs_transactions(self, txids): - txs = await asyncio.get_event_loop().run_in_executor( - self.executor, self._fs_transactions, txids - ) - unsorted_result = {} - - async def add_result(item): - _txid, _tx, _tx_num, _tx_height = item - unsorted_result[_txid] = (_tx, await self.tx_merkle(_tx_num, _tx_height)) - - if txs: - await asyncio.gather(*map(add_result, txs)) - return {txid: unsorted_result[txid] for txid, _, _, _ in txs} + return await asyncio.get_event_loop().run_in_executor(self.executor, self._fs_transactions, txids) async def fs_block_hashes(self, height, count): if height + count > len(self.headers): @@ -553,28 +528,10 @@ class LevelDB: transactions. By default returns at most 1000 entries. Set limit to None to get them all. """ - # def read_history(): - # hashx_history = [] - # for key, hist in self.history.db.iterator(prefix=hashX): - # a = array.array('I') - # a.frombytes(hist) - # for tx_num in a: - # tx_height = bisect_right(self.tx_counts, tx_num) - # if tx_height > self.db_height: - # tx_hash = None - # else: - # tx_hash = self.tx_db.get(TX_HASH_PREFIX + util.pack_be_uint64(tx_num)) - # - # hashx_history.append((tx_hash, tx_height)) - # if limit and len(hashx_history) >= limit: - # return hashx_history - # return hashx_history def read_history(): db_height = self.db_height tx_counts = self.tx_counts - tx_db_get = self.tx_db.get - pack_be_uint64 = util.pack_be_uint64 cnt = 0 txs = []