Merge branch 'remove-updated-stream-on-publish'

This commit is contained in:
Jack Robison 2018-03-02 17:31:21 -05:00
commit e4acdba9e4
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
8 changed files with 156 additions and 25 deletions

View file

@ -5,6 +5,7 @@ from decimal import Decimal
from zope.interface import implements from zope.interface import implements
from twisted.internet import threads, reactor, defer, task from twisted.internet import threads, reactor, defer, task
from twisted.python.failure import Failure from twisted.python.failure import Failure
from twisted.internet.error import ConnectionAborted
from lbryum import wallet as lbryum_wallet from lbryum import wallet as lbryum_wallet
from lbryum.network import Network from lbryum.network import Network
@ -19,11 +20,11 @@ from lbryschema.error import DecodeError
from lbryschema.decode import smart_decode from lbryschema.decode import smart_decode
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.utils import DeferredDict
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint
from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError
from twisted.internet.error import ConnectionAborted
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -83,12 +84,15 @@ class Wallet(object):
self._manage_count = 0 self._manage_count = 0
self._balance_refresh_time = 3 self._balance_refresh_time = 3
self._batch_count = 20 self._batch_count = 20
self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims)
def start(self): def start(self):
log.info("Starting wallet.") log.info("Starting wallet.")
def start_manage(): def start_manage():
self.stopped = False self.stopped = False
self.manage() self.manage()
self._pending_claim_checker.start(30)
return True return True
d = self._start() d = self._start()
@ -102,6 +106,9 @@ class Wallet(object):
def stop(self): def stop(self):
log.info("Stopping wallet.") log.info("Stopping wallet.")
self.stopped = True self.stopped = True
if self._pending_claim_checker.running:
self._pending_claim_checker.stop()
# If self.next_manage_call is None, then manage is currently running or else # If self.next_manage_call is None, then manage is currently running or else
# start has not been called, so set stopped and do nothing else. # start has not been called, so set stopped and do nothing else.
if self.next_manage_call is not None: if self.next_manage_call is not None:
@ -315,6 +322,19 @@ class Wallet(object):
###### ######
@defer.inlineCallbacks
def fetch_and_save_heights_for_pending_claims(self):
pending_outpoints = yield self.storage.get_pending_claim_outpoints()
if pending_outpoints:
tx_heights = yield DeferredDict({txid: self.get_height_for_txid(txid) for txid in pending_outpoints},
consumeErrors=True)
outpoint_heights = {}
for txid, outputs in pending_outpoints.iteritems():
if txid in tx_heights:
for nout in outputs:
outpoint_heights["%s:%i" % (txid, nout)] = tx_heights[txid]
yield self.storage.save_claim_tx_heights(outpoint_heights)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_claim_by_claim_id(self, claim_id, check_expire=True): def get_claim_by_claim_id(self, claim_id, check_expire=True):
claim = yield self._get_claim_by_claimid(claim_id) claim = yield self._get_claim_by_claimid(claim_id)
@ -410,12 +430,6 @@ class Wallet(object):
batch_results = yield self._get_values_for_uris(page, page_size, *uris) batch_results = yield self._get_values_for_uris(page, page_size, *uris)
for uri, resolve_results in batch_results.iteritems(): for uri, resolve_results in batch_results.iteritems():
claim_id = None
if resolve_results and 'claim' in resolve_results:
claim_id = resolve_results['claim']['claim_id']
certificate_id = None
if resolve_results and 'certificate' in resolve_results:
certificate_id = resolve_results['certificate']['claim_id']
try: try:
result[uri] = self._handle_claim_result(resolve_results) result[uri] = self._handle_claim_result(resolve_results)
yield self.save_claim(result[uri]) yield self.save_claim(result[uri])
@ -771,6 +785,9 @@ class Wallet(object):
def get_max_usable_balance_for_claim(self, claim_name): def get_max_usable_balance_for_claim(self, claim_name):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def get_height_for_txid(self, txid):
return defer.fail(NotImplementedError())
def _start(self): def _start(self):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
@ -1163,6 +1180,9 @@ class LBRYumWallet(Wallet):
def claim_renew(self, txid, nout): def claim_renew(self, txid, nout):
return self._run_cmd_as_defer_succeed('renewclaim', txid, nout) return self._run_cmd_as_defer_succeed('renewclaim', txid, nout)
def get_height_for_txid(self, txid):
return self._run_cmd_as_defer_to_thread('gettransactionheight', txid)
def decrypt_wallet(self): def decrypt_wallet(self):
if not self.wallet.use_encryption: if not self.wallet.use_encryption:
return False return False

View file

@ -7,7 +7,7 @@ import string
import json import json
import pkg_resources import pkg_resources
from twisted.internet import defer
from lbryschema.claim import ClaimDict from lbryschema.claim import ClaimDict
from lbrynet.core.cryptoutils import get_lbry_hash_obj from lbrynet.core.cryptoutils import get_lbry_hash_obj
@ -146,3 +146,18 @@ def get_sd_hash(stream_info):
def json_dumps_pretty(obj, **kwargs): def json_dumps_pretty(obj, **kwargs):
return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs)
@defer.inlineCallbacks
def DeferredDict(d, consumeErrors=False):
keys = []
dl = []
response = {}
for k, v in d.iteritems():
keys.append(k)
dl.append(v)
results = yield defer.DeferredList(dl, consumeErrors=consumeErrors)
for k, (success, result) in zip(keys, results):
if success:
response[k] = result
defer.returnValue(response)

View file

@ -1626,6 +1626,10 @@ class Daemon(AuthJSONRPCServer):
timeout = timeout if timeout is not None else self.download_timeout timeout = timeout if timeout is not None else self.download_timeout
parsed_uri = parse_lbry_uri(uri)
if parsed_uri.is_channel and not parsed_uri.path:
raise Exception("cannot download a channel claim, specify a /path")
resolved_result = yield self.session.wallet.resolve(uri) resolved_result = yield self.session.wallet.resolve(uri)
if resolved_result and uri in resolved_result: if resolved_result and uri in resolved_result:
resolved = resolved_result[uri] resolved = resolved_result[uri]
@ -1634,11 +1638,9 @@ class Daemon(AuthJSONRPCServer):
if not resolved or 'value' not in resolved: if not resolved or 'value' not in resolved:
if 'claim' not in resolved: if 'claim' not in resolved:
if 'certificate' in resolved: raise Exception(
raise Exception("Cannot use get on channels") "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", ""))
else: )
raise Exception(
"Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")))
else: else:
resolved = resolved['claim'] resolved = resolved['claim']
txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] txid, nout, name = resolved['txid'], resolved['nout'], resolved['name']
@ -2799,6 +2801,7 @@ class Daemon(AuthJSONRPCServer):
Returns: Returns:
(bool) true if successful (bool) true if successful
""" """
if announce_all: if announce_all:
yield self.session.blob_manager.immediate_announce_all_blobs() yield self.session.blob_manager.immediate_announce_all_blobs()
else: else:
@ -2812,11 +2815,9 @@ class Daemon(AuthJSONRPCServer):
else: else:
raise Exception('single argument must be specified') raise Exception('single argument must be specified')
if not blob_hash: if not blob_hash:
blobs = yield self.storage.get_blobs_for_stream(stream_hash) blobs = yield self.storage.get_blobs_for_stream(stream_hash, only_completed=True)
blob_hashes.extend([blob.blob_hash for blob in blobs if blob.get_is_verified()]) blob_hashes.extend([blob.blob_hash for blob in blobs])
yield self.session.blob_manager._immediate_announce(blob_hashes) yield self.session.blob_manager._immediate_announce(blob_hashes)
response = yield self._render_response(True) response = yield self._render_response(True)
defer.returnValue(response) defer.returnValue(response)

View file

@ -183,7 +183,6 @@ class GetStream(object):
self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))
yield self.downloader.get_claim_info()
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)

View file

