This commit is contained in:
Lex Berezhny 2020-06-21 19:51:09 -04:00
parent 53b7d0a58b
commit 46da2584ca
6 changed files with 118 additions and 77 deletions

View file

@ -60,5 +60,5 @@ SEARCH_PARAMS = {
} | SEARCH_INTEGER_PARAMS
SEARCH_ORDER_FIELDS = {
'name', 'claim_hash'
'name', 'claim_hash', 'claim_id'
} | SEARCH_INTEGER_PARAMS

View file

@ -2,7 +2,7 @@ import os
import asyncio
import tempfile
import multiprocessing as mp
from typing import List, Optional, Iterable, Iterator, TypeVar, Generic, TYPE_CHECKING
from typing import List, Optional, Iterable, Iterator, TypeVar, Generic, TYPE_CHECKING, Dict
from concurrent.futures import Executor, ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
@ -11,7 +11,7 @@ from sqlalchemy import create_engine, text
from lbry.event import EventController
from lbry.crypto.bip32 import PubKey
from lbry.blockchain.transaction import Transaction, Output
from .constants import TXO_TYPES
from .constants import TXO_TYPES, CLAIM_TYPE_CODES
from .query_context import initialize, ProgressPublisher
from . import queries as q
from . import sync
@ -277,6 +277,12 @@ class Database:
claims, total, censor = await self.run_in_executor(q.search_claims, **constraints)
return Result(claims, total, censor)
async def search_supports(self, **constraints) -> Result[Output]:
return await self.fetch_result(q.search_supports, **constraints)
async def resolve(self, *urls) -> Dict[str, Output]:
return await self.run_in_executor(q.resolve, *urls)
async def get_txo_sum(self, **constraints) -> int:
return await self.run_in_executor(q.get_txo_sum, **constraints)
@ -297,7 +303,7 @@ class Database:
async def get_claims(self, **constraints) -> Result[Output]:
if 'txo_type' not in constraints:
constraints['txo_type'] = CLAIM_TYPES
constraints['txo_type__in'] = CLAIM_TYPE_CODES
txos = await self.fetch_result(q.get_txos, **constraints)
if 'wallet' in constraints:
await add_channel_keys_to_txo_results(constraints['wallet'].accounts, txos)

View file

@ -357,8 +357,9 @@ def select_txos(
META_ATTRS = (
'activation_height', 'takeover_height', 'support_amount', 'creation_height',
'short_url', 'canonical_url', 'claims_in_channel_count', 'supports_in_claim_count',
'activation_height', 'takeover_height', 'creation_height', 'staked_amount',
'short_url', 'canonical_url', 'staked_support_amount', 'staked_support_count',
'signed_claim_count', 'signed_support_count', 'is_signature_valid',
)
@ -430,10 +431,6 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp
(TXI.c.address.in_(my_accounts))
).label('is_my_input'))
spent = TXI.alias('spent')
if include_is_spent:
select_columns.append((spent.c.txo_hash != None).label('is_spent'))
if include_received_tips:
support = TXO.alias('support')
select_columns.append(
@ -453,7 +450,7 @@ def get_txos(no_tx=False, include_total=False, **constraints) -> Tuple[List[Outp
elif constraints.get('order_by', None) == 'none':
del constraints['order_by']
rows = context().fetchall(select_txos(select_columns, spent=spent, **constraints))
rows = context().fetchall(select_txos(select_columns, **constraints))
txos = rows_to_txos(rows, not no_tx)
channel_hashes = set()
@ -529,6 +526,36 @@ def get_txo_plot(start_day=None, days_back=0, end_day=None, days_after=None, **c
return plot
BASE_SELECT_SUPPORT_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Support.c.channel_hash,
Support.c.is_signature_valid,
]
def select_supports(cols: List = None, **constraints) -> Select:
if cols is None:
cols = BASE_SELECT_SUPPORT_COLUMNS
joins = Support.join(TXO, ).join(TX)
return query([Support], select(*cols).select_from(joins), **constraints)
def search_supports(**constraints) -> Tuple[List[Output], Optional[int]]:
total = None
if not constraints.pop('no_totals', False):
total = search_support_count(**constraints)
rows = context().fetchall(select_supports(**constraints))
txos = rows_to_txos(rows, include_tx=False)
return txos, total
def search_support_count(**constraints) -> int:
constraints.pop('offset', None)
constraints.pop('limit', None)
constraints.pop('order_by', None)
count = context().fetchall(select_supports([func.count().label('total')], **constraints))
return count[0]['total'] or 0
BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Claim.c.activation_height,
Claim.c.takeover_height,
@ -538,9 +565,12 @@ BASE_SELECT_CLAIM_COLUMNS = BASE_SELECT_TXO_COLUMNS + [
Claim.c.reposted_claim_hash,
Claim.c.short_url,
Claim.c.canonical_url,
Claim.c.claims_in_channel_count,
Claim.c.support_amount,
Claim.c.supports_in_claim_count,
Claim.c.signed_claim_count,
Claim.c.signed_support_count,
(Claim.c.amount + Claim.c.staked_support_amount).label('staked_amount'),
Claim.c.staked_support_amount,
Claim.c.staked_support_count,
Claim.c.is_signature_valid,
]
@ -650,7 +680,7 @@ def select_claims(cols: List = None, for_count=False, **constraints) -> Select:
claim_types = [claim_types]
if claim_types:
constraints['claim_type__in'] = {
CLAIM_TYPE_CODES[claim_type] for claim_type in claim_types
TXO_TYPES[claim_type] for claim_type in claim_types
}
if 'stream_types' in constraints:
stream_types = constraints.pop('stream_types')
@ -797,13 +827,14 @@ def get_supports_summary(self, **constraints):
)
def resolve(urls) -> Tuple[List, List]:
txo_rows = [resolve_url(raw_url) for raw_url in urls]
extra_txo_rows = _get_referenced_rows(
[txo for txo in txo_rows if isinstance(txo, dict)],
[txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
)
return txo_rows, extra_txo_rows
def resolve(*urls) -> Dict[str, Output]:
return {url: resolve_url(url) for url in urls}
#txo_rows = [resolve_url(raw_url) for raw_url in urls]
#extra_txo_rows = _get_referenced_rows(
# [txo for txo in txo_rows if isinstance(txo, dict)],
# [txo.censor_hash for txo in txo_rows if isinstance(txo, ResolveCensoredError)]
#)
#return txo_rows, extra_txo_rows
def resolve_url(raw_url):
@ -822,7 +853,8 @@ def resolve_url(raw_url):
q['is_controlling'] = True
else:
q['order_by'] = ['^creation_height']
matches = search_claims(censor, **q, limit=1)
#matches = search_claims(censor, **q, limit=1)
matches = search_claims(**q, limit=1)[0]
if matches:
channel = matches[0]
elif censor.censored:
@ -833,16 +865,13 @@ def resolve_url(raw_url):
if url.has_stream:
q = url.stream.to_dict()
if channel is not None:
if set(q) == {'name'}:
# temporarily emulate is_controlling for claims in channel
q['order_by'] = ['effective_amount', '^height']
else:
q['order_by'] = ['^channel_join']
q['channel_hash'] = channel['claim_hash']
q['signature_valid'] = 1
q['order_by'] = ['^creation_height']
q['channel_hash'] = channel.claim_hash
q['is_signature_valid'] = 1
elif set(q) == {'name'}:
q['is_controlling'] = 1
matches = search_claims(censor, **q, limit=1)
# matches = search_claims(censor, **q, limit=1)
matches = search_claims(**q, limit=1)[0]
if matches:
return matches[0]
elif censor.censored:

View file

@ -182,13 +182,12 @@ class Event(Enum):
BLOCK_READ = "blockchain.sync.block.read", ProgressUnit.BLOCKS
BLOCK_SAVE = "blockchain.sync.block.save", ProgressUnit.TXS
BLOCK_DONE = "blockchain.sync.block.done", ProgressUnit.TASKS
CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.TAKEOVERS
CLAIM_META = "blockchain.sync.claim.update", ProgressUnit.CLAIMS
CLAIM_CALC = "blockchain.sync.claim.totals", ProgressUnit.CLAIMS
CLAIM_TRIE = "blockchain.sync.claim.trie", ProgressUnit.TAKEOVERS
CLAIM_SIGN = "blockchain.sync.claim.signatures", ProgressUnit.CLAIMS
SUPPORT_META = "blockchain.sync.support.update", ProgressUnit.SUPPORTS
SUPPORT_SIGN = "blockchain.sync.support.signatures", ProgressUnit.SUPPORTS
CHANNEL_SIGN = "blockchain.sync.channel.signatures", ProgressUnit.CLAIMS
TRENDING_CALC = "blockchain.sync.trending", ProgressUnit.BLOCKS
TAKEOVER_INSERT = "blockchain.sync.takeover.insert", ProgressUnit.TAKEOVERS
@ -261,8 +260,6 @@ class ProgressContext:
def step(self, done):
send_condition = (
# no-op
done != 0 and self.total != 0 and
# enforce step rate
(self.step_size == 1 or done % self.step_size == 0) and
# deduplicate finish event by not sending a step where done == total
@ -345,27 +342,36 @@ class BulkLoader:
'address': txo.get_address(self.ledger) if txo.has_address else None,
'position': txo.position,
'amount': txo.amount,
'height': tx.height,
'script_offset': txo.script.offset,
'script_length': txo.script.length,
'txo_type': 0,
'claim_id': None,
'claim_hash': None,
'claim_name': None,
'reposted_claim_hash': None,
'channel_hash': None,
'public_key': None,
'public_key_hash': None
}
if txo.is_claim:
if txo.can_decode_claim:
claim = txo.claim
row['txo_type'] = TXO_TYPES.get(claim.claim_type, TXO_TYPES['stream'])
if claim.is_repost:
row['reposted_claim_hash'] = claim.repost.reference.claim_hash
if claim.is_signed:
row['channel_hash'] = claim.signing_channel_hash
if claim.is_channel:
row['public_key'] = claim.channel.public_key_bytes
row['public_key_hash'] = self.ledger.address_to_hash160(
self.ledger.public_key_to_address(claim.channel.public_key_bytes)
)
else:
row['txo_type'] = TXO_TYPES['stream']
elif txo.is_support:
row['txo_type'] = TXO_TYPES['support']
if txo.can_decode_support:
claim = txo.support
if claim.is_signed:
row['channel_hash'] = claim.signing_channel_hash
elif txo.purchase is not None:
row['txo_type'] = TXO_TYPES['purchase']
row['claim_id'] = txo.purchased_claim_id
@ -401,11 +407,10 @@ class BulkLoader:
'normalized': txo.normalized_name,
'address': txo.get_address(self.ledger),
'txo_hash': txo.ref.hash,
'tx_position': tx.position,
'amount': txo.amount,
'height': tx.height,
'timestamp': tx.timestamp,
'release_time': None,
'height': tx.height,
'title': None,
'author': None,
'description': None,
@ -413,18 +418,16 @@ class BulkLoader:
# streams
'stream_type': None,
'media_type': None,
'fee_currency': None,
'fee_amount': 0,
'fee_currency': None,
'duration': None,
# reposts
'reposted_claim_hash': None,
# claims which are channels
'public_key': None,
'public_key_hash': None,
# signed claims
'channel_hash': None,
'signature': None,
'signature_digest': None,
'is_signature_valid': None,
}
try:
@ -435,8 +438,8 @@ class BulkLoader:
if claim.is_stream:
claim_record['claim_type'] = TXO_TYPES['stream']
claim_record['media_type'] = claim.stream.source.media_type
claim_record['stream_type'] = STREAM_TYPES[guess_stream_type(claim_record['media_type'])]
claim_record['media_type'] = claim.stream.source.media_type
claim_record['title'] = claim.stream.title
claim_record['description'] = claim.stream.description
claim_record['author'] = claim.stream.author
@ -457,10 +460,6 @@ class BulkLoader:
claim_record['reposted_claim_hash'] = claim.repost.reference.claim_hash
elif claim.is_channel:
claim_record['claim_type'] = TXO_TYPES['channel']
claim_record['public_key'] = claim.channel.public_key_bytes
claim_record['public_key_hash'] = self.ledger.address_to_hash160(
self.ledger.public_key_to_address(claim.channel.public_key_bytes)
)
if claim.is_signed:
claim_record['channel_hash'] = claim.signing_channel_hash
claim_record['signature'] = txo.get_encoded_signature()
@ -496,12 +495,15 @@ class BulkLoader:
tx = txo.tx_ref.tx
claim_hash = txo.claim_hash
support_record = {
'txo_hash': txo.ref.hash,
'claim_hash': claim_hash,
'address': txo.get_address(self.ledger),
'txo_hash': txo.ref.hash,
'tx_position': tx.position,
'amount': txo.amount,
'height': tx.height,
'emoji': None,
'channel_hash': None,
'signature': None,
'signature_digest': None,
}
self.supports.append(support_record)
support = txo.can_decode_support
@ -509,12 +511,13 @@ class BulkLoader:
support_record['emoji'] = support.emoji
if support.is_signed:
support_record['channel_hash'] = support.signing_channel_hash
support_record['signature'] = txo.get_encoded_signature()
support_record['signature_digest'] = txo.get_signature_digest(None)
def add_claim(self, txo: Output):
claim, tags = self.claim_to_rows(txo)
if claim:
tx = txo.tx_ref.tx
claim['public_key_height'] = tx.height
if txo.script.is_claim_name:
claim['creation_height'] = tx.height
claim['creation_timestamp'] = tx.timestamp
@ -525,9 +528,9 @@ class BulkLoader:
def update_claim(self, txo: Output):
claim, tags = self.claim_to_rows(txo)
if claim:
claim['claim_hash_pk'] = claim.pop('claim_hash')
claim['claim_hash_'] = claim.pop('claim_hash')
self.update_claims.append(claim)
self.delete_tags.append({'claim_hash_pk': claim['claim_hash_pk']})
self.delete_tags.append({'claim_hash_': claim['claim_hash_']})
self.tags.extend(tags)
return self
@ -538,13 +541,8 @@ class BulkLoader:
(TXO.insert(), self.txos),
(TXI.insert(), self.txis),
(Claim.insert(), self.claims),
(Tag.delete().where(Tag.c.claim_hash == bindparam('claim_hash_pk')), self.delete_tags),
(Claim.update()
.where(Claim.c.claim_hash == bindparam('claim_hash_pk'))
.values(public_key_height=case(
[(Claim.c.public_key_hash != bindparam('public_key_hash'), bindparam('height'))],
else_=Claim.c.public_key_height
)), self.update_claims),
(Tag.delete().where(Tag.c.claim_hash == bindparam('claim_hash_')), self.delete_tags),
(Claim.update().where(Claim.c.claim_hash == bindparam('claim_hash_')), self.update_claims),
(Tag.insert(), self.tags),
(Support.insert(), self.supports),
)

View file

@ -12,7 +12,8 @@ from lbry.db.tables import (
def process_all_things_after_sync():
process_inputs_outputs()
process_supports()
process_claims()
process_claim_deletes()
process_claim_changes()
def process_inputs_outputs():
@ -109,12 +110,14 @@ def process_supports():
loader.save()
def process_claims():
def process_claim_deletes():
with progress(Event.CLAIM_DELETE) as p:
p.start(1)
sql = Claim.delete().where(condition_spent_claims())
p.ctx.execute(sql)
def process_claim_changes():
with progress(Event.CLAIM_INSERT) as p:
loader = p.ctx.get_bulk_loader()
for claim in rows_to_txos(p.ctx.fetchall(select_missing_claims)):

View file

@ -70,14 +70,22 @@ TXO = Table(
Column('address', Text),
Column('position', SmallInteger),
Column('amount', BigInteger),
Column('height', Integer),
Column('script_offset', Integer),
Column('script_length', Integer),
Column('is_spent', Boolean, server_default='0'),
Column('is_reserved', Boolean, server_default='0'),
# claims
Column('txo_type', SmallInteger, server_default='0'),
Column('claim_id', Text, nullable=True),
Column('claim_hash', LargeBinary, nullable=True),
Column('claim_name', Text, nullable=True),
Column('channel_hash', LargeBinary, nullable=True), # claims in channel
# channels
Column('public_key', LargeBinary, nullable=True),
Column('public_key_hash', LargeBinary, nullable=True),
)
txo_join_account = TXO.join(AccountAddress, TXO.columns.address == AccountAddress.columns.address)
@ -102,17 +110,17 @@ Claim = Table(
Column('normalized', Text),
Column('address', Text),
Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash)),
Column('tx_position', SmallInteger),
Column('amount', BigInteger),
Column('staked_amount', BigInteger, server_default='0'),
Column('timestamp', Integer), # last updated timestamp
Column('creation_timestamp', Integer),
Column('release_time', Integer, nullable=True),
Column('height', Integer), # last updated height
Column('creation_height', Integer),
Column('activation_height', Integer, nullable=True),
Column('expiration_height', Integer, nullable=True),
Column('takeover_height', Integer, nullable=True),
Column('is_controlling', Boolean, server_default='0'),
Column('release_time', Integer, nullable=True),
# normalized#shortest-unique-claim_id
Column('short_url', Text, nullable=True),
@ -125,7 +133,8 @@ Claim = Table(
Column('claim_type', SmallInteger),
Column('claim_reposted_count', Integer, server_default='0'),
Column('supports_in_claim_count', Integer, server_default='0'),
Column('staked_support_count', Integer, server_default='0'),
Column('staked_support_amount', BigInteger, server_default='0'),
# streams
Column('stream_type', Text, nullable=True),
@ -138,18 +147,15 @@ Claim = Table(
Column('reposted_claim_hash', LargeBinary, nullable=True),
# claims which are channels
Column('public_key', LargeBinary, nullable=True),
Column('public_key_hash', LargeBinary, nullable=True),
Column('public_key_height', Integer, server_default='0'), # height at which public key was last changed
Column('claims_in_channel_count', Integer, server_default='0'),
Column('signed_claim_count', Integer, server_default='0'),
Column('signed_support_count', Integer, server_default='0'),
# claims which are inside channels
Column('channel_hash', LargeBinary, nullable=True),
Column('signature', LargeBinary, nullable=True),
Column('signature_digest', LargeBinary, nullable=True),
Column('is_signature_valid', Boolean, server_default='0'),
Column('is_signature_valid', Boolean, nullable=True),
Column('support_amount', BigInteger, server_default='0'),
Column('trending_group', BigInteger, server_default='0'),
Column('trending_mixed', BigInteger, server_default='0'),
Column('trending_local', BigInteger, server_default='0'),
@ -168,13 +174,12 @@ Support = Table(
'support', metadata,
Column('txo_hash', LargeBinary, ForeignKey(TXO.columns.txo_hash), primary_key=True),
Column('claim_hash', LargeBinary, ForeignKey(TXO.columns.claim_hash)),
Column('claim_hash', LargeBinary),
Column('address', Text),
Column('tx_position', SmallInteger),
Column('activation_height', Integer, nullable=True),
Column('expiration_height', Integer, nullable=True),
Column('amount', BigInteger),
Column('height', Integer),
Column('activation_height', Integer, nullable=True),
Column('expiration_height', Integer, nullable=True),
# support metadata
Column('emoji', Text),
@ -183,7 +188,7 @@ Support = Table(
Column('channel_hash', LargeBinary, nullable=True),
Column('signature', LargeBinary, nullable=True),
Column('signature_digest', LargeBinary, nullable=True),
Column('is_signature_valid', Boolean, server_default='0'),
Column('is_signature_valid', Boolean, nullable=True),
)