Merge pull request #1264 from lbryio/faster-claim-sqlite

batch insert/update claims to sqlite
This commit is contained in:
Jack Robison 2018-06-20 17:06:29 -04:00 committed by GitHub
commit 3d5bef1595
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 127 additions and 98 deletions

View file

@ -21,7 +21,7 @@ at anytime.
*
### Changed
*
* save claims to sqlite in batches to speed up `resolve` queries for many uris
*
### Added

View file

@ -513,14 +513,30 @@ class Wallet(object):
@defer.inlineCallbacks
def save_claim(self, claim_info):
claims = []
if 'value' in claim_info:
if claim_info['value']:
yield self.storage.save_claim(claim_info)
claims.append(claim_info)
else:
if 'certificate' in claim_info and claim_info['certificate']['value']:
yield self.storage.save_claim(claim_info['certificate'])
claims.append(claim_info['certificate'])
if 'claim' in claim_info and claim_info['claim']['value']:
yield self.storage.save_claim(claim_info['claim'])
claims.append(claim_info['claim'])
yield self.storage.save_claims(claims)
@defer.inlineCallbacks
def save_claims(self, claim_infos):
to_save = []
for info in claim_infos:
if 'value' in info:
if info['value']:
to_save.append(info)
else:
if 'certificate' in info and info['certificate']['value']:
to_save.append(info['certificate'])
if 'claim' in info and info['claim']['value']:
to_save.append(info['claim'])
yield self.storage.save_claims(to_save)
@defer.inlineCallbacks
def resolve(self, *uris, **kwargs):
@ -528,16 +544,15 @@ class Wallet(object):
page_size = kwargs.get('page_size', 10)
result = {}
batch_results = yield self._get_values_for_uris(page, page_size, *uris)
to_save = []
for uri, resolve_results in batch_results.iteritems():
try:
result[uri] = self._handle_claim_result(resolve_results)
yield self.save_claim(result[uri])
to_save.append(result[uri])
except (UnknownNameError, UnknownClaimID, UnknownURI) as err:
result[uri] = {'error': err.message}
yield self.save_claims(to_save)
defer.returnValue(result)
@defer.inlineCallbacks
@ -668,7 +683,7 @@ class Wallet(object):
log.error(msg)
raise Exception(msg)
claim = self._process_claim_out(claim)
yield self.storage.save_claim(self._get_temp_claim_info(claim, name, bid), smart_decode(claim['value']))
yield self.storage.save_claims([self._get_temp_claim_info(claim, name, bid)])
defer.returnValue(claim)
@defer.inlineCallbacks

View file

