diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index afc85f859..1db31e7c7 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -263,7 +263,8 @@ class SPVNode: self.es_writer = ElasticWriter(env) await self.writer.open() await self.writer.start() - await asyncio.wait([self.server.start(), self.es_writer.start()]) + await self.es_writer.start() + await self.server.start() async def stop(self, cleanup=True): if self.stopped: diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index a7ac6a91a..233b1c3ce 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1420,7 +1420,6 @@ class BlockProcessor: # flush the changes save_undo = (self.daemon.cached_height() - self.height) <= self.env.reorg_limit - self.db.write_db_state() if save_undo: self.db.prefix_db.commit(self.height, self.tip) else: diff --git a/lbry/wallet/server/chain_reader.py b/lbry/wallet/server/chain_reader.py index 61900a7ee..4a7e39f66 100644 --- a/lbry/wallet/server/chain_reader.py +++ b/lbry/wallet/server/chain_reader.py @@ -50,7 +50,7 @@ class BlockchainReader: if self.last_state: while True: if self.db.headers[-1] == self.db.prefix_db.header.get(last_height, deserialize_value=False): - self.log.info("connects to block %i", last_height) + self.log.debug("connects to block %i", last_height) break else: self.log.warning("disconnect block %i", last_height) diff --git a/lbry/wallet/server/cli.py b/lbry/wallet/server/cli.py index 59f313ab4..f415cb3b4 100644 --- a/lbry/wallet/server/cli.py +++ b/lbry/wallet/server/cli.py @@ -7,20 +7,23 @@ from lbry.wallet.server.chain_reader import BlockchainReaderServer from lbry.wallet.server.db.elasticsearch.sync import ElasticWriter -def get_args_and_setup_logging(name): +def get_arg_parser(name): parser = argparse.ArgumentParser( prog=name ) Env.contribute_to_arg_parser(parser) - args = parser.parse_args() + return parser + + +def setup_logging(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") logging.getLogger('aiohttp').setLevel(logging.WARNING) logging.getLogger('elasticsearch').setLevel(logging.WARNING) - return args def run_writer_forever(): - args = get_args_and_setup_logging('lbry-hub-writer') + setup_logging() + args = get_arg_parser('lbry-hub-writer').parse_args() try: block_processor = BlockProcessor(Env.from_arg_parser(args)) block_processor.run() @@ -32,7 +35,8 @@ def run_writer_forever(): def run_server_forever(): - args = get_args_and_setup_logging('lbry-hub-server') + setup_logging() + args = get_arg_parser('lbry-hub-server').parse_args() try: server = BlockchainReaderServer(Env.from_arg_parser(args)) @@ -45,10 +49,14 @@ def run_server_forever(): def run_es_sync_forever(): - args = get_args_and_setup_logging('lbry-hub-elastic-sync') + setup_logging() + parser = get_arg_parser('lbry-hub-elastic-sync') + parser.add_argument('--reindex', type=bool, default=False) + args = parser.parse_args() + try: server = ElasticWriter(Env.from_arg_parser(args)) - server.run() + server.run(args.reindex) except Exception: traceback.print_exc() logging.critical('es writer terminated abnormally') diff --git a/lbry/wallet/server/db/db.py b/lbry/wallet/server/db/db.py index ed43e7126..2463a28b8 100644 --- a/lbry/wallet/server/db/db.py +++ b/lbry/wallet/server/db/db.py @@ -1151,10 +1151,3 @@ class HubDB: utxo_append((hashX, utxo_value.amount)) return utxos return await asyncio.get_event_loop().run_in_executor(self._executor, lookup_utxos) - - async def get_trending_notifications(self, height: int): - def read_trending(): - return { - k.claim_hash: v for k, v in self.prefix_db.trending_notification.iterate((height,)) - } - return await asyncio.get_event_loop().run_in_executor(self._executor, read_trending) diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index ebb0efc4b..6ff10480d 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -17,6 +17,7 @@ from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT from lbry.wallet.server.chain_reader import BlockchainReader from lbry.wallet.server.db.revertable import RevertableOp +from lbry.wallet.server.db.common import TrendingNotification from lbry.wallet.server.db import DB_PREFIXES @@ -34,9 +35,7 @@ class ElasticWriter(BlockchainReader): self._elastic_host = env.elastic_host self._elastic_port = env.elastic_port self.sync_timeout = 1800 - self.sync_client = AsyncElasticsearch( - [{'host': self._elastic_host, 'port': self._elastic_port}], timeout=self.sync_timeout - ) + self.sync_client = None self._es_info_path = os.path.join(env.db_dir, 'es_info') self._last_wrote_height = 0 self._last_wrote_block_hash = None @@ -66,8 +65,10 @@ class ElasticWriter(BlockchainReader): self.log.info("notify listener %i", height) def _read_es_height(self): - with open(self._es_info_path, 'r') as f: - info = json.loads(f.read()) + info = {} + if os.path.exists(self._es_info_path): + with open(self._es_info_path, 'r') as f: + info.update(json.loads(f.read())) self._last_wrote_height = int(info.get('height', 0)) self._last_wrote_block_hash = info.get('block_hash', None) @@ -100,22 +101,26 @@ class ElasticWriter(BlockchainReader): while True: try: await self.sync_client.cluster.health(wait_for_status='yellow') + self.log.info("ES is ready to connect to") break except ConnectionError: self.log.warning("Failed to connect to Elasticsearch. Waiting for it!") await asyncio.sleep(1) + index_version = await self.get_index_version() + res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400) acked = res.get('acknowledged', False) + if acked: await self.set_index_version(self.VERSION) - return acked - index_version = await self.get_index_version() - if index_version != self.VERSION: + return True + elif index_version != self.VERSION: self.log.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION) raise IndexVersionMismatch(index_version, self.VERSION) - await self.sync_client.indices.refresh(self.index) - return acked + else: + await self.sync_client.indices.refresh(self.index) + return False async def stop_index(self): if self.sync_client: @@ -195,10 +200,10 @@ class ElasticWriter(BlockchainReader): 'params': {'src': { 'changes': [ { - 'height': notify_height, - 'prev_amount': trending_v.previous_amount / 1E8, - 'new_amount': trending_v.new_amount / 1E8, - } for (notify_height, trending_v) in notifications + 'height': notification.height, + 'prev_amount': notification.prev_amount / 1E8, + 'new_amount': notification.new_amount / 1E8, + } for notification in notifications ] }} }, @@ -219,7 +224,7 @@ class ElasticWriter(BlockchainReader): touched_or_deleted = self.db.prefix_db.touched_or_deleted.get(height) for k, v in self.db.prefix_db.trending_notification.iterate((height,)): - self._trending[k.claim_hash].append((k.height, v)) + self._trending[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount)) if touched_or_deleted: readded_after_reorg = self._removed_during_undo.intersection(touched_or_deleted.touched_claims) self._deleted_claims.difference_update(readded_after_reorg) @@ -292,7 +297,7 @@ class ElasticWriter(BlockchainReader): def last_synced_height(self) -> int: return self._last_wrote_height - async def start(self): + async def start(self, reindex=False): await super().start() def _start_cancellable(run, *args): @@ -302,10 +307,15 @@ class ElasticWriter(BlockchainReader): self.db.open_db() await self.db.initialize_caches() + await self.read_es_height() await self.start_index() self.last_state = self.db.read_db_state() await _start_cancellable(self.run_es_notifier) + + if reindex or self._last_wrote_height == 0 and self.db.db_height > 0: + self.log.warning("reindex (last wrote: %i, db height: %i)", self._last_wrote_height, self.db.db_height) + await self.reindex() await _start_cancellable(self.refresh_blocks_forever) async def stop(self, delete_index=False): @@ -319,8 +329,9 @@ class ElasticWriter(BlockchainReader): await self.stop_index() self._executor.shutdown(wait=True) self._executor = None + self.shutdown_event.set() - def run(self): + def run(self, reindex=False): loop = asyncio.get_event_loop() def __exit(): @@ -328,9 +339,65 @@ class ElasticWriter(BlockchainReader): try: loop.add_signal_handler(signal.SIGINT, __exit) loop.add_signal_handler(signal.SIGTERM, __exit) - loop.run_until_complete(self.start()) + loop.run_until_complete(self.start(reindex=reindex)) loop.run_until_complete(self.shutdown_event.wait()) except (SystemExit, KeyboardInterrupt): pass finally: loop.run_until_complete(self.stop()) + + async def reindex(self): + async with self._lock: + self.log.info("reindexing %i claims (estimate)", self.db.prefix_db.claim_to_txo.estimate_num_keys()) + await self.delete_index() + res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400) + acked = res.get('acknowledged', False) + if acked: + await self.set_index_version(self.VERSION) + await self.sync_client.indices.refresh(self.index) + self.write_es_height(0, self.env.coin.GENESIS_HASH) + await self._sync_all_claims() + await self.sync_client.indices.refresh(self.index) + self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex()) + self.notify_es_notification_listeners(self.db.db_height, self.db.db_tip) + self.log.warning("finished reindexing") + + async def _sync_all_claims(self, batch_size=100000): + def load_historic_trending(): + notifications = self._trending + for k, v in self.db.prefix_db.trending_notification.iterate(): + notifications[k.claim_hash].append(TrendingNotification(k.height, v.previous_amount, v.new_amount)) + + async def all_claims_producer(): + async for claim in self.db.all_claims_producer(batch_size=batch_size): + yield self._upsert_claim_query(self.index, claim) + claim_hash = bytes.fromhex(claim['claim_id']) + if claim_hash in self._trending: + yield self._update_trending_query(self.index, claim_hash, self._trending.pop(claim_hash)) + self._trending.clear() + + self.log.info("loading about %i historic trending updates", self.db.prefix_db.trending_notification.estimate_num_keys()) + await asyncio.get_event_loop().run_in_executor(self._executor, load_historic_trending) + self.log.info("loaded historic trending updates for %i claims", len(self._trending)) + + cnt = 0 + success = 0 + producer = all_claims_producer() + + finished = False + try: + async for ok, item in async_streaming_bulk(self.sync_client, producer, raise_on_error=False): + cnt += 1 + if not ok: + self.log.warning("indexing failed for an item: %s", item) + else: + success += 1 + if cnt % batch_size == 0: + self.log.info(f"indexed {success} claims") + finished = True + await self.sync_client.indices.refresh(self.index) + self.log.info("indexed %i/%i claims", success, cnt) + finally: + if not finished: + await producer.aclose() + self.shutdown_event.set() diff --git a/tests/integration/blockchain/test_wallet_server_sessions.py b/tests/integration/blockchain/test_wallet_server_sessions.py index 64b9a43c3..50ed9e7df 100644 --- a/tests/integration/blockchain/test_wallet_server_sessions.py +++ b/tests/integration/blockchain/test_wallet_server_sessions.py @@ -89,37 +89,58 @@ class TestUsagePayment(CommandTestCase): class TestESSync(CommandTestCase): async def test_es_sync_utility(self): + es_writer = self.conductor.spv_node.es_writer + server_search_client = self.conductor.spv_node.server.session_manager.search_index + for i in range(10): await self.stream_create(f"stream{i}", bid='0.001') await self.generate(1) self.assertEqual(10, len(await self.claim_search(order_by=['height']))) - db = self.conductor.spv_node.server.db - env = self.conductor.spv_node.server.env - - await db.search_index.delete_index() - db.search_index.clear_caches() - self.assertEqual(0, len(await self.claim_search(order_by=['height']))) - await db.search_index.stop() - - async def resync(): - await db.search_index.start() - db.search_index.clear_caches() - await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) - self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + # delete the index and verify nothing is returned by claim search + await es_writer.delete_index() + server_search_client.clear_caches() self.assertEqual(0, len(await self.claim_search(order_by=['height']))) - await resync() - - # this time we will test a migration from unversioned to v1 - await db.search_index.sync_client.indices.delete_template(db.search_index.index) - await db.search_index.stop() - - await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) - await db.search_index.start() - - await resync() + # reindex, 10 claims should be returned + await es_writer.reindex() self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + server_search_client.clear_caches() + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + + # reindex again, this should not appear to do anything but will delete and reinsert the same 10 claims + await es_writer.reindex() + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + server_search_client.clear_caches() + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + + # delete the index again and stop the writer, upon starting it the writer should reindex automatically + await es_writer.stop(delete_index=True) + server_search_client.clear_caches() + self.assertEqual(0, len(await self.claim_search(order_by=['height']))) + + await es_writer.start(reindex=True) + self.assertEqual(10, len(await self.claim_search(order_by=['height']))) + + # stop the es writer and advance the chain by 1, adding a new claim. upon resuming the es writer, it should + # add the new claim + await es_writer.stop() + await self.stream_create(f"stream11", bid='0.001', confirm=False) + generate_block_task = asyncio.create_task(self.generate(1)) + await es_writer.start() + await generate_block_task + self.assertEqual(11, len(await self.claim_search(order_by=['height']))) + + + # # this time we will test a migration from unversioned to v1 + # await db.search_index.sync_client.indices.delete_template(db.search_index.index) + # await db.search_index.stop() + # + # await make_es_index_and_run_sync(env, db=db, index_name=db.search_index.index, force=True) + # await db.search_index.start() + # + # await es_writer.reindex() + # self.assertEqual(10, len(await self.claim_search(order_by=['height']))) class TestHubDiscovery(CommandTestCase):