From 911ca8c37e6dfe413490e2016d4486f0a561d35f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 19 Apr 2019 17:05:18 -0300 Subject: [PATCH] cache and share txs fetches while validating concurrently on batches --- lbrynet/wallet/resolve.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/lbrynet/wallet/resolve.py b/lbrynet/wallet/resolve.py index d3f8ebb4b..b0b3108ce 100644 --- a/lbrynet/wallet/resolve.py +++ b/lbrynet/wallet/resolve.py @@ -1,5 +1,6 @@ import logging +import asyncio from cryptography.exceptions import InvalidSignature from binascii import unhexlify, hexlify @@ -24,14 +25,18 @@ class Resolver: self.transaction_class = ledger.transaction_class self.network = ledger.network self.ledger = ledger + self._tx_cache = {} async def resolve(self, page, page_size, *uris): + uris = set(uris) try: for uri in uris: parsed_uri = parse_lbry_uri(uri) if parsed_uri.claim_id: validate_claim_id(parsed_uri.claim_id) resolutions = await self.network.get_values_for_uris(self.header_hash, *uris) + if len(uris) > 1: + return await self._batch_handle(resolutions, uris, page, page_size) return await self._handle_resolutions(resolutions, uris, page, page_size) except URIParseError as err: return {'error': err.args[0]} @@ -39,6 +44,17 @@ class Resolver: log.exception(e) return {'error': str(e)} + async def _batch_handle(self, resolutions, uris, page, page_size): + futs = [] + for uri in uris: + futs.append(asyncio.ensure_future(self._handle_resolutions(resolutions, [uri], page, page_size))) + results = await asyncio.gather(*futs) + return {uri: results[uri] for uri in results if uri in results} + + def _fetch_tx(self, txid): + self._tx_cache[txid] = self._tx_cache.get(txid) or asyncio.ensure_future(self.network.get_transaction(txid)) + return self._tx_cache[txid] + async def _handle_resolutions(self, resolutions, requested_uris, page, page_size): results = {} for uri in requested_uris: @@ -135,13 +151,12 @@ class Resolver: if not claim_result or 'value' not in claim_result: return claim_result claim_result = _decode_claim_result(claim_result) - channel_id = None if claim_result['value']: claim_result['has_signature'] = False if claim_result['value'].is_signed: claim_result['has_signature'] = True - claim_tx = await self.network.get_transaction(claim_result['txid']) + claim_tx = await self._fetch_tx(claim_result['txid']) if certificate is None: log.info("fetching certificate to check claim signature") channel_id = claim_result['value'].signing_channel_id @@ -149,7 +164,7 @@ class Resolver: if not certificate: log.warning('Certificate %s not found', channel_id) claim_result['channel_name'] = certificate['name'] if certificate else None - cert_tx = await self.network.get_transaction(certificate['txid']) if certificate else None + cert_tx = await self._fetch_tx(certificate['txid']) if certificate else None claim_result['signature_is_valid'] = validate_claim_signature_and_get_channel_name( claim_result, certificate, self.ledger, claim_tx=claim_tx, cert_tx=cert_tx )