forked from LBRYCommunity/lbry-sdk
request scheduler
This commit is contained in:
parent
b2de89ca29
commit
28e4b3eefd
2 changed files with 24 additions and 3 deletions
|
@ -424,8 +424,7 @@ class RPCSession(SessionBase):
|
|||
self.max_errors = 0
|
||||
self._bump_errors()
|
||||
else:
|
||||
for request in requests:
|
||||
await self._task_group.add(self._handle_request(request))
|
||||
await self.schedule_requests(requests)
|
||||
|
||||
async def _handle_request(self, request):
|
||||
start = time.perf_counter()
|
||||
|
@ -472,6 +471,10 @@ class RPCSession(SessionBase):
|
|||
async def handle_request(self, request):
|
||||
pass
|
||||
|
||||
async def schedule_requests(self, requests):
|
||||
for request in requests:
|
||||
self._task_group.add(self._handle_request(request))
|
||||
|
||||
async def send_request(self, method, args=()):
|
||||
"""Send an RPC request over the network."""
|
||||
if self.is_closing():
|
||||
|
|
|
@ -197,6 +197,8 @@ class SessionManager:
|
|||
self.notified_height: typing.Optional[int] = None
|
||||
# Cache some idea of room to avoid recounting on each subscription
|
||||
self.subs_room = 0
|
||||
self.consumers = 32
|
||||
self.priority_queue = asyncio.PriorityQueue()
|
||||
|
||||
self.session_event = Event()
|
||||
|
||||
|
@ -546,7 +548,8 @@ class SessionManager:
|
|||
# because we connect to ourself
|
||||
await asyncio.wait([
|
||||
self._clear_stale_sessions(),
|
||||
self._manage_servers()
|
||||
self._manage_servers(),
|
||||
self.serve_requests()
|
||||
])
|
||||
except Exception as err:
|
||||
if not isinstance(err, asyncio.CancelledError):
|
||||
|
@ -561,6 +564,13 @@ class SessionManager:
|
|||
])
|
||||
await self.stop_other()
|
||||
|
||||
async def serve_requests(self):
|
||||
async def consumer():
|
||||
while True:
|
||||
_, fut = await self.priority_queue.get()
|
||||
await fut
|
||||
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
|
||||
|
||||
async def start_other(self):
|
||||
pass
|
||||
|
||||
|
@ -876,6 +886,14 @@ class LBRYElectrumX(SessionBase):
|
|||
self.daemon = self.session_mgr.daemon
|
||||
self.bp: BlockProcessor = self.session_mgr.bp
|
||||
self.db: LevelDB = self.bp.db
|
||||
self.time_since_last_request = time.perf_counter()
|
||||
|
||||
async def schedule_requests(self, requests):
|
||||
for request in requests:
|
||||
current = time.perf_counter()
|
||||
elapsed = current - self.time_since_last_request
|
||||
self.time_since_last_request = current
|
||||
self.session_mgr.priority_queue.put_nowait((elapsed, self._handle_request(request)))
|
||||
|
||||
@classmethod
|
||||
def protocol_min_max_strings(cls):
|
||||
|
|
Loading…
Add table
Reference in a new issue