lbry-sdk/lbrynet/lbry_file/EncryptedFileMetadataManager.py

355 lines
15 KiB
Python

import logging
import sqlite3
import os
from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.enterprise import adbapi
from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash
from lbrynet.core.sqlite_helpers import rerun_if_locked
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
log = logging.getLogger(__name__)
class DBEncryptedFileMetadataManager(object):
"""Store and provide access to LBRY file metadata using sqlite"""
def __init__(self, db_dir, file_name=None):
self.db_dir = db_dir
self._db_file_name = file_name or "lbryfile_info.db"
self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir,
self._db_file_name),
check_same_thread=False)
def setup(self):
return self._open_db()
def stop(self):
self.db_conn.close()
return defer.succeed(True)
def get_all_streams(self):
return self._get_all_streams()
def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs):
d = self._store_stream(stream_hash, file_name, key, suggested_file_name)
d.addCallback(lambda _: self.add_blobs_to_stream(stream_hash, blobs))
return d
def get_stream_info(self, stream_hash):
return self._get_stream_info(stream_hash)
def check_if_stream_exists(self, stream_hash):
return self._check_if_stream_exists(stream_hash)
def delete_stream(self, stream_hash):
return self._delete_stream(stream_hash)
def add_blobs_to_stream(self, stream_hash, blobs):
return self._add_blobs_to_stream(stream_hash, blobs, ignore_duplicate_error=True)
def get_blobs_for_stream(self, stream_hash, start_blob=None,
end_blob=None, count=None, reverse=False):
log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count)
def get_positions_of_start_and_end():
if start_blob is not None:
d1 = self._get_blob_num_by_hash(stream_hash, start_blob)
else:
d1 = defer.succeed(None)
if end_blob is not None:
d2 = self._get_blob_num_by_hash(stream_hash, end_blob)
else:
d2 = defer.succeed(None)
dl = defer.DeferredList([d1, d2])
def get_positions(results):
start_num = None
end_num = None
if results[0][0] is True:
start_num = results[0][1]
if results[1][0] is True:
end_num = results[1][1]
return start_num, end_num
dl.addCallback(get_positions)
return dl
def get_blob_infos(nums):
start_num, end_num = nums
return self._get_further_blob_infos(stream_hash, start_num, end_num,
count, reverse)
d = get_positions_of_start_and_end()
d.addCallback(get_blob_infos)
return d
def get_stream_of_blob(self, blob_hash):
return self._get_stream_of_blobhash(blob_hash)
def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
return self._save_sd_blob_hash_to_stream(stream_hash, sd_blob_hash)
def get_sd_blob_hashes_for_stream(self, stream_hash):
return self._get_sd_blob_hashes_for_stream(stream_hash)
def get_stream_hash_for_sd_hash(self, sd_hash):
return self._get_stream_hash_for_sd_blob_hash(sd_hash)
@staticmethod
def _create_tables(transaction):
transaction.execute("create table if not exists lbry_files (" +
" stream_hash text primary key, " +
" key text, " +
" stream_name text, " +
" suggested_file_name text" +
")")
transaction.execute("create table if not exists lbry_file_blobs (" +
" blob_hash text, " +
" stream_hash text, " +
" position integer, " +
" iv text, " +
" length integer, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_descriptors (" +
" sd_blob_hash TEXT PRIMARY KEY, " +
" stream_hash TEXT, " +
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_options (" +
" blob_data_rate real, " +
" status text," +
" stream_hash text,"
" foreign key(stream_hash) references lbry_files(stream_hash)" +
")")
transaction.execute("create table if not exists lbry_file_metadata (" +
" lbry_file integer primary key, " +
" txid text, " +
" n integer, " +
" foreign key(lbry_file) references lbry_files(rowid)"
")")
def _open_db(self):
# check_same_thread=False is solely to quiet a spurious error that appears to be due
# to a bug in twisted, where the connection is closed by a different thread than the
# one that opened it. The individual connections in the pool are not used in multiple
# threads.
return self.db_conn.runInteraction(self._create_tables)
@rerun_if_locked
@defer.inlineCallbacks
def get_file_outpoint(self, rowid):
result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata "
"where lbry_file=?", (rowid, ))
response = None
if result:
txid, nout = result[0]
if txid is not None and nout is not None:
response = "%s:%i" % (txid, nout)
defer.returnValue(response)
@rerun_if_locked
@defer.inlineCallbacks
def save_outpoint_to_file(self, rowid, txid, nout):
existing_outpoint = yield self.get_file_outpoint(rowid)
if not existing_outpoint:
yield self.db_conn.runOperation("insert into lbry_file_metadata values "
"(?, ?, ?)", (rowid, txid, nout))
@rerun_if_locked
def _delete_stream(self, stream_hash):
d = self.db_conn.runQuery(
"select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(
lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash)))
def do_delete(transaction, row_id, s_h):
transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,))
transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,))
d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h))
return d
@rerun_if_locked
def _store_stream(self, stream_hash, name, key, suggested_file_name):
d = self.db_conn.runQuery("insert into lbry_files values (?, ?, ?, ?)",
(stream_hash, key, name, suggested_file_name))
def check_duplicate(err):
if err.check(sqlite3.IntegrityError):
raise DuplicateStreamHashError(stream_hash)
return err
d.addErrback(check_duplicate)
return d
@rerun_if_locked
def _get_all_streams(self):
d = self.db_conn.runQuery("select stream_hash from lbry_files")
d.addCallback(lambda results: [r[0] for r in results])
return d
@rerun_if_locked
def _get_stream_info(self, stream_hash):
def get_result(res):
if res:
return res[0]
else:
raise NoSuchStreamHash(stream_hash)
d = self.db_conn.runQuery(
"select key, stream_name, suggested_file_name from lbry_files where stream_hash = ?",
(stream_hash,))
d.addCallback(get_result)
return d
@rerun_if_locked
def _check_if_stream_exists(self, stream_hash):
d = self.db_conn.runQuery(
"select stream_hash from lbry_files where stream_hash = ?", (stream_hash,))
d.addCallback(lambda r: True if len(r) else False)
return d
@rerun_if_locked
def _get_blob_num_by_hash(self, stream_hash, blob_hash):
d = self.db_conn.runQuery(
"select position from lbry_file_blobs where stream_hash = ? and blob_hash = ?",
(stream_hash, blob_hash))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@rerun_if_locked
def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False):
params = []
q_string = "select * from ("
q_string += " select blob_hash, position, iv, length from lbry_file_blobs "
q_string += " where stream_hash = ? "
params.append(stream_hash)
if start_num is not None:
q_string += " and position > ? "
params.append(start_num)
if end_num is not None:
q_string += " and position < ? "
params.append(end_num)
q_string += " order by position "
if reverse is True:
q_string += " DESC "
if count is not None:
q_string += " limit ? "
params.append(count)
q_string += ") order by position"
# Order by position is done twice so that it always returns them from lowest position to
# greatest, but the limit by clause can select the 'count' greatest or 'count' least
return self.db_conn.runQuery(q_string, tuple(params))
@rerun_if_locked
def _add_blobs_to_stream(self, stream_hash, blob_infos, ignore_duplicate_error=False):
def add_blobs(transaction):
for blob_info in blob_infos:
try:
transaction.execute("insert into lbry_file_blobs values (?, ?, ?, ?, ?)",
(blob_info.blob_hash, stream_hash, blob_info.blob_num,
blob_info.iv, blob_info.length))
except sqlite3.IntegrityError:
if ignore_duplicate_error is False:
raise
return self.db_conn.runInteraction(add_blobs)
@rerun_if_locked
def _get_stream_of_blobhash(self, blob_hash):
d = self.db_conn.runQuery("select stream_hash from lbry_file_blobs where blob_hash = ?",
(blob_hash,))
d.addCallback(lambda r: r[0][0] if len(r) else None)
return d
@rerun_if_locked
def _save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash):
d = self.db_conn.runOperation("insert or ignore into lbry_file_descriptors values (?, ?)",
(sd_blob_hash, stream_hash))
d.addCallback(lambda _: log.info("Saved sd blob hash %s to stream hash %s",
str(sd_blob_hash), str(stream_hash)))
return d
@rerun_if_locked
def _get_sd_blob_hashes_for_stream(self, stream_hash):
log.debug("Looking up sd blob hashes for stream hash %s", str(stream_hash))
d = self.db_conn.runQuery(
"select sd_blob_hash from lbry_file_descriptors where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda results: [r[0] for r in results])
return d
@rerun_if_locked
def _get_stream_hash_for_sd_blob_hash(self, sd_blob_hash):
def _handle_result(result):
if not result:
raise NoSuchSDHash(sd_blob_hash)
return result[0][0]
log.debug("Looking up sd blob hashes for sd blob hash %s", str(sd_blob_hash))
d = self.db_conn.runQuery(
"select stream_hash from lbry_file_descriptors where sd_blob_hash = ?",
(sd_blob_hash,))
d.addCallback(_handle_result)
return d
# used by lbry file manager
@rerun_if_locked
def _save_lbry_file(self, stream_hash, data_payment_rate):
def do_save(db_transaction):
row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash)
db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row)
return db_transaction.lastrowid
return self.db_conn.runInteraction(do_save)
@rerun_if_locked
def _delete_lbry_file_options(self, rowid):
return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?",
(rowid,))
@rerun_if_locked
def _set_lbry_file_payment_rate(self, rowid, new_rate):
return self.db_conn.runQuery(
"update lbry_file_options set blob_data_rate = ? where rowid = ?",
(new_rate, rowid))
@rerun_if_locked
def _get_all_lbry_files(self):
d = self.db_conn.runQuery("select rowid, stream_hash, "
"blob_data_rate from lbry_file_options")
return d
@rerun_if_locked
def _change_file_status(self, rowid, new_status):
d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?",
(new_status, rowid))
d.addCallback(lambda _: new_status)
return d
@rerun_if_locked
def _get_lbry_file_status(self, rowid):
d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?",
(rowid,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d
@rerun_if_locked
def _get_count_for_stream_hash(self, stream_hash):
d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if r else 0))
return d
@rerun_if_locked
def _get_rowid_for_stream_hash(self, stream_hash):
d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?",
(stream_hash,))
d.addCallback(lambda r: (r[0][0] if len(r) else None))
return d