Merge pull request #2620 from lbryio/torrent-db-support

Add torrent support to SQLiteStorage
This commit is contained in:
Jack Robison 2019-11-15 18:34:00 -05:00 committed by GitHub
commit 719d18c670
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 214 additions and 51 deletions

View file

@ -49,7 +49,7 @@ class DatabaseComponent(Component):
@staticmethod
def get_current_db_revision():
return 12
return 13
@property
def revision_filename(self):

View file

@ -31,6 +31,8 @@ def migrate_db(conf, start, end):
from .migrate10to11 import do_migration
elif current == 11:
from .migrate11to12 import do_migration
elif current == 12:
from .migrate12to13 import do_migration
else:
raise Exception(f"DB migration of version {current} to {current+1} is not available")
try:

View file

@ -0,0 +1,80 @@
import os
import sqlite3
def do_migration(conf):
db_path = os.path.join(conf.data_dir, "lbrynet.sqlite")
connection = sqlite3.connect(db_path)
cursor = connection.cursor()
current_columns = []
for col_info in cursor.execute("pragma table_info('file');").fetchall():
current_columns.append(col_info[1])
if 'bt_infohash' in current_columns:
connection.close()
print("already migrated")
return
cursor.executescript("""
pragma foreign_keys=off;
create table if not exists torrent (
bt_infohash char(20) not null primary key,
tracker text,
length integer not null,
name text not null
);
create table if not exists torrent_node ( -- BEP-0005
bt_infohash char(20) not null references torrent,
host text not null,
port integer not null
);
create table if not exists torrent_tracker ( -- BEP-0012
bt_infohash char(20) not null references torrent,
tracker text not null
);
create table if not exists torrent_http_seed ( -- BEP-0017
bt_infohash char(20) not null references torrent,
http_seed text not null
);
create table if not exists new_file (
stream_hash char(96) references stream,
bt_infohash char(20) references torrent,
file_name text,
download_directory text,
blob_data_rate real not null,
status text not null,
saved_file integer not null,
content_fee text,
added_on integer not null
);
create table if not exists new_content_claim (
stream_hash char(96) references stream,
bt_infohash char(20) references torrent,
claim_outpoint text unique not null references claim
);
insert into new_file (stream_hash, bt_infohash, file_name, download_directory, blob_data_rate, status,
saved_file, content_fee, added_on) select
stream_hash, NULL, file_name, download_directory, blob_data_rate, status, saved_file, content_fee,
added_on
from file;
insert into new_content_claim (stream_hash, bt_infohash, claim_outpoint)
select stream_hash, NULL, claim_outpoint from content_claim;
drop table file;
drop table content_claim;
alter table new_file rename to file;
alter table new_content_claim rename to content_claim;
pragma foreign_keys=on;
""")
connection.commit()
connection.close()

View file

