From 28e4b3eefd485001f825c1f722d02a3c07d28979 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 13 Oct 2021 15:47:32 -0300 Subject: [PATCH] request scheduler --- lbry/wallet/rpc/session.py | 7 +++++-- lbry/wallet/server/session.py | 20 +++++++++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 762bb21cd..3602bcad0 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -424,8 +424,7 @@ class RPCSession(SessionBase): self.max_errors = 0 self._bump_errors() else: - for request in requests: - await self._task_group.add(self._handle_request(request)) + await self.schedule_requests(requests) async def _handle_request(self, request): start = time.perf_counter() @@ -472,6 +471,10 @@ class RPCSession(SessionBase): async def handle_request(self, request): pass + async def schedule_requests(self, requests): + for request in requests: + self._task_group.add(self._handle_request(request)) + async def send_request(self, method, args=()): """Send an RPC request over the network.""" if self.is_closing(): diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index ca4a0142f..dbcb6bfd4 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -197,6 +197,8 @@ class SessionManager: self.notified_height: typing.Optional[int] = None # Cache some idea of room to avoid recounting on each subscription self.subs_room = 0 + self.consumers = 32 + self.priority_queue = asyncio.PriorityQueue() self.session_event = Event() @@ -546,7 +548,8 @@ class SessionManager: # because we connect to ourself await asyncio.wait([ self._clear_stale_sessions(), - self._manage_servers() + self._manage_servers(), + self.serve_requests() ]) except Exception as err: if not isinstance(err, asyncio.CancelledError): @@ -561,6 +564,13 @@ class SessionManager: ]) await self.stop_other() + async def serve_requests(self): + async def consumer(): + while True: + _, fut = await self.priority_queue.get() + await fut + await asyncio.gather(*(consumer() for _ in range(self.consumers))) + async def start_other(self): pass @@ -876,6 +886,14 @@ class LBRYElectrumX(SessionBase): self.daemon = self.session_mgr.daemon self.bp: BlockProcessor = self.session_mgr.bp self.db: LevelDB = self.bp.db + self.time_since_last_request = time.perf_counter() + + async def schedule_requests(self, requests): + for request in requests: + current = time.perf_counter() + elapsed = current - self.time_since_last_request + self.time_since_last_request = current + self.session_mgr.priority_queue.put_nowait((elapsed, self._handle_request(request))) @classmethod def protocol_min_max_strings(cls):