diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0ac782298..b461802f9 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -722,7 +722,6 @@ class Daemon(AuthJSONRPCServer): d = reupload.reflect_stream(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) - yield publisher.lbry_file.get_claim_info() self.analytics_manager.send_claim_action('publish') log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], claim_out['nout']) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 60fce734b..67873218a 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -183,7 +183,6 @@ class GetStream(object): self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) yield self.pay_key_fee(key_fee, name) yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) - yield self.downloader.get_claim_info() log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 86b44ee63..2715aa9cf 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -186,6 +186,11 @@ class SQLiteStorage(object): self.db = SqliteConnection(self._db_path) self.db.set_reactor(reactor) + # used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a + # change to the associated content claim occurs. these are added by the file manager + # when it loads each file + self.content_claim_callbacks = {} # {: } + def setup(self): def _create_tables(transaction): transaction.executescript(self.CREATE_TABLES_QUERY) @@ -537,12 +542,37 @@ class SQLiteStorage(object): "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", (outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) ) + yield self.db.runInteraction(_save_claim) if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing # support info yield self.save_supports(claim_id, claim_info['supports']) + # check for content claim updates + if claim_dict.source_hash: + existing_file_stream_hash = yield self.run_and_return_one_or_none( + "select file.stream_hash from stream " + "inner join file on file.stream_hash=stream.stream_hash " + "where sd_hash=?", claim_dict.source_hash + ) + if existing_file_stream_hash: + known_outpoint = yield self.run_and_return_one_or_none( + "select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash + ) + known_claim_id = yield self.run_and_return_one_or_none( + "select claim_id from claim " + "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint " + "where c3.stream_hash=?", existing_file_stream_hash + ) + if not known_claim_id: + log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash) + yield self.save_content_claim(existing_file_stream_hash, outpoint) + elif known_claim_id and known_claim_id == claim_id: + if known_outpoint != outpoint: + log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash) + yield self.save_content_claim(existing_file_stream_hash, outpoint) + def get_stream_hashes_for_claim_id(self, claim_id): return self.run_and_return_list( "select f.stream_hash from file f " @@ -551,6 +581,7 @@ class SQLiteStorage(object): claim_id ) + @defer.inlineCallbacks def save_content_claim(self, stream_hash, claim_outpoint): def _save_content_claim(transaction): # get the claim id and serialized metadata @@ -588,7 +619,12 @@ class SQLiteStorage(object): # update the claim associated to the file transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) - return self.db.runInteraction(_save_content_claim) + yield self.db.runInteraction(_save_content_claim) + + # update corresponding ManagedEncryptedFileDownloader object + if stream_hash in self.content_claim_callbacks: + file_callback = self.content_claim_callbacks[stream_hash] + yield file_callback() @defer.inlineCallbacks def get_content_claim(self, stream_hash, include_supports=True): diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 39b970dcd..96b56c0ab 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -127,6 +127,7 @@ class EncryptedFileManager(object): try: # restore will raise an Exception if status is unknown lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) except Exception: log.warning("Failed to start %i", file_info.get('rowid')) @@ -171,6 +172,8 @@ class EncryptedFileManager(object): stream_metadata['suggested_file_name'] ) lbry_file.restore(status) + yield lbry_file.get_claim_info() + self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -195,8 +198,9 @@ class EncryptedFileManager(object): rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, stream_metadata['suggested_file_name'] ) - lbry_file.get_claim_info(include_supports=False) lbry_file.restore(status) + yield lbry_file.get_claim_info(include_supports=False) + self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -220,6 +224,9 @@ class EncryptedFileManager(object): self.lbry_files.remove(lbry_file) + if lbry_file.stream_hash in self.storage.content_claim_callbacks: + del self.storage.content_claim_callbacks[lbry_file.stream_hash] + yield lbry_file.delete_data() yield self.session.storage.delete_stream(lbry_file.stream_hash)