diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 3602bcad0..8b04c1ac2 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -424,7 +424,7 @@ class RPCSession(SessionBase): self.max_errors = 0 self._bump_errors() else: - await self.schedule_requests(requests) + self.schedule_requests(requests) async def _handle_request(self, request): start = time.perf_counter() @@ -471,7 +471,7 @@ class RPCSession(SessionBase): async def handle_request(self, request): pass - async def schedule_requests(self, requests): + def schedule_requests(self, requests): for request in requests: self._task_group.add(self._handle_request(request)) diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 9373a7288..b7919b53e 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -564,7 +564,12 @@ class SessionManager: async def consumer(): while True: _, fut = await self.priority_queue.get() - await fut + try: + await fut + except asyncio.CancelledError: + raise + except Exception as e: + log.exception("raised while serving a request. This should never happen.") await asyncio.gather(*(consumer() for _ in range(self.consumers))) async def start_other(self): @@ -892,7 +897,7 @@ class LBRYElectrumX(SessionBase): self.db: LevelDB = self.bp.db self.time_since_last_request = time.perf_counter() - async def schedule_requests(self, requests): + def schedule_requests(self, requests): for request in requests: current = time.perf_counter() elapsed = current - self.time_since_last_request