diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 060fe9777..6b2dd1a1d 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -539,7 +539,6 @@ class Daemon(AuthJSONRPCServer): log.info('Starting to setup up file manager') self.startup_status = STARTUP_STAGES[3] self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) - yield self.stream_info_manager.setup() self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index fd96aa8f0..430cbe12d 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -5,7 +5,6 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi import logging import os -from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure @@ -16,7 +15,6 @@ from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDow from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError -from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet import conf @@ -41,7 +39,6 @@ class EncryptedFileManager(object): # TODO: why is sd_identifier part of the file manager? self.sd_identifier = sd_identifier self.lbry_files = [] - self.sql_db = None if download_directory: self.download_directory = download_directory else: @@ -51,7 +48,7 @@ class EncryptedFileManager(object): @defer.inlineCallbacks def setup(self): - yield self._open_db() + yield self.stream_info_manager.setup() yield self._add_to_sd_identifier() # don't block on starting the lbry files self._start_lbry_files() @@ -252,84 +249,32 @@ class EncryptedFileManager(object): def stop(self): safe_stop_looping_call(self.lbry_file_reflector) yield defer.DeferredList(list(self._stop_lbry_files())) - if self.sql_db: - yield self.sql_db.close() - self.sql_db = None log.info("Stopped encrypted file manager") defer.returnValue(True) def get_count_for_stream_hash(self, stream_hash): return self._get_count_for_stream_hash(stream_hash) - ######### database calls ######### - - def _open_db(self): - # check_same_thread=False is solely to quiet a spurious error that appears to be due - # to a bug in twisted, where the connection is closed by a different thread than the - # one that opened it. The individual connections in the pool are not used in multiple - # threads. - self.sql_db = adbapi.ConnectionPool( - "sqlite3", - os.path.join(self.session.db_dir, "lbryfile_info.db"), - check_same_thread=False - ) - return self.sql_db.runQuery( - "create table if not exists lbry_file_options (" + - " blob_data_rate real, " + - " status text," + - " stream_hash text," - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")" - ) - - @rerun_if_locked - def _save_lbry_file(self, stream_hash, data_payment_rate): - def do_save(db_transaction): - row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash) - db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row) - return db_transaction.lastrowid - return self.sql_db.runInteraction(do_save) - - @rerun_if_locked - def _delete_lbry_file_options(self, rowid): - return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?", - (rowid,)) - - @rerun_if_locked - def _set_lbry_file_payment_rate(self, rowid, new_rate): - return self.sql_db.runQuery( - "update lbry_file_options set blob_data_rate = ? where rowid = ?", - (new_rate, rowid)) - - @rerun_if_locked - def _get_all_lbry_files(self): - d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") - return d - - @rerun_if_locked - def _change_file_status(self, rowid, new_status): - d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", - (new_status, rowid)) - d.addCallback(lambda _: new_status) - return d - - @rerun_if_locked - def _get_lbry_file_status(self, rowid): - d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", - (rowid,)) - d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d - - @rerun_if_locked def _get_count_for_stream_hash(self, stream_hash): - d = self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda r: (r[0][0] if r else 0)) - return d + return self.stream_info_manager._get_count_for_stream_hash(stream_hash) + + def _delete_lbry_file_options(self, rowid): + return self.stream_info_manager._delete_lbry_file_options(rowid) + + def _save_lbry_file(self, stream_hash, data_payment_rate): + return self.stream_info_manager._save_lbry_file(stream_hash, data_payment_rate) + + def _get_all_lbry_files(self): + return self.stream_info_manager._get_all_lbry_files() - @rerun_if_locked def _get_rowid_for_stream_hash(self, stream_hash): - d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d + return self.stream_info_manager._get_rowid_for_stream_hash(stream_hash) + + def _change_file_status(self, rowid, status): + return self.stream_info_manager._change_file_status(rowid, status) + + def _set_lbry_file_payment_rate(self, rowid, new_rate): + return self.stream_info_manager._set_lbry_file_payment_rate(rowid, new_rate) + + def _get_lbry_file_status(self, rowid): + return self.stream_info_manager._get_lbry_file_status(rowid) diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py index 16f01cd09..c4abbd9a0 100644 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ b/lbrynet/lbry_file/EncryptedFileMetadataManager.py @@ -6,7 +6,7 @@ from twisted.python.failure import Failure from twisted.enterprise import adbapi from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash from lbrynet.core.sqlite_helpers import rerun_if_locked - +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader log = logging.getLogger(__name__) @@ -14,11 +14,12 @@ log = logging.getLogger(__name__) class DBEncryptedFileMetadataManager(object): """Store and provide access to LBRY file metadata using sqlite""" - def __init__(self, db_dir): + def __init__(self, db_dir, file_name=None): self.db_dir = db_dir - self.stream_info_db = None - self.stream_blob_db = None - self.stream_desc_db = None + self._db_file_name = file_name or "lbryfile_info.db" + self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir, + self._db_file_name), + check_same_thread=False) def setup(self): return self._open_db() @@ -96,38 +97,40 @@ class DBEncryptedFileMetadataManager(object): def get_stream_hash_for_sd_hash(self, sd_hash): return self._get_stream_hash_for_sd_blob_hash(sd_hash) + @staticmethod + def _create_tables(transaction): + transaction.execute("create table if not exists lbry_files (" + + " stream_hash text primary key, " + + " key text, " + + " stream_name text, " + + " suggested_file_name text" + + ")") + transaction.execute("create table if not exists lbry_file_blobs (" + + " blob_hash text, " + + " stream_hash text, " + + " position integer, " + + " iv text, " + + " length integer, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + transaction.execute("create table if not exists lbry_file_descriptors (" + + " sd_blob_hash TEXT PRIMARY KEY, " + + " stream_hash TEXT, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + transaction.execute("create table if not exists lbry_file_options (" + + " blob_data_rate real, " + + " status text," + + " stream_hash text," + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + def _open_db(self): # check_same_thread=False is solely to quiet a spurious error that appears to be due # to a bug in twisted, where the connection is closed by a different thread than the # one that opened it. The individual connections in the pool are not used in multiple # threads. - self.db_conn = adbapi.ConnectionPool( - "sqlite3", - (os.path.join(self.db_dir, "lbryfile_info.db")), - check_same_thread=False) - - def create_tables(transaction): - transaction.execute("create table if not exists lbry_files (" + - " stream_hash text primary key, " + - " key text, " + - " stream_name text, " + - " suggested_file_name text" + - ")") - transaction.execute("create table if not exists lbry_file_blobs (" + - " blob_hash text, " + - " stream_hash text, " + - " position integer, " + - " iv text, " + - " length integer, " + - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") - transaction.execute("create table if not exists lbry_file_descriptors (" + - " sd_blob_hash TEXT PRIMARY KEY, " + - " stream_hash TEXT, " + - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") - - return self.db_conn.runInteraction(create_tables) + return self.db_conn.runInteraction(self._create_tables) @rerun_if_locked def _delete_stream(self, stream_hash): @@ -269,97 +272,55 @@ class DBEncryptedFileMetadataManager(object): d.addCallback(_handle_result) return d + # used by lbry file manager + @rerun_if_locked + def _save_lbry_file(self, stream_hash, data_payment_rate): + def do_save(db_transaction): + row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash) + db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row) + return db_transaction.lastrowid + return self.db_conn.runInteraction(do_save) -class TempEncryptedFileMetadataManager(object): - def __init__(self): - self.streams = {} - self.stream_blobs = {} - self.sd_files = {} + @rerun_if_locked + def _delete_lbry_file_options(self, rowid): + return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?", + (rowid,)) - def setup(self): - return defer.succeed(True) + @rerun_if_locked + def _set_lbry_file_payment_rate(self, rowid, new_rate): + return self.db_conn.runQuery( + "update lbry_file_options set blob_data_rate = ? where rowid = ?", + (new_rate, rowid)) - def stop(self): - return defer.succeed(True) - - def get_all_streams(self): - return defer.succeed(self.streams.keys()) - - def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs): - self.streams[stream_hash] = {'suggested_file_name': suggested_file_name, - 'stream_name': file_name, - 'key': key} - d = self.add_blobs_to_stream(stream_hash, blobs) - d.addCallback(lambda _: stream_hash) + @rerun_if_locked + def _get_all_lbry_files(self): + d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") return d - def get_stream_info(self, stream_hash): - if stream_hash in self.streams: - stream_info = self.streams[stream_hash] - return defer.succeed([stream_info['key'], stream_info['stream_name'], - stream_info['suggested_file_name']]) - return defer.succeed(None) + @rerun_if_locked + def _change_file_status(self, rowid, new_status): + d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?", + (new_status, rowid)) + d.addCallback(lambda _: new_status) + return d - def delete_stream(self, stream_hash): - if stream_hash in self.streams: - del self.streams[stream_hash] - for (s_h, b_h) in self.stream_blobs.keys(): - if s_h == stream_hash: - del self.stream_blobs[(s_h, b_h)] - return defer.succeed(True) + @rerun_if_locked + def _get_lbry_file_status(self, rowid): + d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?", + (rowid,)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) + return d - def add_blobs_to_stream(self, stream_hash, blobs): - assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known" - for blob in blobs: - info = {} - info['blob_num'] = blob.blob_num - info['length'] = blob.length - info['iv'] = blob.iv - self.stream_blobs[(stream_hash, blob.blob_hash)] = info - return defer.succeed(True) + @rerun_if_locked + def _get_count_for_stream_hash(self, stream_hash): + d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: (r[0][0] if r else 0)) + return d - def get_blobs_for_stream(self, stream_hash, start_blob=None, - end_blob=None, count=None, reverse=False): - - if start_blob is not None: - start_num = self._get_blob_num_by_hash(stream_hash, start_blob) - else: - start_num = None - if end_blob is not None: - end_num = self._get_blob_num_by_hash(stream_hash, end_blob) - else: - end_num = None - return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse) - - def get_stream_of_blob(self, blob_hash): - for (s_h, b_h) in self.stream_blobs.iterkeys(): - if b_h == blob_hash: - return defer.succeed(s_h) - return defer.succeed(None) - - def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - blob_infos = [] - for (s_h, b_h), info in self.stream_blobs.iteritems(): - if stream_hash == s_h: - position = info['blob_num'] - length = info['length'] - iv = info['iv'] - if (start_num is None) or (position > start_num): - if (end_num is None) or (position < end_num): - blob_infos.append((b_h, position, iv, length)) - blob_infos.sort(key=lambda i: i[1], reverse=reverse) - if count is not None: - blob_infos = blob_infos[:count] - return defer.succeed(blob_infos) - - def _get_blob_num_by_hash(self, stream_hash, blob_hash): - if (stream_hash, blob_hash) in self.stream_blobs: - return defer.succeed(self.stream_blobs[(stream_hash, blob_hash)]['blob_num']) - - def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - self.sd_files[sd_blob_hash] = stream_hash - return defer.succeed(True) - - def get_sd_blob_hashes_for_stream(self, stream_hash): - return defer.succeed( - [sd_hash for sd_hash, s_h in self.sd_files.iteritems() if stream_hash == s_h]) + @rerun_if_locked + def _get_rowid_for_stream_hash(self, stream_hash): + d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) + return d diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index 52e53be74..8f638bd7d 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -10,7 +10,6 @@ import unittest from Crypto import Random from Crypto.Hash import MD5 from lbrynet import conf -from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.Session import Session @@ -120,7 +119,7 @@ class LbryUploader(object): peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") - stream_info_manager = TempEncryptedFileMetadataManager() + stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, stream_info_manager, self.sd_identifier) if self.ul_rate_limit is not None: @@ -227,7 +226,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") - stream_info_manager = TempEncryptedFileMetadataManager() + stream_info_manager = DBEncryptedFileMetadataManager(db_dir) lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier) @@ -520,7 +519,7 @@ class TestTransfer(TestCase): blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) @@ -820,7 +819,7 @@ class TestTransfer(TestCase): is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index afd3b029c..9fe4a29c1 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -7,7 +7,6 @@ from twisted.trial.unittest import TestCase from twisted.internet import defer, threads from lbrynet import conf -from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.Session import Session @@ -80,7 +79,7 @@ class TestStreamify(TestCase): is_generous=self.is_generous, external_ip="127.0.0.1" ) - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) diff --git a/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py b/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py index 659e3c09a..e83363d6e 100644 --- a/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py +++ b/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py @@ -7,6 +7,7 @@ from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.core.Error import NoSuchStreamHash from lbrynet.tests.util import random_lbry_hash + class DBEncryptedFileMetadataManagerTest(unittest.TestCase): def setUp(self): self.db_dir = tempfile.mkdtemp() diff --git a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py index 87cf676cb..ebdcf731c 100644 --- a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py +++ b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py @@ -3,8 +3,10 @@ from twisted.trial import unittest from lbrynet import conf from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager +from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.tests.util import random_lbry_hash + class TestEncryptedFileManager(unittest.TestCase): def setUp(self): @@ -19,12 +21,12 @@ class TestEncryptedFileManager(unittest.TestCase): session = MocSession() session.db_dir = '.' - stream_info_manager = None + stream_info_manager = DBEncryptedFileMetadataManager('.') sd_identifier = None download_directory = '.' manager = EncryptedFileManager( session, stream_info_manager, sd_identifier, download_directory) - yield manager._open_db() + yield manager.stream_info_manager.setup() out = yield manager._get_all_lbry_files() self.assertEqual(len(out), 0)