es sync notifier

This commit is contained in:
Jack Robison 2022-01-12 11:58:50 -05:00
parent cfae30a364
commit c0ce27ccf3
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 60 additions and 6 deletions

View file

@ -123,6 +123,7 @@ class BlockchainReaderServer(BlockchainReader):
self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications) self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
self.synchronized = asyncio.Event() self.synchronized = asyncio.Event()
self._es_height = None self._es_height = None
self._es_block_hash = None
def clear_caches(self): def clear_caches(self):
self.history_cache.clear() self.history_cache.clear()
@ -166,9 +167,9 @@ class BlockchainReaderServer(BlockchainReader):
synchronized.set() synchronized.set()
try: try:
while True: while True:
self._es_height = await self.es_notifications.get() self._es_height, self._es_block_hash = await self.es_notifications.get()
self.clear_search_cache() self.clear_search_cache()
if self._es_height == self.db.db_height: if self.last_state and self._es_block_hash == self.last_state.tip:
self.synchronized.set() self.synchronized.set()
self.log.warning("es and reader are in sync") self.log.warning("es and reader are in sync")
else: else:

View file

@ -0,0 +1,51 @@
import struct
import typing
import asyncio
import logging
log = logging.getLogger(__name__)
class ElasticNotifierProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates"""
def __init__(self, listeners):
self._listeners = listeners
self.transport: typing.Optional[asyncio.Transport] = None
def connection_made(self, transport):
self.transport = transport
self._listeners.append(self)
log.warning("got es notifier connection")
def connection_lost(self, exc) -> None:
self._listeners.remove(self)
self.transport = None
def send_height(self, height: int, block_hash: bytes):
log.warning("notify es update '%s'", height)
self.transport.write(struct.pack(b'>Q32s', height, block_hash) + b'\n')
class ElasticNotifierClientProtocol(asyncio.Protocol):
"""notifies the reader when ES has written updates"""
def __init__(self, notifications: asyncio.Queue):
self.notifications = notifications
self.transport: typing.Optional[asyncio.Transport] = None
def close(self):
if self.transport and not self.transport.is_closing():
self.transport.close()
def connection_made(self, transport):
self.transport = transport
log.warning("connected to es notifier")
def connection_lost(self, exc) -> None:
self.transport = None
def data_received(self, data: bytes) -> None:
height, block_hash = struct.unpack(b'>Q32s', data.rstrip(b'\n'))
self.notifications.put_nowait((height, block_hash))

View file

@ -60,9 +60,9 @@ class ElasticWriter(BlockchainReader):
async with server: async with server:
await server.serve_forever() await server.serve_forever()
def notify_es_notification_listeners(self, height: int): def notify_es_notification_listeners(self, height: int, block_hash: bytes):
for p in self._listeners: for p in self._listeners:
p.send_height(height) p.send_height(height, block_hash)
self.log.warning("notify listener %i", height) self.log.warning("notify listener %i", height)
def _read_es_height(self): def _read_es_height(self):
@ -270,7 +270,7 @@ class ElasticWriter(BlockchainReader):
self._trending.clear() self._trending.clear()
self._advanced = False self._advanced = False
self.synchronized.set() self.synchronized.set()
self.notify_es_notification_listeners(self._last_wrote_height) self.notify_es_notification_listeners(self._last_wrote_height, self.db.db_tip)
@property @property
def last_synced_height(self) -> int: def last_synced_height(self) -> int:

View file

@ -36,7 +36,7 @@ class Env:
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None, allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None, payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
session_timeout=None, drop_client=None, description=None, daily_fee=None, session_timeout=None, drop_client=None, description=None, daily_fee=None,
database_query_timeout=None, db_max_open_files=512): database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None):
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY') self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
@ -47,6 +47,8 @@ class Env:
self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost') self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost')
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost') self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200) self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
self.loop_policy = self.set_event_loop_policy( self.loop_policy = self.set_event_loop_policy(
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None) loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)
) )