@ -565,58 +565,70 @@ class SQLiteStorage(object):
# # # # # # # # # claim functions # # # # # # # # #
@defer.inlineCallbacks
def save_claim(self, claim_info, claim_dict=None):
outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout'])
claim_id = claim_info['claim_id']
name = claim_info['name']
amount = int(COIN * claim_info['amount'])
height = claim_info['height']
address = claim_info['address']
sequence = claim_info['claim_sequence']
claim_dict = claim_dict or smart_decode(claim_info['value'])
serialized = claim_dict.serialized.encode('hex')
def _save_claim(transaction):
transaction.execute(
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence)
)
yield self.db.runInteraction(_save_claim)
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info
yield self.save_supports(claim_id, claim_info['supports'])
# check for content claim updates
if claim_dict.source_hash:
existing_file_stream_hash = yield self.run_and_return_one_or_none(
"select file.stream_hash from stream "
"inner join file on file.stream_hash=stream.stream_hash "
"where sd_hash=?", claim_dict.source_hash
)
if existing_file_stream_hash:
known_outpoint = yield self.run_and_return_one_or_none(
"select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash
def save_claims(self, claim_infos):
def _save_claims(transaction):
content_claims_to_update = []
support_callbacks = []
for claim_info in claim_infos:
outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout'])
claim_id = claim_info['claim_id']
name = claim_info['name']
amount = int(COIN * claim_info['amount'])
height = claim_info['height']
address = claim_info['address']
sequence = claim_info['claim_sequence']
try:
certificate_id = claim_info['value'].get('content_claims_to_update', {}).get('certificateId')
except AttributeError:
certificate_id = None
try:
if claim_info['value'].get('stream', {}).get('source', {}).get('sourceType') == "lbry_sd_hash":
source_hash = claim_info['value'].get('stream', {}).get('source', {}).get('source')
else:
source_hash = None
except AttributeError:
source_hash = None
serialized = claim_info.get('hex') or smart_decode(claim_info['value']).serialized.encode('hex')
transaction.execute(
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence)
)
known_claim_id = yield self.run_and_return_one_or_none(
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info
support_callbacks.append(self.save_supports(claim_id, claim_info['supports']))
if not source_hash:
continue
stream_hash = transaction.execute(
"select file.stream_hash from stream "
"inner join file on file.stream_hash=stream.stream_hash where sd_hash=?", (source_hash, )
).fetchone()
if not stream_hash:
continue
stream_hash = stream_hash[0]
known_outpoint = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash, )
)
known_claim_id = transaction.execute(
"select claim_id from claim "
"inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint "
"where c3.stream_hash=?", existing_file_stream_hash
"where c3.stream_hash=?", (stream_hash, )
)
if not known_claim_id: # this is a claim matching one of our files that has
# no associated claim yet
log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint)
elif known_claim_id and known_claim_id == claim_id:
if known_outpoint != outpoint: # this is an update for one of our files
log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint)
else: # we're up to date already
pass
else: # this is a claim containing a clone of a file that we have
log.warning("claim %s contains the same stream as the one already downloaded from claim %s",
claim_id, known_claim_id)
if not known_claim_id:
content_claims_to_update.append((stream_hash, outpoint))
elif known_outpoint != outpoint:
content_claims_to_update.append((stream_hash, outpoint))
update_file_callbacks = []
for stream_hash, outpoint in content_claims_to_update:
self._save_content_claim(transaction, outpoint, stream_hash)
if stream_hash in self.content_claim_callbacks:
update_file_callbacks.append(self.content_claim_callbacks[stream_hash]())
return update_file_callbacks, support_callbacks
content_dl, support_dl = yield self.db.runInteraction(_save_claims)
if content_dl:
yield defer.DeferredList(content_dl)
if support_dl:
yield defer.DeferredList(support_dl)
def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash):
return self.run_and_return_list(
@ -626,46 +638,47 @@ class SQLiteStorage(object):
"where f.stream_hash!=?", claim_id, new_stream_hash
)
@staticmethod
def _save_content_claim(transaction, claim_outpoint, stream_hash):
# get the claim id and serialized metadata
claim_info = transaction.execute(
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
).fetchone()
if not claim_info:
raise Exception("claim not found")
new_claim_id, claim = claim_info[0], ClaimDict.deserialize(claim_info[1].decode('hex'))
# certificate claims should not be in the content_claim table
if not claim.is_stream:
raise Exception("claim does not contain a stream")
# get the known sd hash for this stream
known_sd_hash = transaction.execute(
"select sd_hash from stream where stream_hash=?", (stream_hash,)
).fetchone()
if not known_sd_hash:
raise Exception("stream not found")
# check the claim contains the same sd hash
if known_sd_hash[0] != claim.source_hash:
raise Exception("stream mismatch")
# if there is a current claim associated to the file, check that the new claim is an update to it
current_associated_content = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash,)
).fetchone()
if current_associated_content:
current_associated_claim_id = transaction.execute(
"select claim_id from claim where claim_outpoint=?", current_associated_content
).fetchone()[0]
if current_associated_claim_id != new_claim_id:
raise Exception("invalid stream update")
# update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
@defer.inlineCallbacks
def save_content_claim(self, stream_hash, claim_outpoint):
def _save_content_claim(transaction):
# get the claim id and serialized metadata
claim_info = transaction.execute(
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint, )
).fetchone()
if not claim_info:
raise Exception("claim not found")
new_claim_id, claim = claim_info[0], ClaimDict.deserialize(claim_info[1].decode('hex'))
# certificate claims should not be in the content_claim table
if not claim.is_stream:
raise Exception("claim does not contain a stream")
# get the known sd hash for this stream
known_sd_hash = transaction.execute(
"select sd_hash from stream where stream_hash=?", (stream_hash, )
).fetchone()
if not known_sd_hash:
raise Exception("stream not found")
# check the claim contains the same sd hash
if known_sd_hash[0] != claim.source_hash:
raise Exception("stream mismatch")
# if there is a current claim associated to the file, check that the new claim is an update to it
current_associated_content = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash, )
).fetchone()
if current_associated_content:
current_associated_claim_id = transaction.execute(
"select claim_id from claim where claim_outpoint=?", current_associated_content
).fetchone()[0]
if current_associated_claim_id != new_claim_id:
raise Exception("invalid stream update")
# update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
yield self.db.runInteraction(_save_content_claim)
yield self.db.runInteraction(self._save_content_claim, claim_outpoint, stream_hash)
# update corresponding ManagedEncryptedFileDownloader object
if stream_hash in self.content_claim_callbacks:
file_callback = self.content_claim_callbacks[stream_hash]

View file

@ -67,6 +67,7 @@ fake_claim_info = {
}
class FakeAnnouncer(object):
def __init__(self):
self._queue_size = 0
@ -309,7 +310,7 @@ class ContentClaimStorageTests(StorageTest):
yield manager.session.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate
)
yield self.storage.save_claim(fake_claim_info)
yield self.storage.save_claims([fake_claim_info])
yield self.storage.save_content_claim(stream_hash, fake_outpoint)
stored_content_claim = yield self.storage.get_content_claim(stream_hash)
self.assertDictEqual(stored_content_claim, fake_claim_info)
@ -332,7 +333,7 @@ class ContentClaimStorageTests(StorageTest):
update_info['txid'] = "beef0000" * 12
update_info['nout'] = 0
second_outpoint = "%s:%i" % (update_info['txid'], update_info['nout'])
yield self.storage.save_claim(update_info)
yield self.storage.save_claims([update_info])
yield self.storage.save_content_claim(stream_hash, second_outpoint)
update_info_result = yield self.storage.get_content_claim(stream_hash)
self.assertDictEqual(update_info_result, update_info)
@ -343,8 +344,8 @@ class ContentClaimStorageTests(StorageTest):
invalid_update_info['nout'] = 0
invalid_update_info['claim_id'] = "beef0002" * 5
invalid_update_outpoint = "%s:%i" % (invalid_update_info['txid'], invalid_update_info['nout'])
yield self.storage.save_claim(invalid_update_info)
try:
yield self.storage.save_claims([invalid_update_info])
yield self.storage.save_content_claim(stream_hash, invalid_update_outpoint)
raise Exception("test failed")
except Exception as err: