diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index f2b8deba9..542408d87 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -19,7 +19,6 @@ class ClientSession(BaseClientSession): self._on_disconnect_controller = StreamController() self.on_disconnected = self._on_disconnect_controller.stream self.framer.max_size = self.max_errors = 1 << 32 - self.bw_limit = -1 self.timeout = timeout self.max_seconds_idle = timeout * 2 self.response_time: Optional[float] = None diff --git a/lbry/wallet/rpc/session.py b/lbry/wallet/rpc/session.py index 678c265cd..619bff791 100644 --- a/lbry/wallet/rpc/session.py +++ b/lbry/wallet/rpc/session.py @@ -62,7 +62,6 @@ class Connector: async def __aenter__(self): transport, self.protocol = await self.create_connection() # By default, do not limit outgoing connections - self.protocol.bw_limit = 0 return self.protocol async def __aexit__(self, exc_type, exc_value, traceback): @@ -112,35 +111,6 @@ class SessionBase(asyncio.Protocol): self.recv_size = 0 self.last_recv = self.start_time self.last_packet_received = self.start_time - # Bandwidth usage per hour before throttling starts - self.bw_limit = 2000000 - self.bw_time = self.start_time - self.bw_charge = 0 - # Concurrency control - self.max_concurrent = 6 - self._concurrency = Concurrency(self.max_concurrent) - - async def _update_concurrency(self): - # A non-positive value means not to limit concurrency - if self.bw_limit <= 0: - return - now = time.perf_counter() - # Reduce the recorded usage in proportion to the elapsed time - refund = (now - self.bw_time) * (self.bw_limit / 3600) - self.bw_charge = max(0, self.bw_charge - int(refund)) - self.bw_time = now - # Reduce concurrency allocation by 1 for each whole bw_limit used - throttle = int(self.bw_charge / self.bw_limit) - target = max(1, self.max_concurrent - throttle) - current = self._concurrency.max_concurrent - if target != current: - self.logger.info(f'changing task concurrency from {current} ' - f'to {target}') - await self._concurrency.set_max_concurrent(target) - - def _using_bandwidth(self, size): - """Called when sending or receiving size bytes.""" - self.bw_charge += size async def _limited_wait(self, secs): try: @@ -155,7 +125,6 @@ class SessionBase(asyncio.Protocol): if not self.is_closing(): framed_message = self.framer.frame(message) self.send_size += len(framed_message) - self._using_bandwidth(len(framed_message)) self.send_count += 1 self.last_send = time.perf_counter() if self.verbosity >= 4: @@ -179,7 +148,6 @@ class SessionBase(asyncio.Protocol): if self.verbosity >= 4: self.logger.debug(f'Received framed message {framed_message}') self.recv_size += len(framed_message) - self._using_bandwidth(len(framed_message)) self.framer.received_bytes(framed_message) def pause_writing(self): @@ -307,23 +275,19 @@ class MessageSession(SessionBase): else: self.last_recv = time.perf_counter() self.recv_count += 1 - if self.recv_count % 10 == 0: - await self._update_concurrency() - await self._task_group.add(self._throttled_message(message)) + await self._task_group.add(self._handle_message(message)) - async def _throttled_message(self, message): - """Process a single request, respecting the concurrency limit.""" - async with self._concurrency.semaphore: - try: - await self.handle_message(message) - except ProtocolError as e: - self.logger.error(f'{e}') - self._bump_errors() - except CancelledError: - raise - except Exception: - self.logger.exception(f'exception handling {message}') - self._bump_errors() + async def _handle_message(self, message): + try: + await self.handle_message(message) + except ProtocolError as e: + self.logger.error(f'{e}') + self._bump_errors() + except CancelledError: + raise + except Exception: + self.logger.exception(f'exception handling {message}') + self._bump_errors() # External API def default_framer(self): @@ -427,8 +391,6 @@ class RPCSession(SessionBase): self.last_recv = time.perf_counter() self.recv_count += 1 - if self.recv_count % 10 == 0: - await self._update_concurrency() try: requests = self.connection.receive_message(message) @@ -441,27 +403,25 @@ class RPCSession(SessionBase): self._bump_errors() else: for request in requests: - await self._task_group.add(self._throttled_request(request)) + await self._task_group.add(self._handle_request(request)) - async def _throttled_request(self, request): - """Process a single request, respecting the concurrency limit.""" - async with self._concurrency.semaphore: - try: - result = await self.handle_request(request) - except (ProtocolError, RPCError) as e: - result = e - except CancelledError: - raise - except Exception: - self.logger.exception(f'exception handling {request}') - result = RPCError(JSONRPC.INTERNAL_ERROR, - 'internal server error') - if isinstance(request, Request): - message = request.send_result(result) - if message: - await self._send_message(message) - if isinstance(result, Exception): - self._bump_errors() + async def _handle_request(self, request): + try: + result = await self.handle_request(request) + except (ProtocolError, RPCError) as e: + result = e + except CancelledError: + raise + except Exception: + self.logger.exception(f'exception handling {request}') + result = RPCError(JSONRPC.INTERNAL_ERROR, + 'internal server error') + if isinstance(request, Request): + message = request.send_result(result) + if message: + await self._send_message(message) + if isinstance(result, Exception): + self._bump_errors() def connection_lost(self, exc): # Cancel pending requests and message processing diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 3e94a37a7..9cca31d2b 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -689,8 +689,6 @@ class SessionBase(RPCSession): msg = '' if not self._can_send.is_set(): msg += ' whilst paused' - if self._concurrency.max_concurrent != self.max_concurrent: - msg += ' whilst throttled' if self.send_size >= 1024*1024: msg += ('. Sent {:,d} bytes in {:,d} messages' .format(self.send_size, self.send_count)) @@ -702,7 +700,7 @@ class SessionBase(RPCSession): return len(self.connection.pending_requests()) def semaphore(self): - return Semaphores([self._concurrency.semaphore, self.group.semaphore]) + return Semaphores([self.group.semaphore]) def sub_count(self): return 0