@ -40,10 +40,19 @@ class Publisher(object):
claim_dict['stream']['source']['contentType'] = get_content_type(file_path) claim_dict['stream']['source']['contentType'] = get_content_type(file_path)
claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
# check if we have a file already for this claim (if this is a publish update with a new stream)
old_stream_hashes = yield self.session.storage.get_old_stream_hashes_for_claim_id(claim_out['claim_id'],
self.lbry_file.stream_hash)
if old_stream_hashes:
for lbry_file in filter(lambda l: l.stream_hash in old_stream_hashes,
list(self.lbry_file_manager.lbry_files)):
yield self.lbry_file_manager.delete_lbry_file(lbry_file, delete_file=False)
log.info("Removed old stream for claim update: %s", lbry_file.stream_hash)
yield self.session.storage.save_content_claim( yield self.session.storage.save_content_claim(
self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']) self.lbry_file.stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])
) )
yield self.lbry_file.get_claim_info()
defer.returnValue(claim_out) defer.returnValue(claim_out)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -51,6 +60,7 @@ class Publisher(object):
"""Make a claim without creating a lbry file""" """Make a claim without creating a lbry file"""
claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address) claim_out = yield self.make_claim(name, bid, claim_dict, claim_address, change_address)
yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout'])) yield self.session.storage.save_content_claim(stream_hash, "%s:%i" % (claim_out['txid'], claim_out['nout']))
self.lbry_file = [f for f in self.lbry_file_manager.lbry_files if f.stream_hash == stream_hash][0]
defer.returnValue(claim_out) defer.returnValue(claim_out)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -186,6 +186,11 @@ class SQLiteStorage(object):
self.db = SqliteConnection(self._db_path) self.db = SqliteConnection(self._db_path)
self.db.set_reactor(reactor) self.db.set_reactor(reactor)
# used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a
# change to the associated content claim occurs. these are added by the file manager
# when it loads each file
self.content_claim_callbacks = {} # {<stream_hash>: <callable returning a deferred>}
def setup(self): def setup(self):
def _create_tables(transaction): def _create_tables(transaction):
transaction.executescript(self.CREATE_TABLES_QUERY) transaction.executescript(self.CREATE_TABLES_QUERY)
@ -384,11 +389,14 @@ class SQLiteStorage(object):
stream_hash, blob_num stream_hash, blob_num
) )
def get_blobs_for_stream(self, stream_hash): def get_blobs_for_stream(self, stream_hash, only_completed=False):
def _get_blobs_for_stream(transaction): def _get_blobs_for_stream(transaction):
crypt_blob_infos = [] crypt_blob_infos = []
stream_blobs = transaction.execute("select blob_hash, position, iv from stream_blob " if only_completed:
"where stream_hash=?", (stream_hash, )).fetchall() query = "select blob_hash, position, iv from stream_blob where stream_hash=? and status='finished'"
else:
query = "select blob_hash, position, iv from stream_blob where stream_hash=?"
stream_blobs = transaction.execute(query, (stream_hash, )).fetchall()
if stream_blobs: if stream_blobs:
for blob_hash, position, iv in stream_blobs: for blob_hash, position, iv in stream_blobs:
if blob_hash is not None: if blob_hash is not None:
@ -537,12 +545,52 @@ class SQLiteStorage(object):
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) (outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence)
) )
yield self.db.runInteraction(_save_claim) yield self.db.runInteraction(_save_claim)
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info # support info
yield self.save_supports(claim_id, claim_info['supports']) yield self.save_supports(claim_id, claim_info['supports'])
# check for content claim updates
if claim_dict.source_hash:
existing_file_stream_hash = yield self.run_and_return_one_or_none(
"select file.stream_hash from stream "
"inner join file on file.stream_hash=stream.stream_hash "
"where sd_hash=?", claim_dict.source_hash
)
if existing_file_stream_hash:
known_outpoint = yield self.run_and_return_one_or_none(
"select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash
)
known_claim_id = yield self.run_and_return_one_or_none(
"select claim_id from claim "
"inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint "
"where c3.stream_hash=?", existing_file_stream_hash
)
if not known_claim_id: # this is a claim matching one of our files that has
# no associated claim yet
log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint)
elif known_claim_id and known_claim_id == claim_id:
if known_outpoint != outpoint: # this is an update for one of our files
log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint)
else: # we're up to date already
pass
else: # this is a claim containing a clone of a file that we have
log.warning("claim %s contains the same stream as the one already downloaded from claim %s",
claim_id, known_claim_id)
def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash):
return self.run_and_return_list(
"select f.stream_hash from file f "
"inner join content_claim cc on f.stream_hash=cc.stream_hash "
"inner join claim c on c.claim_outpoint=cc.claim_outpoint and c.claim_id=? "
"where f.stream_hash!=?", claim_id, new_stream_hash
)
@defer.inlineCallbacks
def save_content_claim(self, stream_hash, claim_outpoint): def save_content_claim(self, stream_hash, claim_outpoint):
def _save_content_claim(transaction): def _save_content_claim(transaction):
# get the claim id and serialized metadata # get the claim id and serialized metadata
@ -580,7 +628,12 @@ class SQLiteStorage(object):
# update the claim associated to the file # update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
return self.db.runInteraction(_save_content_claim) yield self.db.runInteraction(_save_content_claim)
# update corresponding ManagedEncryptedFileDownloader object
if stream_hash in self.content_claim_callbacks:
file_callback = self.content_claim_callbacks[stream_hash]
yield file_callback()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_content_claim(self, stream_hash, include_supports=True): def get_content_claim(self, stream_hash, include_supports=True):
@ -620,7 +673,7 @@ class SQLiteStorage(object):
def _get_claim(transaction): def _get_claim(transaction):
claim_info = transaction.execute( claim_info = transaction.execute(
"select * from claim where claim_id=? order by height, rowid desc", (claim_id, ) "select * from claim where claim_id=? order by rowid desc", (claim_id, )
).fetchone() ).fetchone()
result = _claim_response(*claim_info) result = _claim_response(*claim_info)
if result['channel_claim_id']: if result['channel_claim_id']:
@ -651,3 +704,25 @@ class SQLiteStorage(object):
).fetchall() ).fetchall()
] ]
return self.db.runInteraction(_get_unknown_certificate_claim_ids) return self.db.runInteraction(_get_unknown_certificate_claim_ids)
@defer.inlineCallbacks
def get_pending_claim_outpoints(self):
claim_outpoints = yield self.run_and_return_list("select claim_outpoint from claim where height=-1")
results = {} # {txid: [nout, ...]}
for outpoint_str in claim_outpoints:
txid, nout = outpoint_str.split(":")
outputs = results.get(txid, [])
outputs.append(int(nout))
results[txid] = outputs
if results:
log.debug("missing transaction heights for %i claims", len(results))
defer.returnValue(results)
def save_claim_tx_heights(self, claim_tx_heights):
def _save_claim_heights(transaction):
for outpoint, height in claim_tx_heights.iteritems():
transaction.execute(
"update claim set height=? where claim_outpoint=? and height=-1",
(height, outpoint)
)
return self.db.runInteraction(_save_claim_heights)

