diff --git a/lbrynet/utils.py b/lbrynet/utils.py index 057dc11fa..19498444e 100644 --- a/lbrynet/utils.py +++ b/lbrynet/utils.py @@ -168,44 +168,18 @@ def async_timed_cache(duration: int): def cache_concurrent(async_fn): """ - When the decorated function has concurrent calls made to it with the same arguments, only run it the once + When the decorated function has concurrent calls made to it with the same arguments, only run it once """ - running: typing.Optional[asyncio.Event] = None cache: typing.Dict = {} - def initialize_running_event_and_cache(): - # this is to avoid automatically setting up an event loop by using the decorator - nonlocal running - if running is not None: - return - loop = asyncio.get_running_loop() - running = asyncio.Event(loop=loop) - @functools.wraps(async_fn) async def wrapper(*args, **kwargs): - if running is None: - initialize_running_event_and_cache() - loop = asyncio.get_running_loop() key = tuple([args, tuple([tuple([k, kwargs[k]]) for k in kwargs])]) - if running.is_set() and key in cache: - return await cache[key] - running.set() - if key not in cache: - cache[key] = loop.create_future() - error = False + cache[key] = cache.get(key) or asyncio.create_task(async_fn(*args, **kwargs)) try: - result = await async_fn(*args, **kwargs) - cache[key].set_result(result) - except Exception as err: - cache[key].set_exception(err) - error = True + return await cache[key] finally: - fut = cache.pop(key) - if not cache: - running.clear() - if error: - raise fut.exception() - return fut.result() + cache.pop(key, None) return wrapper