From ac88ada50e2f811379c660170806a4eb0cb5eaf5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 13 Mar 2021 02:39:40 -0300 Subject: [PATCH] move request semaphore to session and apply to all requests --- lbry/wallet/network.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 7dc3319d2..4fbb970c7 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -29,7 +29,7 @@ class ClientSession(BaseClientSession): self.response_time: Optional[float] = None self.connection_latency: Optional[float] = None self._response_samples = 0 - self.pending_amount = 0 + self._concurrency = asyncio.Semaphore(16) @property def available(self): @@ -55,9 +55,9 @@ class ClientSession(BaseClientSession): return result async def send_request(self, method, args=()): - self.pending_amount += 1 log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout) try: + await self._concurrency.acquire() if method == 'server.version': return await self.send_timed_server_version_request(args, self.timeout) request = asyncio.ensure_future(super().send_request(method, args)) @@ -91,7 +91,7 @@ class ClientSession(BaseClientSession): # self.synchronous_close() raise finally: - self.pending_amount -= 1 + self._concurrency.release() async def ensure_server_version(self, required=None, timeout=3): required = required or self.network.PROTOCOL_VERSION @@ -154,7 +154,6 @@ class Network: # self._switch_task: Optional[asyncio.Task] = None self.running = False self.remote_height: int = 0 - self._concurrency = asyncio.Semaphore(16) self._on_connected_controller = StreamController() self.on_connected = self._on_connected_controller.stream @@ -344,18 +343,17 @@ class Network: raise ConnectionError("Attempting to send rpc request when connection is not available.") async def retriable_call(self, function, *args, **kwargs): - async with self._concurrency: - while self.running: - if not self.is_connected: - log.warning("Wallet server unavailable, waiting for it to come back and retry.") - self._urgent_need_reconnect.set() - await self.on_connected.first - try: - return await function(*args, **kwargs) - except asyncio.TimeoutError: - log.warning("Wallet server call timed out, retrying.") - except ConnectionError: - log.warning("connection error") + while self.running: + if not self.is_connected: + log.warning("Wallet server unavailable, waiting for it to come back and retry.") + self._urgent_need_reconnect.set() + await self.on_connected.first + try: + return await function(*args, **kwargs) + except asyncio.TimeoutError: + log.warning("Wallet server call timed out, retrying.") + except ConnectionError: + log.warning("connection error") raise asyncio.CancelledError() # if we got here, we are shutting down