@ -28,12 +28,11 @@ def calculate_effective_amount(amount: str, supports: typing.Optional[typing.Lis
)
class StoredStreamClaim:
def __init__(self, stream_hash: str, outpoint: opt_str = None, claim_id: opt_str = None, name: opt_str = None,
class StoredContentClaim:
def __init__(self, outpoint: opt_str = None, claim_id: opt_str = None, name: opt_str = None,
amount: opt_int = None, height: opt_int = None, serialized: opt_str = None,
channel_claim_id: opt_str = None, address: opt_str = None, claim_sequence: opt_int = None,
channel_name: opt_str = None):
self.stream_hash = stream_hash
self.claim_id = claim_id
self.outpoint = outpoint
self.claim_name = name
@ -71,8 +70,16 @@ class StoredStreamClaim:
}
def _get_content_claims(transaction: sqlite3.Connection, query: str,
source_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]:
claims = {}
for claim_info in _batched_select(transaction, query, source_hashes):
claims[claim_info[0]] = StoredContentClaim(*claim_info[1:])
return claims
def get_claims_from_stream_hashes(transaction: sqlite3.Connection,
stream_hashes: typing.List[str]) -> typing.Dict[str, StoredStreamClaim]:
stream_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]:
query = (
"select content_claim.stream_hash, c.*, case when c.channel_claim_id is not null then "
" (select claim_name from claim where claim_id==c.channel_claim_id) "
@ -81,13 +88,20 @@ def get_claims_from_stream_hashes(transaction: sqlite3.Connection,
" inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.stream_hash in {}"
" order by c.rowid desc"
)
return {
claim_info.stream_hash: claim_info
for claim_info in [
None if not claim_info else StoredStreamClaim(*claim_info)
for claim_info in _batched_select(transaction, query, stream_hashes)
]
}
return _get_content_claims(transaction, query, stream_hashes)
def get_claims_from_torrent_info_hashes(transaction: sqlite3.Connection,
info_hashes: typing.List[str]) -> typing.Dict[str, StoredContentClaim]:
query = (
"select content_claim.bt_infohash, c.*, case when c.channel_claim_id is not null then "
" (select claim_name from claim where claim_id==c.channel_claim_id) "
" else null end as channel_name "
" from content_claim "
" inner join claim c on c.claim_outpoint=content_claim.claim_outpoint and content_claim.bt_infohash in {}"
" order by c.rowid desc"
)
return _get_content_claims(transaction, query, info_hashes)
def _batched_select(transaction, query, parameters, batch_size=900):
@ -97,27 +111,10 @@ def _batched_select(transaction, query, parameters, batch_size=900):
yield from transaction.execute(query.format(bind), current_batch)
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
files = []
signed_claims = {}
stream_hashes = tuple(
stream_hash for (stream_hash,) in transaction.execute("select stream_hash from file").fetchall()
)
for (rowid, stream_hash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee, added_on,
_, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
transaction, "select file.rowid, file.*, stream.*, c.* "
"from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
"where file.stream_hash in {} "
"order by c.rowid desc", stream_hashes):
claim = StoredStreamClaim(stream_hash, *claim_args)
if claim.channel_claim_id:
if claim.channel_claim_id not in signed_claims:
signed_claims[claim.channel_claim_id] = []
signed_claims[claim.channel_claim_id].append(claim)
files.append(
{
def _get_lbry_file_stream_dict(rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
raw_content_fee):
return {
"rowid": rowid,
"added_on": added_on,
"stream_hash": stream_hash,
@ -135,6 +132,34 @@ def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Di
binascii.unhexlify(raw_content_fee)
)
}
def get_all_lbry_files(transaction: sqlite3.Connection) -> typing.List[typing.Dict]:
files = []
signed_claims = {}
stream_hashes_and_bt_infohashes = transaction.execute("select stream_hash, bt_infohash from file").fetchall()
stream_hashes = tuple(
stream_hash for stream_hash, _ in stream_hashes_and_bt_infohashes if stream_hash is not None
)
for (rowid, stream_hash, bt_infohash, file_name, download_dir, data_rate, status, saved_file, raw_content_fee,
added_on, _, sd_hash, stream_key, stream_name, suggested_file_name, *claim_args) in _batched_select(
transaction, "select file.rowid, file.*, stream.*, c.* "
"from file inner join stream on file.stream_hash=stream.stream_hash "
"inner join content_claim cc on file.stream_hash=cc.stream_hash "
"inner join claim c on cc.claim_outpoint=c.claim_outpoint "
"where file.stream_hash in {} "
"order by c.rowid desc", stream_hashes):
claim = StoredContentClaim(*claim_args)
if claim.channel_claim_id:
if claim.channel_claim_id not in signed_claims:
signed_claims[claim.channel_claim_id] = []
signed_claims[claim.channel_claim_id].append(claim)
files.append(
_get_lbry_file_stream_dict(
rowid, added_on, stream_hash, file_name, download_dir, data_rate, status,
sd_hash, stream_key, stream_name, suggested_file_name, claim, saved_file,
raw_content_fee
)
)
for claim_name, claim_id in _batched_select(
transaction, "select c.claim_name, c.claim_id from claim c where c.claim_id in {}",
@ -179,6 +204,15 @@ def delete_stream(transaction: sqlite3.Connection, descriptor: 'StreamDescriptor
transaction.executemany("delete from blob where blob_hash=?", blob_hashes).fetchall()
def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
transaction.execute("delete from content_claim where bt_infohash=?", (bt_infohash, )).fetchall()
transaction.execute("delete from torrent_tracker where bt_infohash=?", (bt_infohash,)).fetchall()
transaction.execute("delete from torrent_node where bt_infohash=?", (bt_infohash,)).fetchall()
transaction.execute("delete from torrent_http_seed where bt_infohash=?", (bt_infohash,)).fetchall()
transaction.execute("delete from file where bt_infohash=?", (bt_infohash,)).fetchall()
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()
def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
@ -189,12 +223,10 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
time_added = added_on or int(time.time())
transaction.execute(
"insert or replace into file values (?, ?, ?, ?, ?, ?, ?, ?)",
"insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode(),
time_added
)
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
).fetchall()
return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
@ -243,8 +275,32 @@ class SQLiteStorage(SQLiteMixin):
claim_sequence integer not null
);
create table if not exists torrent (
bt_infohash char(20) not null primary key,
tracker text,
length integer not null,
name text not null
);
create table if not exists torrent_node ( -- BEP-0005
bt_infohash char(20) not null references torrent,
host text not null,
port integer not null
);
create table if not exists torrent_tracker ( -- BEP-0012
bt_infohash char(20) not null references torrent,
tracker text not null
);
create table if not exists torrent_http_seed ( -- BEP-0017
bt_infohash char(20) not null references torrent,
http_seed text not null
);
create table if not exists file (
stream_hash text primary key not null references stream,
stream_hash char(96) references stream,
bt_infohash char(20) references torrent,
file_name text,
download_directory text,
blob_data_rate real not null,
@ -255,9 +311,9 @@ class SQLiteStorage(SQLiteMixin):
);
create table if not exists content_claim (
stream_hash text unique not null references file,
claim_outpoint text not null references claim,
primary key (stream_hash, claim_outpoint)
stream_hash char(96) references stream,
bt_infohash char(20) references torrent,
claim_outpoint text unique not null references claim
);
create table if not exists support (
@ -449,6 +505,9 @@ class SQLiteStorage(SQLiteMixin):
def delete_stream(self, descriptor: 'StreamDescriptor'):
return self.db.run_with_foreign_keys_disabled(delete_stream, descriptor)
async def delete_torrent(self, bt_infohash: str):
return await self.db.run(delete_torrent, bt_infohash)
# # # # # # # # # file stuff # # # # # # # # #
def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str],
@ -476,6 +535,7 @@ class SQLiteStorage(SQLiteMixin):
removed = []
for (stream_hash, download_directory, file_name) in transaction.execute(
"select stream_hash, download_directory, file_name from file where saved_file=1 "
"and stream_hash is not null"
).fetchall():
if download_directory and file_name and os.path.isfile(
os.path.join(binascii.unhexlify(download_directory).decode(),
@ -536,7 +596,7 @@ class SQLiteStorage(SQLiteMixin):
store_file(transaction, descriptor.stream_hash, os.path.basename(descriptor.suggested_file_name),
download_directory, 0.0, 'stopped', content_fee=content_fee)
if content_claim:
transaction.execute("insert or ignore into content_claim values (?, ?)", content_claim)
transaction.execute("insert or ignore into content_claim values (?, ?, ?)", content_claim)
transaction.executemany(
"update file set status='stopped' where stream_hash=?",
((stream_hash, ) for stream_hash in stream_hashes)
@ -703,8 +763,9 @@ class SQLiteStorage(SQLiteMixin):
)
# update the claim associated to the file
transaction.execute("delete from content_claim where stream_hash=?", (stream_hash, )).fetchall()
transaction.execute(
"insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)
"insert into content_claim values (?, NULL, ?)", (stream_hash, claim_outpoint)
).fetchall()
async def save_content_claim(self, stream_hash, claim_outpoint):

View file

@ -191,6 +191,22 @@ class Source(Metadata):
def sd_hash_bytes(self, sd_hash: bytes):
self.message.sd_hash = sd_hash
@property
def bt_infohash(self) -> str:
return hexlify(self.message.bt_infohash).decode()
@bt_infohash.setter
def bt_infohash(self, bt_infohash: str):
self.message.bt_infohash = unhexlify(bt_infohash.encode())
@property
def bt_infohash_bytes(self) -> bytes:
return self.message.bt_infohash.decode()
@bt_infohash_bytes.setter
def bt_infohash_bytes(self, bt_infohash: bytes):
self.message.bt_infohash = bt_infohash
@property
def url(self) -> str:
return self.message.url

View file

@ -195,6 +195,8 @@ class Stream(BaseClaim):
claim['source']['hash'] = self.source.file_hash
if 'sd_hash' in claim['source']:
claim['source']['sd_hash'] = self.source.sd_hash
elif 'bt_infohash' in claim['source']:
claim['source']['bt_infohash'] = self.source.bt_infohash
if 'media_type' in claim['source']:
claim['stream_type'] = guess_stream_type(claim['source']['media_type'])
fee = claim.get('fee', {})
@ -216,6 +218,8 @@ class Stream(BaseClaim):
if 'sd_hash' in kwargs:
self.source.sd_hash = kwargs.pop('sd_hash')
elif 'bt_infohash' in kwargs:
self.source.bt_infohash = kwargs.pop('bt_infohash')
if 'file_name' in kwargs:
self.source.name = kwargs.pop('file_name')
if 'file_hash' in kwargs:

View file

@ -11,7 +11,7 @@ from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
from lbry.stream.reflector.client import StreamReflectorClient
from lbry.extras.daemon.storage import StoredStreamClaim
from lbry.extras.daemon.storage import StoredContentClaim
from lbry.blob import MAX_BLOB_SIZE
if typing.TYPE_CHECKING:
@ -78,7 +78,7 @@ class ManagedStream:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager',
sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None,
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None,
download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None,
descriptor: typing.Optional[StreamDescriptor] = None,
content_fee: typing.Optional['Transaction'] = None,
@ -452,8 +452,8 @@ class ManagedStream:
return sent
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):
self.stream_claim_info = StoredStreamClaim(
self.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
self.stream_claim_info = StoredContentClaim(
f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'],
claim_info['name'], claim_info['amount'], claim_info['height'],
binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'],
claim_info['claim_sequence'], claim_info.get('channel_name')

View file

@ -20,7 +20,7 @@ if typing.TYPE_CHECKING:
from lbry.blob.blob_manager import BlobManager
from lbry.dht.node import Node
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage, StoredStreamClaim
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
from lbry.wallet import LbryWalletManager
from lbry.wallet.transaction import Transaction
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
@ -117,7 +117,7 @@ class StreamManager:
async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str],
download_directory: Optional[str], status: str,
claim: Optional['StoredStreamClaim'], content_fee: Optional['Transaction'],
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
added_on: Optional[int]):
try:
descriptor = await self.blob_manager.get_stream_descriptor(sd_hash)