fix updating content claims for existing files

-update lbry file attributes as soon as a change to the content claim occurs
This commit is contained in:
Jack Robison 2018-02-28 14:20:33 -05:00
parent d1240541c8
commit 76cb674ebb
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 45 additions and 4 deletions

View file

@ -722,7 +722,6 @@ class Daemon(AuthJSONRPCServer):
d = reupload.reflect_stream(publisher.lbry_file) d = reupload.reflect_stream(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
yield publisher.lbry_file.get_claim_info()
self.analytics_manager.send_claim_action('publish') self.analytics_manager.send_claim_action('publish')
log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'], log.info("Success! Published to lbry://%s txid: %s nout: %d", name, claim_out['txid'],
claim_out['nout']) claim_out['nout'])

View file

@ -183,7 +183,6 @@ class GetStream(object):
self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) yield self.session.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))
yield self.downloader.get_claim_info()
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)

View file

@ -186,6 +186,11 @@ class SQLiteStorage(object):
self.db = SqliteConnection(self._db_path) self.db = SqliteConnection(self._db_path)
self.db.set_reactor(reactor) self.db.set_reactor(reactor)
# used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a
# change to the associated content claim occurs. these are added by the file manager
# when it loads each file
self.content_claim_callbacks = {} # {<stream_hash>: <callable returning a deferred>}
def setup(self): def setup(self):
def _create_tables(transaction): def _create_tables(transaction):
transaction.executescript(self.CREATE_TABLES_QUERY) transaction.executescript(self.CREATE_TABLES_QUERY)
@ -537,12 +542,37 @@ class SQLiteStorage(object):
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) (outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence)
) )
yield self.db.runInteraction(_save_claim) yield self.db.runInteraction(_save_claim)
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info # support info
yield self.save_supports(claim_id, claim_info['supports']) yield self.save_supports(claim_id, claim_info['supports'])
# check for content claim updates
if claim_dict.source_hash:
existing_file_stream_hash = yield self.run_and_return_one_or_none(
"select file.stream_hash from stream "
"inner join file on file.stream_hash=stream.stream_hash "
"where sd_hash=?", claim_dict.source_hash
)
if existing_file_stream_hash:
known_outpoint = yield self.run_and_return_one_or_none(
"select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash
)
known_claim_id = yield self.run_and_return_one_or_none(
"select claim_id from claim "
"inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint "
"where c3.stream_hash=?", existing_file_stream_hash
)
if not known_claim_id:
log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint)
elif known_claim_id and known_claim_id == claim_id:
if known_outpoint != outpoint:
log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint)
def get_stream_hashes_for_claim_id(self, claim_id): def get_stream_hashes_for_claim_id(self, claim_id):
return self.run_and_return_list( return self.run_and_return_list(
"select f.stream_hash from file f " "select f.stream_hash from file f "
@ -551,6 +581,7 @@ class SQLiteStorage(object):
claim_id claim_id
) )
@defer.inlineCallbacks
def save_content_claim(self, stream_hash, claim_outpoint): def save_content_claim(self, stream_hash, claim_outpoint):
def _save_content_claim(transaction): def _save_content_claim(transaction):
# get the claim id and serialized metadata # get the claim id and serialized metadata
@ -588,7 +619,12 @@ class SQLiteStorage(object):
# update the claim associated to the file # update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
return self.db.runInteraction(_save_content_claim) yield self.db.runInteraction(_save_content_claim)
# update corresponding ManagedEncryptedFileDownloader object
if stream_hash in self.content_claim_callbacks:
file_callback = self.content_claim_callbacks[stream_hash]
yield file_callback()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_content_claim(self, stream_hash, include_supports=True): def get_content_claim(self, stream_hash, include_supports=True):

View file

@ -127,6 +127,7 @@ class EncryptedFileManager(object):
try: try:
# restore will raise an Exception if status is unknown # restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status']) lbry_file.restore(file_info['status'])
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
except Exception: except Exception:
log.warning("Failed to start %i", file_info.get('rowid')) log.warning("Failed to start %i", file_info.get('rowid'))
@ -171,6 +172,8 @@ class EncryptedFileManager(object):
stream_metadata['suggested_file_name'] stream_metadata['suggested_file_name']
) )
lbry_file.restore(status) lbry_file.restore(status)
yield lbry_file.get_claim_info()
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file) defer.returnValue(lbry_file)
@ -195,8 +198,9 @@ class EncryptedFileManager(object):
rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory, rowid, stream_hash, payment_rate_manager, sd_hash, key, stream_name, file_name, download_directory,
stream_metadata['suggested_file_name'] stream_metadata['suggested_file_name']
) )
lbry_file.get_claim_info(include_supports=False)
lbry_file.restore(status) lbry_file.restore(status)
yield lbry_file.get_claim_info(include_supports=False)
self.storage.content_claim_callbacks[stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file) self.lbry_files.append(lbry_file)
defer.returnValue(lbry_file) defer.returnValue(lbry_file)
@ -220,6 +224,9 @@ class EncryptedFileManager(object):
self.lbry_files.remove(lbry_file) self.lbry_files.remove(lbry_file)
if lbry_file.stream_hash in self.storage.content_claim_callbacks:
del self.storage.content_claim_callbacks[lbry_file.stream_hash]
yield lbry_file.delete_data() yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash) yield self.session.storage.delete_stream(lbry_file.stream_hash)