diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index d3e05c277..61900a7ee 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -108,7 +108,7 @@ class BlockchainReader: class BlockchainReaderServer(BlockchainReader): def __init__(self, env): - super().__init__(env, 'lbry-reader', thread_workers=1, thread_prefix='hub-worker') + super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker') self.history_cache = {} self.resolve_outputs_cache = {} self.resolve_cache = {} @@ -208,6 +208,8 @@ class BlockchainReaderServer(BlockchainReader): await self.start_prometheus() if self.env.udp_port and int(self.env.udp_port): + self.log.warning("country=%s interface=%s:%s allow_lan=%s", self.env.country, + self.env.host, self.env.udp_port, self.env.allow_lan_udp) await self.status_server.start( 0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, self.env.host, self.env.udp_port, self.env.allow_lan_udp @@ -218,7 +220,7 @@ class BlockchainReaderServer(BlockchainReader): await _start_cancellable(self.session_manager.serve, self.mempool) async def stop(self): - self.status_server.stop() + await self.status_server.stop() async with self._lock: while self.cancellable_tasks: t = self.cancellable_tasks.pop() diff --git a/lbry/wallet/server/udp.py b/lbry/wallet/server/udp.py index c5520ac6b..06e737521 100644 --- a/lbry/wallet/server/udp.py +++ b/lbry/wallet/server/udp.py @@ -110,6 +110,7 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): self._min_delay = 1 / throttle_reqs_per_sec self._allow_localhost = allow_localhost self._allow_lan = allow_lan + self.closed = asyncio.Event() def update_cached_response(self): self._left_cache, self._right_cache = SPVPong.make_sans_source_address( @@ -160,13 +161,16 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol): def connection_made(self, transport) -> None: self.transport = transport + self.closed.clear() def connection_lost(self, exc: Optional[Exception]) -> None: self.transport = None + self.closed.set() - def close(self): + async def close(self): if self.transport: self.transport.close() + await self.closed.wait() class StatusServer: @@ -184,9 +188,9 @@ class StatusServer: await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port)) log.info("started udp status server on %s:%i", interface, port) - def stop(self): + async def stop(self): if self.is_running: - self._protocol.close() + await self._protocol.close() self._protocol = None @property