diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 111c1bfdc..d2fc3c94e 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -5,6 +5,7 @@ from decimal import Decimal from zope.interface import implements from twisted.internet import threads, reactor, defer, task from twisted.python.failure import Failure +from twisted.internet.error import ConnectionAborted from lbryum import wallet as lbryum_wallet from lbryum.network import Network @@ -19,11 +20,11 @@ from lbryschema.error import DecodeError from lbryschema.decode import smart_decode from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet +from lbrynet.core.utils import DeferredDict from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError -from twisted.internet.error import ConnectionAborted log = logging.getLogger(__name__) @@ -83,12 +84,15 @@ class Wallet(object): self._manage_count = 0 self._balance_refresh_time = 3 self._batch_count = 20 + self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) def start(self): log.info("Starting wallet.") + def start_manage(): self.stopped = False self.manage() + self._pending_claim_checker.start(30) return True d = self._start() @@ -102,6 +106,9 @@ class Wallet(object): def stop(self): log.info("Stopping wallet.") self.stopped = True + + if self._pending_claim_checker.running: + self._pending_claim_checker.stop() # If self.next_manage_call is None, then manage is currently running or else # start has not been called, so set stopped and do nothing else. if self.next_manage_call is not None: @@ -315,6 +322,19 @@ class Wallet(object): ###### + @defer.inlineCallbacks + def fetch_and_save_heights_for_pending_claims(self): + pending_outpoints = yield self.storage.get_pending_claim_outpoints() + if pending_outpoints: + tx_heights = yield DeferredDict({txid: self.get_height_for_txid(txid) for txid in pending_outpoints}, + consumeErrors=True) + outpoint_heights = {} + for txid, outputs in pending_outpoints.iteritems(): + if txid in tx_heights: + for nout in outputs: + outpoint_heights["%s:%i" % (txid, nout)] = tx_heights[txid] + yield self.storage.save_claim_tx_heights(outpoint_heights) + @defer.inlineCallbacks def get_claim_by_claim_id(self, claim_id, check_expire=True): claim = yield self._get_claim_by_claimid(claim_id) @@ -765,6 +785,9 @@ class Wallet(object): def get_max_usable_balance_for_claim(self, claim_name): return defer.fail(NotImplementedError()) + def get_height_for_txid(self, txid): + return defer.fail(NotImplementedError()) + def _start(self): return defer.fail(NotImplementedError()) @@ -1157,6 +1180,9 @@ class LBRYumWallet(Wallet): def claim_renew(self, txid, nout): return self._run_cmd_as_defer_succeed('renewclaim', txid, nout) + def get_height_for_txid(self, txid): + return self._run_cmd_as_defer_to_thread('gettransactionheight', txid) + def decrypt_wallet(self): if not self.wallet.use_encryption: return False diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 2d295f718..ae67c9885 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -7,7 +7,7 @@ import string import json import pkg_resources - +from twisted.internet import defer from lbryschema.claim import ClaimDict from lbrynet.core.cryptoutils import get_lbry_hash_obj @@ -146,3 +146,18 @@ def get_sd_hash(stream_info): def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) + + +@defer.inlineCallbacks +def DeferredDict(d, consumeErrors=False): + keys = [] + dl = [] + response = {} + for k, v in d.iteritems(): + keys.append(k) + dl.append(v) + results = yield defer.DeferredList(dl, consumeErrors=consumeErrors) + for k, (success, result) in zip(keys, results): + if success: + response[k] = result + defer.returnValue(response) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 5353bef98..308414cf7 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -1638,9 +1638,9 @@ class Daemon(AuthJSONRPCServer): if not resolved or 'value' not in resolved: if 'claim' not in resolved: - raise Exception( - "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) - ) + raise Exception( + "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) + ) else: resolved = resolved['claim'] txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] diff --git a/lbrynet/daemon/Publisher.py b/lbrynet/daemon/Publisher.py index da565db44..283e478a9 100644 --- a/lbrynet/daemon/Publisher.py +++ b/lbrynet/daemon/Publisher.py @@ -42,15 +42,13 @@ class Publisher(object): claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) # check if we have a file already for this claim (if this is a publish update with a new stream) - old_stream_hashes = yield self.session.storage.get_stream_hashes_for_claim_id(claim_out['claim_id']) + old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'], + self.lbry_file.stream_hash) if old_stream_hashes: - lbry_files = list(self.lbry_file_manager.lbry_files) - for lbry_file in lbry_files: - s_h = lbry_file.stream_hash - if s_h in old_stream_hashes: - yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) - old_stream_hashes.remove(s_h) - log.info("Removed old stream for claim update: %s", s_h) + for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes, + list(self.lbry_file_manager.lbry_files)): + yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False) + log.info("Removed old stream for claim update: %s", lbry_file.stream_hash) yield self.session.storage.save_content_claim( self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 9905e0197..dcf305ae1 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -579,12 +579,12 @@ class SQLiteStorage(object): log.warning("claim %s contains the same stream as the one already downloaded from claim %s", claim_id, known_claim_id) - def get_stream_hashes_for_claim_id(self, claim_id): + def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash): return self.run_and_return_list( "select f.stream_hash from file f " "inner join content_claim cc on f.stream_hash=cc.stream_hash " - "inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=?", - claim_id + "inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=? " + "where f.stream_hash!=?", claim_id, new_stream_hash ) @defer.inlineCallbacks @@ -670,7 +670,7 @@ class SQLiteStorage(object): def _get_claim(transaction): claim_info = transaction.execute( - "select * from claim where claim_id=? order by height, rowid desc", (claim_id, ) + "select * from claim where claim_id=? order by rowid desc", (claim_id, ) ).fetchone() result = _claim_response(*claim_info) if result['channel_claim_id']: @@ -701,3 +701,25 @@ class SQLiteStorage(object): ).fetchall() ] return self.db.runInteraction(_get_unknown_certificate_claim_ids) + + @defer.inlineCallbacks + def get_pending_claim_outpoints(self): + claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1") + results = {} # {txid: [nout, ...]} + for outpoint_str in claim_outpoints: + txid, nout = outpoint_str.split(":") + outputs = results.get(txid, []) + outputs.append(int(nout)) + results[txid] = outputs + if results: + log.debug("missing transaction heights for %i claims", len(results)) + defer.returnValue(results) + + def save_claim_tx_heights(self, claim_tx_heights): + def _save_claim_heights(transaction): + for outpoint, height in claim_tx_heights.iteritems(): + transaction.execute( + "update claim set height=? where claim_outpoint=? and height=-1", + (height, outpoint) + ) + return self.db.runInteraction(_save_claim_heights) diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 6288fdcf2..dbf1b7c54 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -296,8 +296,9 @@ class ContentClaimStorageTests(StorageTest): stored_content_claim = yield self.storage.get_content_claim(stream_hash) self.assertDictEqual(stored_content_claim, fake_claim_info) - stream_hashes = yield self.storage.get_stream_hashes_for_claim_id(fake_claim_info['claim_id']) - self.assertListEqual(stream_hashes, [stream_hash]) + stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'], + stream_hash) + self.assertListEqual(stream_hashes, []) # test that we can't associate a claim update with a new stream to the file second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash()