diff --git a/lbry/lbry/extras/daemon/storage.py b/lbry/lbry/extras/daemon/storage.py index d0c516a32..8b7e91890 100644 --- a/lbry/lbry/extras/daemon/storage.py +++ b/lbry/lbry/extras/daemon/storage.py @@ -97,14 +97,38 @@ def _batched_select(transaction, query, parameters, batch_size=900): yield from transaction.execute(query.format(bind), current_batch) +def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, + sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, + raw_content_fee): + return { + "rowid": rowid, + "added_on": added_on, + "stream_hash": stream_hash, + "file_name": file_name, # hex + "download_directory": download_dir, # hex + "blob_data_rate": data_rate, + "status": status, + "sd_hash": sd_hash, + "key": stream_key, + "stream_name": stream_name, # hex + "suggested_file_name": suggested_file_name, # hex + "claim": claim, + "saved_file": bool(saved_file), + "content_fee": None if not raw_content_fee else Transaction( + binascii.unhexlify(raw_content_fee) + ) + } + + def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]: files = [] signed_claims = {} + stream_hashes_and_bt_infohashes = transaction.execute("select stream_hash, bt_infohash from file").fetchall() stream_hashes = tuple( - stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall() + stream_hash for stream_hash, _ in stream_hashes_and_bt_infohashes if stream_hash is not None ) - for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_on, - _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( + for (rowid, stream_hash, bt_infohash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, + added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select( transaction, "select file.rowid, file.*, stream.*, c.* " "from file inner join stream on file.stream_hash=stream.stream_hash " "inner join content_claim cc on file.stream_hash=cc.stream_hash " @@ -117,24 +141,11 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di signed_claims[claim.channel_claim_id] = [] signed_claims[claim.channel_claim_id].append(claim) files.append( - { - "rowid": rowid, - "added_on": added_on, - "stream_hash": stream_hash, - "file_name": file_name, # hex - "download_directory": download_dir, # hex - "blob_data_rate": data_rate, - "status": status, - "sd_hash": sd_hash, - "key": stream_key, - "stream_name": stream_name, # hex - "suggested_file_name": suggested_file_name, # hex - "claim": claim, - "saved_file": bool(saved_file), - "content_fee": None if not raw_content_fee else Transaction( - binascii.unhexlify(raw_content_fee) - ) - } + _get_lbry_file_stream_dict( + rowid, added_on, stream_hash, file_name, download_dir, data_rate, status, + sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file, + raw_content_fee + ) ) for claim_name, claim_id in _batched_select( transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}", @@ -189,12 +200,10 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() time_added = added_on or int(time.time()) transaction.execute( - "insert or replace into file values (?, ?, ?, ?, ?, ?, ?, ?)", + "insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)", (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, - None if not content_fee else binascii.hexlify(content_fee.raw).decode(), - time_added - ) + None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added) ).fetchall() return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] @@ -243,8 +252,32 @@ class SQLiteStorage(SQLiteMixin): claim_sequence integer not null ); + create table if not exists torrent ( + bt_infohash char(20) not null primary key, + tracker text, + length integer not null, + name text not null + ); + + create table if not exists torrent_node ( -- BEP-0005 + bt_infohash char(20) not null references torrent, + host text not null, + port integer not null + ); + + create table if not exists torrent_tracker ( -- BEP-0012 + bt_infohash char(20) not null references torrent, + tracker text not null + ); + + create table if not exists torrent_http_seed ( -- BEP-0017 + bt_infohash char(20) not null references torrent, + http_seed text not null + ); + create table if not exists file ( - stream_hash text primary key not null references stream, + stream_hash char(96) references stream, + bt_infohash char(20) references torrent, file_name text, download_directory text, blob_data_rate real not null, @@ -255,9 +288,9 @@ class SQLiteStorage(SQLiteMixin): ); create table if not exists content_claim ( - stream_hash text unique not null references file, - claim_outpoint text not null references claim, - primary key (stream_hash, claim_outpoint) + stream_hash char(96) references stream, + bt_infohash char(20) references torrent, + claim_outpoint text unique not null references claim ); create table if not exists support ( @@ -475,7 +508,8 @@ class SQLiteStorage(SQLiteMixin): def update_manually_removed_files(transaction: sqlite3.Connection): removed = [] for (stream_hash, download_directory, file_name) in transaction.execute( - "select stream_hash, download_directory, file_name from file where saved_file=1" + "select stream_hash, download_directory, file_name from file where saved_file=1 " + "and stream_hash is not null" ).fetchall(): if download_directory and file_name and os.path.isfile( os.path.join(binascii.unhexlify(download_directory).decode(), @@ -536,7 +570,7 @@ class SQLiteStorage(SQLiteMixin): store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name), download_directory, 0.0, 'stopped', content_fee=content_fee) if content_claim: - transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim) + transaction.execute("insert or ignore into content_claim values (?, ?, ?)", content_claim) transaction.executemany( "update file set status='stopped' where stream_hash=?", ((stream_hash, ) for stream_hash in stream_hashes) @@ -703,8 +737,9 @@ class SQLiteStorage(SQLiteMixin): ) # update the claim associated to the file + transaction.execute("delete from content_claim where stream_hash=?", (stream_hash, )).fetchall() transaction.execute( - "insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint) + "insert into content_claim values (?, NULL, ?)", (stream_hash, claim_outpoint) ).fetchall() async def save_content_claim(self, stream_hash, claim_outpoint):