This commit is contained in:
Jack Robison 2022-02-01 14:55:28 -05:00
parent 888d47f88b
commit 6a5ff0636c
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 11 additions and 5 deletions

View file

@ -108,7 +108,7 @@ class BlockchainReader:
class BlockchainReaderServer(BlockchainReader): class BlockchainReaderServer(BlockchainReader):
def __init__(self, env): def __init__(self, env):
super().__init__(env, 'lbry-reader', thread_workers=1, thread_prefix='hub-worker') super().__init__(env, 'lbry-reader', thread_workers=max(1, env.max_query_workers), thread_prefix='hub-worker')
self.history_cache = {} self.history_cache = {}
self.resolve_outputs_cache = {} self.resolve_outputs_cache = {}
self.resolve_cache = {} self.resolve_cache = {}
@ -208,6 +208,8 @@ class BlockchainReaderServer(BlockchainReader):
await self.start_prometheus() await self.start_prometheus()
if self.env.udp_port and int(self.env.udp_port): if self.env.udp_port and int(self.env.udp_port):
self.log.warning("country=%s interface=%s:%s allow_lan=%s", self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp)
await self.status_server.start( await self.status_server.start(
0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country, 0, bytes.fromhex(self.env.coin.GENESIS_HASH)[::-1], self.env.country,
self.env.host, self.env.udp_port, self.env.allow_lan_udp self.env.host, self.env.udp_port, self.env.allow_lan_udp
@ -218,7 +220,7 @@ class BlockchainReaderServer(BlockchainReader):
await _start_cancellable(self.session_manager.serve, self.mempool) await _start_cancellable(self.session_manager.serve, self.mempool)
async def stop(self): async def stop(self):
self.status_server.stop() await self.status_server.stop()
async with self._lock: async with self._lock:
while self.cancellable_tasks: while self.cancellable_tasks:
t = self.cancellable_tasks.pop() t = self.cancellable_tasks.pop()

View file

@ -110,6 +110,7 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
self._min_delay = 1 / throttle_reqs_per_sec self._min_delay = 1 / throttle_reqs_per_sec
self._allow_localhost = allow_localhost self._allow_localhost = allow_localhost
self._allow_lan = allow_lan self._allow_lan = allow_lan
self.closed = asyncio.Event()
def update_cached_response(self): def update_cached_response(self):
self._left_cache, self._right_cache = SPVPong.make_sans_source_address( self._left_cache, self._right_cache = SPVPong.make_sans_source_address(
@ -160,13 +161,16 @@ class SPVServerStatusProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport) -> None: def connection_made(self, transport) -> None:
self.transport = transport self.transport = transport
self.closed.clear()
def connection_lost(self, exc: Optional[Exception]) -> None: def connection_lost(self, exc: Optional[Exception]) -> None:
self.transport = None self.transport = None
self.closed.set()
def close(self): async def close(self):
if self.transport: if self.transport:
self.transport.close() self.transport.close()
await self.closed.wait()
class StatusServer: class StatusServer:
@ -184,9 +188,9 @@ class StatusServer:
await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port)) await loop.create_datagram_endpoint(lambda: self._protocol, (interface, port))
log.info("started udp status server on %s:%i", interface, port) log.info("started udp status server on %s:%i", interface, port)
def stop(self): async def stop(self):
if self.is_running: if self.is_running:
self._protocol.close() await self._protocol.close()
self._protocol = None self._protocol = None
@property @property