From e2dd3dcf88666bef78ff34bb78d250e976fb63a1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 15 Nov 2019 14:55:49 -0500 Subject: [PATCH] get_claims_from_torrent_info_hashes --- lbry/lbry/extras/daemon/storage.py | 38 ++++++++++++++++++++---------- lbry/lbry/stream/managed_stream.py | 8 +++---- lbry/lbry/stream/stream_manager.py | 4 ++-- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index 8b7e91890..de1600aa8 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -28,12 +28,11 @@ def calculate_effective_amount(amount: str, supports: typing.Optional[typing.Lis ) -class StoredStreamClaim: - def __init__(self, stream_hash: str, outpoint: opt_str = None, claim_id: opt_str = None, name: opt_str = None, +class StoredContentClaim: + def __init__(self, outpoint: opt_str = None, claim_id: opt_str = None, name: opt_str = None, amount: opt_int = None, height: opt_int = None, serialized: opt_str = None, channel_claim_id: opt_str = None, address: opt_str = None, claim_sequence: opt_int = None, channel_name: opt_str = None): - self.stream_hash = stream_hash self.claim_id = claim_id self.outpoint = outpoint self.claim_name = name @@ -71,8 +70,16 @@ class StoredStreamClaim: } +def _get_content_claims(transaction: sqlite3.Connection, query: str, + source_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]: + claims = {} + for claim_info in _batched_select(transaction, query, source_hashes): + claims[claim_info[0]] = StoredContentClaim(*claim_info[1:]) + return claims + + def get_claims_from_stream_hashes(transaction: sqlite3.Connection, - stream_hashes: typing.List[str]) -> typing.Dict[str, StoredStreamClaim]: + stream_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]: query = ( "select content_claim.stream_hash, c.*, case when c.channel_claim_id is not null then " " (select claim_name from claim where claim_id==c.channel_claim_id) " @@ -81,13 +88,20 @@ def get_claims_from_stream_hashes(transaction: sqlite3.Connection, " inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash in {}" " order by c.rowid desc" ) - return { - claim_info.stream_hash: claim_info - for claim_info in [ - None if not claim_info else StoredStreamClaim(*claim_info) - for claim_info in _batched_select(transaction, query, stream_hashes) - ] - } + return _get_content_claims(transaction, query, stream_hashes) + + +def get_claims_from_torrent_info_hashes(transaction: sqlite3.Connection, + info_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]: + query = ( + "select content_claim.bt_infohash, c.*, case when c.channel_claim_id is not null then " + " (select claim_name from claim where claim_id==c.channel_claim_id) " + " else null end as channel_name " + " from content_claim " + " inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.bt_infohash in {}" + " order by c.rowid desc" + ) + return _get_content_claims(transaction, query, info_hashes) def _batched_select(transaction, query, parameters, batch_size=900): @@ -135,7 +149,7 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di "inner join claim c on cc.claim_outpoint=c.claim_outpoint " "where file.stream_hash in {} " "order by c.rowid desc", stream_hashes): - claim = StoredStreamClaim(stream_hash, *claim_args) + claim = StoredContentClaim(*claim_args) if claim.channel_claim_id: if claim.channel_claim_id not in signed_claims: signed_claims[claim.channel_claim_id] = [] diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index ef03e1342..3a5de6f5d 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -11,7 +11,7 @@ from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name from lbry.stream.reflector.client import StreamReflectorClient -from lbry.extras.daemon.storage import StoredStreamClaim +from lbry.extras.daemon.storage import StoredContentClaim from lbry.blob import MAX_BLOB_SIZE if typing.TYPE_CHECKING: @@ -78,7 +78,7 @@ class ManagedStream: def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, - status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, + status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None, download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, descriptor: typing.Optional[StreamDescriptor] = None, content_fee: typing.Optional['Transaction'] = None, @@ -452,8 +452,8 @@ class ManagedStream: return sent def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): - self.stream_claim_info = StoredStreamClaim( - self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + self.stream_claim_info = StoredContentClaim( + f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], claim_info['name'], claim_info['amount'], claim_info['height'], binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], claim_info['claim_sequence'], claim_info.get('channel_name') diff --git a/lbry/lbry/stream/stream_manager.py b/lbry/lbry/stream/stream_manager.py index e81246b92..4ab4fb4d6 100644 --- a/lbry/lbry/stream/stream_manager.py +++ b/lbry/lbry/stream/stream_manager.py @@ -20,7 +20,7 @@ if typing.TYPE_CHECKING: from lbry.blob.blob_manager import BlobManager from lbry.dht.node import Node from lbry.extras.daemon.analytics import AnalyticsManager - from lbry.extras.daemon.storage import SQLiteStorage, StoredStreamClaim + from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.wallet import LbryWalletManager from lbry.wallet.transaction import Transaction from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager @@ -117,7 +117,7 @@ class StreamManager: async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], download_directory: Optional[str], status: str, - claim: Optional['StoredStreamClaim'], content_fee: Optional['Transaction'], + claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], added_on: Optional[int]): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)