From 91229aac6eec1b5002d8316a68b26bce349af8a6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 27 Feb 2018 16:21:37 -0500 Subject: [PATCH 1/8] remove old stream on a publish update --- lbrynet/daemon/Publisher.py | 12 ++++++++++++ lbrynet/database/storage.py | 8 ++++++++ lbrynet/tests/unit/database/test_SQLiteStorage.py | 3 +++ 3 files changed, 23 insertions(+) diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index 645ef5875..c1354fd91 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -40,6 +40,18 @@ class Publisher(object): claim_dict['stream']['source']['contentType'] = get_content_type(file_path) claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) + + # check if we have a file already for this claim (if this is a publish update with a new stream) + old_stream_hashes = yield self.session.storage.get_stream_hashes_for_claim_id(claim_out['claim_id']) + if old_stream_hashes: + lbry_files = list(self.lbry_file_manager.lbry_files) + for lbry_file in lbry_files: + s_h = lbry_file.stream_hash + if s_h in old_stream_hashes: + yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) + old_stream_hashes.remove(s_h) + log.info("Removed old stream for claim update: %s", s_h) + yield self.session.storage.save_content_claim( self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) ) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index ed6b663a3..86b44ee63 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -543,6 +543,14 @@ class SQLiteStorage(object): # support info yield self.save_supports(claim_id, claim_info['supports']) + def get_stream_hashes_for_claim_id(self, claim_id): + return self.run_and_return_list( + "select f.stream_hash from file f " + "inner join content_claim cc on f.stream_hash=cc.stream_hash " + "inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=?", + claim_id + ) + def save_content_claim(self, stream_hash, claim_outpoint): def _save_content_claim(transaction): # get the claim id and serialized metadata diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 72bb72b79..6288fdcf2 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -296,6 +296,9 @@ class ContentClaimStorageTests(StorageTest): stored_content_claim = yield self.storage.get_content_claim(stream_hash) self.assertDictEqual(stored_content_claim, fake_claim_info) + stream_hashes = yield self.storage.get_stream_hashes_for_claim_id(fake_claim_info['claim_id']) + self.assertListEqual(stream_hashes, [stream_hash]) + # test that we can't associate a claim update with a new stream to the file second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash() yield self.make_and_store_fake_stream(blob_count=2, stream_hash=second_stream_hash, sd_hash=second_sd_hash) From d1240541c896b58b216edba0e283b2c91e7e8509 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Tue, 27 Feb 2018 16:32:21 -0500 Subject: [PATCH 2/8] always update lbry file attributes after publishing --- lbrynet/daemon/Daemon.py | 1 + lbrynet/daemon/Publisher.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index b461802f9..0ac782298 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -722,6 +722,7 @@ class Daemon(AuthJSONRPCServer): d = reupload.reflect_stream(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) + yield publisher.lbry_file.get_claim_info() self.analytics_manager.send_claim_action('publish') log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], claim_out['nout']) diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index c1354fd91..da565db44 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -55,7 +55,6 @@ class Publisher(object): yield self.session.storage.save_content_claim( self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) ) - yield self.lbry_file.get_claim_info() defer.returnValue(claim_out) @defer.inlineCallbacks @@ -63,6 +62,7 @@ class Publisher(object): """Make a claim without creating a lbry file""" claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])) + self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] defer.returnValue(claim_out) @defer.inlineCallbacks From 76cb674ebbb9365c95fee9e2a33da00558a652e9 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Feb 2018 14:20:33 -0500 Subject: [PATCH 3/8] fix updating content claims for existing files -update lbry file attributes as soon as a change to the content claim occurs --- lbrynet/daemon/Daemon.py | 1 - lbrynet/daemon/Downloader.py | 1 - lbrynet/database/storage.py | 38 +++++++++++++++++++- lbrynet/file_manager/EncryptedFileManager.py | 9 ++++- 4 files changed, 45 insertions(+), 4 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0ac782298..b461802f9 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -722,7 +722,6 @@ class Daemon(AuthJSONRPCServer): d = reupload.reflect_stream(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) - yield publisher.lbry_file.get_claim_info() self.analytics_manager.send_claim_action('publish') log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], claim_out['nout']) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 60fce734b..67873218a 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -183,7 +183,6 @@ class GetStream(object): self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) yield self.pay_key_fee(key_fee, name) yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) - yield self.downloader.get_claim_info() log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 86b44ee63..2715aa9cf 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -186,6 +186,11 @@ class SQLiteStorage(object): self.db = SqliteConnection(self._db_path) self.db.set_reactor(reactor) + # used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a + # change to the associated content claim occurs. these are added by the file manager + # when it loads each file + self.content_claim_callbacks = {} # {: } + def setup(self): def _create_tables(transaction): transaction.executescript(self.CREATE_TABLES_QUERY) @@ -537,12 +542,37 @@ class SQLiteStorage(object): "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", (outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) ) + yield self.db.runInteraction(_save_claim) if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing # support info yield self.save_supports(claim_id, claim_info['supports']) + # check for content claim updates + if claim_dict.source_hash: + existing_file_stream_hash = yield self.run_and_return_one_or_none( + "select file.stream_hash from stream " + "inner join file on file.stream_hash=stream.stream_hash " + "where sd_hash=?", claim_dict.source_hash + ) + if existing_file_stream_hash: + known_outpoint = yield self.run_and_return_one_or_none( + "select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash + ) + known_claim_id = yield self.run_and_return_one_or_none( + "select claim_id from claim " + "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint " + "where c3.stream_hash=?", existing_file_stream_hash + ) + if not known_claim_id: + log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash) + yield self.save_content_claim(existing_file_stream_hash, outpoint) + elif known_claim_id and known_claim_id == claim_id: + if known_outpoint != outpoint: + log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash) + yield self.save_content_claim(existing_file_stream_hash, outpoint) + def get_stream_hashes_for_claim_id(self, claim_id): return self.run_and_return_list( "select f.stream_hash from file f " @@ -551,6 +581,7 @@ class SQLiteStorage(object): claim_id ) + @defer.inlineCallbacks def save_content_claim(self, stream_hash, claim_outpoint): def _save_content_claim(transaction): # get the claim id and serialized metadata @@ -588,7 +619,12 @@ class SQLiteStorage(object): # update the claim associated to the file transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) - return self.db.runInteraction(_save_content_claim) + yield self.db.runInteraction(_save_content_claim) + + # update corresponding ManagedEncryptedFileDownloader object + if stream_hash in self.content_claim_callbacks: + file_callback = self.content_claim_callbacks[stream_hash] + yield file_callback() @defer.inlineCallbacks def get_content_claim(self, stream_hash, include_supports=True): diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 39b970dcd..96b56c0ab 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -127,6 +127,7 @@ class EncryptedFileManager(object): try: # restore will raise an Exception if status is unknown lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) except Exception: log.warning("Failed to start %i", file_info.get('rowid')) @@ -171,6 +172,8 @@ class EncryptedFileManager(object): stream_metadata['suggested_file_name'] ) lbry_file.restore(status) + yield lbry_file.get_claim_info() + self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -195,8 +198,9 @@ class EncryptedFileManager(object): rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, stream_metadata['suggested_file_name'] ) - lbry_file.get_claim_info(include_supports=False) lbry_file.restore(status) + yield lbry_file.get_claim_info(include_supports=False) + self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -220,6 +224,9 @@ class EncryptedFileManager(object): self.lbry_files.remove(lbry_file) + if lbry_file.stream_hash in self.storage.content_claim_callbacks: + del self.storage.content_claim_callbacks[lbry_file.stream_hash] + yield lbry_file.delete_data() yield self.session.storage.delete_stream(lbry_file.stream_hash) From 19583277965d2b75febda1992240838a42024d29 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Feb 2018 14:49:17 -0500 Subject: [PATCH 4/8] comments and logging --- lbrynet/database/storage.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 2715aa9cf..9905e0197 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -565,13 +565,19 @@ class SQLiteStorage(object): "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint " "where c3.stream_hash=?", existing_file_stream_hash ) - if not known_claim_id: + if not known_claim_id: # this is a claim matching one of our files that has + # no associated claim yet log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash) yield self.save_content_claim(existing_file_stream_hash, outpoint) elif known_claim_id and known_claim_id == claim_id: - if known_outpoint != outpoint: + if known_outpoint != outpoint: # this is an update for one of our files log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash) yield self.save_content_claim(existing_file_stream_hash, outpoint) + else: # we're up to date already + pass + else: # this is a claim containing a clone of a file that we have + log.warning("claim %s contains the same stream as the one already downloaded from claim %s", + claim_id, known_claim_id) def get_stream_hashes_for_claim_id(self, claim_id): return self.run_and_return_list( From 35426c7350b55826dd26ba0dfc520a3e3028b15d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Feb 2018 14:49:33 -0500 Subject: [PATCH 5/8] remove some unused code --- lbrynet/core/Wallet.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index e2824ec2b..111c1bfdc 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -410,12 +410,6 @@ class Wallet(object): batch_results = yield self._get_values_for_uris(page, page_size, *uris) for uri, resolve_results in batch_results.iteritems(): - claim_id = None - if resolve_results and 'claim' in resolve_results: - claim_id = resolve_results['claim']['claim_id'] - certificate_id = None - if resolve_results and 'certificate' in resolve_results: - certificate_id = resolve_results['certificate']['claim_id'] try: result[uri] = self._handle_claim_result(resolve_results) yield self.save_claim(result[uri]) From 6a4b65a796a8d997d7acc96ccac317b447e3cc43 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Feb 2018 14:50:27 -0500 Subject: [PATCH 6/8] have `get` fail when given a channel uri before attempting to resolve --- lbrynet/daemon/Daemon.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index b461802f9..5353bef98 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -1626,6 +1626,10 @@ class Daemon(AuthJSONRPCServer): timeout = timeout if timeout is not None else self.download_timeout + parsed_uri = parse_lbry_uri(uri) + if parsed_uri.is_channel and not parsed_uri.path: + raise Exception("cannot download a channel claim, specify a /path") + resolved_result = yield self.session.wallet.resolve(uri) if resolved_result and uri in resolved_result: resolved = resolved_result[uri] @@ -1634,11 +1638,9 @@ class Daemon(AuthJSONRPCServer): if not resolved or 'value' not in resolved: if 'claim' not in resolved: - if 'certificate' in resolved: - raise Exception("Cannot use get on channels") - else: raise Exception( - "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))) + "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) + ) else: resolved = resolved['claim'] txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] From 01c4c6ed97be2386fea906202587e7b7d203a333 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 28 Feb 2018 14:59:12 -0500 Subject: [PATCH 7/8] fetch claim heights --- lbrynet/core/Wallet.py | 28 ++++++++++++++++- lbrynet/core/utils.py | 17 ++++++++++- lbrynet/daemon/Daemon.py | 6 ++-- lbrynet/daemon/Publisher.py | 14 ++++----- lbrynet/database/storage.py | 30 ++++++++++++++++--- .../tests/unit/database/test_SQLiteStorage.py | 5 ++-- 6 files changed, 81 insertions(+), 19 deletions(-) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 111c1bfdc..d2fc3c94e 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -5,6 +5,7 @@ from decimal import Decimal from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure +from twisted.internet.error import ConnectionAborted from lbryum import wallet as lbryum_wallet from lbryum.network import Network @@ -19,11 +20,11 @@ from lbryschema.error import DecodeError from lbryschema.decode import smart_decode from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet +from lbrynet.core.utils import DeferredDict from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError -from twisted.internet.error import ConnectionAborted log = logging.getLogger(__name__) @@ -83,12 +84,15 @@ class Wallet(object): self._manage_count = 0 self._balance_refresh_time = 3 self._batch_count = 20 + self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) def start(self): log.info("Starting wallet.") + def start_manage(): self.stopped = False self.manage() + self._pending_claim_checker.start(30) return True d = self._start() @@ -102,6 +106,9 @@ class Wallet(object): def stop(self): log.info("Stopping wallet.") self.stopped = True + + if self._pending_claim_checker.running: + self._pending_claim_checker.stop() # If self.next_manage_call is None, then manage is currently running or else # start has not been called, so set stopped and do nothing else. if self.next_manage_call is not None: @@ -315,6 +322,19 @@ class Wallet(object): ###### + @defer.inlineCallbacks + def fetch_and_save_heights_for_pending_claims(self): + pending_outpoints = yield self.storage.get_pending_claim_outpoints() + if pending_outpoints: + tx_heights = yield DeferredDict({txid: self.get_height_for_txid(txid) for txid in pending_outpoints}, + consumeErrors=True) + outpoint_heights = {} + for txid, outputs in pending_outpoints.iteritems(): + if txid in tx_heights: + for nout in outputs: + outpoint_heights["%s:%i" % (txid, nout)] = tx_heights[txid] + yield self.storage.save_claim_tx_heights(outpoint_heights) + @defer.inlineCallbacks def get_claim_by_claim_id(self, claim_id, check_expire=True): claim = yield self._get_claim_by_claimid(claim_id) @@ -765,6 +785,9 @@ class Wallet(object): def get_max_usable_balance_for_claim(self, claim_name): return defer.fail(NotImplementedError()) + def get_height_for_txid(self, txid): + return defer.fail(NotImplementedError()) + def _start(self): return defer.fail(NotImplementedError()) @@ -1157,6 +1180,9 @@ class LBRYumWallet(Wallet): def claim_renew(self, txid, nout): return self._run_cmd_as_defer_succeed('renewclaim', txid, nout) + def get_height_for_txid(self, txid): + return self._run_cmd_as_defer_to_thread('gettransactionheight', txid) + def decrypt_wallet(self): if not self.wallet.use_encryption: return False diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 2d295f718..ae67c9885 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -7,7 +7,7 @@ import string import json import pkg_resources - +from twisted.internet import defer from lbryschema.claim import ClaimDict from lbrynet.core.cryptoutils import get_lbry_hash_obj @@ -146,3 +146,18 @@ def get_sd_hash(stream_info): def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) + + +@defer.inlineCallbacks +def DeferredDict(d, consumeErrors=False): + keys = [] + dl = [] + response = {} + for k, v in d.iteritems(): + keys.append(k) + dl.append(v) + results = yield defer.DeferredList(dl, consumeErrors=consumeErrors) + for k, (success, result) in zip(keys, results): + if success: + response[k] = result + defer.returnValue(response) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 5353bef98..308414cf7 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -1638,9 +1638,9 @@ class Daemon(AuthJSONRPCServer): if not resolved or 'value' not in resolved: if 'claim' not in resolved: - raise Exception( - "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) - ) + raise Exception( + "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) + ) else: resolved = resolved['claim'] txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index da565db44..283e478a9 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -42,15 +42,13 @@ class Publisher(object): claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) # check if we have a file already for this claim (if this is a publish update with a new stream) - old_stream_hashes = yield self.session.storage.get_stream_hashes_for_claim_id(claim_out['claim_id']) + old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'], + self.lbry_file.stream_hash) if old_stream_hashes: - lbry_files = list(self.lbry_file_manager.lbry_files) - for lbry_file in lbry_files: - s_h = lbry_file.stream_hash - if s_h in old_stream_hashes: - yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) - old_stream_hashes.remove(s_h) - log.info("Removed old stream for claim update: %s", s_h) + for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, + list(self.lbry_file_manager.lbry_files)): + yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) + log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) yield self.session.storage.save_content_claim( self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 9905e0197..dcf305ae1 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -579,12 +579,12 @@ class SQLiteStorage(object): log.warning("claim %s contains the same stream as the one already downloaded from claim %s", claim_id, known_claim_id) - def get_stream_hashes_for_claim_id(self, claim_id): + def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash): return self.run_and_return_list( "select f.stream_hash from file f " "inner join content_claim cc on f.stream_hash=cc.stream_hash " - "inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=?", - claim_id + "inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=? " + "where f.stream_hash!=?", claim_id, new_stream_hash ) @defer.inlineCallbacks @@ -670,7 +670,7 @@ class SQLiteStorage(object): def _get_claim(transaction): claim_info = transaction.execute( - "select * from claim where claim_id=? order by height, rowid desc", (claim_id, ) + "select * from claim where claim_id=? order by rowid desc", (claim_id, ) ).fetchone() result = _claim_response(*claim_info) if result['channel_claim_id']: @@ -701,3 +701,25 @@ class SQLiteStorage(object): ).fetchall() ] return self.db.runInteraction(_get_unknown_certificate_claim_ids) + + @defer.inlineCallbacks + def get_pending_claim_outpoints(self): + claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1") + results = {} # {txid: [nout, ...]} + for outpoint_str in claim_outpoints: + txid, nout = outpoint_str.split(":") + outputs = results.get(txid, []) + outputs.append(int(nout)) + results[txid] = outputs + if results: + log.debug("missing transaction heights for %i claims", len(results)) + defer.returnValue(results) + + def save_claim_tx_heights(self, claim_tx_heights): + def _save_claim_heights(transaction): + for outpoint, height in claim_tx_heights.iteritems(): + transaction.execute( + "update claim set height=? where claim_outpoint=? and height=-1", + (height, outpoint) + ) + return self.db.runInteraction(_save_claim_heights) diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 6288fdcf2..dbf1b7c54 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -296,8 +296,9 @@ class ContentClaimStorageTests(StorageTest): stored_content_claim = yield self.storage.get_content_claim(stream_hash) self.assertDictEqual(stored_content_claim, fake_claim_info) - stream_hashes = yield self.storage.get_stream_hashes_for_claim_id(fake_claim_info['claim_id']) - self.assertListEqual(stream_hashes, [stream_hash]) + stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'], + stream_hash) + self.assertListEqual(stream_hashes, []) # test that we can't associate a claim update with a new stream to the file second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash() From 7862ee67153214c29079028c35fae706dd5621e2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Thu, 1 Mar 2018 16:42:52 -0500 Subject: [PATCH 8/8] fix blob_announce command --- lbrynet/daemon/Daemon.py | 7 +++---- lbrynet/database/storage.py | 9 ++++++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 308414cf7..c7016da02 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -2801,6 +2801,7 @@ class Daemon(AuthJSONRPCServer): Returns: (bool) true if successful """ + if announce_all: yield self.session.blob_manager.immediate_announce_all_blobs() else: @@ -2814,11 +2815,9 @@ class Daemon(AuthJSONRPCServer): else: raise Exception('single argument must be specified') if not blob_hash: - blobs = yield self.storage.get_blobs_for_stream(stream_hash) - blob_hashes.extend([blob.blob_hash for blob in blobs if blob.get_is_verified()]) - + blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True) + blob_hashes.extend([blob.blob_hash for blob in blobs]) yield self.session.blob_manager._immediate_announce(blob_hashes) - response = yield self._render_response(True) defer.returnValue(response) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index dcf305ae1..3e69944b7 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -389,11 +389,14 @@ class SQLiteStorage(object): stream_hash, blob_num ) - def get_blobs_for_stream(self, stream_hash): + def get_blobs_for_stream(self, stream_hash, only_completed=False): def _get_blobs_for_stream(transaction): crypt_blob_infos = [] - stream_blobs = transaction.execute("select blob_hash, position, iv from stream_blob " - "where stream_hash=?", (stream_hash, )).fetchall() + if only_completed: + query = "select blob_hash, position, iv from stream_blob where stream_hash=? and status='finished'" + else: + query = "select blob_hash, position, iv from stream_blob where stream_hash=?" + stream_blobs = transaction.execute(query, (stream_hash, )).fetchall() if stream_blobs: for blob_hash, position, iv in stream_blobs: if blob_hash is not None: