fix cache_concurrent decorator
This commit is contained in:
parent
861c25716e
commit
432fe444f8
1 changed files with 4 additions and 30 deletions
|
@ -168,44 +168,18 @@ def async_timed_cache(duration: int):
|
|||
|
||||
def cache_concurrent(async_fn):
|
||||
"""
|
||||
When the decorated function has concurrent calls made to it with the same arguments, only run it the once
|
||||
When the decorated function has concurrent calls made to it with the same arguments, only run it once
|
||||
"""
|
||||
running: typing.Optional[asyncio.Event] = None
|
||||
cache: typing.Dict = {}
|
||||
|
||||
def initialize_running_event_and_cache():
|
||||
# this is to avoid automatically setting up an event loop by using the decorator
|
||||
nonlocal running
|
||||
if running is not None:
|
||||
return
|
||||
loop = asyncio.get_running_loop()
|
||||
running = asyncio.Event(loop=loop)
|
||||
|
||||
@functools.wraps(async_fn)
|
||||
async def wrapper(*args, **kwargs):
|
||||
if running is None:
|
||||
initialize_running_event_and_cache()
|
||||
loop = asyncio.get_running_loop()
|
||||
key = tuple([args, tuple([tuple([k, kwargs[k]]) for k in kwargs])])
|
||||
if running.is_set() and key in cache:
|
||||
return await cache[key]
|
||||
running.set()
|
||||
if key not in cache:
|
||||
cache[key] = loop.create_future()
|
||||
error = False
|
||||
cache[key] = cache.get(key) or asyncio.create_task(async_fn(*args, **kwargs))
|
||||
try:
|
||||
result = await async_fn(*args, **kwargs)
|
||||
cache[key].set_result(result)
|
||||
except Exception as err:
|
||||
cache[key].set_exception(err)
|
||||
error = True
|
||||
return await cache[key]
|
||||
finally:
|
||||
fut = cache.pop(key)
|
||||
if not cache:
|
||||
running.clear()
|
||||
if error:
|
||||
raise fut.exception()
|
||||
return fut.result()
|
||||
cache.pop(key, None)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
|
Loading…
Reference in a new issue