diff --git a/lbry/lbry/extras/daemon/Components.py b/lbry/lbry/extras/daemon/Components.py index a1b3bd5a6..5ddbaa4b4 100644 --- a/lbry/lbry/extras/daemon/Components.py +++ b/lbry/lbry/extras/daemon/Components.py @@ -49,7 +49,7 @@ class DatabaseComponent(Component): @staticmethod def get_current_db_revision(): - return 12 + return 13 @property def revision_filename(self): diff --git a/lbry/lbry/extras/daemon/migrator/dbmigrator.py b/lbry/lbry/extras/daemon/migrator/dbmigrator.py index 94df761f0..99a1bb2b4 100644 --- a/lbry/lbry/extras/daemon/migrator/dbmigrator.py +++ b/lbry/lbry/extras/daemon/migrator/dbmigrator.py @@ -31,6 +31,8 @@ def migrate_db(conf, start, end): from .migrate10to11 import do_migration elif current == 11: from .migrate11to12 import do_migration + elif current == 12: + from .migrate12to13 import do_migration else: raise Exception(f"DB migration of version {current} to {current+1} is not available") try: diff --git a/lbry/lbry/extras/daemon/migrator/migrate12to13.py b/lbry/lbry/extras/daemon/migrator/migrate12to13.py new file mode 100644 index 000000000..05bf7aee7 --- /dev/null +++ b/lbry/lbry/extras/daemon/migrator/migrate12to13.py @@ -0,0 +1,80 @@ +import os +import sqlite3 + + +def do_migration(conf): + db_path = os.path.join(conf.data_dir, "lbrynet.sqlite") + connection = sqlite3.connect(db_path) + cursor = connection.cursor() + + current_columns = [] + for col_info in cursor.execute("pragma table_info('file');").fetchall(): + current_columns.append(col_info[1]) + if 'bt_infohash' in current_columns: + connection.close() + print("already migrated") + return + + cursor.executescript(""" + pragma foreign_keys=off; + + create table if not exists torrent ( + bt_infohash char(20) not null primary key, + tracker text, + length integer not null, + name text not null + ); + + create table if not exists torrent_node ( -- BEP-0005 + bt_infohash char(20) not null references torrent, + host text not null, + port integer not null + ); + + create table if not exists torrent_tracker ( -- BEP-0012 + bt_infohash char(20) not null references torrent, + tracker text not null + ); + + create table if not exists torrent_http_seed ( -- BEP-0017 + bt_infohash char(20) not null references torrent, + http_seed text not null + ); + + create table if not exists new_file ( + stream_hash char(96) references stream, + bt_infohash char(20) references torrent, + file_name text, + download_directory text, + blob_data_rate real not null, + status text not null, + saved_file integer not null, + content_fee text, + added_on integer not null + ); + + create table if not exists new_content_claim ( + stream_hash char(96) references stream, + bt_infohash char(20) references torrent, + claim_outpoint text unique not null references claim + ); + + insert into new_file (stream_hash, bt_infohash, file_name, download_directory, blob_data_rate, status, + saved_file, content_fee, added_on) select + stream_hash, NULL, file_name, download_directory, blob_data_rate, status, saved_file, content_fee, + added_on + from file; + + insert into new_content_claim (stream_hash, bt_infohash, claim_outpoint) + select stream_hash, NULL, claim_outpoint from content_claim; + + drop table file; + drop table content_claim; + alter table new_file rename to file; + alter table new_content_claim rename to content_claim; + + pragma foreign_keys=on; + """) + + connection.commit() + connection.close() diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index d0c516a32..b5859d6c1 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -28,12 +28,11 @@ def calculate_effective_amount(amount: str, supports: typing.Optional[typing.Lis ) -class StoredStreamClaim: - def __init__(self, stream_hash: str, outpoint: opt_str = None, claim_id: opt_str = None, name: opt_str = None, +class StoredContentClaim: + def __init__(self, outpoint: opt_str = None, claim_id: opt_str = None, name: opt_str = None, amount: opt_int = None, height: opt_int = None, serialized: opt_str = None, channel_claim_id: opt_str = None, address: opt_str = None, claim_sequence: opt_int = None, channel_name: opt_str = None): - self.stream_hash = stream_hash self.claim_id = claim_id self.outpoint = outpoint self.claim_name = name @@ -71,8 +70,16 @@ class StoredStreamClaim: } +def _get_content_claims(transaction: sqlite3.Connection, query: str, + source_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]: + claims = {} + for claim_info in _batched_select(transaction, query, source_hashes): + claims[claim_info[0]] = StoredContentClaim(*claim_info[1:]) + return claims + + def get_claims_from_stream_hashes(transaction: sqlite3.Connection, - stream_hashes: typing.List[str]) -> typing.Dict[str, StoredStreamClaim]: + stream_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]: query = ( "select content_claim.stream_hash, c.*, case when c.channel_claim_id is not null then " " (select claim_name from claim where claim_id==c.channel_claim_id) " @@ -81,13 +88,20 @@ def get_claims_from_stream_hashes(transaction: sqlite3.Connection, " inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash in {}" " order by c.rowid desc" ) - return { - claim_info.stream_hash: claim_info - for claim_info in [ - None if not claim_info else StoredStreamClaim(*claim_info) - for claim_info in _batched_select(transaction, query, stream_hashes) - ] - } + return _get_content_claims(transaction, query, stream_hashes) + + +def get_claims_from_torrent_info_hashes(transaction: sqlite3.Connection, + info_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]: + query = ( + "select content_claim.bt_infohash, c.*, case when c.channel_claim_id is not null then " + " (select claim_name from claim where claim_id==c.channel_claim_id) " + " else null end as channel_name " + " from content_claim " + " inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.bt_infohash in {}" + " order by c.rowid desc" + ) + return _get_content_claims(transaction, query, info_hashes) def _batched_select(transaction, query, parameters, batch_size=900): @@ -97,44 +111,55 @@ def _batched_select(transaction, query, parameters, batch_size=900): yield from transaction.execute(query.format(bind), current_batch) +def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, + sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, + raw_content_fee): + return { + "rowid": rowid, + "added_on": added_on, + "stream_hash": stream_hash, + "file_name": file_name, # hex + "download_directory": download_dir, # hex + "blob_data_rate": data_rate, + "status": status, + "sd_hash": sd_hash, + "key": stream_key, + "stream_name": stream_name, # hex + "suggested_file_name": suggested_file_name, # hex + "claim": claim, + "saved_file": bool(saved_file), + "content_fee": None if not raw_content_fee else Transaction( + binascii.unhexlify(raw_content_fee) + ) + } + + def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: files = [] signed_claims = {} + stream_hashes_and_bt_infohashes = transaction.execute("select stream_hash, bt_infohash from file").fetchall() stream_hashes = tuple( - stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall() + stream_hash for stream_hash, _ in stream_hashes_and_bt_infohashes if stream_hash is not None ) - for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_on, - _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( + for (rowid, stream_hash, bt_infohash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, + added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( transaction, "select file.rowid, file.*, stream.*, c.* " "from file inner join stream on file.stream_hash=stream.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash " "inner join claim c on cc.claim_outpoint=c.claim_outpoint " "where file.stream_hash in {} " "order by c.rowid desc", stream_hashes): - claim = StoredStreamClaim(stream_hash, *claim_args) + claim = StoredContentClaim(*claim_args) if claim.channel_claim_id: if claim.channel_claim_id not in signed_claims: signed_claims[claim.channel_claim_id] = [] signed_claims[claim.channel_claim_id].append(claim) files.append( - { - "rowid": rowid, - "added_on": added_on, - "stream_hash": stream_hash, - "file_name": file_name, # hex - "download_directory": download_dir, # hex - "blob_data_rate": data_rate, - "status": status, - "sd_hash": sd_hash, - "key": stream_key, - "stream_name": stream_name, # hex - "suggested_file_name": suggested_file_name, # hex - "claim": claim, - "saved_file": bool(saved_file), - "content_fee": None if not raw_content_fee else Transaction( - binascii.unhexlify(raw_content_fee) - ) - } + _get_lbry_file_stream_dict( + rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, + sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, + raw_content_fee + ) ) for claim_name, claim_id in _batched_select( transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}", @@ -179,6 +204,15 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor transaction.executemany("delete from blob where blob_hash=?", blob_hashes).fetchall() +def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str): + transaction.execute("delete from content_claim where bt_infohash=?", (bt_infohash, )).fetchall() + transaction.execute("delete from torrent_tracker where bt_infohash=?", (bt_infohash,)).fetchall() + transaction.execute("delete from torrent_node where bt_infohash=?", (bt_infohash,)).fetchall() + transaction.execute("delete from torrent_http_seed where bt_infohash=?", (bt_infohash,)).fetchall() + transaction.execute("delete from file where bt_infohash=?", (bt_infohash,)).fetchall() + transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall() + + def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], data_payment_rate: float, status: str, content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int: @@ -189,12 +223,10 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() time_added = added_on or int(time.time()) transaction.execute( - "insert or replace into file values (?, ?, ?, ?, ?, ?, ?, ?)", + "insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)", (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, - None if not content_fee else binascii.hexlify(content_fee.raw).decode(), - time_added - ) + None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added) ).fetchall() return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] @@ -243,8 +275,32 @@ class SQLiteStorage(SQLiteMixin): claim_sequence integer not null ); + create table if not exists torrent ( + bt_infohash char(20) not null primary key, + tracker text, + length integer not null, + name text not null + ); + + create table if not exists torrent_node ( -- BEP-0005 + bt_infohash char(20) not null references torrent, + host text not null, + port integer not null + ); + + create table if not exists torrent_tracker ( -- BEP-0012 + bt_infohash char(20) not null references torrent, + tracker text not null + ); + + create table if not exists torrent_http_seed ( -- BEP-0017 + bt_infohash char(20) not null references torrent, + http_seed text not null + ); + create table if not exists file ( - stream_hash text primary key not null references stream, + stream_hash char(96) references stream, + bt_infohash char(20) references torrent, file_name text, download_directory text, blob_data_rate real not null, @@ -255,9 +311,9 @@ class SQLiteStorage(SQLiteMixin): ); create table if not exists content_claim ( - stream_hash text unique not null references file, - claim_outpoint text not null references claim, - primary key (stream_hash, claim_outpoint) + stream_hash char(96) references stream, + bt_infohash char(20) references torrent, + claim_outpoint text unique not null references claim ); create table if not exists support ( @@ -449,6 +505,9 @@ class SQLiteStorage(SQLiteMixin): def delete_stream(self, descriptor: 'StreamDescriptor'): return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor) + async def delete_torrent(self, bt_infohash: str): + return await self.db.run(delete_torrent, bt_infohash) + # # # # # # # # # file stuff # # # # # # # # # def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], @@ -475,7 +534,8 @@ class SQLiteStorage(SQLiteMixin): def update_manually_removed_files(transaction: sqlite3.Connection): removed = [] for (stream_hash, download_directory, file_name) in transaction.execute( - "select stream_hash, download_directory, file_name from file where saved_file=1" + "select stream_hash, download_directory, file_name from file where saved_file=1 " + "and stream_hash is not null" ).fetchall(): if download_directory and file_name and os.path.isfile( os.path.join(binascii.unhexlify(download_directory).decode(), @@ -536,7 +596,7 @@ class SQLiteStorage(SQLiteMixin): store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name), download_directory, 0.0, 'stopped', content_fee=content_fee) if content_claim: - transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim) + transaction.execute("insert or ignore into content_claim values (?, ?, ?)", content_claim) transaction.executemany( "update file set status='stopped' where stream_hash=?", ((stream_hash, ) for stream_hash in stream_hashes) @@ -703,8 +763,9 @@ class SQLiteStorage(SQLiteMixin): ) # update the claim associated to the file + transaction.execute("delete from content_claim where stream_hash=?", (stream_hash, )).fetchall() transaction.execute( - "insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint) + "insert into content_claim values (?, NULL, ?)", (stream_hash, claim_outpoint) ).fetchall() async def save_content_claim(self, stream_hash, claim_outpoint): diff --git a/lbry/lbry/schema/attrs.py b/lbry/lbry/schema/attrs.py index 7586a7039..eed77505f 100644 --- a/lbry/lbry/schema/attrs.py +++ b/lbry/lbry/schema/attrs.py @@ -191,6 +191,22 @@ class Source(Metadata): def sd_hash_bytes(self, sd_hash: bytes): self.message.sd_hash = sd_hash + @property + def bt_infohash(self) -> str: + return hexlify(self.message.bt_infohash).decode() + + @bt_infohash.setter + def bt_infohash(self, bt_infohash: str): + self.message.bt_infohash = unhexlify(bt_infohash.encode()) + + @property + def bt_infohash_bytes(self) -> bytes: + return self.message.bt_infohash.decode() + + @bt_infohash_bytes.setter + def bt_infohash_bytes(self, bt_infohash: bytes): + self.message.bt_infohash = bt_infohash + @property def url(self) -> str: return self.message.url diff --git a/lbry/lbry/schema/claim.py b/lbry/lbry/schema/claim.py index 7150a547a..d845b4658 100644 --- a/lbry/lbry/schema/claim.py +++ b/lbry/lbry/schema/claim.py @@ -195,6 +195,8 @@ class Stream(BaseClaim): claim['source']['hash'] = self.source.file_hash if 'sd_hash' in claim['source']: claim['source']['sd_hash'] = self.source.sd_hash + elif 'bt_infohash' in claim['source']: + claim['source']['bt_infohash'] = self.source.bt_infohash if 'media_type' in claim['source']: claim['stream_type'] = guess_stream_type(claim['source']['media_type']) fee = claim.get('fee', {}) @@ -216,6 +218,8 @@ class Stream(BaseClaim): if 'sd_hash' in kwargs: self.source.sd_hash = kwargs.pop('sd_hash') + elif 'bt_infohash' in kwargs: + self.source.bt_infohash = kwargs.pop('bt_infohash') if 'file_name' in kwargs: self.source.name = kwargs.pop('file_name') if 'file_hash' in kwargs: diff --git a/lbry/lbry/stream/managed_stream.py b/lbry/lbry/stream/managed_stream.py index ef03e1342..3a5de6f5d 100644 --- a/lbry/lbry/stream/managed_stream.py +++ b/lbry/lbry/stream/managed_stream.py @@ -11,7 +11,7 @@ from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name from lbry.stream.reflector.client import StreamReflectorClient -from lbry.extras.daemon.storage import StoredStreamClaim +from lbry.extras.daemon.storage import StoredContentClaim from lbry.blob import MAX_BLOB_SIZE if typing.TYPE_CHECKING: @@ -78,7 +78,7 @@ class ManagedStream: def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, - status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None, + status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None, download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, descriptor: typing.Optional[StreamDescriptor] = None, content_fee: typing.Optional['Transaction'] = None, @@ -452,8 +452,8 @@ class ManagedStream: return sent def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): - self.stream_claim_info = StoredStreamClaim( - self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + self.stream_claim_info = StoredContentClaim( + f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], claim_info['name'], claim_info['amount'], claim_info['height'], binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], claim_info['claim_sequence'], claim_info.get('channel_name') diff --git a/lbry/lbry/stream/stream_manager.py b/lbry/lbry/stream/stream_manager.py index e81246b92..4ab4fb4d6 100644 --- a/lbry/lbry/stream/stream_manager.py +++ b/lbry/lbry/stream/stream_manager.py @@ -20,7 +20,7 @@ if typing.TYPE_CHECKING: from lbry.blob.blob_manager import BlobManager from lbry.dht.node import Node from lbry.extras.daemon.analytics import AnalyticsManager - from lbry.extras.daemon.storage import SQLiteStorage, StoredStreamClaim + from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.wallet import LbryWalletManager from lbry.wallet.transaction import Transaction from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager @@ -117,7 +117,7 @@ class StreamManager: async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], download_directory: Optional[str], status: str, - claim: Optional['StoredStreamClaim'], content_fee: Optional['Transaction'], + claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], added_on: Optional[int]): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)