From cc098f3090a09b8ce6fca4a54304f2135954ccc0 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Thu, 11 Jul 2019 13:29:26 -0400 Subject: [PATCH] refactored wallet server db reader to support thread pool and process pool --- lbry/lbry/wallet/server/block_processor.py | 2 +- lbry/lbry/wallet/server/coin.py | 2 +- lbry/lbry/wallet/server/db/__init__.py | 0 lbry/lbry/wallet/server/{ => db}/canonical.py | 0 lbry/lbry/wallet/server/db/common.py | 13 + lbry/lbry/wallet/server/db/reader.py | 375 ++++++++++++++++ lbry/lbry/wallet/server/{ => db}/trending.py | 0 .../wallet/server/{db.py => db/writer.py} | 346 +-------------- lbry/lbry/wallet/server/query.py | 409 ------------------ lbry/lbry/wallet/server/session.py | 245 +---------- lbry/tests/unit/wallet/server/test_sqldb.py | 34 +- torba/torba/orchstr8/node.py | 2 +- 12 files changed, 438 insertions(+), 990 deletions(-) create mode 100644 lbry/lbry/wallet/server/db/__init__.py rename lbry/lbry/wallet/server/{ => db}/canonical.py (100%) create mode 100644 lbry/lbry/wallet/server/db/common.py create mode 100644 lbry/lbry/wallet/server/db/reader.py rename lbry/lbry/wallet/server/{ => db}/trending.py (100%) rename lbry/lbry/wallet/server/{db.py => db/writer.py} (68%) delete mode 100644 lbry/lbry/wallet/server/query.py diff --git a/lbry/lbry/wallet/server/block_processor.py b/lbry/lbry/wallet/server/block_processor.py index f6bd6ae54..2b5678788 100644 --- a/lbry/lbry/wallet/server/block_processor.py +++ b/lbry/lbry/wallet/server/block_processor.py @@ -3,7 +3,7 @@ import time from torba.server.block_processor import BlockProcessor from lbry.schema.claim import Claim -from lbry.wallet.server.db import SQLDB +from lbry.wallet.server.db.writer import SQLDB class Timer: diff --git a/lbry/lbry/wallet/server/coin.py b/lbry/lbry/wallet/server/coin.py index 56f791894..29bd4e4d6 100644 --- a/lbry/lbry/wallet/server/coin.py +++ b/lbry/lbry/wallet/server/coin.py @@ -12,7 +12,7 @@ class LBC(Coin): from .session import LBRYElectrumX, LBRYSessionManager from .block_processor import LBRYBlockProcessor from .daemon import LBCDaemon - from .db import LBRYDB + from .db.writer import LBRYDB DAEMON = LBCDaemon SESSIONCLS = LBRYElectrumX BLOCK_PROCESSOR = LBRYBlockProcessor diff --git a/lbry/lbry/wallet/server/db/__init__.py b/lbry/lbry/wallet/server/db/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbry/lbry/wallet/server/canonical.py b/lbry/lbry/wallet/server/db/canonical.py similarity index 100% rename from lbry/lbry/wallet/server/canonical.py rename to lbry/lbry/wallet/server/db/canonical.py diff --git a/lbry/lbry/wallet/server/db/common.py b/lbry/lbry/wallet/server/db/common.py new file mode 100644 index 000000000..dffe752a8 --- /dev/null +++ b/lbry/lbry/wallet/server/db/common.py @@ -0,0 +1,13 @@ +CLAIM_TYPES = { + 'stream': 1, + 'channel': 2, +} + +STREAM_TYPES = { + 'video': 1, + 'audio': 2, + 'image': 3, + 'document': 4, + 'binary': 5, + 'model': 6 +} diff --git a/lbry/lbry/wallet/server/db/reader.py b/lbry/lbry/wallet/server/db/reader.py new file mode 100644 index 000000000..03b9b210b --- /dev/null +++ b/lbry/lbry/wallet/server/db/reader.py @@ -0,0 +1,375 @@ +import sqlite3 +import struct +from typing import Tuple, List +from binascii import unhexlify +from decimal import Decimal +from contextvars import ContextVar + +from torba.client.basedatabase import query + +from lbry.schema.url import URL, normalize_name +from lbry.schema.tags import clean_tags +from lbry.schema.result import Outputs + +from .common import CLAIM_TYPES, STREAM_TYPES + + +ATTRIBUTE_ARRAY_MAX_LENGTH = 100 + + +INTEGER_PARAMS = { + 'height', 'creation_height', 'activation_height', 'expiration_height', + 'timestamp', 'creation_timestamp', 'release_time', 'fee_amount', + 'tx_position', 'channel_join', + 'amount', 'effective_amount', 'support_amount', + 'trending_group', 'trending_mixed', + 'trending_local', 'trending_global', +} + +SEARCH_PARAMS = { + 'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids', + 'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency', + 'has_channel_signature', 'signature_valid', + 'any_tags', 'all_tags', 'not_tags', + 'any_locations', 'all_locations', 'not_locations', + 'any_languages', 'all_languages', 'not_languages', + 'is_controlling', 'limit', 'offset', 'order_by', + 'no_totals', +} | INTEGER_PARAMS + + +ORDER_FIELDS = { + 'name', +} | INTEGER_PARAMS + + +PRAGMAS = """ + pragma journal_mode=WAL; +""" + + +db = ContextVar('db') +ledger = ContextVar('ledger') + + +def initializer(_path, _ledger_name): + _db = sqlite3.connect(_path, isolation_level=None, uri=True) + _db.row_factory = sqlite3.Row + db.set(_db) + from lbry.wallet.ledger import MainNetLedger, RegTestLedger + ledger.set(MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger) + + +def cleanup(): + db.get().close() + db.set(None) + ledger.set(None) + + +def get_claims(cols, for_count=False, **constraints): + if 'order_by' in constraints: + sql_order_by = [] + for order_by in constraints['order_by']: + is_asc = order_by.startswith('^') + column = order_by[1:] if is_asc else order_by + if column not in ORDER_FIELDS: + raise NameError(f'{column} is not a valid order_by field') + if column == 'name': + column = 'normalized' + sql_order_by.append( + f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" + ) + constraints['order_by'] = sql_order_by + + ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'} + for constraint in INTEGER_PARAMS: + if constraint in constraints: + value = constraints.pop(constraint) + postfix = '' + if isinstance(value, str): + if len(value) >= 2 and value[:2] in ops: + postfix, value = ops[value[:2]], value[2:] + elif len(value) >= 1 and value[0] in ops: + postfix, value = ops[value[0]], value[1:] + if constraint == 'fee_amount': + value = Decimal(value)*1000 + constraints[f'claim.{constraint}{postfix}'] = int(value) + + if constraints.pop('is_controlling', False): + if {'sequence', 'amount_order'}.isdisjoint(constraints): + for_count = False + constraints['claimtrie.claim_hash__is_not_null'] = '' + if 'sequence' in constraints: + constraints['order_by'] = 'claim.activation_height ASC' + constraints['offset'] = int(constraints.pop('sequence')) - 1 + constraints['limit'] = 1 + if 'amount_order' in constraints: + constraints['order_by'] = 'claim.effective_amount DESC' + constraints['offset'] = int(constraints.pop('amount_order')) - 1 + constraints['limit'] = 1 + + if 'claim_id' in constraints: + claim_id = constraints.pop('claim_id') + if len(claim_id) == 40: + constraints['claim.claim_id'] = claim_id + else: + constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' + + if 'name' in constraints: + constraints['claim.normalized'] = normalize_name(constraints.pop('name')) + + if 'public_key_id' in constraints: + constraints['claim.public_key_hash'] = sqlite3.Binary( + ledger.get().address_to_hash160(constraints.pop('public_key_id'))) + + if 'channel' in constraints: + channel_url = constraints.pop('channel') + match = _resolve_one(channel_url) + if isinstance(match, sqlite3.Row): + constraints['channel_hash'] = match['claim_hash'] + else: + return [[0]] if cols == 'count(*)' else [] + if 'channel_hash' in constraints: + constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash')) + if 'channel_ids' in constraints: + channel_ids = constraints.pop('channel_ids') + if channel_ids: + constraints['claim.channel_hash__in'] = [ + sqlite3.Binary(unhexlify(cid)[::-1]) for cid in channel_ids + ] + if 'not_channel_ids' in constraints: + not_channel_ids = constraints.pop('not_channel_ids') + if not_channel_ids: + not_channel_ids_binary = [ + sqlite3.Binary(unhexlify(ncid)[::-1]) for ncid in not_channel_ids + ] + if constraints.get('has_channel_signature', False): + constraints['claim.channel_hash__not_in'] = not_channel_ids_binary + else: + constraints['null_or_not_channel__or'] = { + 'claim.signature_valid__is_null': True, + 'claim.channel_hash__not_in': not_channel_ids_binary + } + if 'signature_valid' in constraints: + has_channel_signature = constraints.pop('has_channel_signature', False) + if has_channel_signature: + constraints['claim.signature_valid'] = constraints.pop('signature_valid') + else: + constraints['null_or_signature__or'] = { + 'claim.signature_valid__is_null': True, + 'claim.signature_valid': constraints.pop('signature_valid') + } + elif constraints.pop('has_channel_signature', False): + constraints['claim.signature_valid__is_not_null'] = True + + if 'txid' in constraints: + tx_hash = unhexlify(constraints.pop('txid'))[::-1] + nout = constraints.pop('nout', 0) + constraints['claim.txo_hash'] = sqlite3.Binary( + tx_hash + struct.pack(' Tuple[List, List, int, int]: + assert set(constraints).issubset(SEARCH_PARAMS), \ + f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}" + total = None + if not constraints.pop('no_totals', False): + total = get_claims_count(**constraints) + constraints['offset'] = abs(constraints.get('offset', 0)) + constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) + if 'order_by' not in constraints: + constraints['order_by'] = ["height", "^name"] + txo_rows = _search(**constraints) + channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash']) + extra_txo_rows = [] + if channel_hashes: + extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) + return txo_rows, extra_txo_rows, constraints['offset'], total + + +def search_to_bytes(constraints) -> bytes: + return Outputs.to_bytes(*search(constraints)) + + +def _search(**constraints): + return get_claims( + """ + claimtrie.claim_hash as is_controlling, + claimtrie.last_take_over_height, + claim.claim_hash, claim.txo_hash, + claim.claims_in_channel, + claim.height, claim.creation_height, + claim.activation_height, claim.expiration_height, + claim.effective_amount, claim.support_amount, + claim.trending_group, claim.trending_mixed, + claim.trending_local, claim.trending_global, + claim.short_url, claim.canonical_url, + claim.channel_hash, channel.txo_hash AS channel_txo_hash, + channel.height AS channel_height, claim.signature_valid + """, **constraints + ) + + +def resolve(urls) -> Tuple[List, List]: + result = [] + channel_hashes = set() + for raw_url in urls: + match = _resolve_one(raw_url) + result.append(match) + if isinstance(match, sqlite3.Row) and match['channel_hash']: + channel_hashes.add(match['channel_hash']) + extra_txo_rows = [] + if channel_hashes: + extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) + return result, extra_txo_rows + + +def resolve_to_bytes(urls) -> bytes: + return Outputs.to_bytes(*resolve(urls)) + + +def _resolve_one(raw_url): + try: + url = URL.parse(raw_url) + except ValueError as e: + return e + + channel = None + + if url.has_channel: + query = url.channel.to_dict() + if set(query) == {'name'}: + query['is_controlling'] = True + else: + query['order_by'] = ['^height'] + matches = _search(**query, limit=1) + if matches: + channel = matches[0] + else: + return LookupError(f'Could not find channel in "{raw_url}".') + + if url.has_stream: + query = url.stream.to_dict() + if channel is not None: + if set(query) == {'name'}: + # temporarily emulate is_controlling for claims in channel + query['order_by'] = ['effective_amount'] + else: + query['order_by'] = ['^channel_join'] + query['channel_hash'] = channel['claim_hash'] + query['signature_valid'] = 1 + elif set(query) == {'name'}: + query['is_controlling'] = 1 + matches = _search(**query, limit=1) + if matches: + return matches[0] + else: + return LookupError(f'Could not find stream in "{raw_url}".') + + return channel + + +def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): + any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] + if any_items: + constraints.update({ + f'$any_{attr}{i}': item for i, item in enumerate(any_items) + }) + values = ', '.join( + f':$any_{attr}{i}' for i in range(len(any_items)) + ) + if for_count: + constraints[f'claim.claim_hash__in#_any_{attr}'] = f""" + SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) + """ + else: + constraints[f'#_any_{attr}'] = f""" + EXISTS( + SELECT 1 FROM {attr} WHERE + claim.claim_hash={attr}.claim_hash + AND {attr} IN ({values}) + ) + """ + + all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] + if all_items: + constraints[f'$all_{attr}_count'] = len(all_items) + constraints.update({ + f'$all_{attr}{i}': item for i, item in enumerate(all_items) + }) + values = ', '.join( + f':$all_{attr}{i}' for i in range(len(all_items)) + ) + if for_count: + constraints[f'claim.claim_hash__in#_all_{attr}'] = f""" + SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) + GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count + """ + else: + constraints[f'#_all_{attr}'] = f""" + {len(all_items)}=( + SELECT count(*) FROM {attr} WHERE + claim.claim_hash={attr}.claim_hash + AND {attr} IN ({values}) + ) + """ + + not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] + if not_items: + constraints.update({ + f'$not_{attr}{i}': item for i, item in enumerate(not_items) + }) + values = ', '.join( + f':$not_{attr}{i}' for i in range(len(not_items)) + ) + if for_count: + constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f""" + SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) + """ + else: + constraints[f'#_not_{attr}'] = f""" + NOT EXISTS( + SELECT 1 FROM {attr} WHERE + claim.claim_hash={attr}.claim_hash + AND {attr} IN ({values}) + ) + """ diff --git a/lbry/lbry/wallet/server/trending.py b/lbry/lbry/wallet/server/db/trending.py similarity index 100% rename from lbry/lbry/wallet/server/trending.py rename to lbry/lbry/wallet/server/db/trending.py diff --git a/lbry/lbry/wallet/server/db.py b/lbry/lbry/wallet/server/db/writer.py similarity index 68% rename from lbry/lbry/wallet/server/db.py rename to lbry/lbry/wallet/server/db/writer.py index 9327ce870..3fdf3998c 100644 --- a/lbry/lbry/wallet/server/db.py +++ b/lbry/lbry/wallet/server/db/writer.py @@ -1,7 +1,5 @@ import sqlite3 -import struct from typing import Union, Tuple, Set, List -from binascii import unhexlify from itertools import chain from decimal import Decimal @@ -9,97 +7,19 @@ from torba.server.db import DB from torba.server.util import class_logger from torba.client.basedatabase import query, constraints_to_sql -from lbry.schema.url import URL, normalize_name from lbry.schema.tags import clean_tags from lbry.schema.mime_types import guess_stream_type from lbry.wallet.ledger import MainNetLedger, RegTestLedger from lbry.wallet.transaction import Transaction, Output -from lbry.wallet.server.canonical import register_canonical_functions -from lbry.wallet.server.trending import ( +from lbry.wallet.server.db.canonical import register_canonical_functions +from lbry.wallet.server.db.trending import ( CREATE_TREND_TABLE, calculate_trending, register_trending_functions ) +from .common import CLAIM_TYPES, STREAM_TYPES + ATTRIBUTE_ARRAY_MAX_LENGTH = 100 -CLAIM_TYPES = { - 'stream': 1, - 'channel': 2, -} -STREAM_TYPES = { - 'video': 1, - 'audio': 2, - 'image': 3, - 'document': 4, - 'binary': 5, - 'model': 6 -} - - -def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): - any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] - if any_items: - constraints.update({ - f'$any_{attr}{i}': item for i, item in enumerate(any_items) - }) - values = ', '.join( - f':$any_{attr}{i}' for i in range(len(any_items)) - ) - if for_count: - constraints[f'claim.claim_hash__in#_any_{attr}'] = f""" - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - """ - else: - constraints[f'#_any_{attr}'] = f""" - EXISTS( - SELECT 1 FROM {attr} WHERE - claim.claim_hash={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] - if all_items: - constraints[f'$all_{attr}_count'] = len(all_items) - constraints.update({ - f'$all_{attr}{i}': item for i, item in enumerate(all_items) - }) - values = ', '.join( - f':$all_{attr}{i}' for i in range(len(all_items)) - ) - if for_count: - constraints[f'claim.claim_hash__in#_all_{attr}'] = f""" - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count - """ - else: - constraints[f'#_all_{attr}'] = f""" - {len(all_items)}=( - SELECT count(*) FROM {attr} WHERE - claim.claim_hash={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] - if not_items: - constraints.update({ - f'$not_{attr}{i}': item for i, item in enumerate(not_items) - }) - values = ', '.join( - f':$not_{attr}{i}' for i in range(len(not_items)) - ) - if for_count: - constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f""" - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - """ - else: - constraints[f'#_not_{attr}'] = f""" - NOT EXISTS( - SELECT 1 FROM {attr} WHERE - claim.claim_hash={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ class SQLDB: @@ -234,7 +154,7 @@ class SQLDB: self.ledger = MainNetLedger if self.main.coin.NET == 'mainnet' else RegTestLedger def open(self): - self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False) + self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True) self.db.row_factory = sqlite3.Row self.db.executescript(self.CREATE_TABLES_QUERY) register_canonical_functions(self.db) @@ -772,262 +692,6 @@ class SQLDB: r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True) r(calculate_trending, self.db, height, self.main.first_sync, daemon_height) - def get_claims(self, cols, for_count=False, **constraints): - if 'order_by' in constraints: - sql_order_by = [] - for order_by in constraints['order_by']: - is_asc = order_by.startswith('^') - column = order_by[1:] if is_asc else order_by - if column not in self.ORDER_FIELDS: - raise NameError(f'{column} is not a valid order_by field') - if column == 'name': - column = 'normalized' - sql_order_by.append( - f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" - ) - constraints['order_by'] = sql_order_by - - ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'} - for constraint in self.INTEGER_PARAMS: - if constraint in constraints: - value = constraints.pop(constraint) - postfix = '' - if isinstance(value, str): - if len(value) >= 2 and value[:2] in ops: - postfix, value = ops[value[:2]], value[2:] - elif len(value) >= 1 and value[0] in ops: - postfix, value = ops[value[0]], value[1:] - if constraint == 'fee_amount': - value = Decimal(value)*1000 - constraints[f'claim.{constraint}{postfix}'] = int(value) - - if constraints.pop('is_controlling', False): - if {'sequence', 'amount_order'}.isdisjoint(constraints): - for_count = False - constraints['claimtrie.claim_hash__is_not_null'] = '' - if 'sequence' in constraints: - constraints['order_by'] = 'claim.activation_height ASC' - constraints['offset'] = int(constraints.pop('sequence')) - 1 - constraints['limit'] = 1 - if 'amount_order' in constraints: - constraints['order_by'] = 'claim.effective_amount DESC' - constraints['offset'] = int(constraints.pop('amount_order')) - 1 - constraints['limit'] = 1 - - if 'claim_id' in constraints: - claim_id = constraints.pop('claim_id') - if len(claim_id) == 40: - constraints['claim.claim_id'] = claim_id - else: - constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' - - if 'name' in constraints: - constraints['claim.normalized'] = normalize_name(constraints.pop('name')) - - if 'public_key_id' in constraints: - constraints['claim.public_key_hash'] = sqlite3.Binary( - self.ledger.address_to_hash160(constraints.pop('public_key_id'))) - - if 'channel' in constraints: - channel_url = constraints.pop('channel') - match = self._resolve_one(channel_url) - if isinstance(match, sqlite3.Row): - constraints['channel_hash'] = match['claim_hash'] - else: - return [[0]] if cols == 'count(*)' else [] - if 'channel_hash' in constraints: - constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash')) - if 'channel_ids' in constraints: - channel_ids = constraints.pop('channel_ids') - if channel_ids: - constraints['claim.channel_hash__in'] = [ - sqlite3.Binary(unhexlify(cid)[::-1]) for cid in channel_ids - ] - if 'not_channel_ids' in constraints: - not_channel_ids = constraints.pop('not_channel_ids') - if not_channel_ids: - not_channel_ids_binary = [ - sqlite3.Binary(unhexlify(ncid)[::-1]) for ncid in not_channel_ids - ] - if constraints.get('has_channel_signature', False): - constraints['claim.channel_hash__not_in'] = not_channel_ids_binary - else: - constraints['null_or_not_channel__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.channel_hash__not_in': not_channel_ids_binary - } - if 'signature_valid' in constraints: - has_channel_signature = constraints.pop('has_channel_signature', False) - if has_channel_signature: - constraints['claim.signature_valid'] = constraints.pop('signature_valid') - else: - constraints['null_or_signature__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.signature_valid': constraints.pop('signature_valid') - } - elif constraints.pop('has_channel_signature', False): - constraints['claim.signature_valid__is_not_null'] = True - - if 'txid' in constraints: - tx_hash = unhexlify(constraints.pop('txid'))[::-1] - nout = constraints.pop('nout', 0) - constraints['claim.txo_hash'] = sqlite3.Binary( - tx_hash + struct.pack(' Tuple[List, List, int, int]: - assert set(constraints).issubset(self.SEARCH_PARAMS), \ - f"Search query contains invalid arguments: {set(constraints).difference(self.SEARCH_PARAMS)}" - total = None - if not constraints.pop('no_totals', False): - total = self.get_claims_count(**constraints) - constraints['offset'] = abs(constraints.get('offset', 0)) - constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) - if 'order_by' not in constraints: - constraints['order_by'] = ["height", "^name"] - txo_rows = self._search(**constraints) - channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash']) - extra_txo_rows = [] - if channel_hashes: - extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) - return txo_rows, extra_txo_rows, constraints['offset'], total - - def _resolve_one(self, raw_url): - try: - url = URL.parse(raw_url) - except ValueError as e: - return e - - channel = None - - if url.has_channel: - query = url.channel.to_dict() - if set(query) == {'name'}: - query['is_controlling'] = True - else: - query['order_by'] = ['^height'] - matches = self._search(**query, limit=1) - if matches: - channel = matches[0] - else: - return LookupError(f'Could not find channel in "{raw_url}".') - - if url.has_stream: - query = url.stream.to_dict() - if channel is not None: - if set(query) == {'name'}: - # temporarily emulate is_controlling for claims in channel - query['order_by'] = ['effective_amount'] - else: - query['order_by'] = ['^channel_join'] - query['channel_hash'] = channel['claim_hash'] - query['signature_valid'] = 1 - elif set(query) == {'name'}: - query['is_controlling'] = 1 - matches = self._search(**query, limit=1) - if matches: - return matches[0] - else: - return LookupError(f'Could not find stream in "{raw_url}".') - - return channel - - def resolve(self, urls) -> Tuple[List, List]: - result = [] - channel_hashes = set() - for raw_url in urls: - match = self._resolve_one(raw_url) - result.append(match) - if isinstance(match, sqlite3.Row) and match['channel_hash']: - channel_hashes.add(match['channel_hash']) - extra_txo_rows = [] - if channel_hashes: - extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) - return result, extra_txo_rows - class LBRYDB(DB): diff --git a/lbry/lbry/wallet/server/query.py b/lbry/lbry/wallet/server/query.py deleted file mode 100644 index 65e6268a7..000000000 --- a/lbry/lbry/wallet/server/query.py +++ /dev/null @@ -1,409 +0,0 @@ -import os -import sqlite3 -import struct -import asyncio -from typing import Tuple, List -from binascii import unhexlify -from decimal import Decimal - -from torba.client.basedatabase import query - -from lbry.schema.url import URL, normalize_name -from lbry.schema.tags import clean_tags -from lbry.schema.result import Outputs -from lbry.wallet.ledger import MainNetLedger, RegTestLedger - -from multiprocessing import Process, get_start_method -from multiprocessing.context import BaseContext -from concurrent.futures import ProcessPoolExecutor -from concurrent.futures.process import _ExceptionWithTraceback, _sendback_result - - -ATTRIBUTE_ARRAY_MAX_LENGTH = 100 -CLAIM_TYPES = { - 'stream': 1, - 'channel': 2, -} -STREAM_TYPES = { - 'video': 1, - 'audio': 2, - 'image': 3, - 'document': 4, - 'binary': 5, - 'model': 6 -} - - -def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False): - any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] - if any_items: - constraints.update({ - f'$any_{attr}{i}': item for i, item in enumerate(any_items) - }) - values = ', '.join( - f':$any_{attr}{i}' for i in range(len(any_items)) - ) - if for_count: - constraints[f'claim.claim_hash__in#_any_{attr}'] = f""" - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - """ - else: - constraints[f'#_any_{attr}'] = f""" - EXISTS( - SELECT 1 FROM {attr} WHERE - claim.claim_hash={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] - if all_items: - constraints[f'$all_{attr}_count'] = len(all_items) - constraints.update({ - f'$all_{attr}{i}': item for i, item in enumerate(all_items) - }) - values = ', '.join( - f':$all_{attr}{i}' for i in range(len(all_items)) - ) - if for_count: - constraints[f'claim.claim_hash__in#_all_{attr}'] = f""" - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count - """ - else: - constraints[f'#_all_{attr}'] = f""" - {len(all_items)}=( - SELECT count(*) FROM {attr} WHERE - claim.claim_hash={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH] - if not_items: - constraints.update({ - f'$not_{attr}{i}': item for i, item in enumerate(not_items) - }) - values = ', '.join( - f':$not_{attr}{i}' for i in range(len(not_items)) - ) - if for_count: - constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f""" - SELECT claim_hash FROM {attr} WHERE {attr} IN ({values}) - """ - else: - constraints[f'#_not_{attr}'] = f""" - NOT EXISTS( - SELECT 1 FROM {attr} WHERE - claim.claim_hash={attr}.claim_hash - AND {attr} IN ({values}) - ) - """ - - -class QueryProcessor(Process): - - PRAGMAS = """ - pragma journal_mode=WAL; - """ - - def get_claims(self, cols, for_count=False, **constraints): - if 'order_by' in constraints: - sql_order_by = [] - for order_by in constraints['order_by']: - is_asc = order_by.startswith('^') - column = order_by[1:] if is_asc else order_by - if column not in self.ORDER_FIELDS: - raise NameError(f'{column} is not a valid order_by field') - if column == 'name': - column = 'normalized' - sql_order_by.append( - f"claim.{column} ASC" if is_asc else f"claim.{column} DESC" - ) - constraints['order_by'] = sql_order_by - - ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'} - for constraint in self.INTEGER_PARAMS: - if constraint in constraints: - value = constraints.pop(constraint) - postfix = '' - if isinstance(value, str): - if len(value) >= 2 and value[:2] in ops: - postfix, value = ops[value[:2]], value[2:] - elif len(value) >= 1 and value[0] in ops: - postfix, value = ops[value[0]], value[1:] - if constraint == 'fee_amount': - value = Decimal(value)*1000 - constraints[f'claim.{constraint}{postfix}'] = int(value) - - if constraints.pop('is_controlling', False): - if {'sequence', 'amount_order'}.isdisjoint(constraints): - for_count = False - constraints['claimtrie.claim_hash__is_not_null'] = '' - if 'sequence' in constraints: - constraints['order_by'] = 'claim.activation_height ASC' - constraints['offset'] = int(constraints.pop('sequence')) - 1 - constraints['limit'] = 1 - if 'amount_order' in constraints: - constraints['order_by'] = 'claim.effective_amount DESC' - constraints['offset'] = int(constraints.pop('amount_order')) - 1 - constraints['limit'] = 1 - - if 'claim_id' in constraints: - claim_id = constraints.pop('claim_id') - if len(claim_id) == 40: - constraints['claim.claim_id'] = claim_id - else: - constraints['claim.claim_id__like'] = f'{claim_id[:40]}%' - - if 'name' in constraints: - constraints['claim.normalized'] = normalize_name(constraints.pop('name')) - - if 'public_key_id' in constraints: - constraints['claim.public_key_hash'] = sqlite3.Binary( - self.ledger.address_to_hash160(constraints.pop('public_key_id'))) - - if 'channel' in constraints: - channel_url = constraints.pop('channel') - match = self._resolve_one(channel_url) - if isinstance(match, sqlite3.Row): - constraints['channel_hash'] = match['claim_hash'] - else: - return [[0]] if cols == 'count(*)' else [] - if 'channel_hash' in constraints: - constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash')) - if 'channel_ids' in constraints: - channel_ids = constraints.pop('channel_ids') - if channel_ids: - constraints['claim.channel_hash__in'] = [ - sqlite3.Binary(unhexlify(cid)[::-1]) for cid in channel_ids - ] - if 'not_channel_ids' in constraints: - not_channel_ids = constraints.pop('not_channel_ids') - if not_channel_ids: - not_channel_ids_binary = [ - sqlite3.Binary(unhexlify(ncid)[::-1]) for ncid in not_channel_ids - ] - if constraints.get('has_channel_signature', False): - constraints['claim.channel_hash__not_in'] = not_channel_ids_binary - else: - constraints['null_or_not_channel__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.channel_hash__not_in': not_channel_ids_binary - } - if 'signature_valid' in constraints: - has_channel_signature = constraints.pop('has_channel_signature', False) - if has_channel_signature: - constraints['claim.signature_valid'] = constraints.pop('signature_valid') - else: - constraints['null_or_signature__or'] = { - 'claim.signature_valid__is_null': True, - 'claim.signature_valid': constraints.pop('signature_valid') - } - elif constraints.pop('has_channel_signature', False): - constraints['claim.signature_valid__is_not_null'] = True - - if 'txid' in constraints: - tx_hash = unhexlify(constraints.pop('txid'))[::-1] - nout = constraints.pop('nout', 0) - constraints['claim.txo_hash'] = sqlite3.Binary( - tx_hash + struct.pack(' Tuple[List, List, int, int]: - assert set(constraints).issubset(self.SEARCH_PARAMS), \ - f"Search query contains invalid arguments: {set(constraints).difference(self.SEARCH_PARAMS)}" - total = None - if not constraints.pop('no_totals', False): - total = self.get_claims_count(**constraints) - constraints['offset'] = abs(constraints.get('offset', 0)) - constraints['limit'] = min(abs(constraints.get('limit', 10)), 50) - if 'order_by' not in constraints: - constraints['order_by'] = ["height", "^name"] - txo_rows = self._search(**constraints) - channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash']) - extra_txo_rows = [] - if channel_hashes: - extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) - return txo_rows, extra_txo_rows, constraints['offset'], total - - def _resolve_one(self, raw_url): - try: - url = URL.parse(raw_url) - except ValueError as e: - return e - - channel = None - - if url.has_channel: - query = url.channel.to_dict() - if set(query) == {'name'}: - query['is_controlling'] = True - else: - query['order_by'] = ['^height'] - matches = self._search(**query, limit=1) - if matches: - channel = matches[0] - else: - return LookupError(f'Could not find channel in "{raw_url}".') - - if url.has_stream: - query = url.stream.to_dict() - if channel is not None: - if set(query) == {'name'}: - # temporarily emulate is_controlling for claims in channel - query['order_by'] = ['effective_amount'] - else: - query['order_by'] = ['^channel_join'] - query['channel_hash'] = channel['claim_hash'] - query['signature_valid'] = 1 - elif set(query) == {'name'}: - query['is_controlling'] = 1 - matches = self._search(**query, limit=1) - if matches: - return matches[0] - else: - return LookupError(f'Could not find stream in "{raw_url}".') - - return channel - - def resolve(self, urls) -> Tuple[List, List]: - result = [] - channel_hashes = set() - for raw_url in urls: - match = self._resolve_one(raw_url) - result.append(match) - if isinstance(match, sqlite3.Row) and match['channel_hash']: - channel_hashes.add(match['channel_hash']) - extra_txo_rows = [] - if channel_hashes: - extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]}) - return result, extra_txo_rows - - def run(self): - (call_queue, result_queue, _, initargs) = self._args # pylint: disable=W0632 - db_path, ledger_name = initargs - self.ledger = MainNetLedger if ledger_name == 'mainnet' else RegTestLedger - self.db = sqlite3.connect(db_path, isolation_level=None) - self.db.row_factory = sqlite3.Row - while True: - call_item = call_queue.get(block=True) - if call_item is None: - # Wake up queue management thread - result_queue.put(os.getpid()) - return - try: - fn = getattr(self, call_item.args[0]) - r = Outputs.to_base64(*fn(*call_item.args[1:])) - except BaseException as e: - exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc) - else: - _sendback_result(result_queue, call_item.work_id, result=r) - - # Liberate the resource as soon as possible, to avoid holding onto - # open files or shared memory that is not needed anymore - del call_item - - -class QueryContext(BaseContext): - _name = get_start_method(False) - Process = QueryProcessor - - -class QueryExecutor(ProcessPoolExecutor): - def __init__(self, db_path, ledger_name, max_workers=None): - super().__init__( - max_workers=max_workers or max(os.cpu_count(), 4), - mp_context=QueryContext(), - initargs=(db_path, ledger_name) - ) - - async def resolve(self, urls): - return await asyncio.wrap_future( - self.submit(None, 'resolve', urls) - ) - - async def search(self, kwargs): - return await asyncio.wrap_future( - self.submit(None, 'search', kwargs) - ) diff --git a/lbry/lbry/wallet/server/session.py b/lbry/lbry/wallet/server/session.py index 6a2628652..de04d3389 100644 --- a/lbry/lbry/wallet/server/session.py +++ b/lbry/lbry/wallet/server/session.py @@ -1,16 +1,17 @@ +import os import math -import unicodedata as uda -from binascii import unhexlify, hexlify +import base64 +import asyncio +from binascii import hexlify from torba.rpc.jsonrpc import RPCError -from torba.server.hash import hash_to_hex_str from torba.server.session import ElectrumX, SessionManager from torba.server import util -from lbry.schema.url import URL from lbry.wallet.server.block_processor import LBRYBlockProcessor -from lbry.wallet.server.db import LBRYDB -from lbry.wallet.server.query import QueryExecutor +from lbry.wallet.server.db.writer import LBRYDB +from lbry.wallet.server.db import reader +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor class LBRYSessionManager(SessionManager): @@ -20,7 +21,13 @@ class LBRYSessionManager(SessionManager): self.query_executor = None async def start_other(self): - self.query_executor = QueryExecutor('claims.db', self.env.coin.NET, self.env.max_query_workers) + args = dict(initializer=reader.initializer, initargs=('claims.db', self.env.coin.NET)) + if self.env.max_query_workers is not None and self.env.max_query_workers == 0: + self.query_executor = ThreadPoolExecutor(max_workers=1, **args) + else: + self.query_executor = ProcessPoolExecutor( + max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args + ) async def stop_other(self): self.query_executor.shutdown() @@ -43,17 +50,7 @@ class LBRYElectrumX(ElectrumX): 'blockchain.transaction.get_height': self.transaction_get_height, 'blockchain.claimtrie.search': self.claimtrie_search, 'blockchain.claimtrie.resolve': self.claimtrie_resolve, - 'blockchain.claimtrie.getclaimbyid': self.claimtrie_getclaimbyid, - 'blockchain.claimtrie.getclaimsforname': self.claimtrie_getclaimsforname, 'blockchain.claimtrie.getclaimsbyids': self.claimtrie_getclaimsbyids, - 'blockchain.claimtrie.getvalue': self.claimtrie_getvalue, - 'blockchain.claimtrie.getnthclaimforname': self.claimtrie_getnthclaimforname, - 'blockchain.claimtrie.getclaimsintx': self.claimtrie_getclaimsintx, - 'blockchain.claimtrie.getclaimssignedby': self.claimtrie_getclaimssignedby, - 'blockchain.claimtrie.getclaimssignedbynthtoname': self.claimtrie_getclaimssignedbynthtoname, - 'blockchain.claimtrie.getvalueforuri': self.claimtrie_getvalueforuri, - 'blockchain.claimtrie.getvaluesforuris': self.claimtrie_getvalueforuris, - 'blockchain.claimtrie.getclaimssignedbyid': self.claimtrie_getclaimssignedbyid, 'blockchain.block.get_server_height': self.get_server_height, } self.request_handlers.update(handlers) @@ -61,10 +58,14 @@ class LBRYElectrumX(ElectrumX): async def claimtrie_search(self, **kwargs): if 'claim_id' in kwargs: self.assert_claim_id(kwargs['claim_id']) - return await self.session_mgr.query_executor.search(kwargs) + return base64.b64encode(await asyncio.get_running_loop().run_in_executor( + self.session_mgr.query_executor, reader.search_to_bytes, kwargs + )).decode() async def claimtrie_resolve(self, *urls): - return await self.session_mgr.query_executor.resolve(urls) + return base64.b64encode(await asyncio.get_running_loop().run_in_executor( + self.session_mgr.query_executor, reader.resolve_to_bytes, urls + )).decode() async def get_server_height(self): return self.bp.height @@ -79,84 +80,9 @@ class LBRYElectrumX(ElectrumX): return -1 return None - async def claimtrie_getclaimssignedby(self, name): - winning_claim = await self.daemon.getvalueforname(name) - if winning_claim: - return await self.claimtrie_getclaimssignedbyid(winning_claim['claimId']) - - async def claimtrie_getclaimssignedbyid(self, certificate_id): - claim_ids = self.get_claim_ids_signed_by(certificate_id) - return await self.batched_formatted_claims_from_daemon(claim_ids) - - def claimtrie_getclaimssignedbyidminimal(self, certificate_id): - claim_ids = self.get_claim_ids_signed_by(certificate_id) - ret = [] - for claim_id in claim_ids: - raw_claim_id = unhexlify(claim_id)[::-1] - info = self.db.get_claim_info(raw_claim_id) - if info: - ret.append({ - 'claim_id': claim_id, - 'height': info.height, - 'name': info.name.decode() - }) - return ret - - def get_claim_ids_signed_by(self, certificate_id): - raw_certificate_id = unhexlify(certificate_id)[::-1] - raw_claim_ids = self.db.get_signed_claim_ids_by_cert_id(raw_certificate_id) - return list(map(hash_to_hex_str, raw_claim_ids)) - - async def claimtrie_getclaimssignedbynthtoname(self, name, n): - claim = self.claimtrie_getnthclaimforname(name, n) - if claim and 'claim_id' in claim: - return await self.claimtrie_getclaimssignedbyid(hash_to_hex_str(claim['claim_id'])) - - async def claimtrie_getclaimsintx(self, txid): - # TODO: this needs further discussion. - # Code on lbryum-server is wrong and we need to gather what we clearly expect from this command - claim_ids = [claim['claimId'] for claim in (await self.daemon.getclaimsfortx(txid)) if 'claimId' in claim] - return await self.batched_formatted_claims_from_daemon(claim_ids) - - async def claimtrie_getvalue(self, name, block_hash=None): - proof = await self.daemon.getnameproof(name, block_hash) - result = {'proof': proof, 'supports': []} - - if proof_has_winning_claim(proof): - tx_hash, nout = proof['txhash'], int(proof['nOut']) - transaction_info = await self.daemon.getrawtransaction(tx_hash, True) - result['transaction'] = transaction_info['hex'] # should have never included this (or the call to get it) - raw_claim_id = self.db.get_claim_id_from_outpoint(unhexlify(tx_hash)[::-1], nout) - claim_id = hexlify(raw_claim_id[::-1]).decode() - claim = await self.claimtrie_getclaimbyid(claim_id) - result.update(claim) - - return result - - async def claimtrie_getnthclaimforname(self, name, n): - n = int(n) - result = await self.claimtrie_getclaimsforname(name) - if 'claims' in result and len(result['claims']) > n >= 0: - # TODO: revist this after lbrycrd_#209 to see if we can sort by claim_sequence at this point - result['claims'].sort(key=lambda c: (int(c['height']), int(c['nout']))) - result['claims'][n]['claim_sequence'] = n - return result['claims'][n] - - async def claimtrie_getpartialmatch(self, name, part): - result = await self.claimtrie_getclaimsforname(name) - if 'claims' in result: - return next(filter(lambda x: x['claim_id'].starts_with(part), result['claims']), None) - - async def claimtrie_getclaimsforname(self, name): - claims = await self.daemon.getclaimsforname(name) - if claims: - claims['claims'] = [self.format_claim_from_daemon(claim, name) for claim in claims['claims']] - claims['supports_without_claims'] = [] # fixme temporary - del claims['supports without claims'] - claims['last_takeover_height'] = claims['nLastTakeoverHeight'] - del claims['nLastTakeoverHeight'] - return claims - return {} + async def claimtrie_getclaimsbyids(self, *claim_ids): + claims = await self.batched_formatted_claims_from_daemon(claim_ids) + return dict(zip(claim_ids, claims)) async def batched_formatted_claims_from_daemon(self, claim_ids): claims = await self.daemon.getclaimsbyids(claim_ids) @@ -215,19 +141,6 @@ class LBRYElectrumX(ElectrumX): result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode() return result - def format_supports_from_daemon(self, supports): - return [[support['txid'], support['n'], get_from_possible_keys(support, 'amount', 'nAmount')] for - support in supports] - - async def claimtrie_getclaimbyid(self, claim_id): - self.assert_claim_id(claim_id) - claim = await self.daemon.getclaimbyid(claim_id) - return self.format_claim_from_daemon(claim) - - async def claimtrie_getclaimsbyids(self, *claim_ids): - claims = await self.batched_formatted_claims_from_daemon(claim_ids) - return dict(zip(claim_ids, claims)) - def assert_tx_hash(self, value): '''Raise an RPCError if the value is not a valid transaction hash.''' @@ -248,118 +161,6 @@ class LBRYElectrumX(ElectrumX): pass raise RPCError(1, f'{value} should be a claim id hash') - def normalize_name(self, name): - # this is designed to match lbrycrd; change it here if it changes there - return uda.normalize('NFD', name).casefold() - - def claim_matches_name(self, claim, name): - if not name: - return False - if 'normalized_name' in claim: - return self.normalize_name(name) == claim['normalized_name'] - return name == claim['name'] - - async def claimtrie_getvalueforuri(self, block_hash, uri, known_certificates=None): - # TODO: this thing is huge, refactor - CLAIM_ID = "claim_id" - WINNING = "winning" - SEQUENCE = "sequence" - uri = uri - block_hash = block_hash - try: - parsed_uri = URL.parse(uri) - except ValueError as err: - return {'error': err.args[0]} - result = {} - - if parsed_uri.has_channel: - certificate = None - - # TODO: this is also done on the else, refactor - if parsed_uri.channel.claim_id: - if len(parsed_uri.channel.claim_id) < 40: - certificate_info = self.claimtrie_getpartialmatch( - parsed_uri.channel.name, parsed_uri.channel.claim_id) - else: - certificate_info = await self.claimtrie_getclaimbyid(parsed_uri.channel.claim_id) - if certificate_info and self.claim_matches_name(certificate_info, parsed_uri.channel.name): - certificate = {'resolution_type': CLAIM_ID, 'result': certificate_info} - elif parsed_uri.claim_sequence: - certificate_info = await self.claimtrie_getnthclaimforname(parsed_uri.name, parsed_uri.claim_sequence) - if certificate_info: - certificate = {'resolution_type': SEQUENCE, 'result': certificate_info} - else: - certificate_info = await self.claimtrie_getvalue(parsed_uri.name, block_hash) - if certificate_info: - certificate = {'resolution_type': WINNING, 'result': certificate_info} - - if certificate and 'claim_id' not in certificate['result']: - return result - - if certificate: - result['certificate'] = certificate - channel_id = certificate['result']['claim_id'] - claims_in_channel = self.claimtrie_getclaimssignedbyidminimal(channel_id) - if not parsed_uri.path: - result['unverified_claims_in_channel'] = {claim['claim_id']: (claim['name'], claim['height']) - for claim in claims_in_channel} - else: - # making an assumption that there aren't case conflicts on an existing channel - norm_path = self.normalize_name(parsed_uri.path) - result['unverified_claims_for_name'] = {claim['claim_id']: (claim['name'], claim['height']) - for claim in claims_in_channel - if self.normalize_name(claim['name']) == norm_path} - - else: - claim = None - if parsed_uri.claim_id: - if len(parsed_uri.claim_id) < 40: - claim_info = self.claimtrie_getpartialmatch(parsed_uri.name, parsed_uri.claim_id) - else: - claim_info = await self.claimtrie_getclaimbyid(parsed_uri.claim_id) - if claim_info and self.claim_matches_name(claim_info, parsed_uri.name): - claim = {'resolution_type': CLAIM_ID, 'result': claim_info} - elif parsed_uri.claim_sequence: - claim_info = await self.claimtrie_getnthclaimforname(parsed_uri.name, parsed_uri.claim_sequence) - if claim_info: - claim = {'resolution_type': SEQUENCE, 'result': claim_info} - else: - claim_info = await self.claimtrie_getvalue(parsed_uri.name, block_hash) - if claim_info: - claim = {'resolution_type': WINNING, 'result': claim_info} - if (claim and - # is not an unclaimed winning name - (claim['resolution_type'] != WINNING or proof_has_winning_claim(claim['result']['proof']))): - raw_claim_id = unhexlify(claim['result']['claim_id'])[::-1] - raw_certificate_id = self.db.get_claim_info(raw_claim_id).cert_id - if raw_certificate_id: - certificate_id = hash_to_hex_str(raw_certificate_id) - certificate = await self.claimtrie_getclaimbyid(certificate_id) - if certificate: - certificate = {'resolution_type': CLAIM_ID, - 'result': certificate} - result['certificate'] = certificate - result['claim'] = claim - - return result - - async def claimtrie_getvalueforuris(self, block_hash, *uris): - MAX_BATCH_URIS = 500 - if len(uris) > MAX_BATCH_URIS: - raise Exception("Exceeds max batch uris of {}".format(MAX_BATCH_URIS)) - - return {uri: await self.claimtrie_getvalueforuri(block_hash, uri) for uri in uris} - - # TODO: get it all concurrently when lbrycrd pending changes goes into a stable release - #async def getvalue(uri): - # value = await self.claimtrie_getvalueforuri(block_hash, uri) - # return uri, value, - #return dict([await asyncio.gather(*tuple(getvalue(uri) for uri in uris))][0]) - - -def proof_has_winning_claim(proof): - return {'txhash', 'nOut'}.issubset(proof.keys()) - def get_from_possible_keys(dictionary, *keys): for key in keys: diff --git a/lbry/tests/unit/wallet/server/test_sqldb.py b/lbry/tests/unit/wallet/server/test_sqldb.py index eea2119bc..87c102c43 100644 --- a/lbry/tests/unit/wallet/server/test_sqldb.py +++ b/lbry/tests/unit/wallet/server/test_sqldb.py @@ -5,10 +5,10 @@ from binascii import hexlify from torba.client.constants import COIN, NULL_HASH32 from lbry.schema.claim import Claim -from lbry.wallet.server.db import SQLDB +from lbry.wallet.server.db import reader, writer from lbry.wallet.server.coin import LBCRegTest -from lbry.wallet.server.trending import TRENDING_WINDOW -from lbry.wallet.server.canonical import FindShortestID +from lbry.wallet.server.db.trending import TRENDING_WINDOW +from lbry.wallet.server.db.canonical import FindShortestID from lbry.wallet.server.block_processor import Timer from lbry.wallet.transaction import Transaction, Input, Output @@ -41,7 +41,11 @@ class TestSQLDB(unittest.TestCase): self.first_sync = False self.daemon_height = 1 self.coin = LBCRegTest() - self.sql = SQLDB(self, ':memory:') + db_url = 'file:test_sqldb?mode=memory&cache=shared' + self.sql = writer.SQLDB(self, db_url) + self.addCleanup(self.sql.close) + reader.initializer(db_url, 'regtest') + self.addCleanup(reader.cleanup) self.timer = Timer('BlockProcessor') self.sql.open() self._current_height = 0 @@ -339,7 +343,7 @@ class TestClaimtrie(TestSQLDB): txo_chan_a = tx_chan_a[0].tx.outputs[0] advance(1, [tx_chan_a]) advance(2, [tx_chan_ab]) - r_ab, r_a = self.sql._search(order_by=['creation_height'], limit=2) + r_ab, r_a = reader._search(order_by=['creation_height'], limit=2) self.assertEqual("@foo#a", r_a['short_url']) self.assertEqual("@foo#ab", r_ab['short_url']) self.assertIsNone(r_a['canonical_url']) @@ -352,7 +356,7 @@ class TestClaimtrie(TestSQLDB): tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) advance(3, [tx_a]) advance(4, [tx_ab, tx_abc]) - r_abc, r_ab, r_a = self.sql._search(order_by=['creation_height', 'tx_position'], limit=3) + r_abc, r_ab, r_a = reader._search(order_by=['creation_height', 'tx_position'], limit=3) self.assertEqual("foo#a", r_a['short_url']) self.assertEqual("foo#ab", r_ab['short_url']) self.assertEqual("foo#abc", r_abc['short_url']) @@ -366,39 +370,39 @@ class TestClaimtrie(TestSQLDB): ab2_claim_id = tx_ab2[0].tx.outputs[0].claim_id advance(6, [tx_a2]) advance(7, [tx_ab2]) - r_ab2, r_a2 = self.sql._search(order_by=['creation_height'], limit=2) + r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # change channel public key, invaliding stream claim signatures advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) - r_ab2, r_a2 = self.sql._search(order_by=['creation_height'], limit=2) + r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_ab2['canonical_url']) - self.assertEqual(0, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # reinstate previous channel public key (previous stream claim signatures become valid again) channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') advance(9, [channel_update]) - r_ab2, r_a2 = self.sql._search(order_by=['creation_height'], limit=2) + r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) - self.assertEqual(2, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # claim abandon updates claims_in_channel advance(10, [self.get_abandon(tx_ab2)]) - self.assertEqual(1, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) + self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) # delete channel, invaliding stream claim signatures advance(11, [self.get_abandon(channel_update)]) - r_a2, = self.sql._search(order_by=['creation_height'], limit=1) + r_a2, = reader._search(order_by=['creation_height'], limit=1) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertIsNone(r_a2['canonical_url']) @@ -436,7 +440,7 @@ class TestTrending(TestSQLDB): self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), ]) - results = self.sql._search(order_by=['trending_local']) + results = reader._search(order_by=['trending_local']) self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results]) diff --git a/torba/torba/orchstr8/node.py b/torba/torba/orchstr8/node.py index ebba5dc07..9e5ecf225 100644 --- a/torba/torba/orchstr8/node.py +++ b/torba/torba/orchstr8/node.py @@ -204,7 +204,7 @@ class SPVNode: 'REORG_LIMIT': '100', 'HOST': self.hostname, 'TCP_PORT': str(self.port), - 'MAX_QUERY_WORKERS': '1' + 'MAX_QUERY_WORKERS': '0' } # TODO: don't use os.environ os.environ.update(conf)