diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index e2824ec2b..d2fc3c94e 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -5,6 +5,7 @@ from decimal import Decimal from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure +from twisted.internet.error import ConnectionAborted from lbryum import wallet as lbryum_wallet from lbryum.network import Network @@ -19,11 +20,11 @@ from lbryschema.error import DecodeError from lbryschema.decode import smart_decode from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet +from lbrynet.core.utils import DeferredDict from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError -from twisted.internet.error import ConnectionAborted log = logging.getLogger(__name__) @@ -83,12 +84,15 @@ class Wallet(object): self._manage_count = 0 self._balance_refresh_time = 3 self._batch_count = 20 + self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) def start(self): log.info("Starting wallet.") + def start_manage(): self.stopped = False self.manage() + self._pending_claim_checker.start(30) return True d = self._start() @@ -102,6 +106,9 @@ class Wallet(object): def stop(self): log.info("Stopping wallet.") self.stopped = True + + if self._pending_claim_checker.running: + self._pending_claim_checker.stop() # If self.next_manage_call is None, then manage is currently running or else # start has not been called, so set stopped and do nothing else. if self.next_manage_call is not None: @@ -315,6 +322,19 @@ class Wallet(object): ###### + @defer.inlineCallbacks + def fetch_and_save_heights_for_pending_claims(self): + pending_outpoints = yield self.storage.get_pending_claim_outpoints() + if pending_outpoints: + tx_heights = yield DeferredDict({txid: self.get_height_for_txid(txid) for txid in pending_outpoints}, + consumeErrors=True) + outpoint_heights = {} + for txid, outputs in pending_outpoints.iteritems(): + if txid in tx_heights: + for nout in outputs: + outpoint_heights["%s:%i" % (txid, nout)] = tx_heights[txid] + yield self.storage.save_claim_tx_heights(outpoint_heights) + @defer.inlineCallbacks def get_claim_by_claim_id(self, claim_id, check_expire=True): claim = yield self._get_claim_by_claimid(claim_id) @@ -410,12 +430,6 @@ class Wallet(object): batch_results = yield self._get_values_for_uris(page, page_size, *uris) for uri, resolve_results in batch_results.iteritems(): - claim_id = None - if resolve_results and 'claim' in resolve_results: - claim_id = resolve_results['claim']['claim_id'] - certificate_id = None - if resolve_results and 'certificate' in resolve_results: - certificate_id = resolve_results['certificate']['claim_id'] try: result[uri] = self._handle_claim_result(resolve_results) yield self.save_claim(result[uri]) @@ -771,6 +785,9 @@ class Wallet(object): def get_max_usable_balance_for_claim(self, claim_name): return defer.fail(NotImplementedError()) + def get_height_for_txid(self, txid): + return defer.fail(NotImplementedError()) + def _start(self): return defer.fail(NotImplementedError()) @@ -1163,6 +1180,9 @@ class LBRYumWallet(Wallet): def claim_renew(self, txid, nout): return self._run_cmd_as_defer_succeed('renewclaim', txid, nout) + def get_height_for_txid(self, txid): + return self._run_cmd_as_defer_to_thread('gettransactionheight', txid) + def decrypt_wallet(self): if not self.wallet.use_encryption: return False diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 2d295f718..ae67c9885 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -7,7 +7,7 @@ import string import json import pkg_resources - +from twisted.internet import defer from lbryschema.claim import ClaimDict from lbrynet.core.cryptoutils import get_lbry_hash_obj @@ -146,3 +146,18 @@ def get_sd_hash(stream_info): def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) + + +@defer.inlineCallbacks +def DeferredDict(d, consumeErrors=False): + keys = [] + dl = [] + response = {} + for k, v in d.iteritems(): + keys.append(k) + dl.append(v) + results = yield defer.DeferredList(dl, consumeErrors=consumeErrors) + for k, (success, result) in zip(keys, results): + if success: + response[k] = result + defer.returnValue(response) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index b461802f9..c7016da02 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -1626,6 +1626,10 @@ class Daemon(AuthJSONRPCServer): timeout = timeout if timeout is not None else self.download_timeout + parsed_uri = parse_lbry_uri(uri) + if parsed_uri.is_channel and not parsed_uri.path: + raise Exception("cannot download a channel claim, specify a /path") + resolved_result = yield self.session.wallet.resolve(uri) if resolved_result and uri in resolved_result: resolved = resolved_result[uri] @@ -1634,11 +1638,9 @@ class Daemon(AuthJSONRPCServer): if not resolved or 'value' not in resolved: if 'claim' not in resolved: - if 'certificate' in resolved: - raise Exception("Cannot use get on channels") - else: - raise Exception( - "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))) + raise Exception( + "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) + ) else: resolved = resolved['claim'] txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] @@ -2799,6 +2801,7 @@ class Daemon(AuthJSONRPCServer): Returns: (bool) true if successful """ + if announce_all: yield self.session.blob_manager.immediate_announce_all_blobs() else: @@ -2812,11 +2815,9 @@ class Daemon(AuthJSONRPCServer): else: raise Exception('single argument must be specified') if not blob_hash: - blobs = yield self.storage.get_blobs_for_stream(stream_hash) - blob_hashes.extend([blob.blob_hash for blob in blobs if blob.get_is_verified()]) - + blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True) + blob_hashes.extend([blob.blob_hash for blob in blobs]) yield self.session.blob_manager._immediate_announce(blob_hashes) - response = yield self._render_response(True) defer.returnValue(response) diff --git a/lbrynet/daemon/Downloader.py b/lbrynet/daemon/Downloader.py index 60fce734b..67873218a 100644 --- a/lbrynet/daemon/Downloader.py +++ b/lbrynet/daemon/Downloader.py @@ -183,7 +183,6 @@ class GetStream(object): self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) yield self.pay_key_fee(key_fee, name) yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) - yield self.downloader.get_claim_info() log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index 645ef5875..283e478a9 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -40,10 +40,19 @@ class Publisher(object): claim_dict['stream']['source']['contentType'] = get_content_type(file_path) claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) + + # check if we have a file already for this claim (if this is a publish update with a new stream) + old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'], + self.lbry_file.stream_hash) + if old_stream_hashes: + for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, + list(self.lbry_file_manager.lbry_files)): + yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) + log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) + yield self.session.storage.save_content_claim( self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) ) - yield self.lbry_file.get_claim_info() defer.returnValue(claim_out) @defer.inlineCallbacks @@ -51,6 +60,7 @@ class Publisher(object): """Make a claim without creating a lbry file""" claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])) + self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0] defer.returnValue(claim_out) @defer.inlineCallbacks diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index ed6b663a3..3e69944b7 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -186,6 +186,11 @@ class SQLiteStorage(object): self.db = SqliteConnection(self._db_path) self.db.set_reactor(reactor) + # used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a + # change to the associated content claim occurs. these are added by the file manager + # when it loads each file + self.content_claim_callbacks = {} # {: } + def setup(self): def _create_tables(transaction): transaction.executescript(self.CREATE_TABLES_QUERY) @@ -384,11 +389,14 @@ class SQLiteStorage(object): stream_hash, blob_num ) - def get_blobs_for_stream(self, stream_hash): + def get_blobs_for_stream(self, stream_hash, only_completed=False): def _get_blobs_for_stream(transaction): crypt_blob_infos = [] - stream_blobs = transaction.execute("select blob_hash, position, iv from stream_blob " - "where stream_hash=?", (stream_hash, )).fetchall() + if only_completed: + query = "select blob_hash, position, iv from stream_blob where stream_hash=? and status='finished'" + else: + query = "select blob_hash, position, iv from stream_blob where stream_hash=?" + stream_blobs = transaction.execute(query, (stream_hash, )).fetchall() if stream_blobs: for blob_hash, position, iv in stream_blobs: if blob_hash is not None: @@ -537,12 +545,52 @@ class SQLiteStorage(object): "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", (outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) ) + yield self.db.runInteraction(_save_claim) if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing # support info yield self.save_supports(claim_id, claim_info['supports']) + # check for content claim updates + if claim_dict.source_hash: + existing_file_stream_hash = yield self.run_and_return_one_or_none( + "select file.stream_hash from stream " + "inner join file on file.stream_hash=stream.stream_hash " + "where sd_hash=?", claim_dict.source_hash + ) + if existing_file_stream_hash: + known_outpoint = yield self.run_and_return_one_or_none( + "select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash + ) + known_claim_id = yield self.run_and_return_one_or_none( + "select claim_id from claim " + "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint " + "where c3.stream_hash=?", existing_file_stream_hash + ) + if not known_claim_id: # this is a claim matching one of our files that has + # no associated claim yet + log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash) + yield self.save_content_claim(existing_file_stream_hash, outpoint) + elif known_claim_id and known_claim_id == claim_id: + if known_outpoint != outpoint: # this is an update for one of our files + log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash) + yield self.save_content_claim(existing_file_stream_hash, outpoint) + else: # we're up to date already + pass + else: # this is a claim containing a clone of a file that we have + log.warning("claim %s contains the same stream as the one already downloaded from claim %s", + claim_id, known_claim_id) + + def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash): + return self.run_and_return_list( + "select f.stream_hash from file f " + "inner join content_claim cc on f.stream_hash=cc.stream_hash " + "inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=? " + "where f.stream_hash!=?", claim_id, new_stream_hash + ) + + @defer.inlineCallbacks def save_content_claim(self, stream_hash, claim_outpoint): def _save_content_claim(transaction): # get the claim id and serialized metadata @@ -580,7 +628,12 @@ class SQLiteStorage(object): # update the claim associated to the file transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) - return self.db.runInteraction(_save_content_claim) + yield self.db.runInteraction(_save_content_claim) + + # update corresponding ManagedEncryptedFileDownloader object + if stream_hash in self.content_claim_callbacks: + file_callback = self.content_claim_callbacks[stream_hash] + yield file_callback() @defer.inlineCallbacks def get_content_claim(self, stream_hash, include_supports=True): @@ -620,7 +673,7 @@ class SQLiteStorage(object): def _get_claim(transaction): claim_info = transaction.execute( - "select * from claim where claim_id=? order by height, rowid desc", (claim_id, ) + "select * from claim where claim_id=? order by rowid desc", (claim_id, ) ).fetchone() result = _claim_response(*claim_info) if result['channel_claim_id']: @@ -651,3 +704,25 @@ class SQLiteStorage(object): ).fetchall() ] return self.db.runInteraction(_get_unknown_certificate_claim_ids) + + @defer.inlineCallbacks + def get_pending_claim_outpoints(self): + claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1") + results = {} # {txid: [nout, ...]} + for outpoint_str in claim_outpoints: + txid, nout = outpoint_str.split(":") + outputs = results.get(txid, []) + outputs.append(int(nout)) + results[txid] = outputs + if results: + log.debug("missing transaction heights for %i claims", len(results)) + defer.returnValue(results) + + def save_claim_tx_heights(self, claim_tx_heights): + def _save_claim_heights(transaction): + for outpoint, height in claim_tx_heights.iteritems(): + transaction.execute( + "update claim set height=? where claim_outpoint=? and height=-1", + (height, outpoint) + ) + return self.db.runInteraction(_save_claim_heights) diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 39b970dcd..96b56c0ab 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -127,6 +127,7 @@ class EncryptedFileManager(object): try: # restore will raise an Exception if status is unknown lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) except Exception: log.warning("Failed to start %i", file_info.get('rowid')) @@ -171,6 +172,8 @@ class EncryptedFileManager(object): stream_metadata['suggested_file_name'] ) lbry_file.restore(status) + yield lbry_file.get_claim_info() + self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -195,8 +198,9 @@ class EncryptedFileManager(object): rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, stream_metadata['suggested_file_name'] ) - lbry_file.get_claim_info(include_supports=False) lbry_file.restore(status) + yield lbry_file.get_claim_info(include_supports=False) + self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info self.lbry_files.append(lbry_file) defer.returnValue(lbry_file) @@ -220,6 +224,9 @@ class EncryptedFileManager(object): self.lbry_files.remove(lbry_file) + if lbry_file.stream_hash in self.storage.content_claim_callbacks: + del self.storage.content_claim_callbacks[lbry_file.stream_hash] + yield lbry_file.delete_data() yield self.session.storage.delete_stream(lbry_file.stream_hash) diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 72bb72b79..dbf1b7c54 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -296,6 +296,10 @@ class ContentClaimStorageTests(StorageTest): stored_content_claim = yield self.storage.get_content_claim(stream_hash) self.assertDictEqual(stored_content_claim, fake_claim_info) + stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'], + stream_hash) + self.assertListEqual(stream_hashes, []) + # test that we can't associate a claim update with a new stream to the file second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash() yield self.make_and_store_fake_stream(blob_count=2, stream_hash=second_stream_hash, sd_hash=second_sd_hash)