diff --git a/CHANGELOG.md b/CHANGELOG.md index 776939809..3362a43d8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,7 +27,7 @@ at anytime. * Check claim schema in `publish` before trying to make the claim, return better error messages * Renamed `channel_list_mine` to `channel_list` * Changed `channel_list` to include channels where the certificate info has been imported but the claim is not in the wallet - * Changed `file_list`, `file_delete`, `file_set_status`, and `file_reflect` to no longer return claim related information. + * Changed file objects returned by `file_list` and `get` to no longer contain `name`, `claim_id`, or `metadata` * Increased assumption for time it takes to announce single hash from 1 second to 5 seconds * Don't set HTTP error codes for failed api requests, conform to http://www.jsonrpc.org/specification#error_object * Return less verbose tracebacks for api requests resulting in errors @@ -39,6 +39,7 @@ at anytime. * Added `claim_renew` command * Added user configurable `auto_renew_claim_height_delta` setting, defaults to 0 (off) * Added `lbrynet-console`, a tool to run or connect to lbrynet-daemon and launch an interactive python console with the api functions built in. + * Added a table to the lbry file database to store the outpoint of the claim downloaded from ### Removed * Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect` diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index e98d8d382..946d8a392 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -756,8 +756,6 @@ class Wallet(object): defer.returnValue(claim) - - @defer.inlineCallbacks def _handle_claim_result(self, results, update_caches=True): if not results: diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 060fe9777..0f95c6015 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -195,7 +195,7 @@ class Daemon(AuthJSONRPCServer): self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 4 + self.current_db_revision = 5 self.db_revision_file = conf.settings.get_db_revision_filename() self.session = None self.uploaded_temp_files = [] @@ -539,7 +539,6 @@ class Daemon(AuthJSONRPCServer): log.info('Starting to setup up file manager') self.startup_status = STARTUP_STAGES[3] self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) - yield self.stream_info_manager.setup() self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, @@ -663,7 +662,7 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(report) @defer.inlineCallbacks - def _download_name(self, name, claim_dict, sd_hash, timeout=None, file_name=None): + def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file @@ -673,6 +672,7 @@ class Daemon(AuthJSONRPCServer): def _download_finished(download_id, name, claim_dict): report = yield self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + @defer.inlineCallbacks def _download_failed(error, download_id, name, claim_dict): report = yield self._get_stream_analytics_report(claim_dict) @@ -694,11 +694,11 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) + yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout) finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, claim_dict), lambda e: _download_failed(e, download_id, name, claim_dict)) - result = yield self._get_lbry_file_dict(lbry_file, full_status=True) except Exception as err: yield _download_failed(err, download_id, name, claim_dict) @@ -732,6 +732,9 @@ class Daemon(AuthJSONRPCServer): d = reupload.reflect_stream(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) + yield self.stream_info_manager.save_outpoint_to_file(publisher.lbry_file.rowid, + claim_out['txid'], + int(claim_out['nout'])) self.analytics_manager.send_claim_action('publish') log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], claim_out['nout']) @@ -765,7 +768,7 @@ class Daemon(AuthJSONRPCServer): downloader.cancel() d = defer.succeed(None) - reactor.callLater(self.search_timeout, _check_est, d) + reactor.callLater(conf.settings['search_timeout'], _check_est, d) d.addCallback( lambda _: download_sd_blob( self.session, sd_hash, self.session.payment_rate_manager)) @@ -891,15 +894,14 @@ class Daemon(AuthJSONRPCServer): else: written_bytes = 0 + size = message = outpoint = None + if full_status: size = yield lbry_file.get_total_bytes() file_status = yield lbry_file.status() message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status) - else: - size = None - message = None - + outpoint = yield self.stream_info_manager.get_file_outpoint(lbry_file.rowid) result = { 'completed': lbry_file.completed, @@ -917,6 +919,7 @@ class Daemon(AuthJSONRPCServer): 'total_bytes': size, 'written_bytes': written_bytes, 'message': message, + 'outpoint': outpoint } defer.returnValue(result) @@ -1312,9 +1315,10 @@ class Daemon(AuthJSONRPCServer): 'download_path': (str) download path of file, 'mime_type': (str) mime type of file, 'key': (str) key attached to file, - 'total_bytes': (int) file size in bytes, None if full_status is false - 'written_bytes': (int) written size in bytes - 'message': (str), None if full_status is false + 'total_bytes': (int) file size in bytes, None if full_status is false, + 'written_bytes': (int) written size in bytes, + 'message': (str), status message, None if full_status is false + 'outpoint': (str), None if full_status is false or if claim is not found }, ] """ @@ -1500,25 +1504,22 @@ class Daemon(AuthJSONRPCServer): Returns: (dict) Dictionary containing information about the stream { - 'completed': (bool) true if download is completed, - 'file_name': (str) name of file, - 'download_directory': (str) download directory, - 'points_paid': (float) credit paid to download file, - 'stopped': (bool) true if download is stopped, - 'stream_hash': (str) stream hash of file, - 'stream_name': (str) stream name, - 'suggested_file_name': (str) suggested file name, - 'sd_hash': (str) sd hash of file, - 'name': (str) name claim attached to file - 'outpoint': (str) claim outpoint attached to file - 'claim_id': (str) claim ID attached to file, - 'download_path': (str) download path of file, - 'mime_type': (str) mime type of file, - 'key': (str) key attached to file, - 'total_bytes': (int) file size in bytes, None if full_status is false - 'written_bytes': (int) written size in bytes - 'message': (str), None if full_status is false - 'metadata': (dict) Metadata dictionary + 'completed': (bool) true if download is completed, + 'file_name': (str) name of file, + 'download_directory': (str) download directory, + 'points_paid': (float) credit paid to download file, + 'stopped': (bool) true if download is stopped, + 'stream_hash': (str) stream hash of file, + 'stream_name': (str) stream name , + 'suggested_file_name': (str) suggested file name, + 'sd_hash': (str) sd hash of file, + 'download_path': (str) download path of file, + 'mime_type': (str) mime type of file, + 'key': (str) key attached to file, + 'total_bytes': (int) file size in bytes, None if full_status is false, + 'written_bytes': (int) written size in bytes, + 'message': (str) status message, + 'outpoint': (str) claim outpoint } """ @@ -1536,8 +1537,7 @@ class Daemon(AuthJSONRPCServer): "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))) else: resolved = resolved['claim'] - - name = resolved['name'] + txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] claim_dict = ClaimDict.load_dict(resolved['value']) sd_hash = claim_dict.source_hash @@ -1556,8 +1556,8 @@ class Daemon(AuthJSONRPCServer): log.info('Already have a file for %s', name) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) else: - result = yield self._download_name(name, claim_dict, sd_hash, timeout=timeout, - file_name=file_name) + result = yield self._download_name(name, claim_dict, sd_hash, txid, nout, + timeout=timeout, file_name=file_name) response = yield self._render_response(result) defer.returnValue(response) @@ -1669,7 +1669,7 @@ class Daemon(AuthJSONRPCServer): Returns: (float) Estimated cost in lbry credits, returns None if uri is not - resolveable + resolvable """ cost = yield self.get_est_cost(uri, size) defer.returnValue(cost) diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index bd3cdd618..7a259de02 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -224,7 +224,8 @@ class AuthJSONRPCServer(AuthorizedBase): else: # last resort, just cast it as a string error = JSONRPCError(str(failure)) - log.warning("error processing api request: %s", error.message) + log.warning("error processing api request: %s\ntraceback: %s", error.message, + "\n".join(error.traceback)) response_content = jsonrpc_dumps_pretty(error, id=id_) self._set_headers(request, response_content) request.setResponseCode(200) diff --git a/lbrynet/db_migrator/dbmigrator.py b/lbrynet/db_migrator/dbmigrator.py index 41610234b..2e8677345 100644 --- a/lbrynet/db_migrator/dbmigrator.py +++ b/lbrynet/db_migrator/dbmigrator.py @@ -13,6 +13,9 @@ def migrate_db(db_dir, start, end): elif current == 3: from lbrynet.db_migrator.migrate3to4 import do_migration do_migration(db_dir) + elif current == 4: + from lbrynet.db_migrator.migrate4to5 import do_migration + do_migration(db_dir) else: raise Exception( "DB migration of version {} to {} is not available".format(current, current+1)) diff --git a/lbrynet/db_migrator/migrate3to4.py b/lbrynet/db_migrator/migrate3to4.py index 6816c6c86..3d45162b7 100644 --- a/lbrynet/db_migrator/migrate3to4.py +++ b/lbrynet/db_migrator/migrate3to4.py @@ -81,5 +81,6 @@ def migrate_blobs_db(db_dir): log.error("Some how not all blobs were marked as announceable") blobs_db_file.commit() - + blobs_db_file.close() + lbryfile_info_file.close() diff --git a/lbrynet/db_migrator/migrate4to5.py b/lbrynet/db_migrator/migrate4to5.py new file mode 100644 index 000000000..38a8e46dd --- /dev/null +++ b/lbrynet/db_migrator/migrate4to5.py @@ -0,0 +1,63 @@ +import sqlite3 +import os +import logging + +log = logging.getLogger(__name__) + + +def do_migration(db_dir): + log.info("Doing the migration") + add_lbry_file_metadata(db_dir) + log.info("Migration succeeded") + + +def add_lbry_file_metadata(db_dir): + """ + We migrate the blobs.db used in BlobManager to have a "should_announce" column, + and set this to True for blobs that are sd_hash's or head blobs (first blob in stream) + """ + + name_metadata = os.path.join(db_dir, "blockchainname.db") + lbryfile_info_db = os.path.join(db_dir, 'lbryfile_info.db') + + if not os.path.isfile(name_metadata) and not os.path.isfile(lbryfile_info_db): + return + + if not os.path.isfile(lbryfile_info_db): + log.error( + "blockchainname.db was not found but lbryfile_info.db was found, skipping migration") + return + + name_metadata_db = sqlite3.connect(name_metadata) + lbryfile_db = sqlite3.connect(lbryfile_info_db) + name_metadata_cursor = name_metadata_db.cursor() + lbryfile_cursor = lbryfile_db.cursor() + + lbryfile_db.executescript( + "create table if not exists lbry_file_metadata (" + + " lbry_file integer primary key, " + + " txid text, " + + " n integer, " + + " foreign key(lbry_file) references lbry_files(rowid)" + ")") + + _files = lbryfile_cursor.execute("select rowid, stream_hash from lbry_files").fetchall() + + lbry_files = {x[1]: x[0] for x in _files} + for (sd_hash, stream_hash) in lbryfile_cursor.execute("select * " + "from lbry_file_descriptors").fetchall(): + lbry_file_id = lbry_files[stream_hash] + outpoint = name_metadata_cursor.execute("select txid, n from name_metadata " + "where sd_hash=?", + (sd_hash,)).fetchall() + if outpoint: + txid, nout = outpoint[0] + lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)", + (lbry_file_id, txid, nout)) + else: + lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)", + (lbry_file_id, None, None)) + lbryfile_db.commit() + + lbryfile_db.close() + name_metadata_db.close() diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index fd96aa8f0..430cbe12d 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -5,7 +5,6 @@ Keep track of which LBRY Files are downloading and store their LBRY File specifi import logging import os -from twisted.enterprise import adbapi from twisted.internet import defer, task, reactor from twisted.python.failure import Failure @@ -16,7 +15,6 @@ from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDow from lbrynet.lbry_file.StreamDescriptor import EncryptedFileStreamType from lbrynet.cryptstream.client.CryptStreamDownloader import AlreadyStoppedError from lbrynet.cryptstream.client.CryptStreamDownloader import CurrentlyStoppingError -from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.core.utils import safe_start_looping_call, safe_stop_looping_call from lbrynet import conf @@ -41,7 +39,6 @@ class EncryptedFileManager(object): # TODO: why is sd_identifier part of the file manager? self.sd_identifier = sd_identifier self.lbry_files = [] - self.sql_db = None if download_directory: self.download_directory = download_directory else: @@ -51,7 +48,7 @@ class EncryptedFileManager(object): @defer.inlineCallbacks def setup(self): - yield self._open_db() + yield self.stream_info_manager.setup() yield self._add_to_sd_identifier() # don't block on starting the lbry files self._start_lbry_files() @@ -252,84 +249,32 @@ class EncryptedFileManager(object): def stop(self): safe_stop_looping_call(self.lbry_file_reflector) yield defer.DeferredList(list(self._stop_lbry_files())) - if self.sql_db: - yield self.sql_db.close() - self.sql_db = None log.info("Stopped encrypted file manager") defer.returnValue(True) def get_count_for_stream_hash(self, stream_hash): return self._get_count_for_stream_hash(stream_hash) - ######### database calls ######### - - def _open_db(self): - # check_same_thread=False is solely to quiet a spurious error that appears to be due - # to a bug in twisted, where the connection is closed by a different thread than the - # one that opened it. The individual connections in the pool are not used in multiple - # threads. - self.sql_db = adbapi.ConnectionPool( - "sqlite3", - os.path.join(self.session.db_dir, "lbryfile_info.db"), - check_same_thread=False - ) - return self.sql_db.runQuery( - "create table if not exists lbry_file_options (" + - " blob_data_rate real, " + - " status text," + - " stream_hash text," - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")" - ) - - @rerun_if_locked - def _save_lbry_file(self, stream_hash, data_payment_rate): - def do_save(db_transaction): - row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash) - db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row) - return db_transaction.lastrowid - return self.sql_db.runInteraction(do_save) - - @rerun_if_locked - def _delete_lbry_file_options(self, rowid): - return self.sql_db.runQuery("delete from lbry_file_options where rowid = ?", - (rowid,)) - - @rerun_if_locked - def _set_lbry_file_payment_rate(self, rowid, new_rate): - return self.sql_db.runQuery( - "update lbry_file_options set blob_data_rate = ? where rowid = ?", - (new_rate, rowid)) - - @rerun_if_locked - def _get_all_lbry_files(self): - d = self.sql_db.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") - return d - - @rerun_if_locked - def _change_file_status(self, rowid, new_status): - d = self.sql_db.runQuery("update lbry_file_options set status = ? where rowid = ?", - (new_status, rowid)) - d.addCallback(lambda _: new_status) - return d - - @rerun_if_locked - def _get_lbry_file_status(self, rowid): - d = self.sql_db.runQuery("select status from lbry_file_options where rowid = ?", - (rowid,)) - d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d - - @rerun_if_locked def _get_count_for_stream_hash(self, stream_hash): - d = self.sql_db.runQuery("select count(*) from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda r: (r[0][0] if r else 0)) - return d + return self.stream_info_manager._get_count_for_stream_hash(stream_hash) + + def _delete_lbry_file_options(self, rowid): + return self.stream_info_manager._delete_lbry_file_options(rowid) + + def _save_lbry_file(self, stream_hash, data_payment_rate): + return self.stream_info_manager._save_lbry_file(stream_hash, data_payment_rate) + + def _get_all_lbry_files(self): + return self.stream_info_manager._get_all_lbry_files() - @rerun_if_locked def _get_rowid_for_stream_hash(self, stream_hash): - d = self.sql_db.runQuery("select rowid from lbry_file_options where stream_hash = ?", - (stream_hash,)) - d.addCallback(lambda r: (r[0][0] if len(r) else None)) - return d + return self.stream_info_manager._get_rowid_for_stream_hash(stream_hash) + + def _change_file_status(self, rowid, status): + return self.stream_info_manager._change_file_status(rowid, status) + + def _set_lbry_file_payment_rate(self, rowid, new_rate): + return self.stream_info_manager._set_lbry_file_payment_rate(rowid, new_rate) + + def _get_lbry_file_status(self, rowid): + return self.stream_info_manager._get_lbry_file_status(rowid) diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py index 16f01cd09..ce1340c82 100644 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ b/lbrynet/lbry_file/EncryptedFileMetadataManager.py @@ -6,7 +6,7 @@ from twisted.python.failure import Failure from twisted.enterprise import adbapi from lbrynet.core.Error import DuplicateStreamHashError, NoSuchStreamHash, NoSuchSDHash from lbrynet.core.sqlite_helpers import rerun_if_locked - +from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader log = logging.getLogger(__name__) @@ -14,11 +14,12 @@ log = logging.getLogger(__name__) class DBEncryptedFileMetadataManager(object): """Store and provide access to LBRY file metadata using sqlite""" - def __init__(self, db_dir): + def __init__(self, db_dir, file_name=None): self.db_dir = db_dir - self.stream_info_db = None - self.stream_blob_db = None - self.stream_desc_db = None + self._db_file_name = file_name or "lbryfile_info.db" + self.db_conn = adbapi.ConnectionPool("sqlite3", os.path.join(self.db_dir, + self._db_file_name), + check_same_thread=False) def setup(self): return self._open_db() @@ -96,52 +97,81 @@ class DBEncryptedFileMetadataManager(object): def get_stream_hash_for_sd_hash(self, sd_hash): return self._get_stream_hash_for_sd_blob_hash(sd_hash) + @staticmethod + def _create_tables(transaction): + transaction.execute("create table if not exists lbry_files (" + + " stream_hash text primary key, " + + " key text, " + + " stream_name text, " + + " suggested_file_name text" + + ")") + transaction.execute("create table if not exists lbry_file_blobs (" + + " blob_hash text, " + + " stream_hash text, " + + " position integer, " + + " iv text, " + + " length integer, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + transaction.execute("create table if not exists lbry_file_descriptors (" + + " sd_blob_hash TEXT PRIMARY KEY, " + + " stream_hash TEXT, " + + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + transaction.execute("create table if not exists lbry_file_options (" + + " blob_data_rate real, " + + " status text," + + " stream_hash text," + " foreign key(stream_hash) references lbry_files(stream_hash)" + + ")") + transaction.execute("create table if not exists lbry_file_metadata (" + + " lbry_file integer primary key, " + + " txid text, " + + " n integer, " + + " foreign key(lbry_file) references lbry_files(rowid)" + ")") + def _open_db(self): # check_same_thread=False is solely to quiet a spurious error that appears to be due # to a bug in twisted, where the connection is closed by a different thread than the # one that opened it. The individual connections in the pool are not used in multiple # threads. - self.db_conn = adbapi.ConnectionPool( - "sqlite3", - (os.path.join(self.db_dir, "lbryfile_info.db")), - check_same_thread=False) + return self.db_conn.runInteraction(self._create_tables) - def create_tables(transaction): - transaction.execute("create table if not exists lbry_files (" + - " stream_hash text primary key, " + - " key text, " + - " stream_name text, " + - " suggested_file_name text" + - ")") - transaction.execute("create table if not exists lbry_file_blobs (" + - " blob_hash text, " + - " stream_hash text, " + - " position integer, " + - " iv text, " + - " length integer, " + - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") - transaction.execute("create table if not exists lbry_file_descriptors (" + - " sd_blob_hash TEXT PRIMARY KEY, " + - " stream_hash TEXT, " + - " foreign key(stream_hash) references lbry_files(stream_hash)" + - ")") + @rerun_if_locked + @defer.inlineCallbacks + def get_file_outpoint(self, rowid): + result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata " + "where lbry_file=?", (rowid, )) + response = None + if result: + txid, nout = result[0] + if txid is not None and nout is not None: + response = "%s:%i" % (txid, nout) + defer.returnValue(response) - return self.db_conn.runInteraction(create_tables) + @rerun_if_locked + @defer.inlineCallbacks + def save_outpoint_to_file(self, rowid, txid, nout): + existing_outpoint = yield self.get_file_outpoint(rowid) + if not existing_outpoint: + yield self.db_conn.runOperation("insert into lbry_file_metadata values " + "(?, ?, ?)", (rowid, txid, nout)) @rerun_if_locked def _delete_stream(self, stream_hash): d = self.db_conn.runQuery( - "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) + "select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) d.addCallback( - lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash))) + lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash))) - def do_delete(transaction, s_h): + def do_delete(transaction, row_id, s_h): transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,)) + transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,)) - d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) + d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h)) return d @rerun_if_locked @@ -269,97 +299,56 @@ class DBEncryptedFileMetadataManager(object): d.addCallback(_handle_result) return d + # used by lbry file manager + @rerun_if_locked + def _save_lbry_file(self, stream_hash, data_payment_rate): + def do_save(db_transaction): + row = (data_payment_rate, ManagedEncryptedFileDownloader.STATUS_STOPPED, stream_hash) + db_transaction.execute("insert into lbry_file_options values (?, ?, ?)", row) + return db_transaction.lastrowid + return self.db_conn.runInteraction(do_save) -class TempEncryptedFileMetadataManager(object): - def __init__(self): - self.streams = {} - self.stream_blobs = {} - self.sd_files = {} + @rerun_if_locked + def _delete_lbry_file_options(self, rowid): + return self.db_conn.runQuery("delete from lbry_file_options where rowid = ?", + (rowid,)) - def setup(self): - return defer.succeed(True) + @rerun_if_locked + def _set_lbry_file_payment_rate(self, rowid, new_rate): + return self.db_conn.runQuery( + "update lbry_file_options set blob_data_rate = ? where rowid = ?", + (new_rate, rowid)) - def stop(self): - return defer.succeed(True) - - def get_all_streams(self): - return defer.succeed(self.streams.keys()) - - def save_stream(self, stream_hash, file_name, key, suggested_file_name, blobs): - self.streams[stream_hash] = {'suggested_file_name': suggested_file_name, - 'stream_name': file_name, - 'key': key} - d = self.add_blobs_to_stream(stream_hash, blobs) - d.addCallback(lambda _: stream_hash) + @rerun_if_locked + def _get_all_lbry_files(self): + d = self.db_conn.runQuery("select rowid, stream_hash, " + "blob_data_rate from lbry_file_options") return d - def get_stream_info(self, stream_hash): - if stream_hash in self.streams: - stream_info = self.streams[stream_hash] - return defer.succeed([stream_info['key'], stream_info['stream_name'], - stream_info['suggested_file_name']]) - return defer.succeed(None) + @rerun_if_locked + def _change_file_status(self, rowid, new_status): + d = self.db_conn.runQuery("update lbry_file_options set status = ? where rowid = ?", + (new_status, rowid)) + d.addCallback(lambda _: new_status) + return d - def delete_stream(self, stream_hash): - if stream_hash in self.streams: - del self.streams[stream_hash] - for (s_h, b_h) in self.stream_blobs.keys(): - if s_h == stream_hash: - del self.stream_blobs[(s_h, b_h)] - return defer.succeed(True) + @rerun_if_locked + def _get_lbry_file_status(self, rowid): + d = self.db_conn.runQuery("select status from lbry_file_options where rowid = ?", + (rowid,)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) + return d - def add_blobs_to_stream(self, stream_hash, blobs): - assert stream_hash in self.streams, "Can't add blobs to a stream that isn't known" - for blob in blobs: - info = {} - info['blob_num'] = blob.blob_num - info['length'] = blob.length - info['iv'] = blob.iv - self.stream_blobs[(stream_hash, blob.blob_hash)] = info - return defer.succeed(True) + @rerun_if_locked + def _get_count_for_stream_hash(self, stream_hash): + d = self.db_conn.runQuery("select count(*) from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: (r[0][0] if r else 0)) + return d - def get_blobs_for_stream(self, stream_hash, start_blob=None, - end_blob=None, count=None, reverse=False): - - if start_blob is not None: - start_num = self._get_blob_num_by_hash(stream_hash, start_blob) - else: - start_num = None - if end_blob is not None: - end_num = self._get_blob_num_by_hash(stream_hash, end_blob) - else: - end_num = None - return self._get_further_blob_infos(stream_hash, start_num, end_num, count, reverse) - - def get_stream_of_blob(self, blob_hash): - for (s_h, b_h) in self.stream_blobs.iterkeys(): - if b_h == blob_hash: - return defer.succeed(s_h) - return defer.succeed(None) - - def _get_further_blob_infos(self, stream_hash, start_num, end_num, count=None, reverse=False): - blob_infos = [] - for (s_h, b_h), info in self.stream_blobs.iteritems(): - if stream_hash == s_h: - position = info['blob_num'] - length = info['length'] - iv = info['iv'] - if (start_num is None) or (position > start_num): - if (end_num is None) or (position < end_num): - blob_infos.append((b_h, position, iv, length)) - blob_infos.sort(key=lambda i: i[1], reverse=reverse) - if count is not None: - blob_infos = blob_infos[:count] - return defer.succeed(blob_infos) - - def _get_blob_num_by_hash(self, stream_hash, blob_hash): - if (stream_hash, blob_hash) in self.stream_blobs: - return defer.succeed(self.stream_blobs[(stream_hash, blob_hash)]['blob_num']) - - def save_sd_blob_hash_to_stream(self, stream_hash, sd_blob_hash): - self.sd_files[sd_blob_hash] = stream_hash - return defer.succeed(True) - - def get_sd_blob_hashes_for_stream(self, stream_hash): - return defer.succeed( - [sd_hash for sd_hash, s_h in self.sd_files.iteritems() if stream_hash == s_h]) + @rerun_if_locked + def _get_rowid_for_stream_hash(self, stream_hash): + d = self.db_conn.runQuery("select rowid from lbry_file_options where stream_hash = ?", + (stream_hash,)) + d.addCallback(lambda r: (r[0][0] if len(r) else None)) + return d diff --git a/lbrynet/tests/functional/test_misc.py b/lbrynet/tests/functional/test_misc.py index 52e53be74..8f638bd7d 100644 --- a/lbrynet/tests/functional/test_misc.py +++ b/lbrynet/tests/functional/test_misc.py @@ -10,7 +10,6 @@ import unittest from Crypto import Random from Crypto.Hash import MD5 from lbrynet import conf -from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.Session import Session @@ -120,7 +119,7 @@ class LbryUploader(object): peer_port=5553, use_upnp=False, rate_limiter=rate_limiter, wallet=wallet, blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") - stream_info_manager = TempEncryptedFileMetadataManager() + stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, stream_info_manager, self.sd_identifier) if self.ul_rate_limit is not None: @@ -227,7 +226,7 @@ def start_lbry_reuploader(sd_hash, kill_event, dead_event, is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") - stream_info_manager = TempEncryptedFileMetadataManager() + stream_info_manager = DBEncryptedFileMetadataManager(db_dir) lbry_file_manager = EncryptedFileManager(session, stream_info_manager, sd_identifier) @@ -520,7 +519,7 @@ class TestTransfer(TestCase): blob_tracker_class=DummyBlobAvailabilityTracker, dht_node_class=Node, is_generous=self.is_generous, external_ip="127.0.0.1") - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) @@ -820,7 +819,7 @@ class TestTransfer(TestCase): is_generous=conf.ADJUSTABLE_SETTINGS['is_generous_host'][1], external_ip="127.0.0.1") - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) diff --git a/lbrynet/tests/functional/test_streamify.py b/lbrynet/tests/functional/test_streamify.py index afd3b029c..9fe4a29c1 100644 --- a/lbrynet/tests/functional/test_streamify.py +++ b/lbrynet/tests/functional/test_streamify.py @@ -7,7 +7,6 @@ from twisted.trial.unittest import TestCase from twisted.internet import defer, threads from lbrynet import conf -from lbrynet.lbry_file.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager from lbrynet.core.Session import Session @@ -80,7 +79,7 @@ class TestStreamify(TestCase): is_generous=self.is_generous, external_ip="127.0.0.1" ) - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) diff --git a/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py b/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py index 659e3c09a..e83363d6e 100644 --- a/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py +++ b/lbrynet/tests/unit/lbryfile/test_EncryptedFileMetadataManager.py @@ -7,6 +7,7 @@ from lbrynet.cryptstream.CryptBlob import CryptBlobInfo from lbrynet.core.Error import NoSuchStreamHash from lbrynet.tests.util import random_lbry_hash + class DBEncryptedFileMetadataManagerTest(unittest.TestCase): def setUp(self): self.db_dir = tempfile.mkdtemp() diff --git a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py index 87cf676cb..ebdcf731c 100644 --- a/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py +++ b/lbrynet/tests/unit/lbryfilemanager/test_EncryptedFileManager.py @@ -3,8 +3,10 @@ from twisted.trial import unittest from lbrynet import conf from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileManager import EncryptedFileManager +from lbrynet.lbry_file.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager from lbrynet.tests.util import random_lbry_hash + class TestEncryptedFileManager(unittest.TestCase): def setUp(self): @@ -19,12 +21,12 @@ class TestEncryptedFileManager(unittest.TestCase): session = MocSession() session.db_dir = '.' - stream_info_manager = None + stream_info_manager = DBEncryptedFileMetadataManager('.') sd_identifier = None download_directory = '.' manager = EncryptedFileManager( session, stream_info_manager, sd_identifier, download_directory) - yield manager._open_db() + yield manager.stream_info_manager.setup() out = yield manager._get_all_lbry_files() self.assertEqual(len(out), 0)