From 87037c06c9270f90b4cc0dae136ad821b645fa0c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Feb 2021 13:29:41 -0300 Subject: [PATCH] remove reader code --- lbry/wallet/server/db/elastic_search.py | 2 +- lbry/wallet/server/db/reader.py | 634 -------------------- lbry/wallet/server/session.py | 34 +- scripts/claim_search_performance.py | 177 ------ scripts/sqlite_perf_test.py | 62 -- tests/unit/wallet/server/test_sqldb.py | 762 ------------------------ 6 files changed, 7 insertions(+), 1664 deletions(-) delete mode 100644 lbry/wallet/server/db/reader.py delete mode 100644 scripts/claim_search_performance.py delete mode 100644 scripts/sqlite_perf_test.py delete mode 100644 tests/unit/wallet/server/test_sqldb.py diff --git a/lbry/wallet/server/db/elastic_search.py b/lbry/wallet/server/db/elastic_search.py index 764096f11..a4df33cc8 100644 --- a/lbry/wallet/server/db/elastic_search.py +++ b/lbry/wallet/server/db/elastic_search.py @@ -150,7 +150,7 @@ class SearchIndex: await self.client.indices.refresh(self.index) await self.client.update_by_query(self.index, body=update) - async def session_query(self, query_name, function, kwargs): + async def session_query(self, query_name, kwargs): offset, total = kwargs.get('offset', 0) if isinstance(kwargs, dict) else 0, 0 total_referenced = [] if query_name == 'resolve': diff --git a/lbry/wallet/server/db/reader.py b/lbry/wallet/server/db/reader.py deleted file mode 100644 index 8132c693f..000000000 --- a/lbry/wallet/server/db/reader.py +++ /dev/null @@ -1,634 +0,0 @@ -import time -import struct -import apsw -import logging -from operator import itemgetter -from typing import Tuple, List, Dict, Union, Type, Optional -from binascii import unhexlify -from decimal import Decimal -from contextvars import ContextVar -from functools import wraps -from itertools import chain -from dataclasses import dataclass - -from lbry.wallet.database import query, interpolate -from lbry.error import ResolveCensoredError -from lbry.schema.url import URL, normalize_name -from lbry.schema.tags import clean_tags -from lbry.schema.result import Outputs, Censor -from lbry.wallet import Ledger, RegTestLedger - -from .common import CLAIM_TYPES, STREAM_TYPES, COMMON_TAGS, INDEXED_LANGUAGES - - -class SQLiteOperationalError(apsw.Error): - def __init__(self, metrics): - super().__init__('sqlite query errored') - self.metrics = metrics - - -class SQLiteInterruptedError(apsw.InterruptError): - def __init__(self, metrics): - super().__init__('sqlite query interrupted') - self.metrics = metrics - - -ATTRIBUTE_ARRAY_MAX_LENGTH = 100 - -INTEGER_PARAMS = { - 'height', 'creation_height', 'activation_height', 'expiration_height', - 'timestamp', 'creation_timestamp', 'duration', 'release_time', 'fee_amount', - 'tx_position', 'channel_join', 'reposted', 'limit_claims_per_channel', - 'amount', 'effective_amount', 'support_amount', - 'trending_group', 'trending_mixed', - 'trending_local', 'trending_global', -} - -SEARCH_PARAMS = { - 'name', 'text', 'claim_id', 'claim_ids', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids', - 'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency', - 'has_channel_signature', 'signature_valid', - 'any_tags', 'all_tags', 'not_tags', 'reposted_claim_id', - 'any_locations', 'all_locations', 'not_locations', - 'any_languages', 'all_languages', 'not_languages', - 'is_controlling', 'limit', 'offset', 'order_by', - 'no_totals', 'has_source' -} | INTEGER_PARAMS - - -ORDER_FIELDS = { - 'name', 'claim_hash' -} | INTEGER_PARAMS - - -@dataclass -class ReaderState: - db: apsw.Connection - stack: List[List] - metrics: Dict - is_tracking_metrics: bool - ledger: Type[Ledger] - query_timeout: float - log: logging.Logger - blocked_streams: Dict - blocked_channels: Dict - filtered_streams: Dict - filtered_channels: Dict - - def close(self): - self.db.close() - - def reset_metrics(self): - self.stack = [] - self.metrics = {} - - def set_query_timeout(self): - stop_at = time.perf_counter() + self.query_timeout - - def interruptor(): - if time.perf_counter() >= stop_at: - self.db.interrupt() - return - - self.db.setprogresshandler(interruptor, 100) - - def get_resolve_censor(self) -> Censor: - return Censor(self.blocked_streams, self.blocked_channels) - - def get_search_censor(self, limit_claims_per_channel: int) -> Censor: - return Censor(self.filtered_streams, self.filtered_channels, limit_claims_per_channel) - - -ctx: ContextVar[Optional[ReaderState]] = ContextVar('ctx') - - -def row_factory(cursor, row): - return { - k[0]: (set(row[i].split(',')) if k[0] == 'tags' else row[i]) - for i, k in enumerate(cursor.getdescription()) - } - - -def initializer(log, _path, _ledger_name, query_timeout, _measure=False, block_and_filter=None): - db = apsw.Connection(_path, flags=apsw.SQLITE_OPEN_READONLY | apsw.SQLITE_OPEN_URI) - db.setrowtrace(row_factory) - if block_and_filter: - blocked_streams, blocked_channels, filtered_streams, filtered_channels = block_and_filter - else: - blocked_streams = blocked_channels = filtered_streams = filtered_channels = {} - ctx.set( - ReaderState( - db=db, stack=[], metrics={}, is_tracking_metrics=_measure, - ledger=Ledger if _ledger_name == 'mainnet' else RegTestLedger, - query_timeout=query_timeout, log=log, - blocked_streams=blocked_streams, blocked_channels=blocked_channels, - filtered_streams=filtered_streams, filtered_channels=filtered_channels, - ) - ) - - -def cleanup(): - ctx.get().close() - ctx.set(None) - - -def measure(func): - @wraps(func) - def wrapper(*args, **kwargs): - state = ctx.get() - if not state.is_tracking_metrics: - return func(*args, **kwargs) - metric = {} - state.metrics.setdefault(func.__name__, []).append(metric) - state.stack.append([]) - start = time.perf_counter() - try: - return func(*args, **kwargs) - finally: - elapsed = int((time.perf_counter()-start)*1000) - metric['total'] = elapsed - metric['isolated'] = (elapsed-sum(state.stack.pop())) - if state.stack: - state.stack[-1].append(elapsed) - return wrapper - - -def reports_metrics(func): - @wraps(func) - def wrapper(*args, **kwargs): - state = ctx.get() - if not state.is_tracking_metrics: - return func(*args, **kwargs) - state.reset_metrics() - r = func(*args, **kwargs) - return r, state.metrics - return wrapper - - -@reports_metrics -def search_to_bytes(constraints) -> Union[bytes, Tuple[bytes, Dict]]: - return encode_result(search(constraints)) - - -@reports_metrics -def resolve_to_bytes(urls) -> Union[bytes, Tuple[bytes, Dict]]: - return encode_result(resolve(urls)) - - -def encode_result(result): - return Outputs.to_bytes(*result) - - -@measure -def execute_query(sql, values, row_offset: int, row_limit: int, censor: Censor) -> List: - context = ctx.get() - context.set_query_timeout() - try: - c = context.db.cursor() - def row_filter(cursor, row): - nonlocal row_offset - row = row_factory(cursor, row) - if len(row) > 1 and censor.censor(row): - return - if row_offset: - row_offset -= 1 - return - return row - c.setrowtrace(row_filter) - i, rows = 0, [] - for row in c.execute(sql, values): - i += 1 - rows.append(row) - if i >= row_limit: - break - return rows - except apsw.Error as err: - plain_sql = interpolate(sql, values) - if context.is_tracking_metrics: - context.metrics['execute_query'][-1]['sql'] = plain_sql - if isinstance(err, apsw.InterruptError): - context.log.warning("interrupted slow sqlite query:\n%s", plain_sql) - raise SQLiteInterruptedError(context.metrics) - context.log.exception('failed running query', exc_info=err) - raise SQLiteOperationalError(context.metrics) - - -def claims_query(cols, for_count=False, **constraints) -> Tuple[str, Dict]: - if 'order_by' in constraints: - order_by_parts = constraints['order_by'] - if isinstance(order_by_parts, str): - order_by_parts = [order_by_parts] - sql_order_by = [] - for order_by in order_by_parts: - is_asc = order_by.startswith('^') - column = order_by[1:] if is_asc else order_by - if column not in ORDER_FIELDS: - raise NameError(f'{column} is not a valid order_by field') - if column == 'name': - column = 'normalized' - sql_order_by.append( - f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" - ) - constraints['order_by'] = sql_order_by - - ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'} - for constraint in INTEGER_PARAMS: - if constraint in constraints: - value = constraints.pop(constraint) - postfix = '' - if isinstance(value, str): - if len(value) >= 2 and value[:2] in ops: - postfix, value = ops[value[:2]], value[2:] - elif len(value) >= 1 and value[0] in ops: - postfix, value = ops[value[0]], value[1:] - if constraint == 'fee_amount': - value = Decimal(value)*1000 - constraints[f'claim.{constraint}{postfix}'] = int(value) - - if constraints.pop('is_controlling', False): - if {'sequence', 'amount_order'}.isdisjoint(constraints): - for_count = False - constraints['claimtrie.claim_hash__is_not_null'] = '' - if 'sequence' in constraints: - constraints['order_by'] = 'claim.activation_height ASC' - constraints['offset'] = int(constraints.pop('sequence')) - 1 - constraints['limit'] = 1 - if 'amount_order' in constraints: - constraints['order_by'] = 'claim.effective_amount DESC' - constraints['offset'] = int(constraints.pop('amount_order')) - 1 - constraints['limit'] = 1 - - if 'claim_id' in constraints: - claim_id = constraints.pop('claim_id') - if len(claim_id) == 40: - constraints['claim.claim_id'] = claim_id - else: - constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' - elif 'claim_ids' in constraints: - constraints['claim.claim_id__in'] = set(constraints.pop('claim_ids')) - - if 'reposted_claim_id' in constraints: - constraints['claim.reposted_claim_hash'] = unhexlify(constraints.pop('reposted_claim_id'))[::-1] - - if 'name' in constraints: - constraints['claim.normalized'] = normalize_name(constraints.pop('name')) - - if 'public_key_id' in constraints: - constraints['claim.public_key_hash'] = ( - ctx.get().ledger.address_to_hash160(constraints.pop('public_key_id'))) - if 'channel_hash' in constraints: - constraints['claim.channel_hash'] = constraints.pop('channel_hash') - if 'channel_ids' in constraints: - channel_ids = constraints.pop('channel_ids') - if channel_ids: - constraints['claim.channel_hash__in'] = { - unhexlify(cid)[::-1] for cid in channel_ids if cid - } - if 'not_channel_ids' in constraints: - not_channel_ids = constraints.pop('not_channel_ids') - if not_channel_ids: - not_channel_ids_binary = { - unhexlify(ncid)[::-1] for ncid in not_channel_ids - } - constraints['claim.claim_hash__not_in#not_channel_ids'] = not_channel_ids_binary - if constraints.get('has_channel_signature', False): - constraints['claim.channel_hash__not_in'] = not_channel_ids_binary - else: - constraints['null_or_not_channel__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.channel_hash__not_in': not_channel_ids_binary - } - if 'signature_valid' in constraints: - has_channel_signature = constraints.pop('has_channel_signature', False) - if has_channel_signature: - constraints['claim.signature_valid'] = constraints.pop('signature_valid') - else: - constraints['null_or_signature__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.signature_valid': constraints.pop('signature_valid') - } - elif constraints.pop('has_channel_signature', False): - constraints['claim.signature_valid__is_not_null'] = True - - if 'txid' in constraints: - tx_hash = unhexlify(constraints.pop('txid'))[::-1] - nout = constraints.pop('nout', 0) - constraints['claim.txo_hash'] = tx_hash + struct.pack(' List: - if 'channel' in constraints: - channel_url = constraints.pop('channel') - match = resolve_url(channel_url) - if isinstance(match, dict): - constraints['channel_hash'] = match['claim_hash'] - else: - return [{'row_count': 0}] if cols == 'count(*) as row_count' else [] - row_offset = constraints.pop('offset', 0) - row_limit = constraints.pop('limit', 20) - sql, values = claims_query(cols, for_count, **constraints) - return execute_query(sql, values, row_offset, row_limit, censor) - - -@measure -def count_claims(**constraints) -> int: - constraints.pop('offset', None) - constraints.pop('limit', None) - constraints.pop('order_by', None) - count = select_claims(Censor(), 'count(*) as row_count', for_count=True, **constraints) - return count[0]['row_count'] - - -def search_claims(censor: Censor, **constraints) -> List: - return select_claims( - censor, - """ - claimtrie.claim_hash as is_controlling, - claimtrie.last_take_over_height, - claim.claim_hash, claim.txo_hash, - claim.claims_in_channel, claim.reposted, - claim.height, claim.creation_height, - claim.activation_height, claim.expiration_height, - claim.effective_amount, claim.support_amount, - claim.trending_group, claim.trending_mixed, - claim.trending_local, claim.trending_global, - claim.short_url, claim.canonical_url, - claim.channel_hash, claim.reposted_claim_hash, - claim.signature_valid - """, **constraints - ) - - -def _get_referenced_rows(txo_rows: List[dict], censor_channels: List[bytes]): - censor = ctx.get().get_resolve_censor() - repost_hashes = set(filter(None, map(itemgetter('reposted_claim_hash'), txo_rows))) - channel_hashes = set(chain( - filter(None, map(itemgetter('channel_hash'), txo_rows)), - censor_channels - )) - - reposted_txos = [] - if repost_hashes: - reposted_txos = search_claims(censor, **{'claim.claim_hash__in': repost_hashes}) - channel_hashes |= set(filter(None, map(itemgetter('channel_hash'), reposted_txos))) - - channel_txos = [] - if channel_hashes: - channel_txos = search_claims(censor, **{'claim.claim_hash__in': channel_hashes}) - - # channels must come first for client side inflation to work properly - return channel_txos + reposted_txos - -@measure -def search(constraints) -> Tuple[List, List, int, int, Censor]: - assert set(constraints).issubset(SEARCH_PARAMS), \ - f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" - total = None - limit_claims_per_channel = constraints.pop('limit_claims_per_channel', None) - if not constraints.pop('no_totals', False): - total = count_claims(**constraints) - constraints['offset'] = abs(constraints.get('offset', 0)) - constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) - context = ctx.get() - search_censor = context.get_search_censor(limit_claims_per_channel) - txo_rows = search_claims(search_censor, **constraints) - extra_txo_rows = _get_referenced_rows(txo_rows, search_censor.censored.keys()) - return txo_rows, extra_txo_rows, constraints['offset'], total, search_censor - - -@measure -def resolve(urls) -> Tuple[List, List]: - txo_rows = [resolve_url(raw_url) for raw_url in urls] - extra_txo_rows = _get_referenced_rows( - [txo for txo in txo_rows if isinstance(txo, dict)], - [txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)] - ) - return txo_rows, extra_txo_rows - - -@measure -def resolve_url(raw_url): - censor = ctx.get().get_resolve_censor() - - try: - url = URL.parse(raw_url) - except ValueError as e: - return e - - channel = None - - if url.has_channel: - query = url.channel.to_dict() - if set(query) == {'name'}: - query['is_controlling'] = True - else: - query['order_by'] = ['^creation_height'] - matches = search_claims(censor, **query, limit=1) - if matches: - channel = matches[0] - elif censor.censored: - return ResolveCensoredError(raw_url, next(iter(censor.censored))) - else: - return LookupError(f'Could not find channel in "{raw_url}".') - - if url.has_stream: - query = url.stream.to_dict() - if channel is not None: - if set(query) == {'name'}: - # temporarily emulate is_controlling for claims in channel - query['order_by'] = ['effective_amount', '^height'] - else: - query['order_by'] = ['^channel_join'] - query['channel_hash'] = channel['claim_hash'] - query['signature_valid'] = 1 - elif set(query) == {'name'}: - query['is_controlling'] = 1 - matches = search_claims(censor, **query, limit=1) - if matches: - return matches[0] - elif censor.censored: - return ResolveCensoredError(raw_url, next(iter(censor.censored))) - else: - return LookupError(f'Could not find claim at "{raw_url}".') - - return channel - - -CLAIM_HASH_OR_REPOST_HASH_SQL = f""" -CASE WHEN claim.claim_type = {CLAIM_TYPES['repost']} - THEN claim.reposted_claim_hash - ELSE claim.claim_hash -END -""" - - -def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): - any_items = set(cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - all_items = set(cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - not_items = set(cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]) - - all_items = {item for item in all_items if item not in not_items} - any_items = {item for item in any_items if item not in not_items} - - any_queries = {} - - if attr == 'tag': - common_tags = any_items & COMMON_TAGS.keys() - if common_tags: - any_items -= common_tags - if len(common_tags) < 5: - for item in common_tags: - index_name = COMMON_TAGS[item] - any_queries[f'#_common_tag_{index_name}'] = f""" - EXISTS( - SELECT 1 FROM tag INDEXED BY tag_{index_name}_idx - WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash - AND tag = '{item}' - ) - """ - elif len(common_tags) >= 5: - constraints.update({ - f'$any_common_tag{i}': item for i, item in enumerate(common_tags) - }) - values = ', '.join( - f':$any_common_tag{i}' for i in range(len(common_tags)) - ) - any_queries[f'#_any_common_tags'] = f""" - EXISTS( - SELECT 1 FROM tag WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=tag.claim_hash - AND tag IN ({values}) - ) - """ - elif attr == 'language': - indexed_languages = any_items & set(INDEXED_LANGUAGES) - if indexed_languages: - any_items -= indexed_languages - for language in indexed_languages: - any_queries[f'#_any_common_languages_{language}'] = f""" - EXISTS( - SELECT 1 FROM language INDEXED BY language_{language}_idx - WHERE {CLAIM_HASH_OR_REPOST_HASH_SQL}=language.claim_hash - AND language = '{language}' - ) - """ - - if any_items: - - constraints.update({ - f'$any_{attr}{i}': item for i, item in enumerate(any_items) - }) - values = ', '.join( - f':$any_{attr}{i}' for i in range(len(any_items)) - ) - if for_count or attr == 'tag': - if attr == 'tag': - any_queries[f'#_any_{attr}'] = f""" - ((claim.claim_type != {CLAIM_TYPES['repost']} - AND claim.claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR - (claim.claim_type == {CLAIM_TYPES['repost']} AND - claim.reposted_claim_hash IN (SELECT claim_hash FROM tag WHERE tag IN ({values})))) - """ - else: - any_queries[f'#_any_{attr}'] = f""" - {CLAIM_HASH_OR_REPOST_HASH_SQL} IN ( - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - ) - """ - else: - any_queries[f'#_any_{attr}'] = f""" - EXISTS( - SELECT 1 FROM {attr} WHERE - {CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - if len(any_queries) == 1: - constraints.update(any_queries) - elif len(any_queries) > 1: - constraints[f'ORed_{attr}_queries__any'] = any_queries - - if all_items: - constraints[f'$all_{attr}_count'] = len(all_items) - constraints.update({ - f'$all_{attr}{i}': item for i, item in enumerate(all_items) - }) - values = ', '.join( - f':$all_{attr}{i}' for i in range(len(all_items)) - ) - if for_count: - constraints[f'#_all_{attr}'] = f""" - {CLAIM_HASH_OR_REPOST_HASH_SQL} IN ( - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count - ) - """ - else: - constraints[f'#_all_{attr}'] = f""" - {len(all_items)}=( - SELECT count(*) FROM {attr} WHERE - {CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - if not_items: - constraints.update({ - f'$not_{attr}{i}': item for i, item in enumerate(not_items) - }) - values = ', '.join( - f':$not_{attr}{i}' for i in range(len(not_items)) - ) - if for_count: - if attr == 'tag': - constraints[f'#_not_{attr}'] = f""" - ((claim.claim_type != {CLAIM_TYPES['repost']} - AND claim.claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values}))) OR - (claim.claim_type == {CLAIM_TYPES['repost']} AND - claim.reposted_claim_hash NOT IN (SELECT claim_hash FROM tag WHERE tag IN ({values})))) - """ - else: - constraints[f'#_not_{attr}'] = f""" - {CLAIM_HASH_OR_REPOST_HASH_SQL} NOT IN ( - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - ) - """ - else: - constraints[f'#_not_{attr}'] = f""" - NOT EXISTS( - SELECT 1 FROM {attr} WHERE - {CLAIM_HASH_OR_REPOST_HASH_SQL}={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ diff --git a/lbry/wallet/server/session.py b/lbry/wallet/server/session.py index 2df35690f..8193f6c11 100644 --- a/lbry/wallet/server/session.py +++ b/lbry/wallet/server/session.py @@ -25,7 +25,6 @@ from lbry.utils import LRUCacheWithMetrics from lbry.build_info import BUILD, COMMIT_HASH, DOCKER_TAG from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.db.writer import LBRYLevelDB -from lbry.wallet.server.db import reader from lbry.wallet.server.websocket import AdminWebSocket from lbry.wallet.server.metrics import ServerLoadData, APICallMetrics from lbry.wallet.rpc.framing import NewlineFramer @@ -829,22 +828,11 @@ class LBRYSessionManager(SessionManager): async def start_other(self): self.running = True - path = os.path.join(self.env.db_dir, 'claims.db') - args = dict( - initializer=reader.initializer, - initargs=( - self.logger, path, self.env.coin.NET, self.env.database_query_timeout, - self.env.track_metrics, ( - self.db.sql.blocked_streams, self.db.sql.blocked_channels, - self.db.sql.filtered_streams, self.db.sql.filtered_channels - ) - ) - ) if self.env.max_query_workers is not None and self.env.max_query_workers == 0: - self.query_executor = ThreadPoolExecutor(max_workers=1, **args) + self.query_executor = ThreadPoolExecutor(max_workers=1) else: self.query_executor = ProcessPoolExecutor( - max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args + max_workers=self.env.max_query_workers or max(os.cpu_count(), 4) ) if self.websocket is not None: await self.websocket.start() @@ -1002,16 +990,6 @@ class LBRYElectrumX(SessionBase): ) except asyncio.CancelledError: raise - except reader.SQLiteInterruptedError as error: - metrics = self.get_metrics_or_placeholder_for_api(query_name) - metrics.query_interrupt(start, error.metrics) - self.session_mgr.interrupt_count_metric.inc() - raise RPCError(JSONRPC.QUERY_TIMEOUT, 'sqlite query timed out') - except reader.SQLiteOperationalError as error: - metrics = self.get_metrics_or_placeholder_for_api(query_name) - metrics.query_error(start, error.metrics) - self.session_mgr.db_operational_error_metric.inc() - raise RPCError(JSONRPC.INTERNAL_ERROR, 'query failed to execute') except Exception: log.exception("dear devs, please handle this exception better") metrics = self.get_metrics_or_placeholder_for_api(query_name) @@ -1028,7 +1006,7 @@ class LBRYElectrumX(SessionBase): self.session_mgr.pending_query_metric.dec() self.session_mgr.executor_time_metric.observe(time.perf_counter() - start) - async def run_and_cache_query(self, query_name, function, kwargs): + async def run_and_cache_query(self, query_name, kwargs): if isinstance(kwargs, dict) and 'trending_mixed' in kwargs.get('order_by', {}): # fixme: trending_mixed is 0 for all records on variable decay, making sort slow. # also, release_time isnt releavant when sorting by trending but it makes cache bad @@ -1047,7 +1025,7 @@ class LBRYElectrumX(SessionBase): return cache_item.result async with cache_item.lock: if cache_item.result is None: - cache_item.result = await self.db.search_index.session_query(query_name, function, kwargs) + cache_item.result = await self.db.search_index.session_query(query_name, kwargs) else: metrics = self.get_metrics_or_placeholder_for_api(query_name) metrics.cache_response() @@ -1058,14 +1036,14 @@ class LBRYElectrumX(SessionBase): async def claimtrie_search(self, **kwargs): if kwargs: - return await self.run_and_cache_query('search', reader.search_to_bytes, kwargs) + return await self.run_and_cache_query('search', kwargs) async def claimtrie_resolve(self, *urls): if urls: count = len(urls) try: self.session_mgr.urls_to_resolve_count_metric.inc(count) - return await self.run_and_cache_query('resolve', reader.resolve_to_bytes, urls) + return await self.run_and_cache_query('resolve', urls) finally: self.session_mgr.resolved_url_count_metric.inc(count) diff --git a/scripts/claim_search_performance.py b/scripts/claim_search_performance.py deleted file mode 100644 index 21af64743..000000000 --- a/scripts/claim_search_performance.py +++ /dev/null @@ -1,177 +0,0 @@ -import os -import time -import textwrap -import argparse -import asyncio -import logging -from concurrent.futures.process import ProcessPoolExecutor -from lbry.wallet.server.db.reader import search_to_bytes, initializer, _get_claims, interpolate -from lbry.wallet.ledger import MainNetLedger - -log = logging.getLogger(__name__) -log.addHandler(logging.StreamHandler()) -log.setLevel(logging.CRITICAL) - -DEFAULT_ANY_TAGS = [ - 'blockchain', - 'news', - 'learning', - 'technology', - 'automotive', - 'economics', - 'food', - 'science', - 'art', - 'nature' -] - -COMMON_AND_RARE = [ - 'gaming', - 'ufos' -] - -COMMON_AND_RARE2 = [ - 'city fix', - 'gaming' -] - -RARE_ANY_TAGS = [ - 'city fix', - 'ufos', -] - -CITY_FIX = [ - 'city fix' -] - -MATURE_TAGS = [ - 'porn', - 'nsfw', - 'mature', - 'xxx' -] - -ORDER_BY = [ - [ - "trending_global", - "trending_mixed", - ], - [ - "release_time" - ], - [ - "effective_amount" - ] -] - - -def get_args(limit=20): - args = [] - any_tags_combinations = [DEFAULT_ANY_TAGS, COMMON_AND_RARE, RARE_ANY_TAGS, COMMON_AND_RARE2, CITY_FIX, []] - not_tags_combinations = [MATURE_TAGS, []] - for no_fee in [False, True]: - for claim_type in [None, 'stream', 'channel']: - for no_totals in [True]: - for offset in [0, 100]: - for any_tags in any_tags_combinations: - for not_tags in not_tags_combinations: - for order_by in ORDER_BY: - kw = { - 'order_by': order_by, - 'offset': offset, - 'limit': limit, - 'no_totals': no_totals - } - if not_tags: - kw['not_tags'] = not_tags - if any_tags: - kw['any_tags'] = any_tags - if claim_type: - kw['claim_type'] = claim_type - if no_fee: - kw['fee_amount'] = 0 - args.append(kw) - print(f"-- Trying {len(args)} argument combinations") - return args - - -def _search(kwargs): - start = time.perf_counter() - error = None - try: - search_to_bytes(kwargs) - except Exception as err: - error = str(err) - return time.perf_counter() - start, kwargs, error - - -async def search(executor, kwargs): - return await asyncio.get_running_loop().run_in_executor( - executor, _search, kwargs - ) - - -async def main(db_path, max_query_time): - args = dict(initializer=initializer, initargs=(log, db_path, MainNetLedger, 0.25)) - workers = max(os.cpu_count(), 4) - log.info(f"using {workers} reader processes") - query_executor = ProcessPoolExecutor(workers, **args) - tasks = [search(query_executor, constraints) for constraints in get_args()] - try: - results = await asyncio.gather(*tasks) - query_times = [ - { - 'sql': interpolate(*_get_claims(""" - claimtrie.claim_hash as is_controlling, - claimtrie.last_take_over_height, - claim.claim_hash, claim.txo_hash, - claim.claims_in_channel, - claim.height, claim.creation_height, - claim.activation_height, claim.expiration_height, - claim.effective_amount, claim.support_amount, - claim.trending_group, claim.trending_mixed, - claim.trending_local, claim.trending_global, - claim.short_url, claim.canonical_url, - claim.channel_hash, channel.txo_hash AS channel_txo_hash, - channel.height AS channel_height, claim.signature_valid - """, **constraints)), - 'duration': ts, - 'error': error - } - for ts, constraints, error in results - ] - errored = [query_info for query_info in query_times if query_info['error']] - errors = {str(query_info['error']): [] for query_info in errored} - for error in errored: - errors[str(error['error'])].append(error['sql']) - slow = [ - query_info for query_info in query_times - if not query_info['error'] and query_info['duration'] > (max_query_time / 2.0) - ] - fast = [ - query_info for query_info in query_times - if not query_info['error'] and query_info['duration'] <= (max_query_time / 2.0) - ] - print(f"-- {len(fast)} queries were fast") - slow.sort(key=lambda query_info: query_info['duration'], reverse=True) - print(f"-- Failing queries:") - for error in errors: - print(f"-- Failure: \"{error}\"") - for failing_query in errors[error]: - print(f"{textwrap.dedent(failing_query)};\n") - print() - print(f"-- Slow queries:") - for slow_query in slow: - print(f"-- Query took {slow_query['duration']}\n{textwrap.dedent(slow_query['sql'])};\n") - finally: - query_executor.shutdown() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument('--db_path', dest='db_path', default=os.path.expanduser('~/claims.db'), type=str) - parser.add_argument('--max_time', dest='max_time', default=0.25, type=float) - args = parser.parse_args() - db_path = args.db_path - max_query_time = args.max_time - asyncio.run(main(db_path, max_query_time)) diff --git a/scripts/sqlite_perf_test.py b/scripts/sqlite_perf_test.py deleted file mode 100644 index 9da638ff4..000000000 --- a/scripts/sqlite_perf_test.py +++ /dev/null @@ -1,62 +0,0 @@ -import uvloop, asyncio, time, sys, logging -from concurrent.futures import ProcessPoolExecutor -from lbry.wallet.server.db import reader -from lbry.wallet.server.metrics import calculate_avg_percentiles - - -db_path = '../../../lbryconf/wallet-server/claims.db' -default_query_timout = 0.25 -log = logging.getLogger(__name__) -log.addHandler(logging.StreamHandler()) - - -async def run_times(executor, iterations, show=True): - start = time.perf_counter() - timings = await asyncio.gather(*(asyncio.get_running_loop().run_in_executor( - executor, reader.search_to_bytes, { - 'no_totals': True, - 'offset': 0, - 'limit': 20, - 'any_tags': [ - 'ufos', 'city fix' - ], - 'not_tags': [ - 'porn', 'mature', 'xxx', 'nsfw' - ], - 'order_by': [ - 'release_time' - ] - } - ) for _ in range(iterations))) - timings = [r[1]['execute_query'][0]['total'] for r in timings] - total = int((time.perf_counter() - start) * 100) - if show: - avg = sum(timings)/len(timings) - print(f"{iterations:4}: {total}ms total concurrent, {len(timings)*avg*1000:.3f}s total sequential (avg*runs)") - print(f" {total/len(timings):.1f}ms/query concurrent (total/runs)") - print(f" {avg:.1f}ms/query actual average (sum(queries)/runs)") - stats = calculate_avg_percentiles(timings) - print(f" min: {stats[1]}, 5%: {stats[2]}, 25%: {stats[3]}, 50%: {stats[4]}, 75%: {stats[5]}, 95%: {stats[6]}, max: {stats[7]}") - sys.stdout.write(' sample:') - for i, t in zip(range(10), timings[::-1]): - sys.stdout.write(f' {t}ms') - print(' ...\n' if len(timings) > 10 else '\n') - - -async def main(): - executor = ProcessPoolExecutor( - 4, initializer=reader.initializer, initargs=(log, db_path, 'mainnet', 1.0, True) - ) - #await run_times(executor, 4, show=False) - #await run_times(executor, 1) - await run_times(executor, 2**3) - await run_times(executor, 2**5) - await run_times(executor, 2**7) - #await run_times(executor, 2**9) - #await run_times(executor, 2**11) - #await run_times(executor, 2**13) - executor.shutdown(True) - -if __name__ == '__main__': - uvloop.install() - asyncio.run(main()) diff --git a/tests/unit/wallet/server/test_sqldb.py b/tests/unit/wallet/server/test_sqldb.py deleted file mode 100644 index 9d644314b..000000000 --- a/tests/unit/wallet/server/test_sqldb.py +++ /dev/null @@ -1,762 +0,0 @@ -import unittest -import ecdsa -import hashlib -import logging -from binascii import hexlify -from typing import List, Tuple - -from lbry.wallet.constants import COIN, NULL_HASH32 -from lbry.schema.claim import Claim -from lbry.schema.result import Censor -from lbry.wallet.server.db import reader, writer -from lbry.wallet.server.coin import LBCRegTest -from lbry.wallet.server.db.trending import zscore -from lbry.wallet.server.db.canonical import FindShortestID -from lbry.wallet.server.block_processor import Timer -from lbry.wallet.transaction import Transaction, Input, Output - - -def get_output(amount=COIN, pubkey_hash=NULL_HASH32): - return Transaction() \ - .add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \ - .outputs[0] - - -def get_input(): - return Input.spend(get_output()) - - -def get_tx(): - return Transaction().add_inputs([get_input()]) - - -def search(**constraints) -> List: - return reader.search_claims(Censor(2), **constraints) - - -def censored_search(**constraints) -> Tuple[List, Censor]: - rows, _, _, _, censor = reader.search(constraints) - return rows, censor - - -class TestSQLDB(unittest.TestCase): - query_timeout = 0.25 - - def setUp(self): - self.first_sync = False - self.daemon_height = 1 - self.coin = LBCRegTest() - db_url = 'file:test_sqldb?mode=memory&cache=shared' - self.sql = writer.SQLDB(self, db_url, [], [], [zscore]) - self.addCleanup(self.sql.close) - self.sql.open() - reader.initializer( - logging.getLogger(__name__), db_url, 'regtest', - self.query_timeout, block_and_filter=( - self.sql.blocked_streams, self.sql.blocked_channels, - self.sql.filtered_streams, self.sql.filtered_channels - ) - ) - self.addCleanup(reader.cleanup) - self.timer = Timer('BlockProcessor') - self._current_height = 0 - self._txos = {} - - def _make_tx(self, output, txi=None): - tx = get_tx().add_outputs([output]) - if txi is not None: - tx.add_inputs([txi]) - self._txos[output.ref.hash] = output - return tx, tx.hash - - def _set_channel_key(self, channel, key): - private_key = ecdsa.SigningKey.from_string(key*32, curve=ecdsa.SECP256k1, hashfunc=hashlib.sha256) - channel.private_key = private_key - channel.claim.channel.public_key_bytes = private_key.get_verifying_key().to_der() - channel.script.generate() - - def get_channel(self, title, amount, name='@foo', key=b'a'): - claim = Claim() - claim.channel.title = title - channel = Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc') - self._set_channel_key(channel, key) - return self._make_tx(channel) - - def get_channel_update(self, channel, amount, key=b'a'): - self._set_channel_key(channel, key) - return self._make_tx( - Output.pay_update_claim_pubkey_hash( - amount, channel.claim_name, channel.claim_id, channel.claim, b'abc' - ), - Input.spend(channel) - ) - - def get_stream(self, title, amount, name='foo', channel=None, **kwargs): - claim = Claim() - claim.stream.update(title=title, **kwargs) - result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, name, claim, b'abc')) - if channel: - result[0].outputs[0].sign(channel) - result[0]._reset() - return result - - def get_stream_update(self, tx, amount, channel=None): - stream = Transaction(tx[0].raw).outputs[0] - result = self._make_tx( - Output.pay_update_claim_pubkey_hash( - amount, stream.claim_name, stream.claim_id, stream.claim, b'abc' - ), - Input.spend(stream) - ) - if channel: - result[0].outputs[0].sign(channel) - result[0]._reset() - return result - - def get_repost(self, claim_id, amount, channel): - claim = Claim() - claim.repost.reference.claim_id = claim_id - result = self._make_tx(Output.pay_claim_name_pubkey_hash(amount, 'repost', claim, b'abc')) - result[0].outputs[0].sign(channel) - result[0]._reset() - return result - - def get_abandon(self, tx): - claim = Transaction(tx[0].raw).outputs[0] - return self._make_tx( - Output.pay_pubkey_hash(claim.amount, b'abc'), - Input.spend(claim) - ) - - def get_support(self, tx, amount): - claim = Transaction(tx[0].raw).outputs[0] - return self._make_tx( - Output.pay_support_pubkey_hash( - amount, claim.claim_name, claim.claim_id, b'abc' - ) - ) - - def get_controlling(self): - for claim in self.sql.execute("select claim.* from claimtrie natural join claim"): - txo = self._txos[claim.txo_hash] - controlling = txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height - return controlling - - def get_active(self): - controlling = self.get_controlling() - active = [] - for claim in self.sql.execute( - f"select * from claim where activation_height <= {self._current_height}"): - txo = self._txos[claim.txo_hash] - if controlling and controlling[0] == txo.claim.stream.title: - continue - active.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height)) - return active - - def get_accepted(self): - accepted = [] - for claim in self.sql.execute( - f"select * from claim where activation_height > {self._current_height}"): - txo = self._txos[claim.txo_hash] - accepted.append((txo.claim.stream.title, claim.amount, claim.effective_amount, claim.activation_height)) - return accepted - - def advance(self, height, txs): - self._current_height = height - self.sql.advance_txs(height, txs, {'timestamp': 1}, self.daemon_height, self.timer) - return [otx[0].outputs[0] for otx in txs] - - def state(self, controlling=None, active=None, accepted=None): - self.assertEqual(controlling, self.get_controlling()) - self.assertEqual(active or [], self.get_active()) - self.assertEqual(accepted or [], self.get_accepted()) - - -class TestClaimtrie(TestSQLDB): - - def test_example_from_spec(self): - # https://spec.lbry.com/#claim-activation-example - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(13, [stream]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[] - ) - advance(1001, [self.get_stream('Claim B', 20*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[('Claim B', 20*COIN, 0, 1031)] - ) - advance(1010, [self.get_support(stream, 14*COIN)]) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[], - accepted=[('Claim B', 20*COIN, 0, 1031)] - ) - advance(1020, [self.get_stream('Claim C', 50*COIN)]) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[], - accepted=[ - ('Claim B', 20*COIN, 0, 1031), - ('Claim C', 50*COIN, 0, 1051)] - ) - advance(1031, []) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[('Claim B', 20*COIN, 20*COIN, 1031)], - accepted=[('Claim C', 50*COIN, 0, 1051)] - ) - advance(1040, [self.get_stream('Claim D', 300*COIN)]) - state( - controlling=('Claim A', 10*COIN, 24*COIN, 13), - active=[('Claim B', 20*COIN, 20*COIN, 1031)], - accepted=[ - ('Claim C', 50*COIN, 0, 1051), - ('Claim D', 300*COIN, 0, 1072)] - ) - advance(1051, []) - state( - controlling=('Claim D', 300*COIN, 300*COIN, 1051), - active=[ - ('Claim A', 10*COIN, 24*COIN, 13), - ('Claim B', 20*COIN, 20*COIN, 1031), - ('Claim C', 50*COIN, 50*COIN, 1051)], - accepted=[] - ) - # beyond example - advance(1052, [self.get_stream_update(stream, 290*COIN)]) - state( - controlling=('Claim A', 290*COIN, 304*COIN, 13), - active=[ - ('Claim B', 20*COIN, 20*COIN, 1031), - ('Claim C', 50*COIN, 50*COIN, 1051), - ('Claim D', 300*COIN, 300*COIN, 1051), - ], - accepted=[] - ) - - def test_competing_claims_subsequent_blocks_height_wins(self): - advance, state = self.advance, self.state - advance(13, [self.get_stream('Claim A', 10*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[] - ) - advance(14, [self.get_stream('Claim B', 10*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[('Claim B', 10*COIN, 10*COIN, 14)], - accepted=[] - ) - advance(15, [self.get_stream('Claim C', 10*COIN)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[ - ('Claim B', 10*COIN, 10*COIN, 14), - ('Claim C', 10*COIN, 10*COIN, 15)], - accepted=[] - ) - - def test_competing_claims_in_single_block_position_wins(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 10*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[('Claim B', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - - def test_competing_claims_in_single_block_effective_amount_wins(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 11*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim B', 11*COIN, 11*COIN, 13), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - - def test_winning_claim_deleted(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 11*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim B', 11*COIN, 11*COIN, 13), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - advance(14, [self.get_abandon(stream2)]) - state( - controlling=('Claim A', 10*COIN, 10*COIN, 13), - active=[], - accepted=[] - ) - - def test_winning_claim_deleted_and_new_claim_becomes_winner(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - stream2 = self.get_stream('Claim B', 11*COIN) - advance(13, [stream, stream2]) - state( - controlling=('Claim B', 11*COIN, 11*COIN, 13), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - advance(15, [self.get_abandon(stream2), self.get_stream('Claim C', 12*COIN)]) - state( - controlling=('Claim C', 12*COIN, 12*COIN, 15), - active=[('Claim A', 10*COIN, 10*COIN, 13)], - accepted=[] - ) - - def test_winning_claim_expires_and_another_takes_over(self): - advance, state = self.advance, self.state - advance(10, [self.get_stream('Claim A', 11*COIN)]) - advance(20, [self.get_stream('Claim B', 10*COIN)]) - state( - controlling=('Claim A', 11*COIN, 11*COIN, 10), - active=[('Claim B', 10*COIN, 10*COIN, 20)], - accepted=[] - ) - advance(262984, []) - state( - controlling=('Claim B', 10*COIN, 10*COIN, 20), - active=[], - accepted=[] - ) - advance(262994, []) - state( - controlling=None, - active=[], - accepted=[] - ) - - def test_create_and_update_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream, self.get_stream_update(stream, 11*COIN)]) - self.assertTrue(search()[0]) - - def test_double_updates_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream]) - update = self.get_stream_update(stream, 11*COIN) - advance(20, [update, self.get_stream_update(update, 9*COIN)]) - self.assertTrue(search()[0]) - - def test_create_and_abandon_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream, self.get_abandon(stream)]) - self.assertFalse(search()) - - def test_update_and_abandon_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream]) - update = self.get_stream_update(stream, 11*COIN) - advance(20, [update, self.get_abandon(update)]) - self.assertFalse(search()) - - def test_create_update_and_delete_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - update = self.get_stream_update(stream, 11*COIN) - advance(10, [stream, update, self.get_abandon(update)]) - self.assertFalse(search()) - - def test_support_added_and_removed_in_same_block(self): - advance, state = self.advance, self.state - stream = self.get_stream('Claim A', 10*COIN) - advance(10, [stream]) - support = self.get_support(stream, COIN) - advance(20, [support, self.get_abandon(support)]) - self.assertEqual(search()[0]['support_amount'], 0) - - @staticmethod - def _get_x_with_claim_id_prefix(getter, prefix, cached_iteration=None, **kwargs): - iterations = cached_iteration+1 if cached_iteration else 100 - for i in range(cached_iteration or 1, iterations): - stream = getter(f'claim #{i}', COIN, **kwargs) - if stream[0].outputs[0].claim_id.startswith(prefix): - cached_iteration is None and print(f'Found "{prefix}" in {i} iterations.') - return stream - if cached_iteration: - raise ValueError(f'Failed to find "{prefix}" at cached iteration, run with None to find iteration.') - raise ValueError(f'Failed to find "{prefix}" in {iterations} iterations, try different values.') - - def get_channel_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs): - return self._get_x_with_claim_id_prefix(self.get_channel, prefix, cached_iteration, **kwargs) - - def get_stream_with_claim_id_prefix(self, prefix, cached_iteration=None, **kwargs): - return self._get_x_with_claim_id_prefix(self.get_stream, prefix, cached_iteration, **kwargs) - - def test_canonical_url_and_channel_validation(self): - advance = self.advance - - tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c') - tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c') - txo_chan_a = tx_chan_a[0].outputs[0] - txo_chan_ab = tx_chan_ab[0].outputs[0] - advance(1, [tx_chan_a]) - advance(2, [tx_chan_ab]) - (r_ab, r_a) = search(order_by=['creation_height'], limit=2) - self.assertEqual("@foo#a", r_a['short_url']) - self.assertEqual("@foo#ab", r_ab['short_url']) - self.assertIsNone(r_a['canonical_url']) - self.assertIsNone(r_ab['canonical_url']) - self.assertEqual(0, r_a['claims_in_channel']) - self.assertEqual(0, r_ab['claims_in_channel']) - - tx_a = self.get_stream_with_claim_id_prefix('a', 2) - tx_ab = self.get_stream_with_claim_id_prefix('ab', 42) - tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) - advance(3, [tx_a]) - advance(4, [tx_ab, tx_abc]) - (r_abc, r_ab, r_a) = search(order_by=['creation_height', 'tx_position'], limit=3) - self.assertEqual("foo#a", r_a['short_url']) - self.assertEqual("foo#ab", r_ab['short_url']) - self.assertEqual("foo#abc", r_abc['short_url']) - self.assertIsNone(r_a['canonical_url']) - self.assertIsNone(r_ab['canonical_url']) - self.assertIsNone(r_abc['canonical_url']) - - tx_a2 = self.get_stream_with_claim_id_prefix('a', 7, channel=txo_chan_a) - tx_ab2 = self.get_stream_with_claim_id_prefix('ab', 23, channel=txo_chan_a) - a2_claim = tx_a2[0].outputs[0] - ab2_claim = tx_ab2[0].outputs[0] - advance(6, [tx_a2]) - advance(7, [tx_ab2]) - (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) - self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) - self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - - # change channel public key, invaliding stream claim signatures - advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) - (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) - self.assertIsNone(r_a2['canonical_url']) - self.assertIsNone(r_ab2['canonical_url']) - self.assertEqual(0, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - - # reinstate previous channel public key (previous stream claim signatures become valid again) - channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') - advance(9, [channel_update]) - (r_ab2, r_a2) = search(order_by=['creation_height'], limit=2) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertEqual(f"foo#{ab2_claim.claim_id[:4]}", r_ab2['short_url']) - self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) - self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) - - # change channel of stream - self.assertEqual("@foo#a/foo#ab", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url']) - tx_ab2 = self.get_stream_update(tx_ab2, COIN, txo_chan_ab) - advance(10, [tx_ab2]) - self.assertEqual("@foo#ab/foo#a", search(claim_id=ab2_claim.claim_id, limit=1)[0]['canonical_url']) - # TODO: currently there is a bug where stream leaving a channel does not update that channels claims count - self.assertEqual(2, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - # TODO: after bug is fixed remove test above and add test below - #self.assertEqual(1, search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) - self.assertEqual(1, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) - - # claim abandon updates claims_in_channel - advance(11, [self.get_abandon(tx_ab2)]) - self.assertEqual(0, search(claim_id=txo_chan_ab.claim_id, limit=1)[0]['claims_in_channel']) - - # delete channel, invaliding stream claim signatures - advance(12, [self.get_abandon(channel_update)]) - (r_a2,) = search(order_by=['creation_height'], limit=1) - self.assertEqual(f"foo#{a2_claim.claim_id[:2]}", r_a2['short_url']) - self.assertIsNone(r_a2['canonical_url']) - - @unittest.skip("cant reproduce on ES") - def test_resolve_issue_2448(self): - advance = self.advance - - tx_chan_a = self.get_channel_with_claim_id_prefix('a', 1, key=b'c') - tx_chan_ab = self.get_channel_with_claim_id_prefix('ab', 72, key=b'c') - txo_chan_a = tx_chan_a[0].outputs[0] - txo_chan_ab = tx_chan_ab[0].outputs[0] - advance(1, [tx_chan_a]) - advance(2, [tx_chan_ab]) - - self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash) - self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash) - - # update increase last height change of channel - advance(9, [self.get_channel_update(txo_chan_a, COIN, key=b'c')]) - - # make sure that activation_height is used instead of height (issue #2448) - self.assertEqual(reader.resolve_url("@foo#a")['claim_hash'], txo_chan_a.claim_hash) - self.assertEqual(reader.resolve_url("@foo#ab")['claim_hash'], txo_chan_ab.claim_hash) - - def test_canonical_find_shortest_id(self): - new_hash = 'abcdef0123456789beef' - other0 = '1bcdef0123456789beef' - other1 = 'ab1def0123456789beef' - other2 = 'abc1ef0123456789beef' - other3 = 'abcdef0123456789bee1' - f = FindShortestID() - f.step(other0, new_hash) - self.assertEqual('#a', f.finalize()) - f.step(other1, new_hash) - self.assertEqual('#abc', f.finalize()) - f.step(other2, new_hash) - self.assertEqual('#abcd', f.finalize()) - f.step(other3, new_hash) - self.assertEqual('#abcdef0123456789beef', f.finalize()) - - -class TestTrending(TestSQLDB): - - def test_trending(self): - advance, state = self.advance, self.state - no_trend = self.get_stream('Claim A', COIN) - downwards = self.get_stream('Claim B', COIN) - up_small = self.get_stream('Claim C', COIN) - up_medium = self.get_stream('Claim D', COIN) - up_biggly = self.get_stream('Claim E', COIN) - claims = advance(1, [up_biggly, up_medium, up_small, no_trend, downwards]) - for window in range(1, 8): - advance(zscore.TRENDING_WINDOW * window, [ - self.get_support(downwards, (20-window)*COIN), - self.get_support(up_small, int(20+(window/10)*COIN)), - self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), - self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), - ]) - results = search(order_by=['trending_local']) - self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) - self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) - self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results]) - self.assertEqual([4, 4, 2, 0, 1], [int(c['trending_group']) for c in results]) - self.assertEqual([53, 38, 2, 0, -6], [int(c['trending_mixed']) for c in results]) - - def test_edge(self): - problematic = self.get_stream('Problem', COIN) - self.advance(1, [problematic]) - self.advance(zscore.TRENDING_WINDOW, [self.get_support(problematic, 53000000000)]) - self.advance(zscore.TRENDING_WINDOW * 2, [self.get_support(problematic, 500000000)]) - - -@unittest.skip("happens on ES, need to backport") -class TestContentBlocking(TestSQLDB): - - def test_blocking_and_filtering(self): - # content claims and channels - tx0 = self.get_channel('A Channel', COIN, '@channel1') - regular_channel = tx0[0].outputs[0] - tx1 = self.get_stream('Claim One', COIN, 'claim1') - tx2 = self.get_stream('Claim Two', COIN, 'claim2', regular_channel) - tx3 = self.get_stream('Claim Three', COIN, 'claim3') - self.advance(1, [tx0, tx1, tx2, tx3]) - claim1, claim2, claim3 = tx1[0].outputs[0], tx2[0].outputs[0], tx3[0].outputs[0] - - # block and filter channels - tx0 = self.get_channel('Blocking Channel', COIN, '@block') - tx1 = self.get_channel('Filtering Channel', COIN, '@filter') - blocking_channel = tx0[0].outputs[0] - filtering_channel = tx1[0].outputs[0] - self.sql.blocking_channel_hashes.add(blocking_channel.claim_hash) - self.sql.filtering_channel_hashes.add(filtering_channel.claim_hash) - self.advance(2, [tx0, tx1]) - self.assertEqual({}, dict(self.sql.blocked_streams)) - self.assertEqual({}, dict(self.sql.blocked_channels)) - self.assertEqual({}, dict(self.sql.filtered_streams)) - self.assertEqual({}, dict(self.sql.filtered_channels)) - - # nothing blocked - results, _ = reader.resolve([ - claim1.claim_name, claim2.claim_name, - claim3.claim_name, regular_channel.claim_name - ]) - self.assertEqual(claim1.claim_hash, results[0]['claim_hash']) - self.assertEqual(claim2.claim_hash, results[1]['claim_hash']) - self.assertEqual(claim3.claim_hash, results[2]['claim_hash']) - self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash']) - - # nothing filtered - results, censor = censored_search() - self.assertEqual(6, len(results)) - self.assertEqual(0, censor.total) - self.assertEqual({}, censor.censored) - - # block claim reposted to blocking channel, also gets filtered - repost_tx1 = self.get_repost(claim1.claim_id, COIN, blocking_channel) - repost1 = repost_tx1[0].outputs[0] - self.advance(3, [repost_tx1]) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_streams) - ) - self.assertEqual({}, dict(self.sql.blocked_channels)) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_streams) - ) - self.assertEqual({}, dict(self.sql.filtered_channels)) - - # claim is blocked from results by direct repost - results, censor = censored_search(text='Claim') - self.assertEqual(2, len(results)) - self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) - self.assertEqual(claim3.claim_hash, results[1]['claim_hash']) - self.assertEqual(1, censor.total) - self.assertEqual({blocking_channel.claim_hash: 1}, censor.censored) - results, _ = reader.resolve([claim1.claim_name]) - self.assertEqual( - f"Resolve of 'claim1' was censored by channel with claim id '{blocking_channel.claim_id}'.", - results[0].args[0] - ) - results, _ = reader.resolve([ - claim2.claim_name, regular_channel.claim_name # claim2 and channel still resolved - ]) - self.assertEqual(claim2.claim_hash, results[0]['claim_hash']) - self.assertEqual(regular_channel.claim_hash, results[1]['claim_hash']) - - # block claim indirectly by blocking its parent channel - repost_tx2 = self.get_repost(regular_channel.claim_id, COIN, blocking_channel) - repost2 = repost_tx2[0].outputs[0] - self.advance(4, [repost_tx2]) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_channels) - ) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_channels) - ) - - # claim in blocked channel is filtered from search and can't resolve - results, censor = censored_search(text='Claim') - self.assertEqual(1, len(results)) - self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - self.assertEqual(2, censor.total) - self.assertEqual({blocking_channel.claim_hash: 2}, censor.censored) - results, _ = reader.resolve([ - claim2.claim_name, regular_channel.claim_name # claim2 and channel don't resolve - ]) - self.assertEqual( - f"Resolve of 'claim2' was censored by channel with claim id '{blocking_channel.claim_id}'.", - results[0].args[0] - ) - self.assertEqual( - f"Resolve of '@channel1' was censored by channel with claim id '{blocking_channel.claim_id}'.", - results[1].args[0] - ) - results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved - self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - - # filtered claim is only filtered and not blocked - repost_tx3 = self.get_repost(claim3.claim_id, COIN, filtering_channel) - repost3 = repost_tx3[0].outputs[0] - self.advance(5, [repost_tx3]) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.blocked_channels) - ) - self.assertEqual( - {repost1.claim.repost.reference.claim_hash: blocking_channel.claim_hash, - repost3.claim.repost.reference.claim_hash: filtering_channel.claim_hash}, - dict(self.sql.filtered_streams) - ) - self.assertEqual( - {repost2.claim.repost.reference.claim_hash: blocking_channel.claim_hash}, - dict(self.sql.filtered_channels) - ) - - # filtered claim doesn't return in search but is resolveable - results, censor = censored_search(text='Claim') - self.assertEqual(0, len(results)) - self.assertEqual(3, censor.total) - self.assertEqual({blocking_channel.claim_hash: 2, filtering_channel.claim_hash: 1}, censor.censored) - results, _ = reader.resolve([claim3.claim_name]) # claim3 still resolved - self.assertEqual(claim3.claim_hash, results[0]['claim_hash']) - - # abandon unblocks content - self.advance(6, [ - self.get_abandon(repost_tx1), - self.get_abandon(repost_tx2), - self.get_abandon(repost_tx3) - ]) - self.assertEqual({}, dict(self.sql.blocked_streams)) - self.assertEqual({}, dict(self.sql.blocked_channels)) - self.assertEqual({}, dict(self.sql.filtered_streams)) - self.assertEqual({}, dict(self.sql.filtered_channels)) - results, censor = censored_search(text='Claim') - self.assertEqual(3, len(results)) - self.assertEqual(0, censor.total) - results, censor = censored_search() - self.assertEqual(6, len(results)) - self.assertEqual(0, censor.total) - results, _ = reader.resolve([ - claim1.claim_name, claim2.claim_name, - claim3.claim_name, regular_channel.claim_name - ]) - self.assertEqual(claim1.claim_hash, results[0]['claim_hash']) - self.assertEqual(claim2.claim_hash, results[1]['claim_hash']) - self.assertEqual(claim3.claim_hash, results[2]['claim_hash']) - self.assertEqual(regular_channel.claim_hash, results[3]['claim_hash']) - - def test_pagination(self): - one, two, three, four, five, six, seven, filter_channel = self.advance(1, [ - self.get_stream('One', COIN), - self.get_stream('Two', COIN), - self.get_stream('Three', COIN), - self.get_stream('Four', COIN), - self.get_stream('Five', COIN), - self.get_stream('Six', COIN), - self.get_stream('Seven', COIN), - self.get_channel('Filtering Channel', COIN, '@filter'), - ]) - self.sql.filtering_channel_hashes.add(filter_channel.claim_hash) - - # nothing filtered - results, censor = censored_search(order_by='^height', offset=1, limit=3) - self.assertEqual(3, len(results)) - self.assertEqual( - [two.claim_hash, three.claim_hash, four.claim_hash], - [r['claim_hash'] for r in results] - ) - self.assertEqual(0, censor.total) - - # content filtered - repost1, repost2 = self.advance(2, [ - self.get_repost(one.claim_id, COIN, filter_channel), - self.get_repost(two.claim_id, COIN, filter_channel), - ]) - results, censor = censored_search(order_by='^height', offset=1, limit=3) - self.assertEqual(3, len(results)) - self.assertEqual( - [four.claim_hash, five.claim_hash, six.claim_hash], - [r['claim_hash'] for r in results] - ) - self.assertEqual(2, censor.total) - self.assertEqual({filter_channel.claim_hash: 2}, censor.censored)