forked from LBRYCommunity/lbry-sdk
add torrent db tables
-decouple file table from stream table
This commit is contained in:
parent
e79f8d58ca
commit
63fb39016b
1 changed files with 67 additions and 32 deletions
|
@ -97,27 +97,10 @@ def _batched_select(transaction, query, parameters, batch_size=900):
|
||||||
yield from transaction.execute(query.format(bind), current_batch)
|
yield from transaction.execute(query.format(bind), current_batch)
|
||||||
|
|
||||||
|
|
||||||
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
|
def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
|
||||||
files = []
|
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
|
||||||
signed_claims = {}
|
raw_content_fee):
|
||||||
stream_hashes = tuple(
|
return {
|
||||||
stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall()
|
|
||||||
)
|
|
||||||
for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_on,
|
|
||||||
_, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
|
|
||||||
transaction, "select file.rowid, file.*, stream.*, c.* "
|
|
||||||
"from file inner join stream on file.stream_hash=stream.stream_hash "
|
|
||||||
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
|
|
||||||
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
|
|
||||||
"where file.stream_hash in {} "
|
|
||||||
"order by c.rowid desc", stream_hashes):
|
|
||||||
claim = StoredStreamClaim(stream_hash, *claim_args)
|
|
||||||
if claim.channel_claim_id:
|
|
||||||
if claim.channel_claim_id not in signed_claims:
|
|
||||||
signed_claims[claim.channel_claim_id] = []
|
|
||||||
signed_claims[claim.channel_claim_id].append(claim)
|
|
||||||
files.append(
|
|
||||||
{
|
|
||||||
"rowid": rowid,
|
"rowid": rowid,
|
||||||
"added_on": added_on,
|
"added_on": added_on,
|
||||||
"stream_hash": stream_hash,
|
"stream_hash": stream_hash,
|
||||||
|
@ -135,6 +118,34 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
|
||||||
binascii.unhexlify(raw_content_fee)
|
binascii.unhexlify(raw_content_fee)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
|
||||||
|
files = []
|
||||||
|
signed_claims = {}
|
||||||
|
stream_hashes_and_bt_infohashes = transaction.execute("select stream_hash, bt_infohash from file").fetchall()
|
||||||
|
stream_hashes = tuple(
|
||||||
|
stream_hash for stream_hash, _ in stream_hashes_and_bt_infohashes if stream_hash is not None
|
||||||
|
)
|
||||||
|
for (rowid, stream_hash, bt_infohash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
|
||||||
|
added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
|
||||||
|
transaction, "select file.rowid, file.*, stream.*, c.* "
|
||||||
|
"from file inner join stream on file.stream_hash=stream.stream_hash "
|
||||||
|
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
|
||||||
|
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
|
||||||
|
"where file.stream_hash in {} "
|
||||||
|
"order by c.rowid desc", stream_hashes):
|
||||||
|
claim = StoredStreamClaim(stream_hash, *claim_args)
|
||||||
|
if claim.channel_claim_id:
|
||||||
|
if claim.channel_claim_id not in signed_claims:
|
||||||
|
signed_claims[claim.channel_claim_id] = []
|
||||||
|
signed_claims[claim.channel_claim_id].append(claim)
|
||||||
|
files.append(
|
||||||
|
_get_lbry_file_stream_dict(
|
||||||
|
rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
|
||||||
|
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
|
||||||
|
raw_content_fee
|
||||||
|
)
|
||||||
)
|
)
|
||||||
for claim_name, claim_id in _batched_select(
|
for claim_name, claim_id in _batched_select(
|
||||||
transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}",
|
transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}",
|
||||||
|
@ -189,12 +200,10 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ
|
||||||
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
|
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
|
||||||
time_added = added_on or int(time.time())
|
time_added = added_on or int(time.time())
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"insert or replace into file values (?, ?, ?, ?, ?, ?, ?, ?)",
|
"insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
|
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
|
||||||
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
|
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
|
||||||
None if not content_fee else binascii.hexlify(content_fee.raw).decode(),
|
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
|
||||||
time_added
|
|
||||||
)
|
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
|
||||||
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
|
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
|
||||||
|
@ -243,8 +252,32 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
claim_sequence integer not null
|
claim_sequence integer not null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
create table if not exists torrent (
|
||||||
|
bt_infohash char(20) not null primary key,
|
||||||
|
tracker text,
|
||||||
|
length integer not null,
|
||||||
|
name text not null
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists torrent_node ( -- BEP-0005
|
||||||
|
bt_infohash char(20) not null references torrent,
|
||||||
|
host text not null,
|
||||||
|
port integer not null
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists torrent_tracker ( -- BEP-0012
|
||||||
|
bt_infohash char(20) not null references torrent,
|
||||||
|
tracker text not null
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists torrent_http_seed ( -- BEP-0017
|
||||||
|
bt_infohash char(20) not null references torrent,
|
||||||
|
http_seed text not null
|
||||||
|
);
|
||||||
|
|
||||||
create table if not exists file (
|
create table if not exists file (
|
||||||
stream_hash text primary key not null references stream,
|
stream_hash char(96) references stream,
|
||||||
|
bt_infohash char(20) references torrent,
|
||||||
file_name text,
|
file_name text,
|
||||||
download_directory text,
|
download_directory text,
|
||||||
blob_data_rate real not null,
|
blob_data_rate real not null,
|
||||||
|
@ -255,9 +288,9 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
);
|
);
|
||||||
|
|
||||||
create table if not exists content_claim (
|
create table if not exists content_claim (
|
||||||
stream_hash text unique not null references file,
|
stream_hash char(96) references stream,
|
||||||
claim_outpoint text not null references claim,
|
bt_infohash char(20) references torrent,
|
||||||
primary key (stream_hash, claim_outpoint)
|
claim_outpoint text unique not null references claim
|
||||||
);
|
);
|
||||||
|
|
||||||
create table if not exists support (
|
create table if not exists support (
|
||||||
|
@ -475,7 +508,8 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
def update_manually_removed_files(transaction: sqlite3.Connection):
|
def update_manually_removed_files(transaction: sqlite3.Connection):
|
||||||
removed = []
|
removed = []
|
||||||
for (stream_hash, download_directory, file_name) in transaction.execute(
|
for (stream_hash, download_directory, file_name) in transaction.execute(
|
||||||
"select stream_hash, download_directory, file_name from file where saved_file=1"
|
"select stream_hash, download_directory, file_name from file where saved_file=1 "
|
||||||
|
"and stream_hash is not null"
|
||||||
).fetchall():
|
).fetchall():
|
||||||
if download_directory and file_name and os.path.isfile(
|
if download_directory and file_name and os.path.isfile(
|
||||||
os.path.join(binascii.unhexlify(download_directory).decode(),
|
os.path.join(binascii.unhexlify(download_directory).decode(),
|
||||||
|
@ -536,7 +570,7 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name),
|
store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name),
|
||||||
download_directory, 0.0, 'stopped', content_fee=content_fee)
|
download_directory, 0.0, 'stopped', content_fee=content_fee)
|
||||||
if content_claim:
|
if content_claim:
|
||||||
transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim)
|
transaction.execute("insert or ignore into content_claim values (?, ?, ?)", content_claim)
|
||||||
transaction.executemany(
|
transaction.executemany(
|
||||||
"update file set status='stopped' where stream_hash=?",
|
"update file set status='stopped' where stream_hash=?",
|
||||||
((stream_hash, ) for stream_hash in stream_hashes)
|
((stream_hash, ) for stream_hash in stream_hashes)
|
||||||
|
@ -703,8 +737,9 @@ class SQLiteStorage(SQLiteMixin):
|
||||||
)
|
)
|
||||||
|
|
||||||
# update the claim associated to the file
|
# update the claim associated to the file
|
||||||
|
transaction.execute("delete from content_claim where stream_hash=?", (stream_hash, )).fetchall()
|
||||||
transaction.execute(
|
transaction.execute(
|
||||||
"insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)
|
"insert into content_claim values (?, NULL, ?)", (stream_hash, claim_outpoint)
|
||||||
).fetchall()
|
).fetchall()
|
||||||
|
|
||||||
async def save_content_claim(self, stream_hash, claim_outpoint):
|
async def save_content_claim(self, stream_hash, claim_outpoint):
|
||||||
|
|
Loading…
Reference in a new issue