move request semaphore to session and apply to all requests

This commit is contained in:
Victor Shyba 2021-03-13 02:39:40 -03:00
parent 91323a21cf
commit d7007e402e

View file

@ -30,7 +30,7 @@ class ClientSession(BaseClientSession):
self.response_time: Optional[float] = None self.response_time: Optional[float] = None
self.connection_latency: Optional[float] = None self.connection_latency: Optional[float] = None
self._response_samples = 0 self._response_samples = 0
self.pending_amount = 0 self._concurrency = asyncio.Semaphore(16)
@property @property
def available(self): def available(self):
@ -56,9 +56,9 @@ class ClientSession(BaseClientSession):
return result return result
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
self.pending_amount += 1
log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout) log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
try: try:
await self._concurrency.acquire()
if method == 'server.version': if method == 'server.version':
return await self.send_timed_server_version_request(args, self.timeout) return await self.send_timed_server_version_request(args, self.timeout)
request = asyncio.ensure_future(super().send_request(method, args)) request = asyncio.ensure_future(super().send_request(method, args))
@ -92,7 +92,7 @@ class ClientSession(BaseClientSession):
# self.synchronous_close() # self.synchronous_close()
raise raise
finally: finally:
self.pending_amount -= 1 self._concurrency.release()
async def ensure_server_version(self, required=None, timeout=3): async def ensure_server_version(self, required=None, timeout=3):
required = required or self.network.PROTOCOL_VERSION required = required or self.network.PROTOCOL_VERSION
@ -155,7 +155,6 @@ class Network:
# self._switch_task: Optional[asyncio.Task] = None # self._switch_task: Optional[asyncio.Task] = None
self.running = False self.running = False
self.remote_height: int = 0 self.remote_height: int = 0
self._concurrency = asyncio.Semaphore(16)
self._on_connected_controller = StreamController() self._on_connected_controller = StreamController()
self.on_connected = self._on_connected_controller.stream self.on_connected = self._on_connected_controller.stream
@ -377,18 +376,17 @@ class Network:
raise ConnectionError("Attempting to send rpc request when connection is not available.") raise ConnectionError("Attempting to send rpc request when connection is not available.")
async def retriable_call(self, function, *args, **kwargs): async def retriable_call(self, function, *args, **kwargs):
async with self._concurrency: while self.running:
while self.running: if not self.is_connected:
if not self.is_connected: log.warning("Wallet server unavailable, waiting for it to come back and retry.")
log.warning("Wallet server unavailable, waiting for it to come back and retry.") self._urgent_need_reconnect.set()
self._urgent_need_reconnect.set() await self.on_connected.first
await self.on_connected.first try:
try: return await function(*args, **kwargs)
return await function(*args, **kwargs) except asyncio.TimeoutError:
except asyncio.TimeoutError: log.warning("Wallet server call timed out, retrying.")
log.warning("Wallet server call timed out, retrying.") except ConnectionError:
except ConnectionError: log.warning("connection error")
log.warning("connection error")
raise asyncio.CancelledError() # if we got here, we are shutting down raise asyncio.CancelledError() # if we got here, we are shutting down