diff --git a/CHANGELOG.md b/CHANGELOG.md index bd4fb7bec..7334a4dff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ at anytime. * ### Changed - * + * save claims to sqlite in batches to speed up `resolve` queries for many uris * ### Added diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 2bc94a332..0b71ed59d 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -513,14 +513,30 @@ class Wallet(object): @defer.inlineCallbacks def save_claim(self, claim_info): + claims = [] if 'value' in claim_info: if claim_info['value']: - yield self.storage.save_claim(claim_info) + claims.append(claim_info) else: if 'certificate' in claim_info and claim_info['certificate']['value']: - yield self.storage.save_claim(claim_info['certificate']) + claims.append(claim_info['certificate']) if 'claim' in claim_info and claim_info['claim']['value']: - yield self.storage.save_claim(claim_info['claim']) + claims.append(claim_info['claim']) + yield self.storage.save_claims(claims) + + @defer.inlineCallbacks + def save_claims(self, claim_infos): + to_save = [] + for info in claim_infos: + if 'value' in info: + if info['value']: + to_save.append(info) + else: + if 'certificate' in info and info['certificate']['value']: + to_save.append(info['certificate']) + if 'claim' in info and info['claim']['value']: + to_save.append(info['claim']) + yield self.storage.save_claims(to_save) @defer.inlineCallbacks def resolve(self, *uris, **kwargs): @@ -528,16 +544,15 @@ class Wallet(object): page_size = kwargs.get('page_size', 10) result = {} - batch_results = yield self._get_values_for_uris(page, page_size, *uris) - + to_save = [] for uri, resolve_results in batch_results.iteritems(): try: result[uri] = self._handle_claim_result(resolve_results) - yield self.save_claim(result[uri]) + to_save.append(result[uri]) except (UnknownNameError, UnknownClaimID, UnknownURI) as err: result[uri] = {'error': err.message} - + yield self.save_claims(to_save) defer.returnValue(result) @defer.inlineCallbacks @@ -668,7 +683,7 @@ class Wallet(object): log.error(msg) raise Exception(msg) claim = self._process_claim_out(claim) - yield self.storage.save_claim(self._get_temp_claim_info(claim, name, bid), smart_decode(claim['value'])) + yield self.storage.save_claims([self._get_temp_claim_info(claim, name, bid)]) defer.returnValue(claim) @defer.inlineCallbacks diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 5ac07d6e4..00e1d9a4d 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -565,58 +565,70 @@ class SQLiteStorage(object): # # # # # # # # # claim functions # # # # # # # # # @defer.inlineCallbacks - def save_claim(self, claim_info, claim_dict=None): - outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout']) - claim_id = claim_info['claim_id'] - name = claim_info['name'] - amount = int(COIN * claim_info['amount']) - height = claim_info['height'] - address = claim_info['address'] - sequence = claim_info['claim_sequence'] - claim_dict = claim_dict or smart_decode(claim_info['value']) - serialized = claim_dict.serialized.encode('hex') - - def _save_claim(transaction): - transaction.execute( - "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", - (outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) - ) - - yield self.db.runInteraction(_save_claim) - - if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing - # support info - yield self.save_supports(claim_id, claim_info['supports']) - - # check for content claim updates - if claim_dict.source_hash: - existing_file_stream_hash = yield self.run_and_return_one_or_none( - "select file.stream_hash from stream " - "inner join file on file.stream_hash=stream.stream_hash " - "where sd_hash=?", claim_dict.source_hash - ) - if existing_file_stream_hash: - known_outpoint = yield self.run_and_return_one_or_none( - "select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash + def save_claims(self, claim_infos): + def _save_claims(transaction): + content_claims_to_update = [] + support_callbacks = [] + for claim_info in claim_infos: + outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout']) + claim_id = claim_info['claim_id'] + name = claim_info['name'] + amount = int(COIN * claim_info['amount']) + height = claim_info['height'] + address = claim_info['address'] + sequence = claim_info['claim_sequence'] + try: + certificate_id = claim_info['value'].get('content_claims_to_update', {}).get('certificateId') + except AttributeError: + certificate_id = None + try: + if claim_info['value'].get('stream', {}).get('source', {}).get('sourceType') == "lbry_sd_hash": + source_hash = claim_info['value'].get('stream', {}).get('source', {}).get('source') + else: + source_hash = None + except AttributeError: + source_hash = None + serialized = claim_info.get('hex') or smart_decode(claim_info['value']).serialized.encode('hex') + transaction.execute( + "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence) ) - known_claim_id = yield self.run_and_return_one_or_none( + if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing + # support info + support_callbacks.append(self.save_supports(claim_id, claim_info['supports'])) + if not source_hash: + continue + stream_hash = transaction.execute( + "select file.stream_hash from stream " + "inner join file on file.stream_hash=stream.stream_hash where sd_hash=?", (source_hash, ) + ).fetchone() + if not stream_hash: + continue + stream_hash = stream_hash[0] + known_outpoint = transaction.execute( + "select claim_outpoint from content_claim where stream_hash=?", (stream_hash, ) + ) + known_claim_id = transaction.execute( "select claim_id from claim " "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint " - "where c3.stream_hash=?", existing_file_stream_hash + "where c3.stream_hash=?", (stream_hash, ) ) - if not known_claim_id: # this is a claim matching one of our files that has - # no associated claim yet - log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash) - yield self.save_content_claim(existing_file_stream_hash, outpoint) - elif known_claim_id and known_claim_id == claim_id: - if known_outpoint != outpoint: # this is an update for one of our files - log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash) - yield self.save_content_claim(existing_file_stream_hash, outpoint) - else: # we're up to date already - pass - else: # this is a claim containing a clone of a file that we have - log.warning("claim %s contains the same stream as the one already downloaded from claim %s", - claim_id, known_claim_id) + if not known_claim_id: + content_claims_to_update.append((stream_hash, outpoint)) + elif known_outpoint != outpoint: + content_claims_to_update.append((stream_hash, outpoint)) + update_file_callbacks = [] + for stream_hash, outpoint in content_claims_to_update: + self._save_content_claim(transaction, outpoint, stream_hash) + if stream_hash in self.content_claim_callbacks: + update_file_callbacks.append(self.content_claim_callbacks[stream_hash]()) + return update_file_callbacks, support_callbacks + + content_dl, support_dl = yield self.db.runInteraction(_save_claims) + if content_dl: + yield defer.DeferredList(content_dl) + if support_dl: + yield defer.DeferredList(support_dl) def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash): return self.run_and_return_list( @@ -626,46 +638,47 @@ class SQLiteStorage(object): "where f.stream_hash!=?", claim_id, new_stream_hash ) + @staticmethod + def _save_content_claim(transaction, claim_outpoint, stream_hash): + # get the claim id and serialized metadata + claim_info = transaction.execute( + "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,) + ).fetchone() + if not claim_info: + raise Exception("claim not found") + new_claim_id, claim = claim_info[0], ClaimDict.deserialize(claim_info[1].decode('hex')) + + # certificate claims should not be in the content_claim table + if not claim.is_stream: + raise Exception("claim does not contain a stream") + + # get the known sd hash for this stream + known_sd_hash = transaction.execute( + "select sd_hash from stream where stream_hash=?", (stream_hash,) + ).fetchone() + if not known_sd_hash: + raise Exception("stream not found") + # check the claim contains the same sd hash + if known_sd_hash[0] != claim.source_hash: + raise Exception("stream mismatch") + + # if there is a current claim associated to the file, check that the new claim is an update to it + current_associated_content = transaction.execute( + "select claim_outpoint from content_claim where stream_hash=?", (stream_hash,) + ).fetchone() + if current_associated_content: + current_associated_claim_id = transaction.execute( + "select claim_id from claim where claim_outpoint=?", current_associated_content + ).fetchone()[0] + if current_associated_claim_id != new_claim_id: + raise Exception("invalid stream update") + + # update the claim associated to the file + transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) + @defer.inlineCallbacks def save_content_claim(self, stream_hash, claim_outpoint): - def _save_content_claim(transaction): - # get the claim id and serialized metadata - claim_info = transaction.execute( - "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint, ) - ).fetchone() - if not claim_info: - raise Exception("claim not found") - new_claim_id, claim = claim_info[0], ClaimDict.deserialize(claim_info[1].decode('hex')) - - # certificate claims should not be in the content_claim table - if not claim.is_stream: - raise Exception("claim does not contain a stream") - - # get the known sd hash for this stream - known_sd_hash = transaction.execute( - "select sd_hash from stream where stream_hash=?", (stream_hash, ) - ).fetchone() - if not known_sd_hash: - raise Exception("stream not found") - # check the claim contains the same sd hash - if known_sd_hash[0] != claim.source_hash: - raise Exception("stream mismatch") - - # if there is a current claim associated to the file, check that the new claim is an update to it - current_associated_content = transaction.execute( - "select claim_outpoint from content_claim where stream_hash=?", (stream_hash, ) - ).fetchone() - if current_associated_content: - current_associated_claim_id = transaction.execute( - "select claim_id from claim where claim_outpoint=?", current_associated_content - ).fetchone()[0] - if current_associated_claim_id != new_claim_id: - raise Exception("invalid stream update") - - # update the claim associated to the file - transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint)) - yield self.db.runInteraction(_save_content_claim) - + yield self.db.runInteraction(self._save_content_claim, claim_outpoint, stream_hash) # update corresponding ManagedEncryptedFileDownloader object if stream_hash in self.content_claim_callbacks: file_callback = self.content_claim_callbacks[stream_hash] diff --git a/lbrynet/tests/unit/database/test_SQLiteStorage.py b/lbrynet/tests/unit/database/test_SQLiteStorage.py index 3829b25ed..0e5328813 100644 --- a/lbrynet/tests/unit/database/test_SQLiteStorage.py +++ b/lbrynet/tests/unit/database/test_SQLiteStorage.py @@ -67,6 +67,7 @@ fake_claim_info = { } + class FakeAnnouncer(object): def __init__(self): self._queue_size = 0 @@ -309,7 +310,7 @@ class ContentClaimStorageTests(StorageTest): yield manager.session.storage.save_published_file( stream_hash, file_name, download_directory, blob_data_rate ) - yield self.storage.save_claim(fake_claim_info) + yield self.storage.save_claims([fake_claim_info]) yield self.storage.save_content_claim(stream_hash, fake_outpoint) stored_content_claim = yield self.storage.get_content_claim(stream_hash) self.assertDictEqual(stored_content_claim, fake_claim_info) @@ -332,7 +333,7 @@ class ContentClaimStorageTests(StorageTest): update_info['txid'] = "beef0000" * 12 update_info['nout'] = 0 second_outpoint = "%s:%i" % (update_info['txid'], update_info['nout']) - yield self.storage.save_claim(update_info) + yield self.storage.save_claims([update_info]) yield self.storage.save_content_claim(stream_hash, second_outpoint) update_info_result = yield self.storage.get_content_claim(stream_hash) self.assertDictEqual(update_info_result, update_info) @@ -343,8 +344,8 @@ class ContentClaimStorageTests(StorageTest): invalid_update_info['nout'] = 0 invalid_update_info['claim_id'] = "beef0002" * 5 invalid_update_outpoint = "%s:%i" % (invalid_update_info['txid'], invalid_update_info['nout']) - yield self.storage.save_claim(invalid_update_info) try: + yield self.storage.save_claims([invalid_update_info]) yield self.storage.save_content_claim(stream_hash, invalid_update_outpoint) raise Exception("test failed") except Exception as err: