remove bw_limit on server
This commit is contained in:
parent
7d2eb5faf7
commit
60194cbafd
3 changed files with 31 additions and 74 deletions
|
@ -19,7 +19,6 @@ class ClientSession(BaseClientSession):
|
||||||
self._on_disconnect_controller = StreamController()
|
self._on_disconnect_controller = StreamController()
|
||||||
self.on_disconnected = self._on_disconnect_controller.stream
|
self.on_disconnected = self._on_disconnect_controller.stream
|
||||||
self.framer.max_size = self.max_errors = 1 << 32
|
self.framer.max_size = self.max_errors = 1 << 32
|
||||||
self.bw_limit = -1
|
|
||||||
self.timeout = timeout
|
self.timeout = timeout
|
||||||
self.max_seconds_idle = timeout * 2
|
self.max_seconds_idle = timeout * 2
|
||||||
self.response_time: Optional[float] = None
|
self.response_time: Optional[float] = None
|
||||||
|
|
|
@ -62,7 +62,6 @@ class Connector:
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
transport, self.protocol = await self.create_connection()
|
transport, self.protocol = await self.create_connection()
|
||||||
# By default, do not limit outgoing connections
|
# By default, do not limit outgoing connections
|
||||||
self.protocol.bw_limit = 0
|
|
||||||
return self.protocol
|
return self.protocol
|
||||||
|
|
||||||
async def __aexit__(self, exc_type, exc_value, traceback):
|
async def __aexit__(self, exc_type, exc_value, traceback):
|
||||||
|
@ -112,35 +111,6 @@ class SessionBase(asyncio.Protocol):
|
||||||
self.recv_size = 0
|
self.recv_size = 0
|
||||||
self.last_recv = self.start_time
|
self.last_recv = self.start_time
|
||||||
self.last_packet_received = self.start_time
|
self.last_packet_received = self.start_time
|
||||||
# Bandwidth usage per hour before throttling starts
|
|
||||||
self.bw_limit = 2000000
|
|
||||||
self.bw_time = self.start_time
|
|
||||||
self.bw_charge = 0
|
|
||||||
# Concurrency control
|
|
||||||
self.max_concurrent = 6
|
|
||||||
self._concurrency = Concurrency(self.max_concurrent)
|
|
||||||
|
|
||||||
async def _update_concurrency(self):
|
|
||||||
# A non-positive value means not to limit concurrency
|
|
||||||
if self.bw_limit <= 0:
|
|
||||||
return
|
|
||||||
now = time.perf_counter()
|
|
||||||
# Reduce the recorded usage in proportion to the elapsed time
|
|
||||||
refund = (now - self.bw_time) * (self.bw_limit / 3600)
|
|
||||||
self.bw_charge = max(0, self.bw_charge - int(refund))
|
|
||||||
self.bw_time = now
|
|
||||||
# Reduce concurrency allocation by 1 for each whole bw_limit used
|
|
||||||
throttle = int(self.bw_charge / self.bw_limit)
|
|
||||||
target = max(1, self.max_concurrent - throttle)
|
|
||||||
current = self._concurrency.max_concurrent
|
|
||||||
if target != current:
|
|
||||||
self.logger.info(f'changing task concurrency from {current} '
|
|
||||||
f'to {target}')
|
|
||||||
await self._concurrency.set_max_concurrent(target)
|
|
||||||
|
|
||||||
def _using_bandwidth(self, size):
|
|
||||||
"""Called when sending or receiving size bytes."""
|
|
||||||
self.bw_charge += size
|
|
||||||
|
|
||||||
async def _limited_wait(self, secs):
|
async def _limited_wait(self, secs):
|
||||||
try:
|
try:
|
||||||
|
@ -155,7 +125,6 @@ class SessionBase(asyncio.Protocol):
|
||||||
if not self.is_closing():
|
if not self.is_closing():
|
||||||
framed_message = self.framer.frame(message)
|
framed_message = self.framer.frame(message)
|
||||||
self.send_size += len(framed_message)
|
self.send_size += len(framed_message)
|
||||||
self._using_bandwidth(len(framed_message))
|
|
||||||
self.send_count += 1
|
self.send_count += 1
|
||||||
self.last_send = time.perf_counter()
|
self.last_send = time.perf_counter()
|
||||||
if self.verbosity >= 4:
|
if self.verbosity >= 4:
|
||||||
|
@ -179,7 +148,6 @@ class SessionBase(asyncio.Protocol):
|
||||||
if self.verbosity >= 4:
|
if self.verbosity >= 4:
|
||||||
self.logger.debug(f'Received framed message {framed_message}')
|
self.logger.debug(f'Received framed message {framed_message}')
|
||||||
self.recv_size += len(framed_message)
|
self.recv_size += len(framed_message)
|
||||||
self._using_bandwidth(len(framed_message))
|
|
||||||
self.framer.received_bytes(framed_message)
|
self.framer.received_bytes(framed_message)
|
||||||
|
|
||||||
def pause_writing(self):
|
def pause_writing(self):
|
||||||
|
@ -307,23 +275,19 @@ class MessageSession(SessionBase):
|
||||||
else:
|
else:
|
||||||
self.last_recv = time.perf_counter()
|
self.last_recv = time.perf_counter()
|
||||||
self.recv_count += 1
|
self.recv_count += 1
|
||||||
if self.recv_count % 10 == 0:
|
await self._task_group.add(self._handle_message(message))
|
||||||
await self._update_concurrency()
|
|
||||||
await self._task_group.add(self._throttled_message(message))
|
|
||||||
|
|
||||||
async def _throttled_message(self, message):
|
async def _handle_message(self, message):
|
||||||
"""Process a single request, respecting the concurrency limit."""
|
try:
|
||||||
async with self._concurrency.semaphore:
|
await self.handle_message(message)
|
||||||
try:
|
except ProtocolError as e:
|
||||||
await self.handle_message(message)
|
self.logger.error(f'{e}')
|
||||||
except ProtocolError as e:
|
self._bump_errors()
|
||||||
self.logger.error(f'{e}')
|
except CancelledError:
|
||||||
self._bump_errors()
|
raise
|
||||||
except CancelledError:
|
except Exception:
|
||||||
raise
|
self.logger.exception(f'exception handling {message}')
|
||||||
except Exception:
|
self._bump_errors()
|
||||||
self.logger.exception(f'exception handling {message}')
|
|
||||||
self._bump_errors()
|
|
||||||
|
|
||||||
# External API
|
# External API
|
||||||
def default_framer(self):
|
def default_framer(self):
|
||||||
|
@ -427,8 +391,6 @@ class RPCSession(SessionBase):
|
||||||
|
|
||||||
self.last_recv = time.perf_counter()
|
self.last_recv = time.perf_counter()
|
||||||
self.recv_count += 1
|
self.recv_count += 1
|
||||||
if self.recv_count % 10 == 0:
|
|
||||||
await self._update_concurrency()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
requests = self.connection.receive_message(message)
|
requests = self.connection.receive_message(message)
|
||||||
|
@ -441,27 +403,25 @@ class RPCSession(SessionBase):
|
||||||
self._bump_errors()
|
self._bump_errors()
|
||||||
else:
|
else:
|
||||||
for request in requests:
|
for request in requests:
|
||||||
await self._task_group.add(self._throttled_request(request))
|
await self._task_group.add(self._handle_request(request))
|
||||||
|
|
||||||
async def _throttled_request(self, request):
|
async def _handle_request(self, request):
|
||||||
"""Process a single request, respecting the concurrency limit."""
|
try:
|
||||||
async with self._concurrency.semaphore:
|
result = await self.handle_request(request)
|
||||||
try:
|
except (ProtocolError, RPCError) as e:
|
||||||
result = await self.handle_request(request)
|
result = e
|
||||||
except (ProtocolError, RPCError) as e:
|
except CancelledError:
|
||||||
result = e
|
raise
|
||||||
except CancelledError:
|
except Exception:
|
||||||
raise
|
self.logger.exception(f'exception handling {request}')
|
||||||
except Exception:
|
result = RPCError(JSONRPC.INTERNAL_ERROR,
|
||||||
self.logger.exception(f'exception handling {request}')
|
'internal server error')
|
||||||
result = RPCError(JSONRPC.INTERNAL_ERROR,
|
if isinstance(request, Request):
|
||||||
'internal server error')
|
message = request.send_result(result)
|
||||||
if isinstance(request, Request):
|
if message:
|
||||||
message = request.send_result(result)
|
await self._send_message(message)
|
||||||
if message:
|
if isinstance(result, Exception):
|
||||||
await self._send_message(message)
|
self._bump_errors()
|
||||||
if isinstance(result, Exception):
|
|
||||||
self._bump_errors()
|
|
||||||
|
|
||||||
def connection_lost(self, exc):
|
def connection_lost(self, exc):
|
||||||
# Cancel pending requests and message processing
|
# Cancel pending requests and message processing
|
||||||
|
|
|
@ -689,8 +689,6 @@ class SessionBase(RPCSession):
|
||||||
msg = ''
|
msg = ''
|
||||||
if not self._can_send.is_set():
|
if not self._can_send.is_set():
|
||||||
msg += ' whilst paused'
|
msg += ' whilst paused'
|
||||||
if self._concurrency.max_concurrent != self.max_concurrent:
|
|
||||||
msg += ' whilst throttled'
|
|
||||||
if self.send_size >= 1024*1024:
|
if self.send_size >= 1024*1024:
|
||||||
msg += ('. Sent {:,d} bytes in {:,d} messages'
|
msg += ('. Sent {:,d} bytes in {:,d} messages'
|
||||||
.format(self.send_size, self.send_count))
|
.format(self.send_size, self.send_count))
|
||||||
|
@ -702,7 +700,7 @@ class SessionBase(RPCSession):
|
||||||
return len(self.connection.pending_requests())
|
return len(self.connection.pending_requests())
|
||||||
|
|
||||||
def semaphore(self):
|
def semaphore(self):
|
||||||
return Semaphores([self._concurrency.semaphore, self.group.semaphore])
|
return Semaphores([self.group.semaphore])
|
||||||
|
|
||||||
def sub_count(self):
|
def sub_count(self):
|
||||||
return 0
|
return 0
|
||||||
|
|
Loading…
Reference in a new issue