From 46da2584ca4553083457dd96174c1914ca446c09 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Sun, 21 Jun 2020 19:51:09 -0400 Subject: [PATCH] db --- lbry/db/constants.py | 2 +- lbry/db/database.py | 12 ++++-- lbry/db/queries.py | 83 +++++++++++++++++++++++++++------------- lbry/db/query_context.py | 58 ++++++++++++++-------------- lbry/db/sync.py | 7 +++- lbry/db/tables.py | 33 +++++++++------- 6 files changed, 118 insertions(+), 77 deletions(-) diff --git a/lbry/db/constants.py b/lbry/db/constants.py index 704d47028..217edcedd 100644 --- a/lbry/db/constants.py +++ b/lbry/db/constants.py @@ -60,5 +60,5 @@ SEARCH_PARAMS = { } | SEARCH_INTEGER_PARAMS SEARCH_ORDER_FIELDS = { - 'name', 'claim_hash' + 'name', 'claim_hash', 'claim_id' } | SEARCH_INTEGER_PARAMS diff --git a/lbry/db/database.py b/lbry/db/database.py index f1e1567d1..70beed89d 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -2,7 +2,7 @@ import os import asyncio import tempfile import multiprocessing as mp -from typing import List, Optional, Iterable, Iterator, TypeVar, Generic, TYPE_CHECKING +from typing import List, Optional, Iterable, Iterator, TypeVar, Generic, TYPE_CHECKING, Dict from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor from functools import partial @@ -11,7 +11,7 @@ from sqlalchemy import create_engine, text from lbry.event import EventController from lbry.crypto.bip32 import PubKey from lbry.blockchain.transaction import Transaction, Output -from .constants import TXO_TYPES +from .constants import TXO_TYPES, CLAIM_TYPE_CODES from .query_context import initialize, ProgressPublisher from . import queries as q from . import sync @@ -277,6 +277,12 @@ class Database: claims, total, censor = await self.run_in_executor(q.search_claims, **constraints) return Result(claims, total, censor) + async def search_supports(self, **constraints) -> Result[Output]: + return await self.fetch_result(q.search_supports, **constraints) + + async def resolve(self, *urls) -> Dict[str, Output]: + return await self.run_in_executor(q.resolve, *urls) + async def get_txo_sum(self, **constraints) -> int: return await self.run_in_executor(q.get_txo_sum, **constraints) @@ -297,7 +303,7 @@ class Database: async def get_claims(self, **constraints) -> Result[Output]: if 'txo_type' not in constraints: - constraints['txo_type'] = CLAIM_TYPES + constraints['txo_type__in'] = CLAIM_TYPE_CODES txos = await self.fetch_result(q.get_txos, **constraints) if 'wallet' in constraints: await add_channel_keys_to_txo_results(constraints['wallet'].accounts, txos) diff --git a/lbry/db/queries.py b/lbry/db/queries.py index c2251939c..e8a29b42e 100644 --- a/lbry/db/queries.py +++ b/lbry/db/queries.py @@ -357,8 +357,9 @@ def select_txos( META_ATTRS = ( - 'activation_height', 'takeover_height', 'support_amount', 'creation_height', - 'short_url', 'canonical_url', 'claims_in_channel_count', 'supports_in_claim_count', + 'activation_height', 'takeover_height', 'creation_height', 'staked_amount', + 'short_url', 'canonical_url', 'staked_support_amount', 'staked_support_count', + 'signed_claim_count', 'signed_support_count', 'is_signature_valid', ) @@ -430,10 +431,6 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp (TXI.c.address.in_(my_accounts)) ).label('is_my_input')) - spent = TXI.alias('spent') - if include_is_spent: - select_columns.append((spent.c.txo_hash != None).label('is_spent')) - if include_received_tips: support = TXO.alias('support') select_columns.append( @@ -453,7 +450,7 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp elif constraints.get('order_by', None) == 'none': del constraints['order_by'] - rows = context().fetchall(select_txos(select_columns, spent=spent, **constraints)) + rows = context().fetchall(select_txos(select_columns, **constraints)) txos = rows_to_txos(rows, not no_tx) channel_hashes = set() @@ -529,6 +526,36 @@ def get_txo_plot(start_day=None, days_back=0, end_day=None, days_after=None, **c return plot +BASE_SELECT_SUPPORT_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ + Support.c.channel_hash, + Support.c.is_signature_valid, +] + + +def select_supports(cols: List = None, **constraints) -> Select: + if cols is None: + cols = BASE_SELECT_SUPPORT_COLUMNS + joins = Support.join(TXO, ).join(TX) + return query([Support], select(*cols).select_from(joins), **constraints) + + +def search_supports(**constraints) -> Tuple[List[Output], Optional[int]]: + total = None + if not constraints.pop('no_totals', False): + total = search_support_count(**constraints) + rows = context().fetchall(select_supports(**constraints)) + txos = rows_to_txos(rows, include_tx=False) + return txos, total + + +def search_support_count(**constraints) -> int: + constraints.pop('offset', None) + constraints.pop('limit', None) + constraints.pop('order_by', None) + count = context().fetchall(select_supports([func.count().label('total')], **constraints)) + return count[0]['total'] or 0 + + BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ Claim.c.activation_height, Claim.c.takeover_height, @@ -538,9 +565,12 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [ Claim.c.reposted_claim_hash, Claim.c.short_url, Claim.c.canonical_url, - Claim.c.claims_in_channel_count, - Claim.c.support_amount, - Claim.c.supports_in_claim_count, + Claim.c.signed_claim_count, + Claim.c.signed_support_count, + (Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'), + Claim.c.staked_support_amount, + Claim.c.staked_support_count, + Claim.c.is_signature_valid, ] @@ -650,7 +680,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select: claim_types = [claim_types] if claim_types: constraints['claim_type__in'] = { - CLAIM_TYPE_CODES[claim_type] for claim_type in claim_types + TXO_TYPES[claim_type] for claim_type in claim_types } if 'stream_types' in constraints: stream_types = constraints.pop('stream_types') @@ -797,13 +827,14 @@ def get_supports_summary(self, **constraints): ) -def resolve(urls) -> Tuple[List, List]: - txo_rows = [resolve_url(raw_url) for raw_url in urls] - extra_txo_rows = _get_referenced_rows( - [txo for txo in txo_rows if isinstance(txo, dict)], - [txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)] - ) - return txo_rows, extra_txo_rows +def resolve(*urls) -> Dict[str, Output]: + return {url: resolve_url(url) for url in urls} + #txo_rows = [resolve_url(raw_url) for raw_url in urls] + #extra_txo_rows = _get_referenced_rows( + # [txo for txo in txo_rows if isinstance(txo, dict)], + # [txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)] + #) + #return txo_rows, extra_txo_rows def resolve_url(raw_url): @@ -822,7 +853,8 @@ def resolve_url(raw_url): q['is_controlling'] = True else: q['order_by'] = ['^creation_height'] - matches = search_claims(censor, **q, limit=1) + #matches = search_claims(censor, **q, limit=1) + matches = search_claims(**q, limit=1)[0] if matches: channel = matches[0] elif censor.censored: @@ -833,16 +865,13 @@ def resolve_url(raw_url): if url.has_stream: q = url.stream.to_dict() if channel is not None: - if set(q) == {'name'}: - # temporarily emulate is_controlling for claims in channel - q['order_by'] = ['effective_amount', '^height'] - else: - q['order_by'] = ['^channel_join'] - q['channel_hash'] = channel['claim_hash'] - q['signature_valid'] = 1 + q['order_by'] = ['^creation_height'] + q['channel_hash'] = channel.claim_hash + q['is_signature_valid'] = 1 elif set(q) == {'name'}: q['is_controlling'] = 1 - matches = search_claims(censor, **q, limit=1) + # matches = search_claims(censor, **q, limit=1) + matches = search_claims(**q, limit=1)[0] if matches: return matches[0] elif censor.censored: diff --git a/lbry/db/query_context.py b/lbry/db/query_context.py index 1df4d79d4..0f5aaa2d5 100644 --- a/lbry/db/query_context.py +++ b/lbry/db/query_context.py @@ -182,13 +182,12 @@ class Event(Enum): BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS BLOCK_DONE = "blockchain.sync.block.done", ProgressUnit.TASKS + CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.TAKEOVERS CLAIM_META = "blockchain.sync.claim.update", ProgressUnit.CLAIMS CLAIM_CALC = "blockchain.sync.claim.totals", ProgressUnit.CLAIMS - CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.TAKEOVERS CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS SUPPORT_META = "blockchain.sync.support.update", ProgressUnit.SUPPORTS SUPPORT_SIGN = "blockchain.sync.support.signatures", ProgressUnit.SUPPORTS - CHANNEL_SIGN = "blockchain.sync.channel.signatures", ProgressUnit.CLAIMS TRENDING_CALC = "blockchain.sync.trending", ProgressUnit.BLOCKS TAKEOVER_INSERT = "blockchain.sync.takeover.insert", ProgressUnit.TAKEOVERS @@ -261,8 +260,6 @@ class ProgressContext: def step(self, done): send_condition = ( - # no-op - done != 0 and self.total != 0 and # enforce step rate (self.step_size == 1 or done % self.step_size == 0) and # deduplicate finish event by not sending a step where done == total @@ -345,27 +342,36 @@ class BulkLoader: 'address': txo.get_address(self.ledger) if txo.has_address else None, 'position': txo.position, 'amount': txo.amount, + 'height': tx.height, 'script_offset': txo.script.offset, 'script_length': txo.script.length, 'txo_type': 0, 'claim_id': None, 'claim_hash': None, 'claim_name': None, - 'reposted_claim_hash': None, 'channel_hash': None, + 'public_key': None, + 'public_key_hash': None } if txo.is_claim: if txo.can_decode_claim: claim = txo.claim row['txo_type'] = TXO_TYPES.get(claim.claim_type, TXO_TYPES['stream']) - if claim.is_repost: - row['reposted_claim_hash'] = claim.repost.reference.claim_hash if claim.is_signed: row['channel_hash'] = claim.signing_channel_hash + if claim.is_channel: + row['public_key'] = claim.channel.public_key_bytes + row['public_key_hash'] = self.ledger.address_to_hash160( + self.ledger.public_key_to_address(claim.channel.public_key_bytes) + ) else: row['txo_type'] = TXO_TYPES['stream'] elif txo.is_support: row['txo_type'] = TXO_TYPES['support'] + if txo.can_decode_support: + claim = txo.support + if claim.is_signed: + row['channel_hash'] = claim.signing_channel_hash elif txo.purchase is not None: row['txo_type'] = TXO_TYPES['purchase'] row['claim_id'] = txo.purchased_claim_id @@ -401,11 +407,10 @@ class BulkLoader: 'normalized': txo.normalized_name, 'address': txo.get_address(self.ledger), 'txo_hash': txo.ref.hash, - 'tx_position': tx.position, 'amount': txo.amount, - 'height': tx.height, 'timestamp': tx.timestamp, 'release_time': None, + 'height': tx.height, 'title': None, 'author': None, 'description': None, @@ -413,18 +418,16 @@ class BulkLoader: # streams 'stream_type': None, 'media_type': None, - 'fee_currency': None, 'fee_amount': 0, + 'fee_currency': None, 'duration': None, # reposts 'reposted_claim_hash': None, - # claims which are channels - 'public_key': None, - 'public_key_hash': None, # signed claims 'channel_hash': None, 'signature': None, 'signature_digest': None, + 'is_signature_valid': None, } try: @@ -435,8 +438,8 @@ class BulkLoader: if claim.is_stream: claim_record['claim_type'] = TXO_TYPES['stream'] - claim_record['media_type'] = claim.stream.source.media_type claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])] + claim_record['media_type'] = claim.stream.source.media_type claim_record['title'] = claim.stream.title claim_record['description'] = claim.stream.description claim_record['author'] = claim.stream.author @@ -457,10 +460,6 @@ class BulkLoader: claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash elif claim.is_channel: claim_record['claim_type'] = TXO_TYPES['channel'] - claim_record['public_key'] = claim.channel.public_key_bytes - claim_record['public_key_hash'] = self.ledger.address_to_hash160( - self.ledger.public_key_to_address(claim.channel.public_key_bytes) - ) if claim.is_signed: claim_record['channel_hash'] = claim.signing_channel_hash claim_record['signature'] = txo.get_encoded_signature() @@ -496,12 +495,15 @@ class BulkLoader: tx = txo.tx_ref.tx claim_hash = txo.claim_hash support_record = { + 'txo_hash': txo.ref.hash, 'claim_hash': claim_hash, 'address': txo.get_address(self.ledger), - 'txo_hash': txo.ref.hash, - 'tx_position': tx.position, 'amount': txo.amount, 'height': tx.height, + 'emoji': None, + 'channel_hash': None, + 'signature': None, + 'signature_digest': None, } self.supports.append(support_record) support = txo.can_decode_support @@ -509,12 +511,13 @@ class BulkLoader: support_record['emoji'] = support.emoji if support.is_signed: support_record['channel_hash'] = support.signing_channel_hash + support_record['signature'] = txo.get_encoded_signature() + support_record['signature_digest'] = txo.get_signature_digest(None) def add_claim(self, txo: Output): claim, tags = self.claim_to_rows(txo) if claim: tx = txo.tx_ref.tx - claim['public_key_height'] = tx.height if txo.script.is_claim_name: claim['creation_height'] = tx.height claim['creation_timestamp'] = tx.timestamp @@ -525,9 +528,9 @@ class BulkLoader: def update_claim(self, txo: Output): claim, tags = self.claim_to_rows(txo) if claim: - claim['claim_hash_pk'] = claim.pop('claim_hash') + claim['claim_hash_'] = claim.pop('claim_hash') self.update_claims.append(claim) - self.delete_tags.append({'claim_hash_pk': claim['claim_hash_pk']}) + self.delete_tags.append({'claim_hash_': claim['claim_hash_']}) self.tags.extend(tags) return self @@ -538,13 +541,8 @@ class BulkLoader: (TXO.insert(), self.txos), (TXI.insert(), self.txis), (Claim.insert(), self.claims), - (Tag.delete().where(Tag.c.claim_hash == bindparam('claim_hash_pk')), self.delete_tags), - (Claim.update() - .where(Claim.c.claim_hash == bindparam('claim_hash_pk')) - .values(public_key_height=case( - [(Claim.c.public_key_hash != bindparam('public_key_hash'), bindparam('height'))], - else_=Claim.c.public_key_height - )), self.update_claims), + (Tag.delete().where(Tag.c.claim_hash == bindparam('claim_hash_')), self.delete_tags), + (Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')), self.update_claims), (Tag.insert(), self.tags), (Support.insert(), self.supports), ) diff --git a/lbry/db/sync.py b/lbry/db/sync.py index 74aa35e22..548df13d6 100644 --- a/lbry/db/sync.py +++ b/lbry/db/sync.py @@ -12,7 +12,8 @@ from lbry.db.tables import ( def process_all_things_after_sync(): process_inputs_outputs() process_supports() - process_claims() + process_claim_deletes() + process_claim_changes() def process_inputs_outputs(): @@ -109,12 +110,14 @@ def process_supports(): loader.save() -def process_claims(): +def process_claim_deletes(): with progress(Event.CLAIM_DELETE) as p: p.start(1) sql = Claim.delete().where(condition_spent_claims()) p.ctx.execute(sql) + +def process_claim_changes(): with progress(Event.CLAIM_INSERT) as p: loader = p.ctx.get_bulk_loader() for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)): diff --git a/lbry/db/tables.py b/lbry/db/tables.py index 4b16cb015..10a310ba3 100644 --- a/lbry/db/tables.py +++ b/lbry/db/tables.py @@ -70,14 +70,22 @@ TXO = Table( Column('address', Text), Column('position', SmallInteger), Column('amount', BigInteger), + Column('height', Integer), Column('script_offset', Integer), Column('script_length', Integer), Column('is_spent', Boolean, server_default='0'), Column('is_reserved', Boolean, server_default='0'), + + # claims Column('txo_type', SmallInteger, server_default='0'), Column('claim_id', Text, nullable=True), Column('claim_hash', LargeBinary, nullable=True), Column('claim_name', Text, nullable=True), + Column('channel_hash', LargeBinary, nullable=True), # claims in channel + + # channels + Column('public_key', LargeBinary, nullable=True), + Column('public_key_hash', LargeBinary, nullable=True), ) txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address) @@ -102,17 +110,17 @@ Claim = Table( Column('normalized', Text), Column('address', Text), Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash)), - Column('tx_position', SmallInteger), Column('amount', BigInteger), + Column('staked_amount', BigInteger, server_default='0'), Column('timestamp', Integer), # last updated timestamp Column('creation_timestamp', Integer), + Column('release_time', Integer, nullable=True), Column('height', Integer), # last updated height Column('creation_height', Integer), Column('activation_height', Integer, nullable=True), Column('expiration_height', Integer, nullable=True), Column('takeover_height', Integer, nullable=True), Column('is_controlling', Boolean, server_default='0'), - Column('release_time', Integer, nullable=True), # normalized#shortest-unique-claim_id Column('short_url', Text, nullable=True), @@ -125,7 +133,8 @@ Claim = Table( Column('claim_type', SmallInteger), Column('claim_reposted_count', Integer, server_default='0'), - Column('supports_in_claim_count', Integer, server_default='0'), + Column('staked_support_count', Integer, server_default='0'), + Column('staked_support_amount', BigInteger, server_default='0'), # streams Column('stream_type', Text, nullable=True), @@ -138,18 +147,15 @@ Claim = Table( Column('reposted_claim_hash', LargeBinary, nullable=True), # claims which are channels - Column('public_key', LargeBinary, nullable=True), - Column('public_key_hash', LargeBinary, nullable=True), - Column('public_key_height', Integer, server_default='0'), # height at which public key was last changed - Column('claims_in_channel_count', Integer, server_default='0'), + Column('signed_claim_count', Integer, server_default='0'), + Column('signed_support_count', Integer, server_default='0'), # claims which are inside channels Column('channel_hash', LargeBinary, nullable=True), Column('signature', LargeBinary, nullable=True), Column('signature_digest', LargeBinary, nullable=True), - Column('is_signature_valid', Boolean, server_default='0'), + Column('is_signature_valid', Boolean, nullable=True), - Column('support_amount', BigInteger, server_default='0'), Column('trending_group', BigInteger, server_default='0'), Column('trending_mixed', BigInteger, server_default='0'), Column('trending_local', BigInteger, server_default='0'), @@ -168,13 +174,12 @@ Support = Table( 'support', metadata, Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True), - Column('claim_hash', LargeBinary, ForeignKey(TXO.columns.claim_hash)), + Column('claim_hash', LargeBinary), Column('address', Text), - Column('tx_position', SmallInteger), - Column('activation_height', Integer, nullable=True), - Column('expiration_height', Integer, nullable=True), Column('amount', BigInteger), Column('height', Integer), + Column('activation_height', Integer, nullable=True), + Column('expiration_height', Integer, nullable=True), # support metadata Column('emoji', Text), @@ -183,7 +188,7 @@ Support = Table( Column('channel_hash', LargeBinary, nullable=True), Column('signature', LargeBinary, nullable=True), Column('signature_digest', LargeBinary, nullable=True), - Column('is_signature_valid', Boolean, server_default='0'), + Column('is_signature_valid', Boolean, nullable=True), )