From 41a6e288aa19bed13966afa259470ec0b90965d1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 17 May 2019 18:56:28 -0400 Subject: [PATCH 1/2] add test_resolve_lru_cache_doesnt_persist_errors --- tests/integration/test_resolve_command.py | 54 +++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/tests/integration/test_resolve_command.py b/tests/integration/test_resolve_command.py index 947e89509..43faf0f9b 100644 --- a/tests/integration/test_resolve_command.py +++ b/tests/integration/test_resolve_command.py @@ -1,3 +1,4 @@ +import asyncio import json from binascii import hexlify from lbrynet.testcase import CommandTestCase @@ -88,3 +89,56 @@ class ResolveCommand(CommandTestCase): self.assertFalse(claim['decoded_claim']) self.assertEqual(claim['txid'], txid) self.assertEqual(claim['effective_amount'], "0.1") + + async def _test_resolve_abc_foo(self): + response = await self.resolve('lbry://@abc/foo') + claim = response['lbry://@abc/foo'] + self.assertIn('certificate', claim) + self.assertIn('claim', claim) + self.assertEqual(claim['claim']['name'], 'foo') + self.assertEqual(claim['claim']['channel_name'], '@abc') + self.assertEqual(claim['certificate']['name'], '@abc') + self.assertEqual(claim['claims_in_channel'], 0) + self.assertEqual( + claim['claim']['timestamp'], + self.ledger.headers[claim['claim']['height']]['timestamp'] + ) + self.assertEqual( + claim['certificate']['timestamp'], + self.ledger.headers[claim['certificate']['height']]['timestamp'] + ) + + async def test_resolve_lru_cache_doesnt_persist_errors(self): + original_get_transaction = self.daemon.wallet_manager.ledger.network.get_transaction + + async def timeout_get_transaction(txid): + fut = self.loop.create_future() + + def delayed_raise_cancelled_error(): + fut.set_exception(asyncio.CancelledError()) + + self.loop.call_soon(delayed_raise_cancelled_error) + return await fut + + tx = await self.channel_create('@abc', '0.01') + channel_id = tx['outputs'][0]['claim_id'] + await self.stream_create('foo', '0.01', channel_id=channel_id) + + # raise a cancelled error from get_transaction + self.daemon.wallet_manager.ledger.network.get_transaction = timeout_get_transaction + with self.assertRaises(KeyError): + await self._test_resolve_abc_foo() + + # restore the real get_transaction that doesn't cancel, it should be called and the result cached + self.daemon.wallet_manager.ledger.network.get_transaction = original_get_transaction + await self._test_resolve_abc_foo() + called_again = asyncio.Event(loop=self.loop) + + def check_result_cached(txid): + called_again.set() + return original_get_transaction(txid) + + # check that the result was cached + self.daemon.wallet_manager.ledger.network.get_transaction = check_result_cached + await self._test_resolve_abc_foo() + self.assertFalse(called_again.is_set()) From 51f301785f2ddd924d45cce40ec701e7c041b6ec Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 17 May 2019 18:57:07 -0400 Subject: [PATCH 2/2] fix test, add lru_cache_concurrent decorator --- lbrynet/utils.py | 54 +++++++++++++++++++++++++++++++++++++++ lbrynet/wallet/resolve.py | 11 +++----- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/lbrynet/utils.py b/lbrynet/utils.py index 19498444e..0bde133e2 100644 --- a/lbrynet/utils.py +++ b/lbrynet/utils.py @@ -15,6 +15,7 @@ import contextlib import certifi import aiohttp import functools +import collections from lbrynet.schema.claim import Claim from lbrynet.cryptoutils import get_lbry_hash_obj @@ -201,6 +202,59 @@ async def resolve_host(url: str, port: int, proto: str) -> str: ))[0][4][0] +class LRUCache: + __slots__ = [ + 'capacity', + 'cache' + ] + + def __init__(self, capacity): + self.capacity = capacity + self.cache = collections.OrderedDict() + + def get(self, key): + value = self.cache.pop(key) + self.cache[key] = value + return value + + def set(self, key, value): + try: + self.cache.pop(key) + except KeyError: + if len(self.cache) >= self.capacity: + self.cache.popitem(last=False) + self.cache[key] = value + + def __contains__(self, item) -> bool: + return item in self.cache + + +def lru_cache_concurrent(cache_size: int): + if not cache_size > 0: + raise ValueError("invalid cache size") + concurrent_cache = {} + lru_cache = LRUCache(cache_size) + + def wrapper(async_fn): + + @functools.wraps(async_fn) + async def _inner(*args, **kwargs): + key = tuple([args, tuple([tuple([k, kwargs[k]]) for k in kwargs])]) + if key in lru_cache: + return lru_cache.get(key) + + concurrent_cache[key] = concurrent_cache.get(key) or asyncio.create_task(async_fn(*args, **kwargs)) + + try: + result = await concurrent_cache[key] + lru_cache.set(key, result) + return result + finally: + concurrent_cache.pop(key, None) + return _inner + return wrapper + + def get_ssl_context() -> ssl.SSLContext: return ssl.create_default_context( purpose=ssl.Purpose.CLIENT_AUTH, capath=certifi.where() diff --git a/lbrynet/wallet/resolve.py b/lbrynet/wallet/resolve.py index a17fbfb59..9e9068bbf 100644 --- a/lbrynet/wallet/resolve.py +++ b/lbrynet/wallet/resolve.py @@ -1,11 +1,10 @@ import logging import asyncio -from functools import lru_cache from cryptography.exceptions import InvalidSignature from binascii import unhexlify, hexlify - +from lbrynet.utils import lru_cache_concurrent from lbrynet.wallet.account import validate_claim_id from lbrynet.wallet.dewies import dewies_to_lbc from lbrynet.error import UnknownNameError, UnknownClaimID, UnknownURI, UnknownOutpoint @@ -51,11 +50,9 @@ class Resolver: results = await asyncio.gather(*futs) return dict(list(map(lambda result: list(result.items())[0], results))) - @lru_cache(256) - def _fetch_tx(self, txid): - async def __fetch_parse(txid): - return self.transaction_class(unhexlify(await self.network.get_transaction(txid))) - return asyncio.ensure_future(__fetch_parse(txid)) + @lru_cache_concurrent(256) + async def _fetch_tx(self, txid): + return self.transaction_class(unhexlify(await self.network.get_transaction(txid))) async def _handle_resolutions(self, resolutions, requested_uris, page, page_size, claim_trie_root): results = {}