View file

@ -127,6 +127,7 @@ class EncryptedFileManager(object):
try: try:
# restore will raise an Exception if status is unknown # restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status']) lbry_file.restore(file_info['status'])
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
except Exception: except Exception:
log.warning("Failed to start %i", file_info.get('rowid')) log.warning("Failed to start %i", file_info.get('rowid'))
@ -171,6 +172,8 @@ class EncryptedFileManager(object):
stream_metadata['suggested_file_name'] stream_metadata['suggested_file_name']
) )
lbry_file.restore(status) lbry_file.restore(status)
yield lbry_file.get_claim_info()
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file) defer.returnValue(lbry_file)
@ -195,8 +198,9 @@ class EncryptedFileManager(object):
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
stream_metadata['suggested_file_name'] stream_metadata['suggested_file_name']
) )
lbry_file.get_claim_info(include_supports=False)
lbry_file.restore(status) lbry_file.restore(status)
yield lbry_file.get_claim_info(include_supports=False)
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file) defer.returnValue(lbry_file)
@ -220,6 +224,9 @@ class EncryptedFileManager(object):
self.lbry_files.remove(lbry_file) self.lbry_files.remove(lbry_file)
if lbry_file.stream_hash in self.storage.content_claim_callbacks:
del self.storage.content_claim_callbacks[lbry_file.stream_hash]
yield lbry_file.delete_data() yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash) yield self.session.storage.delete_stream(lbry_file.stream_hash)

View file

@ -296,6 +296,10 @@ class ContentClaimStorageTests(StorageTest):
stored_content_claim = yield self.storage.get_content_claim(stream_hash) stored_content_claim = yield self.storage.get_content_claim(stream_hash)
self.assertDictEqual(stored_content_claim, fake_claim_info) self.assertDictEqual(stored_content_claim, fake_claim_info)
stream_hashes = yield self.storage.get_old_stream_hashes_for_claim_id(fake_claim_info['claim_id'],
stream_hash)
self.assertListEqual(stream_hashes, [])
# test that we can't associate a claim update with a new stream to the file # test that we can't associate a claim update with a new stream to the file
second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash() second_stream_hash, second_sd_hash = random_lbry_hash(), random_lbry_hash()
yield self.make_and_store_fake_stream(blob_count=2, stream_hash=second_stream_hash, sd_hash=second_sd_hash) yield self.make_and_store_fake_stream(blob_count=2, stream_hash=second_stream_hash, sd_hash=second_sd_hash)