forked from LBRYCommunity/lbry-sdk
move request semaphore to session and apply to all requests
This commit is contained in:
parent
e7fdf2aeb3
commit
ac88ada50e
1 changed files with 14 additions and 16 deletions
|
@ -29,7 +29,7 @@ class ClientSession(BaseClientSession):
|
||||||
self.response_time: Optional[float] = None
|
self.response_time: Optional[float] = None
|
||||||
self.connection_latency: Optional[float] = None
|
self.connection_latency: Optional[float] = None
|
||||||
self._response_samples = 0
|
self._response_samples = 0
|
||||||
self.pending_amount = 0
|
self._concurrency = asyncio.Semaphore(16)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def available(self):
|
def available(self):
|
||||||
|
@ -55,9 +55,9 @@ class ClientSession(BaseClientSession):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
async def send_request(self, method, args=()):
|
async def send_request(self, method, args=()):
|
||||||
self.pending_amount += 1
|
|
||||||
log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
|
log.debug("send %s%s to %s:%i (%i timeout)", method, tuple(args), self.server[0], self.server[1], self.timeout)
|
||||||
try:
|
try:
|
||||||
|
await self._concurrency.acquire()
|
||||||
if method == 'server.version':
|
if method == 'server.version':
|
||||||
return await self.send_timed_server_version_request(args, self.timeout)
|
return await self.send_timed_server_version_request(args, self.timeout)
|
||||||
request = asyncio.ensure_future(super().send_request(method, args))
|
request = asyncio.ensure_future(super().send_request(method, args))
|
||||||
|
@ -91,7 +91,7 @@ class ClientSession(BaseClientSession):
|
||||||
# self.synchronous_close()
|
# self.synchronous_close()
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
self.pending_amount -= 1
|
self._concurrency.release()
|
||||||
|
|
||||||
async def ensure_server_version(self, required=None, timeout=3):
|
async def ensure_server_version(self, required=None, timeout=3):
|
||||||
required = required or self.network.PROTOCOL_VERSION
|
required = required or self.network.PROTOCOL_VERSION
|
||||||
|
@ -154,7 +154,6 @@ class Network:
|
||||||
# self._switch_task: Optional[asyncio.Task] = None
|
# self._switch_task: Optional[asyncio.Task] = None
|
||||||
self.running = False
|
self.running = False
|
||||||
self.remote_height: int = 0
|
self.remote_height: int = 0
|
||||||
self._concurrency = asyncio.Semaphore(16)
|
|
||||||
|
|
||||||
self._on_connected_controller = StreamController()
|
self._on_connected_controller = StreamController()
|
||||||
self.on_connected = self._on_connected_controller.stream
|
self.on_connected = self._on_connected_controller.stream
|
||||||
|
@ -344,7 +343,6 @@ class Network:
|
||||||
raise ConnectionError("Attempting to send rpc request when connection is not available.")
|
raise ConnectionError("Attempting to send rpc request when connection is not available.")
|
||||||
|
|
||||||
async def retriable_call(self, function, *args, **kwargs):
|
async def retriable_call(self, function, *args, **kwargs):
|
||||||
async with self._concurrency:
|
|
||||||
while self.running:
|
while self.running:
|
||||||
if not self.is_connected:
|
if not self.is_connected:
|
||||||
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
|
log.warning("Wallet server unavailable, waiting for it to come back and retry.")
|
||||||
|
|
Loading…
Add table
Reference in a new issue