batch insert/update claims to sqlite

This commit is contained in:
Jack Robison 2018-06-20 11:41:43 -04:00
parent 889b2facce
commit 61a7fd66bf
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 127 additions and 98 deletions

View file

@ -21,7 +21,7 @@ at anytime.
* *
### Changed ### Changed
* * save claims to sqlite in batches to speed up `resolve` queries for many uris
* *
### Added ### Added

View file

@ -513,14 +513,30 @@ class Wallet(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def save_claim(self, claim_info): def save_claim(self, claim_info):
claims = []
if 'value' in claim_info: if 'value' in claim_info:
if claim_info['value']: if claim_info['value']:
yield self.storage.save_claim(claim_info) claims.append(claim_info)
else: else:
if 'certificate' in claim_info and claim_info['certificate']['value']: if 'certificate' in claim_info and claim_info['certificate']['value']:
yield self.storage.save_claim(claim_info['certificate']) claims.append(claim_info['certificate'])
if 'claim' in claim_info and claim_info['claim']['value']: if 'claim' in claim_info and claim_info['claim']['value']:
yield self.storage.save_claim(claim_info['claim']) claims.append(claim_info['claim'])
yield self.storage.save_claims(claims)
@defer.inlineCallbacks
def save_claims(self, claim_infos):
to_save = []
for info in claim_infos:
if 'value' in info:
if info['value']:
to_save.append(info)
else:
if 'certificate' in info and info['certificate']['value']:
to_save.append(info['certificate'])
if 'claim' in info and info['claim']['value']:
to_save.append(info['claim'])
yield self.storage.save_claims(to_save)
@defer.inlineCallbacks @defer.inlineCallbacks
def resolve(self, *uris, **kwargs): def resolve(self, *uris, **kwargs):
@ -528,16 +544,15 @@ class Wallet(object):
page_size = kwargs.get('page_size', 10) page_size = kwargs.get('page_size', 10)
result = {} result = {}
batch_results = yield self._get_values_for_uris(page, page_size, *uris) batch_results = yield self._get_values_for_uris(page, page_size, *uris)
to_save = []
for uri, resolve_results in batch_results.iteritems(): for uri, resolve_results in batch_results.iteritems():
try: try:
result[uri] = self._handle_claim_result(resolve_results) result[uri] = self._handle_claim_result(resolve_results)
yield self.save_claim(result[uri]) to_save.append(result[uri])
except (UnknownNameError, UnknownClaimID, UnknownURI) as err: except (UnknownNameError, UnknownClaimID, UnknownURI) as err:
result[uri] = {'error': err.message} result[uri] = {'error': err.message}
yield self.save_claims(to_save)
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -668,7 +683,7 @@ class Wallet(object):
log.error(msg) log.error(msg)
raise Exception(msg) raise Exception(msg)
claim = self._process_claim_out(claim) claim = self._process_claim_out(claim)
yield self.storage.save_claim(self._get_temp_claim_info(claim, name, bid), smart_decode(claim['value'])) yield self.storage.save_claims([self._get_temp_claim_info(claim, name, bid)])
defer.returnValue(claim) defer.returnValue(claim)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -565,58 +565,70 @@ class SQLiteStorage(object):
# # # # # # # # # claim functions # # # # # # # # # # # # # # # # # # claim functions # # # # # # # # #
@defer.inlineCallbacks @defer.inlineCallbacks
def save_claim(self, claim_info, claim_dict=None): def save_claims(self, claim_infos):
outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout']) def _save_claims(transaction):
claim_id = claim_info['claim_id'] content_claims_to_update = []
name = claim_info['name'] support_callbacks = []
amount = int(COIN * claim_info['amount']) for claim_info in claim_infos:
height = claim_info['height'] outpoint = "%s:%i" % (claim_info['txid'], claim_info['nout'])
address = claim_info['address'] claim_id = claim_info['claim_id']
sequence = claim_info['claim_sequence'] name = claim_info['name']
claim_dict = claim_dict or smart_decode(claim_info['value']) amount = int(COIN * claim_info['amount'])
serialized = claim_dict.serialized.encode('hex') height = claim_info['height']
address = claim_info['address']
def _save_claim(transaction): sequence = claim_info['claim_sequence']
transaction.execute( try:
"insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)", certificate_id = claim_info['value'].get('content_claims_to_update', {}).get('certificateId')
(outpoint, claim_id, name, amount, height, serialized, claim_dict.certificate_id, address, sequence) except AttributeError:
) certificate_id = None
try:
yield self.db.runInteraction(_save_claim) if claim_info['value'].get('stream', {}).get('source', {}).get('sourceType') == "lbry_sd_hash":
source_hash = claim_info['value'].get('stream', {}).get('source', {}).get('source')
if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing else:
# support info source_hash = None
yield self.save_supports(claim_id, claim_info['supports']) except AttributeError:
source_hash = None
# check for content claim updates serialized = claim_info.get('hex') or smart_decode(claim_info['value']).serialized.encode('hex')
if claim_dict.source_hash: transaction.execute(
existing_file_stream_hash = yield self.run_and_return_one_or_none( "insert or replace into claim values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
"select file.stream_hash from stream " (outpoint, claim_id, name, amount, height, serialized, certificate_id, address, sequence)
"inner join file on file.stream_hash=stream.stream_hash "
"where sd_hash=?", claim_dict.source_hash
)
if existing_file_stream_hash:
known_outpoint = yield self.run_and_return_one_or_none(
"select claim_outpoint from content_claim where stream_hash=?", existing_file_stream_hash
) )
known_claim_id = yield self.run_and_return_one_or_none( if 'supports' in claim_info: # if this response doesn't have support info don't overwrite the existing
# support info
support_callbacks.append(self.save_supports(claim_id, claim_info['supports']))
if not source_hash:
continue
stream_hash = transaction.execute(
"select file.stream_hash from stream "
"inner join file on file.stream_hash=stream.stream_hash where sd_hash=?", (source_hash, )
).fetchone()
if not stream_hash:
continue
stream_hash = stream_hash[0]
known_outpoint = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash, )
)
known_claim_id = transaction.execute(
"select claim_id from claim " "select claim_id from claim "
"inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint " "inner join content_claim c3 ON claim.claim_outpoint=c3.claim_outpoint "
"where c3.stream_hash=?", existing_file_stream_hash "where c3.stream_hash=?", (stream_hash, )
) )
if not known_claim_id: # this is a claim matching one of our files that has if not known_claim_id:
# no associated claim yet content_claims_to_update.append((stream_hash, outpoint))
log.info("discovered content claim %s for stream %s", claim_id, existing_file_stream_hash) elif known_outpoint != outpoint:
yield self.save_content_claim(existing_file_stream_hash, outpoint) content_claims_to_update.append((stream_hash, outpoint))
elif known_claim_id and known_claim_id == claim_id: update_file_callbacks = []
if known_outpoint != outpoint: # this is an update for one of our files for stream_hash, outpoint in content_claims_to_update:
log.info("updating content claim %s for stream %s", claim_id, existing_file_stream_hash) self._save_content_claim(transaction, outpoint, stream_hash)
yield self.save_content_claim(existing_file_stream_hash, outpoint) if stream_hash in self.content_claim_callbacks:
else: # we're up to date already update_file_callbacks.append(self.content_claim_callbacks[stream_hash]())
pass return update_file_callbacks, support_callbacks
else: # this is a claim containing a clone of a file that we have
log.warning("claim %s contains the same stream as the one already downloaded from claim %s", content_dl, support_dl = yield self.db.runInteraction(_save_claims)
claim_id, known_claim_id) if content_dl:
yield defer.DeferredList(content_dl)
if support_dl:
yield defer.DeferredList(support_dl)
def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash): def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash):
return self.run_and_return_list( return self.run_and_return_list(
@ -626,46 +638,47 @@ class SQLiteStorage(object):
"where f.stream_hash!=?", claim_id, new_stream_hash "where f.stream_hash!=?", claim_id, new_stream_hash
) )
@staticmethod
def _save_content_claim(transaction, claim_outpoint, stream_hash):
# get the claim id and serialized metadata
claim_info = transaction.execute(
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,)
).fetchone()
if not claim_info:
raise Exception("claim not found")
new_claim_id, claim = claim_info[0], ClaimDict.deserialize(claim_info[1].decode('hex'))
# certificate claims should not be in the content_claim table
if not claim.is_stream:
raise Exception("claim does not contain a stream")
# get the known sd hash for this stream
known_sd_hash = transaction.execute(
"select sd_hash from stream where stream_hash=?", (stream_hash,)
).fetchone()
if not known_sd_hash:
raise Exception("stream not found")
# check the claim contains the same sd hash
if known_sd_hash[0] != claim.source_hash:
raise Exception("stream mismatch")
# if there is a current claim associated to the file, check that the new claim is an update to it
current_associated_content = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash,)
).fetchone()
if current_associated_content:
current_associated_claim_id = transaction.execute(
"select claim_id from claim where claim_outpoint=?", current_associated_content
).fetchone()[0]
if current_associated_claim_id != new_claim_id:
raise Exception("invalid stream update")
# update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
@defer.inlineCallbacks @defer.inlineCallbacks
def save_content_claim(self, stream_hash, claim_outpoint): def save_content_claim(self, stream_hash, claim_outpoint):
def _save_content_claim(transaction): yield self.db.runInteraction(self._save_content_claim, claim_outpoint, stream_hash)
# get the claim id and serialized metadata
claim_info = transaction.execute(
"select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint, )
).fetchone()
if not claim_info:
raise Exception("claim not found")
new_claim_id, claim = claim_info[0], ClaimDict.deserialize(claim_info[1].decode('hex'))
# certificate claims should not be in the content_claim table
if not claim.is_stream:
raise Exception("claim does not contain a stream")
# get the known sd hash for this stream
known_sd_hash = transaction.execute(
"select sd_hash from stream where stream_hash=?", (stream_hash, )
).fetchone()
if not known_sd_hash:
raise Exception("stream not found")
# check the claim contains the same sd hash
if known_sd_hash[0] != claim.source_hash:
raise Exception("stream mismatch")
# if there is a current claim associated to the file, check that the new claim is an update to it
current_associated_content = transaction.execute(
"select claim_outpoint from content_claim where stream_hash=?", (stream_hash, )
).fetchone()
if current_associated_content:
current_associated_claim_id = transaction.execute(
"select claim_id from claim where claim_outpoint=?", current_associated_content
).fetchone()[0]
if current_associated_claim_id != new_claim_id:
raise Exception("invalid stream update")
# update the claim associated to the file
transaction.execute("insert or replace into content_claim values (?, ?)", (stream_hash, claim_outpoint))
yield self.db.runInteraction(_save_content_claim)
# update corresponding ManagedEncryptedFileDownloader object # update corresponding ManagedEncryptedFileDownloader object
if stream_hash in self.content_claim_callbacks: if stream_hash in self.content_claim_callbacks:
file_callback = self.content_claim_callbacks[stream_hash] file_callback = self.content_claim_callbacks[stream_hash]

