From ac7ffdbf3ac7aae0295d42ef26d2d38a16c92259 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 6 Dec 2017 18:27:42 -0500 Subject: [PATCH] add lbry_file_metadata table to save outpoint data of downloads --- lbrynet/daemon/Daemon.py | 22 +++---- lbrynet/db_migrator/dbmigrator.py | 3 + lbrynet/db_migrator/migrate3to4.py | 3 +- lbrynet/db_migrator/migrate4to5.py | 63 +++++++++++++++++++ .../lbry_file/EncryptedFileMetadataManager.py | 38 +++++++++-- 5 files changed, 112 insertions(+), 17 deletions(-) create mode 100644 lbrynet/db_migrator/migrate4to5.py diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index a80745559..b3f33c12b 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -195,7 +195,7 @@ class Daemon(AuthJSONRPCServer): self.connected_to_internet = True self.connection_status_code = None self.platform = None - self.current_db_revision = 4 + self.current_db_revision = 5 self.db_revision_file = conf.settings.get_db_revision_filename() self.session = None self.uploaded_temp_files = [] @@ -662,7 +662,7 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(report) @defer.inlineCallbacks - def _download_name(self, name, claim_dict, sd_hash, timeout=None, file_name=None): + def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None): """ Add a lbry file to the file manager, start the download, and return the new lbry file. If it already exists in the file manager, return the existing lbry file @@ -672,6 +672,7 @@ class Daemon(AuthJSONRPCServer): def _download_finished(download_id, name, claim_dict): report = yield self._get_stream_analytics_report(claim_dict) self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) + @defer.inlineCallbacks def _download_failed(error, download_id, name, claim_dict): report = yield self._get_stream_analytics_report(claim_dict) @@ -693,11 +694,11 @@ class Daemon(AuthJSONRPCServer): file_name) try: lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) + yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout) finished_deferred.addCallbacks(lambda _: _download_finished(download_id, name, claim_dict), lambda e: _download_failed(e, download_id, name, claim_dict)) - result = yield self._get_lbry_file_dict(lbry_file, full_status=True) except Exception as err: yield _download_failed(err, download_id, name, claim_dict) @@ -731,6 +732,9 @@ class Daemon(AuthJSONRPCServer): d = reupload.reflect_stream(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) + yield self.stream_info_manager.save_outpoint_to_file(publisher.lbry_file.rowid, + claim_out['txid'], + int(claim_out['nout'])) self.analytics_manager.send_claim_action('publish') log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], claim_out['nout']) @@ -897,10 +901,7 @@ class Daemon(AuthJSONRPCServer): file_status = yield lbry_file.status() message = STREAM_STAGES[2][1] % (file_status.name, file_status.num_completed, file_status.num_known, file_status.running_status) - info = yield self.session.wallet.get_claim_metadata_for_sd_hash(lbry_file.sd_hash) - if info: - name, txid, nout = info - outpoint = "%s:%i" % (txid, nout) + outpoint = yield self.stream_info_manager.get_file_outpoint(lbry_file.rowid) result = { 'completed': lbry_file.completed, @@ -1536,8 +1537,7 @@ class Daemon(AuthJSONRPCServer): "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))) else: resolved = resolved['claim'] - - name = resolved['name'] + txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] claim_dict = ClaimDict.load_dict(resolved['value']) sd_hash = claim_dict.source_hash @@ -1556,8 +1556,8 @@ class Daemon(AuthJSONRPCServer): log.info('Already have a file for %s', name) result = yield self._get_lbry_file_dict(lbry_file, full_status=True) else: - result = yield self._download_name(name, claim_dict, sd_hash, timeout=timeout, - file_name=file_name) + result = yield self._download_name(name, claim_dict, sd_hash, txid, nout, + timeout=timeout, file_name=file_name) response = yield self._render_response(result) defer.returnValue(response) diff --git a/lbrynet/db_migrator/dbmigrator.py b/lbrynet/db_migrator/dbmigrator.py index 41610234b..2e8677345 100644 --- a/lbrynet/db_migrator/dbmigrator.py +++ b/lbrynet/db_migrator/dbmigrator.py @@ -13,6 +13,9 @@ def migrate_db(db_dir, start, end): elif current == 3: from lbrynet.db_migrator.migrate3to4 import do_migration do_migration(db_dir) + elif current == 4: + from lbrynet.db_migrator.migrate4to5 import do_migration + do_migration(db_dir) else: raise Exception( "DB migration of version {} to {} is not available".format(current, current+1)) diff --git a/lbrynet/db_migrator/migrate3to4.py b/lbrynet/db_migrator/migrate3to4.py index 6816c6c86..3d45162b7 100644 --- a/lbrynet/db_migrator/migrate3to4.py +++ b/lbrynet/db_migrator/migrate3to4.py @@ -81,5 +81,6 @@ def migrate_blobs_db(db_dir): log.error("Some how not all blobs were marked as announceable") blobs_db_file.commit() - + blobs_db_file.close() + lbryfile_info_file.close() diff --git a/lbrynet/db_migrator/migrate4to5.py b/lbrynet/db_migrator/migrate4to5.py new file mode 100644 index 000000000..38a8e46dd --- /dev/null +++ b/lbrynet/db_migrator/migrate4to5.py @@ -0,0 +1,63 @@ +import sqlite3 +import os +import logging + +log = logging.getLogger(__name__) + + +def do_migration(db_dir): + log.info("Doing the migration") + add_lbry_file_metadata(db_dir) + log.info("Migration succeeded") + + +def add_lbry_file_metadata(db_dir): + """ + We migrate the blobs.db used in BlobManager to have a "should_announce" column, + and set this to True for blobs that are sd_hash's or head blobs (first blob in stream) + """ + + name_metadata = os.path.join(db_dir, "blockchainname.db") + lbryfile_info_db = os.path.join(db_dir, 'lbryfile_info.db') + + if not os.path.isfile(name_metadata) and not os.path.isfile(lbryfile_info_db): + return + + if not os.path.isfile(lbryfile_info_db): + log.error( + "blockchainname.db was not found but lbryfile_info.db was found, skipping migration") + return + + name_metadata_db = sqlite3.connect(name_metadata) + lbryfile_db = sqlite3.connect(lbryfile_info_db) + name_metadata_cursor = name_metadata_db.cursor() + lbryfile_cursor = lbryfile_db.cursor() + + lbryfile_db.executescript( + "create table if not exists lbry_file_metadata (" + + " lbry_file integer primary key, " + + " txid text, " + + " n integer, " + + " foreign key(lbry_file) references lbry_files(rowid)" + ")") + + _files = lbryfile_cursor.execute("select rowid, stream_hash from lbry_files").fetchall() + + lbry_files = {x[1]: x[0] for x in _files} + for (sd_hash, stream_hash) in lbryfile_cursor.execute("select * " + "from lbry_file_descriptors").fetchall(): + lbry_file_id = lbry_files[stream_hash] + outpoint = name_metadata_cursor.execute("select txid, n from name_metadata " + "where sd_hash=?", + (sd_hash,)).fetchall() + if outpoint: + txid, nout = outpoint[0] + lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)", + (lbry_file_id, txid, nout)) + else: + lbryfile_cursor.execute("insert into lbry_file_metadata values (?, ?, ?)", + (lbry_file_id, None, None)) + lbryfile_db.commit() + + lbryfile_db.close() + name_metadata_db.close() diff --git a/lbrynet/lbry_file/EncryptedFileMetadataManager.py b/lbrynet/lbry_file/EncryptedFileMetadataManager.py index c4abbd9a0..ce1340c82 100644 --- a/lbrynet/lbry_file/EncryptedFileMetadataManager.py +++ b/lbrynet/lbry_file/EncryptedFileMetadataManager.py @@ -124,6 +124,12 @@ class DBEncryptedFileMetadataManager(object): " stream_hash text," " foreign key(stream_hash) references lbry_files(stream_hash)" + ")") + transaction.execute("create table if not exists lbry_file_metadata (" + + " lbry_file integer primary key, " + + " txid text, " + + " n integer, " + + " foreign key(lbry_file) references lbry_files(rowid)" + ")") def _open_db(self): # check_same_thread=False is solely to quiet a spurious error that appears to be due @@ -132,19 +138,40 @@ class DBEncryptedFileMetadataManager(object): # threads. return self.db_conn.runInteraction(self._create_tables) + @rerun_if_locked + @defer.inlineCallbacks + def get_file_outpoint(self, rowid): + result = yield self.db_conn.runQuery("select txid, n from lbry_file_metadata " + "where lbry_file=?", (rowid, )) + response = None + if result: + txid, nout = result[0] + if txid is not None and nout is not None: + response = "%s:%i" % (txid, nout) + defer.returnValue(response) + + @rerun_if_locked + @defer.inlineCallbacks + def save_outpoint_to_file(self, rowid, txid, nout): + existing_outpoint = yield self.get_file_outpoint(rowid) + if not existing_outpoint: + yield self.db_conn.runOperation("insert into lbry_file_metadata values " + "(?, ?, ?)", (rowid, txid, nout)) + @rerun_if_locked def _delete_stream(self, stream_hash): d = self.db_conn.runQuery( - "select stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) + "select rowid, stream_hash from lbry_files where stream_hash = ?", (stream_hash,)) d.addCallback( - lambda result: result[0][0] if result else Failure(NoSuchStreamHash(stream_hash))) + lambda result: result[0] if result else Failure(NoSuchStreamHash(stream_hash))) - def do_delete(transaction, s_h): + def do_delete(transaction, row_id, s_h): transaction.execute("delete from lbry_files where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_blobs where stream_hash = ?", (s_h,)) transaction.execute("delete from lbry_file_descriptors where stream_hash = ?", (s_h,)) + transaction.execute("delete from lbry_file_metadata where lbry_file = ?", (row_id,)) - d.addCallback(lambda s_h: self.db_conn.runInteraction(do_delete, s_h)) + d.addCallback(lambda (row_id, s_h): self.db_conn.runInteraction(do_delete, row_id, s_h)) return d @rerun_if_locked @@ -294,7 +321,8 @@ class DBEncryptedFileMetadataManager(object): @rerun_if_locked def _get_all_lbry_files(self): - d = self.db_conn.runQuery("select rowid, stream_hash, blob_data_rate from lbry_file_options") + d = self.db_conn.runQuery("select rowid, stream_hash, " + "blob_data_rate from lbry_file_options") return d @rerun_if_locked