diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 5f4a31aad..8e395824f 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -123,6 +123,7 @@ class BlockchainReaderServer(BlockchainReader): self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications) self.synchronized = asyncio.Event() self._es_height = None + self._es_block_hash = None def clear_caches(self): self.history_cache.clear() @@ -166,9 +167,9 @@ class BlockchainReaderServer(BlockchainReader): synchronized.set() try: while True: - self._es_height = await self.es_notifications.get() + self._es_height, self._es_block_hash = await self.es_notifications.get() self.clear_search_cache() - if self._es_height == self.db.db_height: + if self.last_state and self._es_block_hash == self.last_state.tip: self.synchronized.set() self.log.warning("es and reader are in sync") else: diff --git a/lbry/wallet/server/db/elasticsearch/notifier.py b/lbry/wallet/server/db/elasticsearch/notifier.py new file mode 100644 index 000000000..702bc23ae --- /dev/null +++ b/lbry/wallet/server/db/elasticsearch/notifier.py @@ -0,0 +1,51 @@ +import struct +import typing +import asyncio +import logging + + +log = logging.getLogger(__name__) + + +class ElasticNotifierProtocol(asyncio.Protocol): + """notifies the reader when ES has written updates""" + + def __init__(self, listeners): + self._listeners = listeners + self.transport: typing.Optional[asyncio.Transport] = None + + def connection_made(self, transport): + self.transport = transport + self._listeners.append(self) + log.warning("got es notifier connection") + + def connection_lost(self, exc) -> None: + self._listeners.remove(self) + self.transport = None + + def send_height(self, height: int, block_hash: bytes): + log.warning("notify es update '%s'", height) + self.transport.write(struct.pack(b'>Q32s', height, block_hash) + b'\n') + + +class ElasticNotifierClientProtocol(asyncio.Protocol): + """notifies the reader when ES has written updates""" + + def __init__(self, notifications: asyncio.Queue): + self.notifications = notifications + self.transport: typing.Optional[asyncio.Transport] = None + + def close(self): + if self.transport and not self.transport.is_closing(): + self.transport.close() + + def connection_made(self, transport): + self.transport = transport + log.warning("connected to es notifier") + + def connection_lost(self, exc) -> None: + self.transport = None + + def data_received(self, data: bytes) -> None: + height, block_hash = struct.unpack(b'>Q32s', data.rstrip(b'\n')) + self.notifications.put_nowait((height, block_hash)) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index 70d1923e5..7024d4eb5 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -60,9 +60,9 @@ class ElasticWriter(BlockchainReader): async with server: await server.serve_forever() - def notify_es_notification_listeners(self, height: int): + def notify_es_notification_listeners(self, height: int, block_hash: bytes): for p in self._listeners: - p.send_height(height) + p.send_height(height, block_hash) self.log.warning("notify listener %i", height) def _read_es_height(self): @@ -270,7 +270,7 @@ class ElasticWriter(BlockchainReader): self._trending.clear() self._advanced = False self.synchronized.set() - self.notify_es_notification_listeners(self._last_wrote_height) + self.notify_es_notification_listeners(self._last_wrote_height, self.db.db_tip) @property def last_synced_height(self) -> int: diff --git a/lbry/wallet/server/env.py b/lbry/wallet/server/env.py index a109abf76..a2a0cd06b 100644 --- a/lbry/wallet/server/env.py +++ b/lbry/wallet/server/env.py @@ -36,7 +36,7 @@ class Env: allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, session_timeout=None, drop_client=None, description=None, daily_fee=None, - database_query_timeout=None, db_max_open_files=512): + database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None): self.logger = class_logger(__name__, self.__class__.__name__) self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') @@ -47,6 +47,8 @@ class Env: self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) + self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080) + self.loop_policy = self.set_event_loop_policy( loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) )