refactored wallet server db reader to support thread pool and process pool

This commit is contained in:
Lex Berezhny 2019-07-11 13:29:26 -04:00
parent a73c5aec27
commit cc098f3090
12 changed files with 438 additions and 990 deletions

View file

@ -3,7 +3,7 @@ import time
from torba.server.block_processor import BlockProcessor from torba.server.block_processor import BlockProcessor
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.server.db import SQLDB from lbry.wallet.server.db.writer import SQLDB
class Timer: class Timer:

View file

@ -12,7 +12,7 @@ class LBC(Coin):
from .session import LBRYElectrumX, LBRYSessionManager from .session import LBRYElectrumX, LBRYSessionManager
from .block_processor import LBRYBlockProcessor from .block_processor import LBRYBlockProcessor
from .daemon import LBCDaemon from .daemon import LBCDaemon
from .db import LBRYDB from .db.writer import LBRYDB
DAEMON = LBCDaemon DAEMON = LBCDaemon
SESSIONCLS = LBRYElectrumX SESSIONCLS = LBRYElectrumX
BLOCK_PROCESSOR = LBRYBlockProcessor BLOCK_PROCESSOR = LBRYBlockProcessor

View file

View file

@ -0,0 +1,13 @@
CLAIM_TYPES = {
'stream': 1,
'channel': 2,
}
STREAM_TYPES = {
'video': 1,
'audio': 2,
'image': 3,
'document': 4,
'binary': 5,
'model': 6
}

View file

