From ebec12522b29c53fc4c9d1702b1fe6e2f1bd935c Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 6 Jan 2022 12:41:41 -0500 Subject: [PATCH] es writer --- lbry/wallet/server/block_processor.py | 2 +- lbry/wallet/server/db/elasticsearch/common.py | 150 +++++++ .../db/elasticsearch/fast_ar_trending.py | 117 +++++ lbry/wallet/server/db/elasticsearch/sync.py | 398 +++++++++++++----- 4 files changed, 557 insertions(+), 110 deletions(-) create mode 100644 lbry/wallet/server/db/elasticsearch/common.py create mode 100644 lbry/wallet/server/db/elasticsearch/fast_ar_trending.py diff --git a/lbry/wallet/server/block_processor.py b/lbry/wallet/server/block_processor.py index f988b15aa..45bdb0ceb 100644 --- a/lbry/wallet/server/block_processor.py +++ b/lbry/wallet/server/block_processor.py @@ -1582,7 +1582,7 @@ class BlockProcessor: await self._first_caught_up() self._caught_up_event.set() try: - await asyncio.wait_for(self.blocks_event.wait(), 0.25) + await asyncio.wait_for(self.blocks_event.wait(), 0.1) except asyncio.TimeoutError: pass self.blocks_event.clear() diff --git a/lbry/wallet/server/db/elasticsearch/common.py b/lbry/wallet/server/db/elasticsearch/common.py new file mode 100644 index 000000000..4441aad34 --- /dev/null +++ b/lbry/wallet/server/db/elasticsearch/common.py @@ -0,0 +1,150 @@ +from decimal import Decimal +from typing import Iterable + +from lbry.error import TooManyClaimSearchParametersError +from lbry.schema.tags import clean_tags +from lbry.schema.url import normalize_name +from lbry.wallet.server.db.common import CLAIM_TYPES, STREAM_TYPES +from lbry.wallet.server.db.elasticsearch.constants import REPLACEMENTS, FIELDS, TEXT_FIELDS, RANGE_FIELDS + + +def expand_query(**kwargs): + if "amount_order" in kwargs: + kwargs["limit"] = 1 + kwargs["order_by"] = "effective_amount" + kwargs["offset"] = int(kwargs["amount_order"]) - 1 + if 'name' in kwargs: + kwargs['name'] = normalize_name(kwargs.pop('name')) + if kwargs.get('is_controlling') is False: + kwargs.pop('is_controlling') + query = {'must': [], 'must_not': []} + collapse = None + if 'fee_currency' in kwargs and kwargs['fee_currency'] is not None: + kwargs['fee_currency'] = kwargs['fee_currency'].upper() + for key, value in kwargs.items(): + key = key.replace('claim.', '') + many = key.endswith('__in') or isinstance(value, list) + if many and len(value) > 2048: + raise TooManyClaimSearchParametersError(key, 2048) + if many: + key = key.replace('__in', '') + value = list(filter(None, value)) + if value is None or isinstance(value, list) and len(value) == 0: + continue + key = REPLACEMENTS.get(key, key) + if key in FIELDS: + partial_id = False + if key == 'claim_type': + if isinstance(value, str): + value = CLAIM_TYPES[value] + else: + value = [CLAIM_TYPES[claim_type] for claim_type in value] + elif key == 'stream_type': + value = [STREAM_TYPES[value]] if isinstance(value, str) else list(map(STREAM_TYPES.get, value)) + if key == '_id': + if isinstance(value, Iterable): + value = [item[::-1].hex() for item in value] + else: + value = value[::-1].hex() + if not many and key in ('_id', 'claim_id') and len(value) < 20: + partial_id = True + if key in ('signature_valid', 'has_source'): + continue # handled later + if key in TEXT_FIELDS: + key += '.keyword' + ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'} + if partial_id: + query['must'].append({"prefix": {"claim_id": value}}) + elif key in RANGE_FIELDS and isinstance(value, str) and value[0] in ops: + operator_length = 2 if value[:2] in ops else 1 + operator, value = value[:operator_length], value[operator_length:] + if key == 'fee_amount': + value = str(Decimal(value)*1000) + query['must'].append({"range": {key: {ops[operator]: value}}}) + elif many: + query['must'].append({"terms": {key: value}}) + else: + if key == 'fee_amount': + value = str(Decimal(value)*1000) + query['must'].append({"term": {key: {"value": value}}}) + elif key == 'not_channel_ids': + for channel_id in value: + query['must_not'].append({"term": {'channel_id.keyword': channel_id}}) + query['must_not'].append({"term": {'_id': channel_id}}) + elif key == 'channel_ids': + query['must'].append({"terms": {'channel_id.keyword': value}}) + elif key == 'claim_ids': + query['must'].append({"terms": {'claim_id.keyword': value}}) + elif key == 'media_types': + query['must'].append({"terms": {'media_type.keyword': value}}) + elif key == 'any_languages': + query['must'].append({"terms": {'languages': clean_tags(value)}}) + elif key == 'any_languages': + query['must'].append({"terms": {'languages': value}}) + elif key == 'all_languages': + query['must'].extend([{"term": {'languages': tag}} for tag in value]) + elif key == 'any_tags': + query['must'].append({"terms": {'tags.keyword': clean_tags(value)}}) + elif key == 'all_tags': + query['must'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) + elif key == 'not_tags': + query['must_not'].extend([{"term": {'tags.keyword': tag}} for tag in clean_tags(value)]) + elif key == 'not_claim_id': + query['must_not'].extend([{"term": {'claim_id.keyword': cid}} for cid in value]) + elif key == 'limit_claims_per_channel': + collapse = ('channel_id.keyword', value) + if kwargs.get('has_channel_signature'): + query['must'].append({"exists": {"field": "signature"}}) + if 'signature_valid' in kwargs: + query['must'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) + elif 'signature_valid' in kwargs: + query.setdefault('should', []) + query["minimum_should_match"] = 1 + query['should'].append({"bool": {"must_not": {"exists": {"field": "signature"}}}}) + query['should'].append({"term": {"is_signature_valid": bool(kwargs["signature_valid"])}}) + if 'has_source' in kwargs: + query.setdefault('should', []) + query["minimum_should_match"] = 1 + is_stream_or_repost = {"terms": {"claim_type": [CLAIM_TYPES['stream'], CLAIM_TYPES['repost']]}} + query['should'].append( + {"bool": {"must": [{"match": {"has_source": kwargs['has_source']}}, is_stream_or_repost]}}) + query['should'].append({"bool": {"must_not": [is_stream_or_repost]}}) + query['should'].append({"bool": {"must": [{"term": {"reposted_claim_type": CLAIM_TYPES['channel']}}]}}) + if kwargs.get('text'): + query['must'].append( + {"simple_query_string": + {"query": kwargs["text"], "fields": [ + "claim_name^4", "channel_name^8", "title^1", "description^.5", "author^1", "tags^.5" + ]}}) + query = { + "_source": {"excludes": ["description", "title"]}, + 'query': {'bool': query}, + "sort": [], + } + if "limit" in kwargs: + query["size"] = kwargs["limit"] + if 'offset' in kwargs: + query["from"] = kwargs["offset"] + if 'order_by' in kwargs: + if isinstance(kwargs["order_by"], str): + kwargs["order_by"] = [kwargs["order_by"]] + for value in kwargs['order_by']: + if 'trending_group' in value: + # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. + continue + is_asc = value.startswith('^') + value = value[1:] if is_asc else value + value = REPLACEMENTS.get(value, value) + if value in TEXT_FIELDS: + value += '.keyword' + query['sort'].append({value: "asc" if is_asc else "desc"}) + if collapse: + query["collapse"] = { + "field": collapse[0], + "inner_hits": { + "name": collapse[0], + "size": collapse[1], + "sort": query["sort"] + } + } + return query diff --git a/lbry/wallet/server/db/elasticsearch/fast_ar_trending.py b/lbry/wallet/server/db/elasticsearch/fast_ar_trending.py new file mode 100644 index 000000000..2e5836b2c --- /dev/null +++ b/lbry/wallet/server/db/elasticsearch/fast_ar_trending.py @@ -0,0 +1,117 @@ +FAST_AR_TRENDING_SCRIPT = """ +double softenLBC(double lbc) { return (Math.pow(lbc, 1.0 / 3.0)); } + +double logsumexp(double x, double y) +{ + double top; + if(x > y) + top = x; + else + top = y; + double result = top + Math.log(Math.exp(x-top) + Math.exp(y-top)); + return(result); +} + +double logdiffexp(double big, double small) +{ + return big + Math.log(1.0 - Math.exp(small - big)); +} + +double squash(double x) +{ + if(x < 0.0) +return -Math.log(1.0 - x); + else +return Math.log(x + 1.0); +} + +double unsquash(double x) +{ + if(x < 0.0) + return 1.0 - Math.exp(-x); + else + return Math.exp(x) - 1.0; +} + +double log_to_squash(double x) +{ + return logsumexp(x, 0.0); +} + +double squash_to_log(double x) +{ + //assert x > 0.0; + return logdiffexp(x, 0.0); +} + +double squashed_add(double x, double y) +{ + // squash(unsquash(x) + unsquash(y)) but avoiding overflow. + // Cases where the signs are the same + if (x < 0.0 && y < 0.0) + return -logsumexp(-x, logdiffexp(-y, 0.0)); + if (x >= 0.0 && y >= 0.0) + return logsumexp(x, logdiffexp(y, 0.0)); + // Where the signs differ + if (x >= 0.0 && y < 0.0) + if (Math.abs(x) >= Math.abs(y)) + return logsumexp(0.0, logdiffexp(x, -y)); + else + return -logsumexp(0.0, logdiffexp(-y, x)); + if (x < 0.0 && y >= 0.0) + { + // Addition is commutative, hooray for new math + return squashed_add(y, x); + } + return 0.0; +} + +double squashed_multiply(double x, double y) +{ + // squash(unsquash(x)*unsquash(y)) but avoiding overflow. + int sign; + if(x*y >= 0.0) +sign = 1; + else +sign = -1; + return sign*logsumexp(squash_to_log(Math.abs(x)) + + squash_to_log(Math.abs(y)), 0.0); +} + +// Squashed inflated units +double inflateUnits(int height) { + double timescale = 576.0; // Half life of 400 = e-folding time of a day + // by coincidence, so may as well go with it + return log_to_squash(height / timescale); +} + +double spikePower(double newAmount) { + if (newAmount < 50.0) { + return(0.5); + } else if (newAmount < 85.0) { + return(newAmount / 100.0); + } else { + return(0.85); + } +} + +double spikeMass(double oldAmount, double newAmount) { + double softenedChange = softenLBC(Math.abs(newAmount - oldAmount)); + double changeInSoftened = Math.abs(softenLBC(newAmount) - softenLBC(oldAmount)); + double power = spikePower(newAmount); + if (oldAmount > newAmount) { + -1.0 * Math.pow(changeInSoftened, power) * Math.pow(softenedChange, 1.0 - power) + } else { + Math.pow(changeInSoftened, power) * Math.pow(softenedChange, 1.0 - power) + } +} + +for (i in params.src.changes) { + double units = inflateUnits(i.height); + if (ctx._source.trending_score == null) { + ctx._source.trending_score = 0.0; + } + double bigSpike = squashed_multiply(units, squash(spikeMass(i.prev_amount, i.new_amount))); + ctx._source.trending_score = squashed_add(ctx._source.trending_score, bigSpike); +} +""" diff --git a/lbry/wallet/server/db/elasticsearch/sync.py b/lbry/wallet/server/db/elasticsearch/sync.py index d34c88d80..70d1923e5 100644 --- a/lbry/wallet/server/db/elasticsearch/sync.py +++ b/lbry/wallet/server/db/elasticsearch/sync.py @@ -1,138 +1,318 @@ import os -import argparse +import time +import signal +import json +import typing +from collections import defaultdict import asyncio import logging -from elasticsearch import AsyncElasticsearch +from elasticsearch import AsyncElasticsearch, NotFoundError from elasticsearch.helpers import async_streaming_bulk -from lbry.wallet.server.env import Env -from lbry.wallet.server.leveldb import LevelDB -from lbry.wallet.server.db.elasticsearch.search import SearchIndex, IndexVersionMismatch -from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS + +from lbry.schema.result import Censor +from lbry.wallet.server.db.elasticsearch.search import IndexVersionMismatch +from lbry.wallet.server.db.elasticsearch.constants import ALL_FIELDS, INDEX_DEFAULT_SETTINGS +from lbry.wallet.server.db.elasticsearch.common import expand_query +from lbry.wallet.server.db.elasticsearch.notifier import ElasticNotifierProtocol +from lbry.wallet.server.db.elasticsearch.fast_ar_trending import FAST_AR_TRENDING_SCRIPT +from lbry.wallet.server.chain_reader import BlockchainReader +from lbry.wallet.server.db.revertable import RevertableOp +from lbry.wallet.server.db import DB_PREFIXES -async def get_recent_claims(env, index_name='claims', db=None): - log = logging.getLogger() - need_open = db is None - db = db or LevelDB(env) - try: - if need_open: - db.open_db() - if db.es_sync_height == db.db_height or db.db_height <= 0: - return - if need_open: - await db.initialize_caches() - log.info(f"catching up ES ({db.es_sync_height}) to leveldb height: {db.db_height}") - cnt = 0 - touched_claims = set() - deleted_claims = set() - for height in range(db.es_sync_height, db.db_height + 1): - touched_or_deleted = db.prefix_db.touched_or_deleted.get(height) - touched_claims.update(touched_or_deleted.touched_claims) - deleted_claims.update(touched_or_deleted.deleted_claims) - touched_claims.difference_update(deleted_claims) +log = logging.getLogger() - for deleted in deleted_claims: + +class ElasticWriter(BlockchainReader): + VERSION = 1 + + def __init__(self, env): + super().__init__(env, 'lbry-elastic-writer') + # self._refresh_interval = 0.1 + self._task = None + self.index = self.env.es_index_prefix + 'claims' + self._elastic_host = env.elastic_host + self._elastic_port = env.elastic_port + self.sync_timeout = 1800 + self.sync_client = AsyncElasticsearch( + [{'host': self._elastic_host, 'port': self._elastic_port}], timeout=self.sync_timeout + ) + self._es_info_path = os.path.join(env.db_dir, 'es_info') + self._last_wrote_height = 0 + self._last_wrote_block_hash = None + + self._touched_claims = set() + self._deleted_claims = set() + + self._removed_during_undo = set() + + self._trending = defaultdict(list) + self._advanced = True + self.synchronized = asyncio.Event() + self._listeners: typing.List[ElasticNotifierProtocol] = [] + + async def run_es_notifier(self, synchronized: asyncio.Event): + server = await asyncio.get_event_loop().create_server( + lambda: ElasticNotifierProtocol(self._listeners), '127.0.0.1', self.env.elastic_notifier_port + ) + self.log.warning("ES notifier server listening on TCP localhost:%i", self.env.elastic_notifier_port) + synchronized.set() + async with server: + await server.serve_forever() + + def notify_es_notification_listeners(self, height: int): + for p in self._listeners: + p.send_height(height) + self.log.warning("notify listener %i", height) + + def _read_es_height(self): + with open(self._es_info_path, 'r') as f: + info = json.loads(f.read()) + self._last_wrote_height = int(info.get('height', 0)) + self._last_wrote_block_hash = info.get('block_hash', None) + + async def read_es_height(self): + await asyncio.get_event_loop().run_in_executor(None, self._read_es_height) + + def write_es_height(self, height: int, block_hash: str): + with open(self._es_info_path, 'w') as f: + f.write(json.dumps({'height': height, 'block_hash': block_hash}, indent=2)) + self._last_wrote_height = height + self._last_wrote_block_hash = block_hash + + async def get_index_version(self) -> int: + try: + template = await self.sync_client.indices.get_template(self.index) + return template[self.index]['version'] + except NotFoundError: + return 0 + + async def set_index_version(self, version): + await self.sync_client.indices.put_template( + self.index, body={'version': version, 'index_patterns': ['ignored']}, ignore=400 + ) + + async def start_index(self) -> bool: + if self.sync_client: + return False + hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] + self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) + while True: + try: + await self.sync_client.cluster.health(wait_for_status='yellow') + break + except ConnectionError: + self.log.warning("Failed to connect to Elasticsearch. Waiting for it!") + await asyncio.sleep(1) + + res = await self.sync_client.indices.create(self.index, INDEX_DEFAULT_SETTINGS, ignore=400) + acked = res.get('acknowledged', False) + if acked: + await self.set_index_version(self.VERSION) + return acked + index_version = await self.get_index_version() + if index_version != self.VERSION: + self.log.error("es search index has an incompatible version: %s vs %s", index_version, self.VERSION) + raise IndexVersionMismatch(index_version, self.VERSION) + await self.sync_client.indices.refresh(self.index) + return acked + + async def stop_index(self): + if self.sync_client: + await self.sync_client.close() + self.sync_client = None + + def delete_index(self): + return self.sync_client.indices.delete(self.index, ignore_unavailable=True) + + def update_filter_query(self, censor_type, blockdict, channels=False): + blockdict = {blocked.hex(): blocker.hex() for blocked, blocker in blockdict.items()} + if channels: + update = expand_query(channel_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") + else: + update = expand_query(claim_id__in=list(blockdict.keys()), censor_type=f"<{censor_type}") + key = 'channel_id' if channels else 'claim_id' + update['script'] = { + "source": f"ctx._source.censor_type={censor_type}; " + f"ctx._source.censoring_channel_id=params[ctx._source.{key}];", + "lang": "painless", + "params": blockdict + } + return update + + async def apply_filters(self, blocked_streams, blocked_channels, filtered_streams, filtered_channels): + if filtered_streams: + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(Censor.SEARCH, filtered_streams), slices=4) + await self.sync_client.indices.refresh(self.index) + if filtered_channels: + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels), slices=4) + await self.sync_client.indices.refresh(self.index) + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(Censor.SEARCH, filtered_channels, True), slices=4) + await self.sync_client.indices.refresh(self.index) + if blocked_streams: + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_streams), slices=4) + await self.sync_client.indices.refresh(self.index) + if blocked_channels: + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels), slices=4) + await self.sync_client.indices.refresh(self.index) + await self.sync_client.update_by_query( + self.index, body=self.update_filter_query(Censor.RESOLVE, blocked_channels, True), slices=4) + await self.sync_client.indices.refresh(self.index) + + async def _claim_producer(self): + for deleted in self._deleted_claims: yield { - '_index': index_name, + '_index': self.index, '_op_type': 'delete', '_id': deleted.hex() } - for touched in touched_claims: - claim = db.claim_producer(touched) + for touched in self._touched_claims: + claim = self.db.claim_producer(touched) if claim: + self.log.warning("send es %s %i", claim['claim_id'], claim['activation_height']) yield { 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, '_id': claim['claim_id'], - '_index': index_name, + '_index': self.index, '_op_type': 'update', 'doc_as_upsert': True } - cnt += 1 - else: - logging.warning("could not sync claim %s", touched.hex()) - if cnt % 10000 == 0: - logging.info("%i claims sent to ES", cnt) - - db.es_sync_height = db.db_height - db.write_db_state() - db.prefix_db.unsafe_commit() - db.assert_db_state() - - logging.info("finished sending %i claims to ES, deleted %i", cnt, len(deleted_claims)) - finally: - if need_open: - db.close() - - -async def get_all_claims(env, index_name='claims', db=None): - need_open = db is None - db = db or LevelDB(env) - if need_open: - db.open_db() - await db.initialize_caches() - logging.info("Fetching claims to send ES from leveldb") - try: - cnt = 0 - async for claim in db.all_claims_producer(): + for claim_hash, notifications in self._trending.items(): + self.log.warning("send es trending for %s", claim_hash.hex()) yield { - 'doc': {key: value for key, value in claim.items() if key in ALL_FIELDS}, - '_id': claim['claim_id'], - '_index': index_name, + '_id': claim_hash.hex(), + '_index': self.index, '_op_type': 'update', - 'doc_as_upsert': True + 'script': { + 'lang': 'painless', + 'source': FAST_AR_TRENDING_SCRIPT, + 'params': {'src': { + 'changes': [ + { + 'height': notify_height, + 'prev_amount': trending_v.previous_amount / 1E8, + 'new_amount': trending_v.new_amount / 1E8, + } for (notify_height, trending_v) in notifications + ] + }} + }, } - cnt += 1 - if cnt % 10000 == 0: - logging.info("sent %i claims to ES", cnt) - finally: - if need_open: - db.close() + def advance(self, height: int): + super().advance(height) -async def make_es_index_and_run_sync(env: Env, clients=32, force=False, db=None, index_name='claims'): - index = SearchIndex(env.es_index_prefix, elastic_host=env.elastic_host, elastic_port=env.elastic_port) - logging.info("ES sync host: %s:%i", env.elastic_host, env.elastic_port) - try: - created = await index.start() - except IndexVersionMismatch as err: - logging.info( - "dropping ES search index (version %s) for upgrade to version %s", err.got_version, err.expected_version - ) - await index.delete_index() - await index.stop() - created = await index.start() - finally: - index.stop() + touched_or_deleted = self.db.prefix_db.touched_or_deleted.get(height) + for k, v in self.db.prefix_db.trending_notification.iterate((height,)): + self._trending[k.claim_hash].append((k.height, v)) + if touched_or_deleted: + readded_after_reorg = self._removed_during_undo.intersection(touched_or_deleted.touched_claims) + self._deleted_claims.difference_update(readded_after_reorg) + self._touched_claims.update(touched_or_deleted.touched_claims) + self._deleted_claims.update(touched_or_deleted.deleted_claims) + self._touched_claims.difference_update(self._deleted_claims) + for to_del in touched_or_deleted.deleted_claims: + if to_del in self._trending: + self._trending.pop(to_del) + self.log.warning("advanced to %i, %i touched %i to delete (%i %i)", height, len(touched_or_deleted.touched_claims), len(touched_or_deleted.deleted_claims), + len(self._touched_claims), len(self._deleted_claims)) + self._advanced = True - es = AsyncElasticsearch([{'host': env.elastic_host, 'port': env.elastic_port}]) - if force or created: - claim_generator = get_all_claims(env, index_name=index_name, db=db) - else: - claim_generator = get_recent_claims(env, index_name=index_name, db=db) - try: - async for ok, item in async_streaming_bulk(es, claim_generator, request_timeout=600, raise_on_error=False): - if not ok: - logging.warning("indexing failed for an item: %s", item) - await es.indices.refresh(index=index_name) - finally: - await es.close() + def unwind(self): + self.db.tx_counts.pop() + reverted_block_hash = self.db.coin.header_hash(self.db.headers.pop()) + packed = self.db.prefix_db.undo.get(len(self.db.tx_counts), reverted_block_hash) + touched_or_deleted = None + claims_to_delete = [] + # find and apply the touched_or_deleted items in the undos for the reverted blocks + assert packed, f'missing undo information for block {len(self.db.tx_counts)}' + while packed: + op, packed = RevertableOp.unpack(packed) + if op.is_delete and op.key.startswith(DB_PREFIXES.touched_or_deleted.value): + assert touched_or_deleted is None, 'only should have one match' + touched_or_deleted = self.db.prefix_db.touched_or_deleted.unpack_value(op.value) + elif op.is_delete and op.key.startswith(DB_PREFIXES.claim_to_txo.value): + v = self.db.prefix_db.claim_to_txo.unpack_value(op.value) + if v.root_tx_num == v.tx_num and v.root_tx_num > self.db.tx_counts[-1]: + claims_to_delete.append(self.db.prefix_db.claim_to_txo.unpack_key(op.key).claim_hash) + if touched_or_deleted: + self._touched_claims.update(set(touched_or_deleted.deleted_claims).union( + touched_or_deleted.touched_claims.difference(set(claims_to_delete)))) + self._deleted_claims.update(claims_to_delete) + self._removed_during_undo.update(claims_to_delete) + self._advanced = True + self.log.warning("delete %i claim and upsert %i from reorg", len(self._deleted_claims), len(self._touched_claims)) + async def poll_for_changes(self): + await super().poll_for_changes() + cnt = 0 + success = 0 + if self._advanced: + if self._touched_claims or self._deleted_claims or self._trending: + async for ok, item in async_streaming_bulk( + self.sync_client, self._claim_producer(), + raise_on_error=False): + cnt += 1 + if not ok: + self.log.warning("indexing failed for an item: %s", item) + else: + success += 1 + await self.sync_client.indices.refresh(self.index) + self.write_es_height(self.db.db_height, self.db.db_tip[::-1].hex()) + self.log.warning("Indexing block %i done. %i/%i successful", self._last_wrote_height, success, cnt) + self._touched_claims.clear() + self._deleted_claims.clear() + self._removed_during_undo.clear() + self._trending.clear() + self._advanced = False + self.synchronized.set() + self.notify_es_notification_listeners(self._last_wrote_height) -def run_elastic_sync(): - logging.basicConfig(level=logging.INFO) - logging.getLogger('aiohttp').setLevel(logging.WARNING) - logging.getLogger('elasticsearch').setLevel(logging.WARNING) + @property + def last_synced_height(self) -> int: + return self._last_wrote_height - logging.info('lbry.server starting') - parser = argparse.ArgumentParser(prog="lbry-hub-elastic-sync") - parser.add_argument("-c", "--clients", type=int, default=32) - parser.add_argument("-f", "--force", default=False, action='store_true') - Env.contribute_to_arg_parser(parser) - args = parser.parse_args() - env = Env.from_arg_parser(args) + async def start(self): + env = self.env + min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() - if not os.path.exists(os.path.join(args.db_dir, 'lbry-leveldb')): - logging.info("DB path doesnt exist, nothing to sync to ES") - return + def _start_cancellable(run, *args): + _flag = asyncio.Event() + self.cancellable_tasks.append(asyncio.ensure_future(run(*args, _flag))) + return _flag.wait() - asyncio.run(make_es_index_and_run_sync(env, clients=args.clients, force=args.force)) + self.db.open_db() + await self.db.initialize_caches() + await self.start_index() + self.last_state = self.db.read_db_state() + + await _start_cancellable(self.run_es_notifier) + await _start_cancellable(self.refresh_blocks_forever) + + async def stop(self, delete_index=False): + while self.cancellable_tasks: + t = self.cancellable_tasks.pop() + if not t.done(): + t.cancel() + if delete_index: + await self.delete_index() + await self.stop_index() + + def run(self): + loop = asyncio.get_event_loop() + + def __exit(): + raise SystemExit() + try: + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + loop.run_until_complete(self.start()) + loop.run_until_complete(self.shutdown_event.wait()) + except (SystemExit, KeyboardInterrupt): + pass + finally: + loop.run_until_complete(self.stop())