diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 0d02f9cbb..b5fbdc313 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -911,50 +911,49 @@ class LevelDB: def get_block_txs(self, height: int) -> List[bytes]: return self.prefix_db.block_txs.get(height).tx_hashes - def _fs_transactions(self, txids: Iterable[str]): - tx_counts = self.tx_counts - tx_db_get = self.prefix_db.tx.get - tx_cache = self._tx_and_merkle_cache + async def get_transactions_and_merkles(self, tx_hashes: Iterable[str]): tx_infos = {} - - for tx_hash in txids: - cached_tx = tx_cache.get(tx_hash) - if cached_tx: - tx, merkle = cached_tx - else: - tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] - tx_num = self.prefix_db.tx_num.get(tx_hash_bytes) - tx = None - tx_height = -1 - tx_num = None if not tx_num else tx_num.tx_num - if tx_num is not None: - fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 - tx_height = bisect_right(tx_counts, tx_num) - tx = tx_db_get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) - if tx_height == -1: - merkle = { - 'block_height': -1 - } - else: - tx_pos = tx_num - tx_counts[tx_height - 1] - branch, root = self.merkle.branch_and_root( - self.get_block_txs(tx_height), tx_pos - ) - merkle = { - 'block_height': tx_height, - 'merkle': [ - hash_to_hex_str(hash) - for hash in branch - ], - 'pos': tx_pos - } - if tx_height + 10 < self.db_height: - tx_cache[tx_hash] = tx, merkle - tx_infos[tx_hash] = (None if not tx else tx.hex(), merkle) + for tx_hash in tx_hashes: + tx_infos[tx_hash] = await asyncio.get_event_loop().run_in_executor( + None, self._get_transaction_and_merkle, tx_hash + ) + await asyncio.sleep(0) return tx_infos - async def fs_transactions(self, txids): - return await asyncio.get_event_loop().run_in_executor(None, self._fs_transactions, txids) + def _get_transaction_and_merkle(self, tx_hash): + cached_tx = self._tx_and_merkle_cache.get(tx_hash) + if cached_tx: + tx, merkle = cached_tx + else: + tx_hash_bytes = bytes.fromhex(tx_hash)[::-1] + tx_num = self.prefix_db.tx_num.get(tx_hash_bytes) + tx = None + tx_height = -1 + tx_num = None if not tx_num else tx_num.tx_num + if tx_num is not None: + fill_cache = tx_num in self.txo_to_claim and len(self.txo_to_claim[tx_num]) > 0 + tx_height = bisect_right(self.tx_counts, tx_num) + tx = self.prefix_db.tx.get(tx_hash_bytes, fill_cache=fill_cache, deserialize_value=False) + if tx_height == -1: + merkle = { + 'block_height': -1 + } + else: + tx_pos = tx_num - self.tx_counts[tx_height - 1] + branch, root = self.merkle.branch_and_root( + self.get_block_txs(tx_height), tx_pos + ) + merkle = { + 'block_height': tx_height, + 'merkle': [ + hash_to_hex_str(hash) + for hash in branch + ], + 'pos': tx_pos + } + if tx_height + 10 < self.db_height: + self._tx_and_merkle_cache[tx_hash] = tx, merkle + return (None if not tx else tx.hex(), merkle) async def fs_block_hashes(self, height, count): if height + count > len(self.headers): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9a9346880..ca4a0142f 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -548,6 +548,10 @@ class SessionManager: self._clear_stale_sessions(), self._manage_servers() ]) + except Exception as err: + if not isinstance(err, asyncio.CancelledError): + log.exception("hub server died") + raise err finally: await self._close_servers(list(self.servers.keys())) log.warning("disconnect %i sessions", len(self.sessions)) @@ -1015,16 +1019,16 @@ class LBRYElectrumX(SessionBase): self.bp.resolve_cache[url] = await self.loop.run_in_executor(None, self.db._resolve, url) return self.bp.resolve_cache[url] - async def claimtrie_resolve(self, *urls): + async def claimtrie_resolve(self, *urls) -> str: sorted_urls = tuple(sorted(urls)) self.session_mgr.urls_to_resolve_count_metric.inc(len(sorted_urls)) - - def _cached_resolve(): + try: + if sorted_urls in self.bp.resolve_outputs_cache: + return self.bp.resolve_outputs_cache[sorted_urls] rows, extra = [], [] - for url in urls: if url not in self.bp.resolve_cache: - self.bp.resolve_cache[url] = self.db._resolve(url) + self.bp.resolve_cache[url] = await self._cached_resolve_url(url) stream, channel, repost, reposted_channel = self.bp.resolve_cache[url] if isinstance(channel, ResolveCensoredError): rows.append(channel) @@ -1049,18 +1053,11 @@ class LBRYElectrumX(SessionBase): extra.append(repost) if reposted_channel: extra.append(reposted_channel) - # print("claimtrie resolve %i rows %i extrat" % (len(rows), len(extra))) - self.bp.resolve_outputs_cache[sorted_urls] = serialized_outputs = Outputs.to_base64( - rows, extra, 0, None, None + await asyncio.sleep(0) + self.bp.resolve_outputs_cache[sorted_urls] = result = await self.loop.run_in_executor( + None, Outputs.to_base64, rows, extra, 0, None, None ) - return serialized_outputs - - try: - if sorted_urls in self.bp.resolve_outputs_cache: - return self.bp.resolve_outputs_cache[sorted_urls] - else: - - return await self.loop.run_in_executor(None, _cached_resolve) + return result finally: self.session_mgr.resolved_url_count_metric.inc(len(sorted_urls)) @@ -1213,9 +1210,11 @@ class LBRYElectrumX(SessionBase): address: the address to subscribe to""" if len(addresses) > 1000: raise RPCError(BAD_REQUEST, f'too many addresses in subscription request: {len(addresses)}') - return [ - await self.hashX_subscribe(self.address_to_hashX(address), address) for address in addresses - ] + results = [] + for address in addresses: + results.append(await self.hashX_subscribe(self.address_to_hashX(address), address)) + await asyncio.sleep(0) + return results async def address_unsubscribe(self, address): """Unsubscribe an address. @@ -1464,7 +1463,7 @@ class LBRYElectrumX(SessionBase): raise RPCError(BAD_REQUEST, f'too many tx hashes in request: {len(tx_hashes)}') for tx_hash in tx_hashes: assert_tx_hash(tx_hash) - batch_result = await self.db.fs_transactions(tx_hashes) + batch_result = await self.db.get_transactions_and_merkles(tx_hashes) needed_merkles = {} for tx_hash in tx_hashes: diff --git a/tests/integration/blockchain/test_blockchain_reorganization.py b/tests/integration/blockchain/test_blockchain_reorganization.py index 72724a68e..621655add 100644 --- a/tests/integration/blockchain/test_blockchain_reorganization.py +++ b/tests/integration/blockchain/test_blockchain_reorganization.py @@ -23,7 +23,7 @@ class BlockchainReorganizationTests(CommandTestCase): self.assertEqual(block_hash, (await bp.db.fs_block_hashes(height, 1))[0][::-1].hex()) txids = await asyncio.get_event_loop().run_in_executor(None, get_txids) - txs = await bp.db.fs_transactions(txids) + txs = await bp.db.get_transactions_and_merkles(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order') diff --git a/tests/integration/takeovers/test_resolve_command.py b/tests/integration/takeovers/test_resolve_command.py index e1e1d18f7..b5ec87fc8 100644 --- a/tests/integration/takeovers/test_resolve_command.py +++ b/tests/integration/takeovers/test_resolve_command.py @@ -1458,7 +1458,7 @@ class ResolveAfterReorg(BaseResolveTestCase): txids = [ tx_hash[::-1].hex() for tx_hash in bp.db.get_block_txs(height) ] - txs = await bp.db.fs_transactions(txids) + txs = await bp.db.get_transactions_and_merkles(txids) block_txs = (await bp.daemon.deserialised_block(block_hash))['tx'] self.assertSetEqual(set(block_txs), set(txs.keys()), msg='leveldb/lbrycrd is missing transactions') self.assertListEqual(block_txs, list(txs.keys()), msg='leveldb/lbrycrd transactions are of order')