@ -0,0 +1,375 @@
import sqlite3
import struct
from typing import Tuple, List
from binascii import unhexlify
from decimal import Decimal
from contextvars import ContextVar
from torba.client.basedatabase import query
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs
from .common import CLAIM_TYPES, STREAM_TYPES
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'release_time', 'fee_amount',
'tx_position', 'channel_join',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed',
'trending_local', 'trending_global',
}
SEARCH_PARAMS = {
'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
'is_controlling', 'limit', 'offset', 'order_by',
'no_totals',
} | INTEGER_PARAMS
ORDER_FIELDS = {
'name',
} | INTEGER_PARAMS
PRAGMAS = """
pragma journal_mode=WAL;
"""
db = ContextVar('db')
ledger = ContextVar('ledger')
def initializer(_path, _ledger_name):
_db = sqlite3.connect(_path, isolation_level=None, uri=True)
_db.row_factory = sqlite3.Row
db.set(_db)
from lbry.wallet.ledger import MainNetLedger, RegTestLedger
ledger.set(MainNetLedger if _ledger_name == 'mainnet' else RegTestLedger)
def cleanup():
db.get().close()
db.set(None)
ledger.set(None)
def get_claims(cols, for_count=False, **constraints):
if 'order_by' in constraints:
sql_order_by = []
for order_by in constraints['order_by']:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'normalized'
sql_order_by.append(
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
)
constraints['order_by'] = sql_order_by
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
for constraint in INTEGER_PARAMS:
if constraint in constraints:
value = constraints.pop(constraint)
postfix = ''
if isinstance(value, str):
if len(value) >= 2 and value[:2] in ops:
postfix, value = ops[value[:2]], value[2:]
elif len(value) >= 1 and value[0] in ops:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'claim.{constraint}{postfix}'] = int(value)
if constraints.pop('is_controlling', False):
if {'sequence', 'amount_order'}.isdisjoint(constraints):
for_count = False
constraints['claimtrie.claim_hash__is_not_null'] = ''
if 'sequence' in constraints:
constraints['order_by'] = 'claim.activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
constraints['order_by'] = 'claim.effective_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim.claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
if 'name' in constraints:
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = sqlite3.Binary(
ledger.get().address_to_hash160(constraints.pop('public_key_id')))
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = _resolve_one(channel_url)
if isinstance(match, sqlite3.Row):
constraints['channel_hash'] = match['claim_hash']
else:
return [[0]] if cols == 'count(*)' else []
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash'))
if 'channel_ids' in constraints:
channel_ids = constraints.pop('channel_ids')
if channel_ids:
constraints['claim.channel_hash__in'] = [
sqlite3.Binary(unhexlify(cid)[::-1]) for cid in channel_ids
]
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = [
sqlite3.Binary(unhexlify(ncid)[::-1]) for ncid in not_channel_ids
]
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
else:
constraints['null_or_signature__or'] = {
'claim.signature_valid__is_null': True,
'claim.signature_valid': constraints.pop('signature_valid')
}
elif constraints.pop('has_channel_signature', False):
constraints['claim.signature_valid__is_not_null'] = True
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['claim.txo_hash'] = sqlite3.Binary(
tx_hash + struct.pack('<I', nout)
)
if 'claim_type' in constraints:
constraints['claim.claim_type'] = CLAIM_TYPES[constraints.pop('claim_type')]
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
if stream_types:
constraints['claim.stream_type__in'] = [
STREAM_TYPES[stream_type] for stream_type in stream_types
]
if 'media_types' in constraints:
media_types = constraints.pop('media_types')
if media_types:
constraints['claim.media_type__in'] = media_types
if 'fee_currency' in constraints:
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
select = f"SELECT {cols} FROM claim"
sql, values = query(
select if for_count else select+"""
LEFT JOIN claimtrie USING (claim_hash)
LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash)
""", **constraints
)
return db.get().execute(sql, values).fetchall()
def get_claims_count(**constraints):
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = get_claims('count(*)', for_count=True, **constraints)
return count[0][0]
def search(constraints) -> Tuple[List, List, int, int]:
assert set(constraints).issubset(SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(SEARCH_PARAMS)}"
total = None
if not constraints.pop('no_totals', False):
total = get_claims_count(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["height", "^name"]
txo_rows = _search(**constraints)
channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return txo_rows, extra_txo_rows, constraints['offset'], total
def search_to_bytes(constraints) -> bytes:
return Outputs.to_bytes(*search(constraints))
def _search(**constraints):
return get_claims(
"""
claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
claim.claim_hash, claim.txo_hash,
claim.claims_in_channel,
claim.height, claim.creation_height,
claim.activation_height, claim.expiration_height,
claim.effective_amount, claim.support_amount,
claim.trending_group, claim.trending_mixed,
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, channel.txo_hash AS channel_txo_hash,
channel.height AS channel_height, claim.signature_valid
""", **constraints
)
def resolve(urls) -> Tuple[List, List]:
result = []
channel_hashes = set()
for raw_url in urls:
match = _resolve_one(raw_url)
result.append(match)
if isinstance(match, sqlite3.Row) and match['channel_hash']:
channel_hashes.add(match['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = _search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return result, extra_txo_rows
def resolve_to_bytes(urls) -> bytes:
return Outputs.to_bytes(*resolve(urls))
def _resolve_one(raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
channel = None
if url.has_channel:
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^height']
matches = _search(**query, limit=1)
if matches:
channel = matches[0]
else:
return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream:
query = url.stream.to_dict()
if channel is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount']
else:
query['order_by'] = ['^channel_join']
query['channel_hash'] = channel['claim_hash']
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = _search(**query, limit=1)
if matches:
return matches[0]
else:
return LookupError(f'Could not find stream in "{raw_url}".')
return channel
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if any_items:
constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
})
values = ', '.join(
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count:
constraints[f'claim.claim_hash__in#_any_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_any_{attr}'] = f"""
EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if all_items:
constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
})
values = ', '.join(
f':$all_{attr}{i}' for i in range(len(all_items))
)
if for_count:
constraints[f'claim.claim_hash__in#_all_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
"""
else:
constraints[f'#_all_{attr}'] = f"""
{len(all_items)}=(
SELECT count(*) FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""

View file

@ -1,7 +1,5 @@
import sqlite3 import sqlite3
import struct
from typing import Union, Tuple, Set, List from typing import Union, Tuple, Set, List
from binascii import unhexlify
from itertools import chain from itertools import chain
from decimal import Decimal from decimal import Decimal
@ -9,97 +7,19 @@ from torba.server.db import DB
from torba.server.util import class_logger from torba.server.util import class_logger
from torba.client.basedatabase import query, constraints_to_sql from torba.client.basedatabase import query, constraints_to_sql
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags from lbry.schema.tags import clean_tags
from lbry.schema.mime_types import guess_stream_type from lbry.schema.mime_types import guess_stream_type
from lbry.wallet.ledger import MainNetLedger, RegTestLedger from lbry.wallet.ledger import MainNetLedger, RegTestLedger
from lbry.wallet.transaction import Transaction, Output from lbry.wallet.transaction import Transaction, Output
from lbry.wallet.server.canonical import register_canonical_functions from lbry.wallet.server.db.canonical import register_canonical_functions
from lbry.wallet.server.trending import ( from lbry.wallet.server.db.trending import (
CREATE_TREND_TABLE, calculate_trending, register_trending_functions CREATE_TREND_TABLE, calculate_trending, register_trending_functions
) )
from .common import CLAIM_TYPES, STREAM_TYPES
ATTRIBUTE_ARRAY_MAX_LENGTH = 100 ATTRIBUTE_ARRAY_MAX_LENGTH = 100
CLAIM_TYPES = {
'stream': 1,
'channel': 2,
}
STREAM_TYPES = {
'video': 1,
'audio': 2,
'image': 3,
'document': 4,
'binary': 5,
'model': 6
}
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if any_items:
constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
})
values = ', '.join(
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count:
constraints[f'claim.claim_hash__in#_any_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_any_{attr}'] = f"""
EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if all_items:
constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
})
values = ', '.join(
f':$all_{attr}{i}' for i in range(len(all_items))
)
if for_count:
constraints[f'claim.claim_hash__in#_all_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
"""
else:
constraints[f'#_all_{attr}'] = f"""
{len(all_items)}=(
SELECT count(*) FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
class SQLDB: class SQLDB:
@ -234,7 +154,7 @@ class SQLDB:
self.ledger = MainNetLedger if self.main.coin.NET == 'mainnet' else RegTestLedger self.ledger = MainNetLedger if self.main.coin.NET == 'mainnet' else RegTestLedger
def open(self): def open(self):
self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False) self.db = sqlite3.connect(self._db_path, isolation_level=None, check_same_thread=False, uri=True)
self.db.row_factory = sqlite3.Row self.db.row_factory = sqlite3.Row
self.db.executescript(self.CREATE_TABLES_QUERY) self.db.executescript(self.CREATE_TABLES_QUERY)
register_canonical_functions(self.db) register_canonical_functions(self.db)
@ -772,262 +692,6 @@ class SQLDB:
r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True) r(self.update_claimtrie, height, recalculate_claim_hashes, deleted_claim_names, forward_timer=True)
r(calculate_trending, self.db, height, self.main.first_sync, daemon_height) r(calculate_trending, self.db, height, self.main.first_sync, daemon_height)
def get_claims(self, cols, for_count=False, **constraints):
if 'order_by' in constraints:
sql_order_by = []
for order_by in constraints['order_by']:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in self.ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'normalized'
sql_order_by.append(
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
)
constraints['order_by'] = sql_order_by
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
for constraint in self.INTEGER_PARAMS:
if constraint in constraints:
value = constraints.pop(constraint)
postfix = ''
if isinstance(value, str):
if len(value) >= 2 and value[:2] in ops:
postfix, value = ops[value[:2]], value[2:]
elif len(value) >= 1 and value[0] in ops:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'claim.{constraint}{postfix}'] = int(value)
if constraints.pop('is_controlling', False):
if {'sequence', 'amount_order'}.isdisjoint(constraints):
for_count = False
constraints['claimtrie.claim_hash__is_not_null'] = ''
if 'sequence' in constraints:
constraints['order_by'] = 'claim.activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
constraints['order_by'] = 'claim.effective_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim.claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
if 'name' in constraints:
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = sqlite3.Binary(
self.ledger.address_to_hash160(constraints.pop('public_key_id')))
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = self._resolve_one(channel_url)
if isinstance(match, sqlite3.Row):
constraints['channel_hash'] = match['claim_hash']
else:
return [[0]] if cols == 'count(*)' else []
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash'))
if 'channel_ids' in constraints:
channel_ids = constraints.pop('channel_ids')
if channel_ids:
constraints['claim.channel_hash__in'] = [
sqlite3.Binary(unhexlify(cid)[::-1]) for cid in channel_ids
]
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = [
sqlite3.Binary(unhexlify(ncid)[::-1]) for ncid in not_channel_ids
]
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
else:
constraints['null_or_signature__or'] = {
'claim.signature_valid__is_null': True,
'claim.signature_valid': constraints.pop('signature_valid')
}
elif constraints.pop('has_channel_signature', False):
constraints['claim.signature_valid__is_not_null'] = True
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['claim.txo_hash'] = sqlite3.Binary(
tx_hash + struct.pack('<I', nout)
)
if 'claim_type' in constraints:
constraints['claim.claim_type'] = CLAIM_TYPES[constraints.pop('claim_type')]
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
if stream_types:
constraints['claim.stream_type__in'] = [
STREAM_TYPES[stream_type] for stream_type in stream_types
]
if 'media_types' in constraints:
media_types = constraints.pop('media_types')
if media_types:
constraints['claim.media_type__in'] = media_types
if 'fee_currency' in constraints:
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
select = f"SELECT {cols} FROM claim"
sql, values = query(
select if for_count else select+"""
LEFT JOIN claimtrie USING (claim_hash)
LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash)
""", **constraints
)
try:
return self.db.execute(sql, values).fetchall()
except:
self.logger.exception(f'Failed to execute claim search query: {sql}')
raise
def get_claims_count(self, **constraints):
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = self.get_claims('count(*)', for_count=True, **constraints)
return count[0][0]
def _search(self, **constraints):
return self.get_claims(
"""
claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
claim.claim_hash, claim.txo_hash,
claim.claims_in_channel,
claim.height, claim.creation_height,
claim.activation_height, claim.expiration_height,
claim.effective_amount, claim.support_amount,
claim.trending_group, claim.trending_mixed,
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, channel.txo_hash AS channel_txo_hash,
channel.height AS channel_height, claim.signature_valid
""", **constraints
)
INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'release_time', 'fee_amount',
'tx_position', 'channel_join',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed',
'trending_local', 'trending_global',
}
SEARCH_PARAMS = {
'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
'is_controlling', 'limit', 'offset', 'order_by',
'no_totals',
} | INTEGER_PARAMS
ORDER_FIELDS = {
'name',
} | INTEGER_PARAMS
def search(self, constraints) -> Tuple[List, List, int, int]:
assert set(constraints).issubset(self.SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(self.SEARCH_PARAMS)}"
total = None
if not constraints.pop('no_totals', False):
total = self.get_claims_count(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["height", "^name"]
txo_rows = self._search(**constraints)
channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return txo_rows, extra_txo_rows, constraints['offset'], total
def _resolve_one(self, raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
channel = None
if url.has_channel:
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^height']
matches = self._search(**query, limit=1)
if matches:
channel = matches[0]
else:
return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream:
query = url.stream.to_dict()
if channel is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount']
else:
query['order_by'] = ['^channel_join']
query['channel_hash'] = channel['claim_hash']
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = self._search(**query, limit=1)
if matches:
return matches[0]
else:
return LookupError(f'Could not find stream in "{raw_url}".')
return channel
def resolve(self, urls) -> Tuple[List, List]:
result = []
channel_hashes = set()
for raw_url in urls:
match = self._resolve_one(raw_url)
result.append(match)
if isinstance(match, sqlite3.Row) and match['channel_hash']:
channel_hashes.add(match['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return result, extra_txo_rows
class LBRYDB(DB): class LBRYDB(DB):

View file

@ -1,409 +0,0 @@
import os
import sqlite3
import struct
import asyncio
from typing import Tuple, List
from binascii import unhexlify
from decimal import Decimal
from torba.client.basedatabase import query
from lbry.schema.url import URL, normalize_name
from lbry.schema.tags import clean_tags
from lbry.schema.result import Outputs
from lbry.wallet.ledger import MainNetLedger, RegTestLedger
from multiprocessing import Process, get_start_method
from multiprocessing.context import BaseContext
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures.process import _ExceptionWithTraceback, _sendback_result
ATTRIBUTE_ARRAY_MAX_LENGTH = 100
CLAIM_TYPES = {
'stream': 1,
'channel': 2,
}
STREAM_TYPES = {
'video': 1,
'audio': 2,
'image': 3,
'document': 4,
'binary': 5,
'model': 6
}
def _apply_constraints_for_array_attributes(constraints, attr, cleaner, for_count=False):
any_items = cleaner(constraints.pop(f'any_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if any_items:
constraints.update({
f'$any_{attr}{i}': item for i, item in enumerate(any_items)
})
values = ', '.join(
f':$any_{attr}{i}' for i in range(len(any_items))
)
if for_count:
constraints[f'claim.claim_hash__in#_any_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_any_{attr}'] = f"""
EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
all_items = cleaner(constraints.pop(f'all_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if all_items:
constraints[f'$all_{attr}_count'] = len(all_items)
constraints.update({
f'$all_{attr}{i}': item for i, item in enumerate(all_items)
})
values = ', '.join(
f':$all_{attr}{i}' for i in range(len(all_items))
)
if for_count:
constraints[f'claim.claim_hash__in#_all_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
GROUP BY claim_hash HAVING COUNT({attr}) = :$all_{attr}_count
"""
else:
constraints[f'#_all_{attr}'] = f"""
{len(all_items)}=(
SELECT count(*) FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
not_items = cleaner(constraints.pop(f'not_{attr}s', []))[:ATTRIBUTE_ARRAY_MAX_LENGTH]
if not_items:
constraints.update({
f'$not_{attr}{i}': item for i, item in enumerate(not_items)
})
values = ', '.join(
f':$not_{attr}{i}' for i in range(len(not_items))
)
if for_count:
constraints[f'claim.claim_hash__not_in#_not_{attr}'] = f"""
SELECT claim_hash FROM {attr} WHERE {attr} IN ({values})
"""
else:
constraints[f'#_not_{attr}'] = f"""
NOT EXISTS(
SELECT 1 FROM {attr} WHERE
claim.claim_hash={attr}.claim_hash
AND {attr} IN ({values})
)
"""
class QueryProcessor(Process):
PRAGMAS = """
pragma journal_mode=WAL;
"""
def get_claims(self, cols, for_count=False, **constraints):
if 'order_by' in constraints:
sql_order_by = []
for order_by in constraints['order_by']:
is_asc = order_by.startswith('^')
column = order_by[1:] if is_asc else order_by
if column not in self.ORDER_FIELDS:
raise NameError(f'{column} is not a valid order_by field')
if column == 'name':
column = 'normalized'
sql_order_by.append(
f"claim.{column} ASC" if is_asc else f"claim.{column} DESC"
)
constraints['order_by'] = sql_order_by
ops = {'<=': '__lte', '>=': '__gte', '<': '__lt', '>': '__gt'}
for constraint in self.INTEGER_PARAMS:
if constraint in constraints:
value = constraints.pop(constraint)
postfix = ''
if isinstance(value, str):
if len(value) >= 2 and value[:2] in ops:
postfix, value = ops[value[:2]], value[2:]
elif len(value) >= 1 and value[0] in ops:
postfix, value = ops[value[0]], value[1:]
if constraint == 'fee_amount':
value = Decimal(value)*1000
constraints[f'claim.{constraint}{postfix}'] = int(value)
if constraints.pop('is_controlling', False):
if {'sequence', 'amount_order'}.isdisjoint(constraints):
for_count = False
constraints['claimtrie.claim_hash__is_not_null'] = ''
if 'sequence' in constraints:
constraints['order_by'] = 'claim.activation_height ASC'
constraints['offset'] = int(constraints.pop('sequence')) - 1
constraints['limit'] = 1
if 'amount_order' in constraints:
constraints['order_by'] = 'claim.effective_amount DESC'
constraints['offset'] = int(constraints.pop('amount_order')) - 1
constraints['limit'] = 1
if 'claim_id' in constraints:
claim_id = constraints.pop('claim_id')
if len(claim_id) == 40:
constraints['claim.claim_id'] = claim_id
else:
constraints['claim.claim_id__like'] = f'{claim_id[:40]}%'
if 'name' in constraints:
constraints['claim.normalized'] = normalize_name(constraints.pop('name'))
if 'public_key_id' in constraints:
constraints['claim.public_key_hash'] = sqlite3.Binary(
self.ledger.address_to_hash160(constraints.pop('public_key_id')))
if 'channel' in constraints:
channel_url = constraints.pop('channel')
match = self._resolve_one(channel_url)
if isinstance(match, sqlite3.Row):
constraints['channel_hash'] = match['claim_hash']
else:
return [[0]] if cols == 'count(*)' else []
if 'channel_hash' in constraints:
constraints['claim.channel_hash'] = sqlite3.Binary(constraints.pop('channel_hash'))
if 'channel_ids' in constraints:
channel_ids = constraints.pop('channel_ids')
if channel_ids:
constraints['claim.channel_hash__in'] = [
sqlite3.Binary(unhexlify(cid)[::-1]) for cid in channel_ids
]
if 'not_channel_ids' in constraints:
not_channel_ids = constraints.pop('not_channel_ids')
if not_channel_ids:
not_channel_ids_binary = [
sqlite3.Binary(unhexlify(ncid)[::-1]) for ncid in not_channel_ids
]
if constraints.get('has_channel_signature', False):
constraints['claim.channel_hash__not_in'] = not_channel_ids_binary
else:
constraints['null_or_not_channel__or'] = {
'claim.signature_valid__is_null': True,
'claim.channel_hash__not_in': not_channel_ids_binary
}
if 'signature_valid' in constraints:
has_channel_signature = constraints.pop('has_channel_signature', False)
if has_channel_signature:
constraints['claim.signature_valid'] = constraints.pop('signature_valid')
else:
constraints['null_or_signature__or'] = {
'claim.signature_valid__is_null': True,
'claim.signature_valid': constraints.pop('signature_valid')
}
elif constraints.pop('has_channel_signature', False):
constraints['claim.signature_valid__is_not_null'] = True
if 'txid' in constraints:
tx_hash = unhexlify(constraints.pop('txid'))[::-1]
nout = constraints.pop('nout', 0)
constraints['claim.txo_hash'] = sqlite3.Binary(
tx_hash + struct.pack('<I', nout)
)
if 'claim_type' in constraints:
constraints['claim.claim_type'] = CLAIM_TYPES[constraints.pop('claim_type')]
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
if stream_types:
constraints['claim.stream_type__in'] = [
STREAM_TYPES[stream_type] for stream_type in stream_types
]
if 'media_types' in constraints:
media_types = constraints.pop('media_types')
if media_types:
constraints['claim.media_type__in'] = media_types
if 'fee_currency' in constraints:
constraints['claim.fee_currency'] = constraints.pop('fee_currency').lower()
_apply_constraints_for_array_attributes(constraints, 'tag', clean_tags, for_count)
_apply_constraints_for_array_attributes(constraints, 'language', lambda _: _, for_count)
_apply_constraints_for_array_attributes(constraints, 'location', lambda _: _, for_count)
select = f"SELECT {cols} FROM claim"
sql, values = query(
select if for_count else select+"""
LEFT JOIN claimtrie USING (claim_hash)
LEFT JOIN claim as channel ON (claim.channel_hash=channel.claim_hash)
""", **constraints
)
return self.db.execute(sql, values).fetchall()
def get_claims_count(self, **constraints):
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = self.get_claims('count(*)', for_count=True, **constraints)
return count[0][0]
def _search(self, **constraints):
return self.get_claims(
"""
claimtrie.claim_hash as is_controlling,
claimtrie.last_take_over_height,
claim.claim_hash, claim.txo_hash,
claim.claims_in_channel,
claim.height, claim.creation_height,
claim.activation_height, claim.expiration_height,
claim.effective_amount, claim.support_amount,
claim.trending_group, claim.trending_mixed,
claim.trending_local, claim.trending_global,
claim.short_url, claim.canonical_url,
claim.channel_hash, channel.txo_hash AS channel_txo_hash,
channel.height AS channel_height, claim.signature_valid
""", **constraints
)
INTEGER_PARAMS = {
'height', 'creation_height', 'activation_height', 'expiration_height',
'timestamp', 'creation_timestamp', 'release_time', 'fee_amount',
'tx_position', 'channel_join',
'amount', 'effective_amount', 'support_amount',
'trending_group', 'trending_mixed',
'trending_local', 'trending_global',
}
SEARCH_PARAMS = {
'name', 'claim_id', 'txid', 'nout', 'channel', 'channel_ids', 'not_channel_ids',
'public_key_id', 'claim_type', 'stream_types', 'media_types', 'fee_currency',
'has_channel_signature', 'signature_valid',
'any_tags', 'all_tags', 'not_tags',
'any_locations', 'all_locations', 'not_locations',
'any_languages', 'all_languages', 'not_languages',
'is_controlling', 'limit', 'offset', 'order_by',
'no_totals',
} | INTEGER_PARAMS
ORDER_FIELDS = {
'name',
} | INTEGER_PARAMS
def search(self, constraints) -> Tuple[List, List, int, int]:
assert set(constraints).issubset(self.SEARCH_PARAMS), \
f"Search query contains invalid arguments: {set(constraints).difference(self.SEARCH_PARAMS)}"
total = None
if not constraints.pop('no_totals', False):
total = self.get_claims_count(**constraints)
constraints['offset'] = abs(constraints.get('offset', 0))
constraints['limit'] = min(abs(constraints.get('limit', 10)), 50)
if 'order_by' not in constraints:
constraints['order_by'] = ["height", "^name"]
txo_rows = self._search(**constraints)
channel_hashes = set(txo['channel_hash'] for txo in txo_rows if txo['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return txo_rows, extra_txo_rows, constraints['offset'], total
def _resolve_one(self, raw_url):
try:
url = URL.parse(raw_url)
except ValueError as e:
return e
channel = None
if url.has_channel:
query = url.channel.to_dict()
if set(query) == {'name'}:
query['is_controlling'] = True
else:
query['order_by'] = ['^height']
matches = self._search(**query, limit=1)
if matches:
channel = matches[0]
else:
return LookupError(f'Could not find channel in "{raw_url}".')
if url.has_stream:
query = url.stream.to_dict()
if channel is not None:
if set(query) == {'name'}:
# temporarily emulate is_controlling for claims in channel
query['order_by'] = ['effective_amount']
else:
query['order_by'] = ['^channel_join']
query['channel_hash'] = channel['claim_hash']
query['signature_valid'] = 1
elif set(query) == {'name'}:
query['is_controlling'] = 1
matches = self._search(**query, limit=1)
if matches:
return matches[0]
else:
return LookupError(f'Could not find stream in "{raw_url}".')
return channel
def resolve(self, urls) -> Tuple[List, List]:
result = []
channel_hashes = set()
for raw_url in urls:
match = self._resolve_one(raw_url)
result.append(match)
if isinstance(match, sqlite3.Row) and match['channel_hash']:
channel_hashes.add(match['channel_hash'])
extra_txo_rows = []
if channel_hashes:
extra_txo_rows = self._search(**{'claim.claim_hash__in': [sqlite3.Binary(h) for h in channel_hashes]})
return result, extra_txo_rows
def run(self):
(call_queue, result_queue, _, initargs) = self._args # pylint: disable=W0632
db_path, ledger_name = initargs
self.ledger = MainNetLedger if ledger_name == 'mainnet' else RegTestLedger
self.db = sqlite3.connect(db_path, isolation_level=None)
self.db.row_factory = sqlite3.Row
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
try:
fn = getattr(self, call_item.args[0])
r = Outputs.to_base64(*fn(*call_item.args[1:]))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
_sendback_result(result_queue, call_item.work_id, result=r)
# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item
class QueryContext(BaseContext):
_name = get_start_method(False)
Process = QueryProcessor
class QueryExecutor(ProcessPoolExecutor):
def __init__(self, db_path, ledger_name, max_workers=None):
super().__init__(
max_workers=max_workers or max(os.cpu_count(), 4),
mp_context=QueryContext(),
initargs=(db_path, ledger_name)
)
async def resolve(self, urls):
return await asyncio.wrap_future(
self.submit(None, 'resolve', urls)
)
async def search(self, kwargs):
return await asyncio.wrap_future(
self.submit(None, 'search', kwargs)
)

View file

@ -1,16 +1,17 @@
import os
import math import math
import unicodedata as uda import base64
from binascii import unhexlify, hexlify import asyncio
from binascii import hexlify
from torba.rpc.jsonrpc import RPCError from torba.rpc.jsonrpc import RPCError
from torba.server.hash import hash_to_hex_str
from torba.server.session import ElectrumX, SessionManager from torba.server.session import ElectrumX, SessionManager
from torba.server import util from torba.server import util
from lbry.schema.url import URL
from lbry.wallet.server.block_processor import LBRYBlockProcessor from lbry.wallet.server.block_processor import LBRYBlockProcessor
from lbry.wallet.server.db import LBRYDB from lbry.wallet.server.db.writer import LBRYDB
from lbry.wallet.server.query import QueryExecutor from lbry.wallet.server.db import reader
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
class LBRYSessionManager(SessionManager): class LBRYSessionManager(SessionManager):
@ -20,7 +21,13 @@ class LBRYSessionManager(SessionManager):
self.query_executor = None self.query_executor = None
async def start_other(self): async def start_other(self):
self.query_executor = QueryExecutor('claims.db', self.env.coin.NET, self.env.max_query_workers) args = dict(initializer=reader.initializer, initargs=('claims.db', self.env.coin.NET))
if self.env.max_query_workers is not None and self.env.max_query_workers == 0:
self.query_executor = ThreadPoolExecutor(max_workers=1, **args)
else:
self.query_executor = ProcessPoolExecutor(
max_workers=self.env.max_query_workers or max(os.cpu_count(), 4), **args
)
async def stop_other(self): async def stop_other(self):
self.query_executor.shutdown() self.query_executor.shutdown()
@ -43,17 +50,7 @@ class LBRYElectrumX(ElectrumX):
'blockchain.transaction.get_height': self.transaction_get_height, 'blockchain.transaction.get_height': self.transaction_get_height,
'blockchain.claimtrie.search': self.claimtrie_search, 'blockchain.claimtrie.search': self.claimtrie_search,
'blockchain.claimtrie.resolve': self.claimtrie_resolve, 'blockchain.claimtrie.resolve': self.claimtrie_resolve,
'blockchain.claimtrie.getclaimbyid': self.claimtrie_getclaimbyid,
'blockchain.claimtrie.getclaimsforname': self.claimtrie_getclaimsforname,
'blockchain.claimtrie.getclaimsbyids': self.claimtrie_getclaimsbyids, 'blockchain.claimtrie.getclaimsbyids': self.claimtrie_getclaimsbyids,
'blockchain.claimtrie.getvalue': self.claimtrie_getvalue,
'blockchain.claimtrie.getnthclaimforname': self.claimtrie_getnthclaimforname,
'blockchain.claimtrie.getclaimsintx': self.claimtrie_getclaimsintx,
'blockchain.claimtrie.getclaimssignedby': self.claimtrie_getclaimssignedby,
'blockchain.claimtrie.getclaimssignedbynthtoname': self.claimtrie_getclaimssignedbynthtoname,
'blockchain.claimtrie.getvalueforuri': self.claimtrie_getvalueforuri,
'blockchain.claimtrie.getvaluesforuris': self.claimtrie_getvalueforuris,
'blockchain.claimtrie.getclaimssignedbyid': self.claimtrie_getclaimssignedbyid,
'blockchain.block.get_server_height': self.get_server_height, 'blockchain.block.get_server_height': self.get_server_height,
} }
self.request_handlers.update(handlers) self.request_handlers.update(handlers)
@ -61,10 +58,14 @@ class LBRYElectrumX(ElectrumX):
async def claimtrie_search(self, **kwargs): async def claimtrie_search(self, **kwargs):
if 'claim_id' in kwargs: if 'claim_id' in kwargs:
self.assert_claim_id(kwargs['claim_id']) self.assert_claim_id(kwargs['claim_id'])
return await self.session_mgr.query_executor.search(kwargs) return base64.b64encode(await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, reader.search_to_bytes, kwargs
)).decode()
async def claimtrie_resolve(self, *urls): async def claimtrie_resolve(self, *urls):
return await self.session_mgr.query_executor.resolve(urls) return base64.b64encode(await asyncio.get_running_loop().run_in_executor(
self.session_mgr.query_executor, reader.resolve_to_bytes, urls
)).decode()
async def get_server_height(self): async def get_server_height(self):
return self.bp.height return self.bp.height
@ -79,84 +80,9 @@ class LBRYElectrumX(ElectrumX):
return -1 return -1
return None return None
async def claimtrie_getclaimssignedby(self, name): async def claimtrie_getclaimsbyids(self, *claim_ids):
winning_claim = await self.daemon.getvalueforname(name) claims = await self.batched_formatted_claims_from_daemon(claim_ids)
if winning_claim: return dict(zip(claim_ids, claims))
return await self.claimtrie_getclaimssignedbyid(winning_claim['claimId'])
async def claimtrie_getclaimssignedbyid(self, certificate_id):
claim_ids = self.get_claim_ids_signed_by(certificate_id)
return await self.batched_formatted_claims_from_daemon(claim_ids)
def claimtrie_getclaimssignedbyidminimal(self, certificate_id):
claim_ids = self.get_claim_ids_signed_by(certificate_id)
ret = []
for claim_id in claim_ids:
raw_claim_id = unhexlify(claim_id)[::-1]
info = self.db.get_claim_info(raw_claim_id)
if info:
ret.append({
'claim_id': claim_id,
'height': info.height,
'name': info.name.decode()
})
return ret
def get_claim_ids_signed_by(self, certificate_id):
raw_certificate_id = unhexlify(certificate_id)[::-1]
raw_claim_ids = self.db.get_signed_claim_ids_by_cert_id(raw_certificate_id)
return list(map(hash_to_hex_str, raw_claim_ids))
async def claimtrie_getclaimssignedbynthtoname(self, name, n):
claim = self.claimtrie_getnthclaimforname(name, n)
if claim and 'claim_id' in claim:
return await self.claimtrie_getclaimssignedbyid(hash_to_hex_str(claim['claim_id']))
async def claimtrie_getclaimsintx(self, txid):
# TODO: this needs further discussion.
# Code on lbryum-server is wrong and we need to gather what we clearly expect from this command
claim_ids = [claim['claimId'] for claim in (await self.daemon.getclaimsfortx(txid)) if 'claimId' in claim]
return await self.batched_formatted_claims_from_daemon(claim_ids)
async def claimtrie_getvalue(self, name, block_hash=None):
proof = await self.daemon.getnameproof(name, block_hash)
result = {'proof': proof, 'supports': []}
if proof_has_winning_claim(proof):
tx_hash, nout = proof['txhash'], int(proof['nOut'])
transaction_info = await self.daemon.getrawtransaction(tx_hash, True)
result['transaction'] = transaction_info['hex'] # should have never included this (or the call to get it)
raw_claim_id = self.db.get_claim_id_from_outpoint(unhexlify(tx_hash)[::-1], nout)
claim_id = hexlify(raw_claim_id[::-1]).decode()
claim = await self.claimtrie_getclaimbyid(claim_id)
result.update(claim)
return result
async def claimtrie_getnthclaimforname(self, name, n):
n = int(n)
result = await self.claimtrie_getclaimsforname(name)
if 'claims' in result and len(result['claims']) > n >= 0:
# TODO: revist this after lbrycrd_#209 to see if we can sort by claim_sequence at this point
result['claims'].sort(key=lambda c: (int(c['height']), int(c['nout'])))
result['claims'][n]['claim_sequence'] = n
return result['claims'][n]
async def claimtrie_getpartialmatch(self, name, part):
result = await self.claimtrie_getclaimsforname(name)
if 'claims' in result:
return next(filter(lambda x: x['claim_id'].starts_with(part), result['claims']), None)
async def claimtrie_getclaimsforname(self, name):
claims = await self.daemon.getclaimsforname(name)
if claims:
claims['claims'] = [self.format_claim_from_daemon(claim, name) for claim in claims['claims']]
claims['supports_without_claims'] = [] # fixme temporary
del claims['supports without claims']
claims['last_takeover_height'] = claims['nLastTakeoverHeight']
del claims['nLastTakeoverHeight']
return claims
return {}
async def batched_formatted_claims_from_daemon(self, claim_ids): async def batched_formatted_claims_from_daemon(self, claim_ids):
claims = await self.daemon.getclaimsbyids(claim_ids) claims = await self.daemon.getclaimsbyids(claim_ids)
@ -215,19 +141,6 @@ class LBRYElectrumX(ElectrumX):
result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode() result['normalized_name'] = claim['normalized_name'].encode('ISO-8859-1').decode()
return result return result
def format_supports_from_daemon(self, supports):
return [[support['txid'], support['n'], get_from_possible_keys(support, 'amount', 'nAmount')] for
support in supports]
async def claimtrie_getclaimbyid(self, claim_id):
self.assert_claim_id(claim_id)
claim = await self.daemon.getclaimbyid(claim_id)
return self.format_claim_from_daemon(claim)
async def claimtrie_getclaimsbyids(self, *claim_ids):
claims = await self.batched_formatted_claims_from_daemon(claim_ids)
return dict(zip(claim_ids, claims))
def assert_tx_hash(self, value): def assert_tx_hash(self, value):
'''Raise an RPCError if the value is not a valid transaction '''Raise an RPCError if the value is not a valid transaction
hash.''' hash.'''
@ -248,118 +161,6 @@ class LBRYElectrumX(ElectrumX):
pass pass
raise RPCError(1, f'{value} should be a claim id hash') raise RPCError(1, f'{value} should be a claim id hash')
def normalize_name(self, name):
# this is designed to match lbrycrd; change it here if it changes there
return uda.normalize('NFD', name).casefold()
def claim_matches_name(self, claim, name):
if not name:
return False
if 'normalized_name' in claim:
return self.normalize_name(name) == claim['normalized_name']
return name == claim['name']
async def claimtrie_getvalueforuri(self, block_hash, uri, known_certificates=None):
# TODO: this thing is huge, refactor
CLAIM_ID = "claim_id"
WINNING = "winning"
SEQUENCE = "sequence"
uri = uri
block_hash = block_hash
try:
parsed_uri = URL.parse(uri)
except ValueError as err:
return {'error': err.args[0]}
result = {}
if parsed_uri.has_channel:
certificate = None
# TODO: this is also done on the else, refactor
if parsed_uri.channel.claim_id:
if len(parsed_uri.channel.claim_id) < 40:
certificate_info = self.claimtrie_getpartialmatch(
parsed_uri.channel.name, parsed_uri.channel.claim_id)
else:
certificate_info = await self.claimtrie_getclaimbyid(parsed_uri.channel.claim_id)
if certificate_info and self.claim_matches_name(certificate_info, parsed_uri.channel.name):
certificate = {'resolution_type': CLAIM_ID, 'result': certificate_info}
elif parsed_uri.claim_sequence:
certificate_info = await self.claimtrie_getnthclaimforname(parsed_uri.name, parsed_uri.claim_sequence)
if certificate_info:
certificate = {'resolution_type': SEQUENCE, 'result': certificate_info}
else:
certificate_info = await self.claimtrie_getvalue(parsed_uri.name, block_hash)
if certificate_info:
certificate = {'resolution_type': WINNING, 'result': certificate_info}
if certificate and 'claim_id' not in certificate['result']:
return result
if certificate:
result['certificate'] = certificate
channel_id = certificate['result']['claim_id']
claims_in_channel = self.claimtrie_getclaimssignedbyidminimal(channel_id)
if not parsed_uri.path:
result['unverified_claims_in_channel'] = {claim['claim_id']: (claim['name'], claim['height'])
for claim in claims_in_channel}
else:
# making an assumption that there aren't case conflicts on an existing channel
norm_path = self.normalize_name(parsed_uri.path)
result['unverified_claims_for_name'] = {claim['claim_id']: (claim['name'], claim['height'])
for claim in claims_in_channel
if self.normalize_name(claim['name']) == norm_path}
else:
claim = None
if parsed_uri.claim_id:
if len(parsed_uri.claim_id) < 40:
claim_info = self.claimtrie_getpartialmatch(parsed_uri.name, parsed_uri.claim_id)
else:
claim_info = await self.claimtrie_getclaimbyid(parsed_uri.claim_id)
if claim_info and self.claim_matches_name(claim_info, parsed_uri.name):
claim = {'resolution_type': CLAIM_ID, 'result': claim_info}
elif parsed_uri.claim_sequence:
claim_info = await self.claimtrie_getnthclaimforname(parsed_uri.name, parsed_uri.claim_sequence)
if claim_info:
claim = {'resolution_type': SEQUENCE, 'result': claim_info}
else:
claim_info = await self.claimtrie_getvalue(parsed_uri.name, block_hash)
if claim_info:
claim = {'resolution_type': WINNING, 'result': claim_info}
if (claim and
# is not an unclaimed winning name
(claim['resolution_type'] != WINNING or proof_has_winning_claim(claim['result']['proof']))):
raw_claim_id = unhexlify(claim['result']['claim_id'])[::-1]
raw_certificate_id = self.db.get_claim_info(raw_claim_id).cert_id
if raw_certificate_id:
certificate_id = hash_to_hex_str(raw_certificate_id)
certificate = await self.claimtrie_getclaimbyid(certificate_id)
if certificate:
certificate = {'resolution_type': CLAIM_ID,
'result': certificate}
result['certificate'] = certificate
result['claim'] = claim
return result
async def claimtrie_getvalueforuris(self, block_hash, *uris):
MAX_BATCH_URIS = 500
if len(uris) > MAX_BATCH_URIS:
raise Exception("Exceeds max batch uris of {}".format(MAX_BATCH_URIS))
return {uri: await self.claimtrie_getvalueforuri(block_hash, uri) for uri in uris}
# TODO: get it all concurrently when lbrycrd pending changes goes into a stable release
#async def getvalue(uri):
# value = await self.claimtrie_getvalueforuri(block_hash, uri)
# return uri, value,
#return dict([await asyncio.gather(*tuple(getvalue(uri) for uri in uris))][0])
def proof_has_winning_claim(proof):
return {'txhash', 'nOut'}.issubset(proof.keys())
def get_from_possible_keys(dictionary, *keys): def get_from_possible_keys(dictionary, *keys):
for key in keys: for key in keys:

View file

@ -5,10 +5,10 @@ from binascii import hexlify
from torba.client.constants import COIN, NULL_HASH32 from torba.client.constants import COIN, NULL_HASH32
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.wallet.server.db import SQLDB from lbry.wallet.server.db import reader, writer
from lbry.wallet.server.coin import LBCRegTest from lbry.wallet.server.coin import LBCRegTest
from lbry.wallet.server.trending import TRENDING_WINDOW from lbry.wallet.server.db.trending import TRENDING_WINDOW
from lbry.wallet.server.canonical import FindShortestID from lbry.wallet.server.db.canonical import FindShortestID
from lbry.wallet.server.block_processor import Timer from lbry.wallet.server.block_processor import Timer
from lbry.wallet.transaction import Transaction, Input, Output from lbry.wallet.transaction import Transaction, Input, Output
@ -41,7 +41,11 @@ class TestSQLDB(unittest.TestCase):
self.first_sync = False self.first_sync = False
self.daemon_height = 1 self.daemon_height = 1
self.coin = LBCRegTest() self.coin = LBCRegTest()
self.sql = SQLDB(self, ':memory:') db_url = 'file:test_sqldb?mode=memory&cache=shared'
self.sql = writer.SQLDB(self, db_url)
self.addCleanup(self.sql.close)
reader.initializer(db_url, 'regtest')
self.addCleanup(reader.cleanup)
self.timer = Timer('BlockProcessor') self.timer = Timer('BlockProcessor')
self.sql.open() self.sql.open()
self._current_height = 0 self._current_height = 0
@ -339,7 +343,7 @@ class TestClaimtrie(TestSQLDB):
txo_chan_a = tx_chan_a[0].tx.outputs[0] txo_chan_a = tx_chan_a[0].tx.outputs[0]
advance(1, [tx_chan_a]) advance(1, [tx_chan_a])
advance(2, [tx_chan_ab]) advance(2, [tx_chan_ab])
r_ab, r_a = self.sql._search(order_by=['creation_height'], limit=2) r_ab, r_a = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual("@foo#a", r_a['short_url']) self.assertEqual("@foo#a", r_a['short_url'])
self.assertEqual("@foo#ab", r_ab['short_url']) self.assertEqual("@foo#ab", r_ab['short_url'])
self.assertIsNone(r_a['canonical_url']) self.assertIsNone(r_a['canonical_url'])
@ -352,7 +356,7 @@ class TestClaimtrie(TestSQLDB):
tx_abc = self.get_stream_with_claim_id_prefix('abc', 65) tx_abc = self.get_stream_with_claim_id_prefix('abc', 65)
advance(3, [tx_a]) advance(3, [tx_a])
advance(4, [tx_ab, tx_abc]) advance(4, [tx_ab, tx_abc])
r_abc, r_ab, r_a = self.sql._search(order_by=['creation_height', 'tx_position'], limit=3) r_abc, r_ab, r_a = reader._search(order_by=['creation_height', 'tx_position'], limit=3)
self.assertEqual("foo#a", r_a['short_url']) self.assertEqual("foo#a", r_a['short_url'])
self.assertEqual("foo#ab", r_ab['short_url']) self.assertEqual("foo#ab", r_ab['short_url'])
self.assertEqual("foo#abc", r_abc['short_url']) self.assertEqual("foo#abc", r_abc['short_url'])
@ -366,39 +370,39 @@ class TestClaimtrie(TestSQLDB):
ab2_claim_id = tx_ab2[0].tx.outputs[0].claim_id ab2_claim_id = tx_ab2[0].tx.outputs[0].claim_id
advance(6, [tx_a2]) advance(6, [tx_a2])
advance(7, [tx_ab2]) advance(7, [tx_ab2])
r_ab2, r_a2 = self.sql._search(order_by=['creation_height'], limit=2) r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# change channel public key, invaliding stream claim signatures # change channel public key, invaliding stream claim signatures
advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')]) advance(8, [self.get_channel_update(txo_chan_a, COIN, key=b'a')])
r_ab2, r_a2 = self.sql._search(order_by=['creation_height'], limit=2) r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url'])
self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_a2['canonical_url'])
self.assertIsNone(r_ab2['canonical_url']) self.assertIsNone(r_ab2['canonical_url'])
self.assertEqual(0, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) self.assertEqual(0, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# reinstate previous channel public key (previous stream claim signatures become valid again) # reinstate previous channel public key (previous stream claim signatures become valid again)
channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c') channel_update = self.get_channel_update(txo_chan_a, COIN, key=b'c')
advance(9, [channel_update]) advance(9, [channel_update])
r_ab2, r_a2 = self.sql._search(order_by=['creation_height'], limit=2) r_ab2, r_a2 = reader._search(order_by=['creation_height'], limit=2)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url']) self.assertEqual(f"foo#{ab2_claim_id[:4]}", r_ab2['short_url'])
self.assertEqual("@foo#a/foo#a", r_a2['canonical_url']) self.assertEqual("@foo#a/foo#a", r_a2['canonical_url'])
self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url']) self.assertEqual("@foo#a/foo#ab", r_ab2['canonical_url'])
self.assertEqual(2, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) self.assertEqual(2, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# claim abandon updates claims_in_channel # claim abandon updates claims_in_channel
advance(10, [self.get_abandon(tx_ab2)]) advance(10, [self.get_abandon(tx_ab2)])
self.assertEqual(1, self.sql._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel']) self.assertEqual(1, reader._search(claim_id=txo_chan_a.claim_id, limit=1)[0]['claims_in_channel'])
# delete channel, invaliding stream claim signatures # delete channel, invaliding stream claim signatures
advance(11, [self.get_abandon(channel_update)]) advance(11, [self.get_abandon(channel_update)])
r_a2, = self.sql._search(order_by=['creation_height'], limit=1) r_a2, = reader._search(order_by=['creation_height'], limit=1)
self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url']) self.assertEqual(f"foo#{a2_claim_id[:2]}", r_a2['short_url'])
self.assertIsNone(r_a2['canonical_url']) self.assertIsNone(r_a2['canonical_url'])
@ -436,7 +440,7 @@ class TestTrending(TestSQLDB):
self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN), self.get_support(up_medium, (20+(window*(2 if window == 7 else 1)))*COIN),
self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN), self.get_support(up_biggly, (20+(window*(3 if window == 7 else 1)))*COIN),
]) ])
results = self.sql._search(order_by=['trending_local']) results = reader._search(order_by=['trending_local'])
self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results]) self.assertEqual([c.claim_id for c in claims], [hexlify(c['claim_hash'][::-1]).decode() for c in results])
self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results]) self.assertEqual([10, 6, 2, 0, -2], [int(c['trending_local']) for c in results])
self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results]) self.assertEqual([53, 38, -32, 0, -6], [int(c['trending_global']) for c in results])

View file

@ -204,7 +204,7 @@ class SPVNode:
'REORG_LIMIT': '100', 'REORG_LIMIT': '100',
'HOST': self.hostname, 'HOST': self.hostname,
'TCP_PORT': str(self.port), 'TCP_PORT': str(self.port),
'MAX_QUERY_WORKERS': '1' 'MAX_QUERY_WORKERS': '0'
} }
# TODO: don't use os.environ # TODO: don't use os.environ
os.environ.update(conf) os.environ.update(conf)