View file

@ -67,6 +67,7 @@ fake_claim_info = {
} }
class FakeAnnouncer(object): class FakeAnnouncer(object):
def __init__(self): def __init__(self):
self._queue_size = 0 self._queue_size = 0
@ -309,7 +310,7 @@ class ContentClaimStorageTests(StorageTest):
yield manager.session.storage.save_published_file( yield manager.session.storage.save_published_file(
stream_hash, file_name, download_directory, blob_data_rate stream_hash, file_name, download_directory, blob_data_rate
) )
yield self.storage.save_claim(fake_claim_info) yield self.storage.save_claims([fake_claim_info])
yield self.storage.save_content_claim(stream_hash, fake_outpoint) yield self.storage.save_content_claim(stream_hash, fake_outpoint)
stored_content_claim = yield self.storage.get_content_claim(stream_hash) stored_content_claim = yield self.storage.get_content_claim(stream_hash)
self.assertDictEqual(stored_content_claim, fake_claim_info) self.assertDictEqual(stored_content_claim, fake_claim_info)
@ -332,7 +333,7 @@ class ContentClaimStorageTests(StorageTest):
update_info['txid'] = "beef0000" * 12 update_info['txid'] = "beef0000" * 12
update_info['nout'] = 0 update_info['nout'] = 0
second_outpoint = "%s:%i" % (update_info['txid'], update_info['nout']) second_outpoint = "%s:%i" % (update_info['txid'], update_info['nout'])
yield self.storage.save_claim(update_info) yield self.storage.save_claims([update_info])
yield self.storage.save_content_claim(stream_hash, second_outpoint) yield self.storage.save_content_claim(stream_hash, second_outpoint)
update_info_result = yield self.storage.get_content_claim(stream_hash) update_info_result = yield self.storage.get_content_claim(stream_hash)
self.assertDictEqual(update_info_result, update_info) self.assertDictEqual(update_info_result, update_info)
@ -343,8 +344,8 @@ class ContentClaimStorageTests(StorageTest):
invalid_update_info['nout'] = 0 invalid_update_info['nout'] = 0
invalid_update_info['claim_id'] = "beef0002" * 5 invalid_update_info['claim_id'] = "beef0002" * 5
invalid_update_outpoint = "%s:%i" % (invalid_update_info['txid'], invalid_update_info['nout']) invalid_update_outpoint = "%s:%i" % (invalid_update_info['txid'], invalid_update_info['nout'])
yield self.storage.save_claim(invalid_update_info)
try: try:
yield self.storage.save_claims([invalid_update_info])
yield self.storage.save_content_claim(stream_hash, invalid_update_outpoint) yield self.storage.save_content_claim(stream_hash, invalid_update_outpoint)
raise Exception("test failed") raise Exception("test failed")
except Exception as err: except Exception as err: