batch-start the file manager
This commit is contained in:
parent
ca86af736e
commit
b6cedfec56
3 changed files with 55 additions and 11 deletions
|
@ -714,6 +714,44 @@ class SQLiteStorage(object):
|
||||||
)
|
)
|
||||||
defer.returnValue(result)
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_claims_from_stream_hashes(self, stream_hashes, include_supports=True):
|
||||||
|
def _batch_get_claim(transaction):
|
||||||
|
results = {}
|
||||||
|
bind = "({})".format(','.join('?' for _ in range(len(stream_hashes))))
|
||||||
|
claim_infos = transaction.execute(
|
||||||
|
"select content_claim.stream_hash, c.*, "
|
||||||
|
"case when c.channel_claim_id is not null then "
|
||||||
|
"(select claim_name from claim where claim_id==c.channel_claim_id) "
|
||||||
|
"else null end as channel_name from content_claim "
|
||||||
|
"inner join claim c on c.claim_outpoint=content_claim.claim_outpoint "
|
||||||
|
"and content_claim.stream_hash in {} order by c.rowid desc".format(bind),
|
||||||
|
tuple(stream_hashes)
|
||||||
|
).fetchall()
|
||||||
|
for claim_info in claim_infos:
|
||||||
|
channel_name = claim_info[-1]
|
||||||
|
stream_hash = claim_info[0]
|
||||||
|
result = _format_claim_response(*claim_info[1:-1])
|
||||||
|
if channel_name:
|
||||||
|
result['channel_name'] = channel_name
|
||||||
|
results[stream_hash] = result
|
||||||
|
return results
|
||||||
|
|
||||||
|
claims = yield self.db.runInteraction(_batch_get_claim)
|
||||||
|
if include_supports:
|
||||||
|
all_supports = {}
|
||||||
|
for support in (yield self.get_supports(*[claim['claim_id'] for claim in claims.values()])):
|
||||||
|
all_supports.setdefault(support['claim_id'], []).append(support)
|
||||||
|
for stream_hash in claims.keys():
|
||||||
|
claim = claims[stream_hash]
|
||||||
|
supports = all_supports.get(claim['claim_id'], [])
|
||||||
|
claim['supports'] = supports
|
||||||
|
claim['effective_amount'] = float(
|
||||||
|
sum([support['amount'] for support in supports]) + claim['amount']
|
||||||
|
)
|
||||||
|
claims[stream_hash] = claim
|
||||||
|
defer.returnValue(claims)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_claim(self, claim_outpoint, include_supports=True):
|
def get_claim(self, claim_outpoint, include_supports=True):
|
||||||
def _get_claim(transaction):
|
def _get_claim(transaction):
|
||||||
|
|
|
@ -56,18 +56,21 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver):
|
||||||
self.channel_name = None
|
self.channel_name = None
|
||||||
self.metadata = None
|
self.metadata = None
|
||||||
|
|
||||||
|
def set_claim_info(self, claim_info):
|
||||||
|
self.claim_id = claim_info['claim_id']
|
||||||
|
self.txid = claim_info['txid']
|
||||||
|
self.nout = claim_info['nout']
|
||||||
|
self.channel_claim_id = claim_info['channel_claim_id']
|
||||||
|
self.outpoint = "%s:%i" % (self.txid, self.nout)
|
||||||
|
self.claim_name = claim_info['name']
|
||||||
|
self.channel_name = claim_info['channel_name']
|
||||||
|
self.metadata = claim_info['value']['stream']['metadata']
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_claim_info(self, include_supports=True):
|
def get_claim_info(self, include_supports=True):
|
||||||
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
|
claim_info = yield self.storage.get_content_claim(self.stream_hash, include_supports)
|
||||||
if claim_info:
|
if claim_info:
|
||||||
self.claim_id = claim_info['claim_id']
|
self.set_claim_info(claim_info)
|
||||||
self.txid = claim_info['txid']
|
|
||||||
self.nout = claim_info['nout']
|
|
||||||
self.channel_claim_id = claim_info['channel_claim_id']
|
|
||||||
self.outpoint = "%s:%i" % (self.txid, self.nout)
|
|
||||||
self.claim_name = claim_info['name']
|
|
||||||
self.channel_name = claim_info['channel_name']
|
|
||||||
self.metadata = claim_info['value']['stream']['metadata']
|
|
||||||
|
|
||||||
defer.returnValue(claim_info)
|
defer.returnValue(claim_info)
|
||||||
|
|
||||||
|
|
|
@ -97,13 +97,14 @@ class EncryptedFileManager(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream):
|
def _start_lbry_file(self, file_info, payment_rate_manager, verify_stream, claim_info):
|
||||||
lbry_file = self._get_lbry_file(
|
lbry_file = self._get_lbry_file(
|
||||||
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
|
||||||
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
|
||||||
file_info['suggested_file_name']
|
file_info['suggested_file_name']
|
||||||
)
|
)
|
||||||
yield lbry_file.get_claim_info()
|
if claim_info:
|
||||||
|
lbry_file.set_claim_info(claim_info)
|
||||||
try:
|
try:
|
||||||
# verify if the stream is valid (we might have downloaded an invalid stream
|
# verify if the stream is valid (we might have downloaded an invalid stream
|
||||||
# in the past when the validation check didn't work. This runs after every
|
# in the past when the validation check didn't work. This runs after every
|
||||||
|
@ -130,13 +131,15 @@ class EncryptedFileManager(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _start_lbry_files(self, verify_streams):
|
def _start_lbry_files(self, verify_streams):
|
||||||
files = yield self.session.storage.get_all_lbry_files()
|
files = yield self.session.storage.get_all_lbry_files()
|
||||||
|
claim_infos = yield self.session.storage.get_claims_from_stream_hashes([file['stream_hash'] for file in files])
|
||||||
b_prm = self.session.base_payment_rate_manager
|
b_prm = self.session.base_payment_rate_manager
|
||||||
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
|
||||||
|
|
||||||
log.info("Starting %i files", len(files))
|
log.info("Starting %i files", len(files))
|
||||||
dl = []
|
dl = []
|
||||||
for file_info in files:
|
for file_info in files:
|
||||||
dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams))
|
claim_info = claim_infos.get(file_info['stream_hash'])
|
||||||
|
dl.append(self._start_lbry_file(file_info, payment_rate_manager, verify_streams, claim_info))
|
||||||
|
|
||||||
yield defer.DeferredList(dl)
|
yield defer.DeferredList(dl)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue