diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index d990c96cc..7fd76e64b 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -2,7 +2,7 @@ import argparse import asyncio import logging from elasticsearch import AsyncElasticsearch -from elasticsearch.helpers import async_bulk +from elasticsearch.helpers import async_streaming_bulk from lbry.wallet.server.env import Env from lbry.wallet.server.coin import LBC from lbry.wallet.server.leveldb import LevelDB @@ -10,6 +10,50 @@ from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersion from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS +async def get_recent_claims(blocks: int, index_name='claims', db=None): + env = Env(LBC) + need_open = db is None + db = db or LevelDB(env) + if need_open: + await db.open_dbs() + try: + cnt = 0 + state = db.prefix_db.db_state.get() + touched_claims = set() + deleted_claims = set() + for height in range(state.height - blocks + 1, state.height + 1): + touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) + touched_claims.update(touched_or_deleted.touched_claims) + deleted_claims.update(touched_or_deleted.deleted_claims) + touched_claims.difference_update(deleted_claims) + + for deleted in deleted_claims: + yield { + '_index': index_name, + '_op_type': 'delete', + '_id': deleted.hex() + } + for touched in touched_claims: + claim = db.claim_producer(touched) + if claim: + yield { + 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, + '_id': claim['claim_id'], + '_index': index_name, + '_op_type': 'update', + 'doc_as_upsert': True + } + cnt += 1 + else: + logging.warning("could not sync claim %s", touched.hex()) + if cnt % 10000 == 0: + print(f"{cnt} claims sent") + print("sent %i claims, deleted %i" % (len(touched_claims), len(deleted_claims))) + finally: + if need_open: + db.close() + + async def get_all_claims(index_name='claims', db=None): env = Env(LBC) need_open = db is None @@ -52,14 +96,20 @@ async def make_es_index(index=None): index.stop() -async def run_sync(index_name='claims', db=None, clients=32): +async def run_sync(index_name='claims', db=None, clients=32, blocks=0): env = Env(LBC) logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) - claim_generator = get_all_claims(index_name=index_name, db=db) - + if blocks > 0: + blocks = min(blocks, 200) + logging.info("Resyncing last %i blocks", blocks) + claim_generator = get_recent_claims(blocks, index_name=index_name, db=db) + else: + claim_generator = get_all_claims(index_name=index_name, db=db) try: - await async_bulk(es, claim_generator, request_timeout=600) + async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): + if not ok: + logging.warning("indexing failed for an item: %s", item) await es.indices.refresh(index=index_name) finally: await es.close() @@ -85,4 +135,4 @@ def run_elastic_sync(): if not args.force and not asyncio.run(make_es_index()): logging.info("ES is already initialized") return - asyncio.run(run_sync(clients=args.clients)) + asyncio.run(run_sync(clients=args.clients, blocks=args.blocks)) diff --git a/lbry/wallet/server/leveldb.py b/lbry/wallet/server/leveldb.py index 3839c1ee3..d33b1016b 100644 --- a/lbry/wallet/server/leveldb.py +++ b/lbry/wallet/server/leveldb.py @@ -713,6 +713,23 @@ class LevelDB: yield meta batch.clear() + def claim_producer(self, claim_hash: bytes) -> Optional[Dict]: + claim_txo = self.get_cached_claim_txo(claim_hash) + if not claim_txo: + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + return + if not self.prefix_db.claim_takeover.get(claim_txo.normalized_name): + self.logger.warning("can't sync non existent claim to ES: %s", claim_hash.hex()) + return + activation = self.get_activation(claim_txo.tx_num, claim_txo.position) + claim = self._prepare_resolve_result( + claim_txo.tx_num, claim_txo.position, claim_hash, claim_txo.name, claim_txo.root_tx_num, + claim_txo.root_position, activation, claim_txo.channel_signature_is_valid + ) + if not claim: + return + return self._prepare_claim_metadata(claim.claim_hash, claim) + def claims_producer(self, claim_hashes: Set[bytes]): batch = [] results = []