Compare commits
4 commits
master
...
crazy_idea
Author | SHA1 | Date | |
---|---|---|---|
|
08a1ad65ab | ||
|
1d80ce2419 | ||
|
ea5a0f0090 | ||
|
8a2fd29645 |
2 changed files with 31 additions and 3 deletions
|
@ -424,10 +424,11 @@ class RPCSession(SessionBase):
|
||||||
self.max_errors = 0
|
self.max_errors = 0
|
||||||
self._bump_errors()
|
self._bump_errors()
|
||||||
else:
|
else:
|
||||||
for request in requests:
|
self.schedule_requests(requests)
|
||||||
await self._task_group.add(self._handle_request(request))
|
|
||||||
|
|
||||||
async def _handle_request(self, request):
|
async def _handle_request(self, request):
|
||||||
|
if self.is_closing():
|
||||||
|
return
|
||||||
start = time.perf_counter()
|
start = time.perf_counter()
|
||||||
try:
|
try:
|
||||||
result = await self.handle_request(request)
|
result = await self.handle_request(request)
|
||||||
|
@ -472,6 +473,10 @@ class RPCSession(SessionBase):
|
||||||
async def handle_request(self, request):
|
async def handle_request(self, request):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def schedule_requests(self, requests):
|
||||||
|
for request in requests:
|
||||||
|
self._task_group.add(self._handle_request(request))
|
||||||
|
|
||||||
async def send_request(self, method, args=()):
|
async def send_request(self, method, args=()):
|
||||||
"""Send an RPC request over the network."""
|
"""Send an RPC request over the network."""
|
||||||
if self.is_closing():
|
if self.is_closing():
|
||||||
|
|
|
@ -197,6 +197,8 @@ class SessionManager:
|
||||||
self.notified_height: typing.Optional[int] = None
|
self.notified_height: typing.Optional[int] = None
|
||||||
# Cache some idea of room to avoid recounting on each subscription
|
# Cache some idea of room to avoid recounting on each subscription
|
||||||
self.subs_room = 0
|
self.subs_room = 0
|
||||||
|
self.consumers = 32
|
||||||
|
self.priority_queue = asyncio.PriorityQueue()
|
||||||
|
|
||||||
self.session_event = Event()
|
self.session_event = Event()
|
||||||
|
|
||||||
|
@ -546,7 +548,8 @@ class SessionManager:
|
||||||
# because we connect to ourself
|
# because we connect to ourself
|
||||||
await asyncio.wait([
|
await asyncio.wait([
|
||||||
self._clear_stale_sessions(),
|
self._clear_stale_sessions(),
|
||||||
self._manage_servers()
|
self._manage_servers(),
|
||||||
|
self.serve_requests()
|
||||||
])
|
])
|
||||||
finally:
|
finally:
|
||||||
await self._close_servers(list(self.servers.keys()))
|
await self._close_servers(list(self.servers.keys()))
|
||||||
|
@ -557,6 +560,18 @@ class SessionManager:
|
||||||
])
|
])
|
||||||
await self.stop_other()
|
await self.stop_other()
|
||||||
|
|
||||||
|
async def serve_requests(self):
|
||||||
|
async def consumer():
|
||||||
|
while True:
|
||||||
|
_, fut = await self.priority_queue.get()
|
||||||
|
try:
|
||||||
|
await fut
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
log.exception("raised while serving a request. This should never happen.")
|
||||||
|
await asyncio.gather(*(consumer() for _ in range(self.consumers)))
|
||||||
|
|
||||||
async def start_other(self):
|
async def start_other(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -880,6 +895,14 @@ class LBRYElectrumX(SessionBase):
|
||||||
self.daemon = self.session_mgr.daemon
|
self.daemon = self.session_mgr.daemon
|
||||||
self.bp: BlockProcessor = self.session_mgr.bp
|
self.bp: BlockProcessor = self.session_mgr.bp
|
||||||
self.db: LevelDB = self.bp.db
|
self.db: LevelDB = self.bp.db
|
||||||
|
self.last_request_received_at = 0
|
||||||
|
|
||||||
|
def schedule_requests(self, requests):
|
||||||
|
for request in requests:
|
||||||
|
current = time.perf_counter()
|
||||||
|
elapsed = current - self.last_request_received_at
|
||||||
|
self.last_request_received_at = current
|
||||||
|
self.session_mgr.priority_queue.put_nowait((elapsed, self._handle_request(request)))
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def protocol_min_max_strings(cls):
|
def protocol_min_max_strings(cls):
|
||||||
|
|
Loading…
Add table
Reference in a new issue