forked from LBRYCommunity/lbry-sdk
es sync notifier
This commit is contained in:
parent
03f888f787
commit
e3a4dab6cb
4 changed files with 60 additions and 6 deletions
|
@ -123,6 +123,7 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
|
self.es_notification_client = ElasticNotifierClientProtocol(self.es_notifications)
|
||||||
self.synchronized = asyncio.Event()
|
self.synchronized = asyncio.Event()
|
||||||
self._es_height = None
|
self._es_height = None
|
||||||
|
self._es_block_hash = None
|
||||||
|
|
||||||
def clear_caches(self):
|
def clear_caches(self):
|
||||||
self.history_cache.clear()
|
self.history_cache.clear()
|
||||||
|
@ -166,9 +167,9 @@ class BlockchainReaderServer(BlockchainReader):
|
||||||
synchronized.set()
|
synchronized.set()
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
self._es_height = await self.es_notifications.get()
|
self._es_height, self._es_block_hash = await self.es_notifications.get()
|
||||||
self.clear_search_cache()
|
self.clear_search_cache()
|
||||||
if self._es_height == self.db.db_height:
|
if self.last_state and self._es_block_hash == self.last_state.tip:
|
||||||
self.synchronized.set()
|
self.synchronized.set()
|
||||||
self.log.warning("es and reader are in sync")
|
self.log.warning("es and reader are in sync")
|
||||||
else:
|
else:
|
||||||
|
|
51
lbry/wallet/server/db/elasticsearch/notifier.py
Normal file
51
lbry/wallet/server/db/elasticsearch/notifier.py
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
import struct
|
||||||
|
import typing
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticNotifierProtocol(asyncio.Protocol):
|
||||||
|
"""notifies the reader when ES has written updates"""
|
||||||
|
|
||||||
|
def __init__(self, listeners):
|
||||||
|
self._listeners = listeners
|
||||||
|
self.transport: typing.Optional[asyncio.Transport] = None
|
||||||
|
|
||||||
|
def connection_made(self, transport):
|
||||||
|
self.transport = transport
|
||||||
|
self._listeners.append(self)
|
||||||
|
log.warning("got es notifier connection")
|
||||||
|
|
||||||
|
def connection_lost(self, exc) -> None:
|
||||||
|
self._listeners.remove(self)
|
||||||
|
self.transport = None
|
||||||
|
|
||||||
|
def send_height(self, height: int, block_hash: bytes):
|
||||||
|
log.warning("notify es update '%s'", height)
|
||||||
|
self.transport.write(struct.pack(b'>Q32s', height, block_hash) + b'\n')
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticNotifierClientProtocol(asyncio.Protocol):
|
||||||
|
"""notifies the reader when ES has written updates"""
|
||||||
|
|
||||||
|
def __init__(self, notifications: asyncio.Queue):
|
||||||
|
self.notifications = notifications
|
||||||
|
self.transport: typing.Optional[asyncio.Transport] = None
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.transport and not self.transport.is_closing():
|
||||||
|
self.transport.close()
|
||||||
|
|
||||||
|
def connection_made(self, transport):
|
||||||
|
self.transport = transport
|
||||||
|
log.warning("connected to es notifier")
|
||||||
|
|
||||||
|
def connection_lost(self, exc) -> None:
|
||||||
|
self.transport = None
|
||||||
|
|
||||||
|
def data_received(self, data: bytes) -> None:
|
||||||
|
height, block_hash = struct.unpack(b'>Q32s', data.rstrip(b'\n'))
|
||||||
|
self.notifications.put_nowait((height, block_hash))
|
|
@ -60,9 +60,9 @@ class ElasticWriter(BlockchainReader):
|
||||||
async with server:
|
async with server:
|
||||||
await server.serve_forever()
|
await server.serve_forever()
|
||||||
|
|
||||||
def notify_es_notification_listeners(self, height: int):
|
def notify_es_notification_listeners(self, height: int, block_hash: bytes):
|
||||||
for p in self._listeners:
|
for p in self._listeners:
|
||||||
p.send_height(height)
|
p.send_height(height, block_hash)
|
||||||
self.log.warning("notify listener %i", height)
|
self.log.warning("notify listener %i", height)
|
||||||
|
|
||||||
def _read_es_height(self):
|
def _read_es_height(self):
|
||||||
|
@ -270,7 +270,7 @@ class ElasticWriter(BlockchainReader):
|
||||||
self._trending.clear()
|
self._trending.clear()
|
||||||
self._advanced = False
|
self._advanced = False
|
||||||
self.synchronized.set()
|
self.synchronized.set()
|
||||||
self.notify_es_notification_listeners(self._last_wrote_height)
|
self.notify_es_notification_listeners(self._last_wrote_height, self.db.db_tip)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def last_synced_height(self) -> int:
|
def last_synced_height(self) -> int:
|
||||||
|
|
|
@ -36,7 +36,7 @@ class Env:
|
||||||
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
allow_lan_udp=None, cache_all_tx_hashes=None, cache_all_claim_txos=None, country=None,
|
||||||
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
payment_address=None, donation_address=None, max_send=None, max_receive=None, max_sessions=None,
|
||||||
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
session_timeout=None, drop_client=None, description=None, daily_fee=None,
|
||||||
database_query_timeout=None, db_max_open_files=512):
|
database_query_timeout=None, db_max_open_files=512, elastic_notifier_port=None):
|
||||||
self.logger = class_logger(__name__, self.__class__.__name__)
|
self.logger = class_logger(__name__, self.__class__.__name__)
|
||||||
|
|
||||||
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
self.db_dir = db_dir if db_dir is not None else self.required('DB_DIRECTORY')
|
||||||
|
@ -47,6 +47,8 @@ class Env:
|
||||||
self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost')
|
self.rpc_host = rpc_host if rpc_host is not None else self.default('RPC_HOST', 'localhost')
|
||||||
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
self.elastic_host = elastic_host if elastic_host is not None else self.default('ELASTIC_HOST', 'localhost')
|
||||||
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
self.elastic_port = elastic_port if elastic_port is not None else self.integer('ELASTIC_PORT', 9200)
|
||||||
|
self.elastic_notifier_port = elastic_notifier_port if elastic_notifier_port is not None else self.integer('ELASTIC_NOTIFIER_PORT', 19080)
|
||||||
|
|
||||||
self.loop_policy = self.set_event_loop_policy(
|
self.loop_policy = self.set_event_loop_policy(
|
||||||
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)
|
loop_policy if loop_policy is not None else self.default('EVENT_LOOP_POLICY', None)
|
||||||
)
|
)
|
||||||
|
|
Loading…
Add table
Reference in a new issue