Compare commits

...

4 commits

Author SHA1 Message Date
Victor Shyba
08a1ad65ab ignore request if protocol closed 2021-10-13 16:01:55 -03:00
Victor Shyba
1d80ce2419 initialize new sessions with high priority 2021-10-13 15:57:15 -03:00
Victor Shyba
ea5a0f0090 schedule_requests is not async 2021-10-13 15:55:02 -03:00
Victor Shyba
8a2fd29645 request scheduler 2021-10-13 15:49:02 -03:00
2 changed files with 31 additions and 3 deletions

View file

@ -424,10 +424,11 @@ class RPCSession(SessionBase):
self.max_errors = 0 self.max_errors = 0
self._bump_errors() self._bump_errors()
else: else:
for request in requests: self.schedule_requests(requests)
await self._task_group.add(self._handle_request(request))
async def _handle_request(self, request): async def _handle_request(self, request):
if self.is_closing():
return
start = time.perf_counter() start = time.perf_counter()
try: try:
result = await self.handle_request(request) result = await self.handle_request(request)
@ -472,6 +473,10 @@ class RPCSession(SessionBase):
async def handle_request(self, request): async def handle_request(self, request):
pass pass
def schedule_requests(self, requests):
for request in requests:
self._task_group.add(self._handle_request(request))
async def send_request(self, method, args=()): async def send_request(self, method, args=()):
"""Send an RPC request over the network.""" """Send an RPC request over the network."""
if self.is_closing(): if self.is_closing():

View file

@ -197,6 +197,8 @@ class SessionManager:
self.notified_height: typing.Optional[int] = None self.notified_height: typing.Optional[int] = None
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
self.consumers = 32
self.priority_queue = asyncio.PriorityQueue()
self.session_event = Event() self.session_event = Event()
@ -546,7 +548,8 @@ class SessionManager:
# because we connect to ourself # because we connect to ourself
await asyncio.wait([ await asyncio.wait([
self._clear_stale_sessions(), self._clear_stale_sessions(),
self._manage_servers() self._manage_servers(),
self.serve_requests()
]) ])
finally: finally:
await self._close_servers(list(self.servers.keys())) await self._close_servers(list(self.servers.keys()))
@ -557,6 +560,18 @@ class SessionManager:
]) ])
await self.stop_other() await self.stop_other()
async def serve_requests(self):
async def consumer():
while True:
_, fut = await self.priority_queue.get()
try:
await fut
except asyncio.CancelledError:
raise
except Exception as e:
log.exception("raised while serving a request. This should never happen.")
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
async def start_other(self): async def start_other(self):
pass pass
@ -880,6 +895,14 @@ class LBRYElectrumX(SessionBase):
self.daemon = self.session_mgr.daemon self.daemon = self.session_mgr.daemon
self.bp: BlockProcessor = self.session_mgr.bp self.bp: BlockProcessor = self.session_mgr.bp
self.db: LevelDB = self.bp.db self.db: LevelDB = self.bp.db
self.last_request_received_at = 0
def schedule_requests(self, requests):
for request in requests:
current = time.perf_counter()
elapsed = current - self.last_request_received_at
self.last_request_received_at = current
self.session_mgr.priority_queue.put_nowait((elapsed, self._handle_request(request)))
@classmethod @classmethod
def protocol_min_max_strings(cls): def protocol_min_max_strings(cls):