combine StreamProgressManager and FullStreamProgressManager

-add wrote_first_data Deferred to FullStreamProgressManager
This commit is contained in:
Jack Robison 2018-09-21 16:46:49 -04:00
parent fc69366092
commit 9dc4608a62
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 65 additions and 117 deletions

View file

@ -75,7 +75,7 @@ class StreamBlobDecryptor:
def finish_decrypt(): def finish_decrypt():
bytes_left = len(self.buff) % (AES.block_size // 8) bytes_left = len(self.buff) % (AES.block_size // 8)
if bytes_left != 0: if bytes_left != 0:
log.warning(self.buff[-1 * (AES.block_size // 8):].encode('hex')) log.warning(binascii.hexlify(self.buff[-1 * (AES.block_size // 8):]).decode())
raise Exception("blob %s has incorrect padding: %i bytes left" % raise Exception("blob %s has incorrect padding: %i bytes left" %
(self.blob.blob_hash, bytes_left)) (self.blob.blob_hash, bytes_left))
data_to_decrypt, self.buff = self.buff, b'' data_to_decrypt, self.buff = self.buff, b''

View file

@ -1,7 +1,6 @@
import logging import logging
import os import os
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.task import LoopingCall
from lbrynet import conf from lbrynet import conf
from lbrynet.schema.fee import Fee from lbrynet.schema.fee import Fee
@ -34,8 +33,11 @@ log = logging.getLogger(__name__)
class GetStream: class GetStream:
def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter, def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter,
payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None,
reactor=None):
if not reactor:
from twisted.internet import reactor
self.reactor = reactor
self.timeout = timeout or conf.settings['download_timeout'] self.timeout = timeout or conf.settings['download_timeout']
self.data_rate = data_rate or conf.settings['data_rate'] self.data_rate = data_rate or conf.settings['data_rate']
self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1]
@ -53,44 +55,17 @@ class GetStream:
self.sd_identifier = sd_identifier self.sd_identifier = sd_identifier
self.storage = storage self.storage = storage
self.downloader = None self.downloader = None
self.checker = LoopingCall(self.check_status)
# fired when the download is complete # fired when the download is complete
self.finished_deferred = None self.finished_deferred = None
# fired after the metadata and the first data blob have been downloaded # fired after the metadata and the first data blob have been downloaded
self.data_downloading_deferred = defer.Deferred(None) self.data_downloading_deferred = defer.Deferred(None)
self.wrote_data = False
@property @property
def download_path(self): def download_path(self):
return os.path.join(self.download_directory, self.downloader.file_name) return os.path.join(self.download_directory, self.downloader.file_name)
def _check_status(self, status):
if status.num_completed > 0 and not self.data_downloading_deferred.called:
self.data_downloading_deferred.callback(True)
if self.data_downloading_deferred.called:
safe_stop_looping_call(self.checker)
else:
log.debug("Waiting for stream data (%i seconds)", self.timeout_counter)
def check_status(self):
"""
Check if we've got the first data blob in the stream yet
"""
self.timeout_counter += 1
if self.timeout_counter > self.timeout:
if not self.data_downloading_deferred.called:
if self.downloader:
err = DownloadDataTimeout(self.sd_hash)
else:
err = DownloadSDTimeout(self.sd_hash)
self.data_downloading_deferred.errback(err)
safe_stop_looping_call(self.checker)
elif self.downloader:
d = self.downloader.status()
d.addCallback(self._check_status)
else:
log.debug("Waiting for stream descriptor (%i seconds)", self.timeout_counter)
def convert_max_fee(self): def convert_max_fee(self):
currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount']
return self.exchange_rate_manager.convert_currency(currency, "LBC", amount) return self.exchange_rate_manager.convert_currency(currency, "LBC", amount)
@ -151,18 +126,13 @@ class GetStream:
else: else:
defer.returnValue(None) defer.returnValue(None)
@defer.inlineCallbacks
def finish(self, results, name): def finish(self, results, name):
self.set_status(DOWNLOAD_STOPPED_CODE, name) self.set_status(DOWNLOAD_STOPPED_CODE, name)
log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6],
self.download_path) self.download_path)
safe_stop_looping_call(self.checker) return defer.succeed(self.download_path)
status = yield self.downloader.status()
self._check_status(status)
defer.returnValue(self.download_path)
def fail(self, err): def fail(self, err):
safe_stop_looping_call(self.checker)
raise err raise err
@defer.inlineCallbacks @defer.inlineCallbacks
@ -194,8 +164,10 @@ class GetStream:
self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) self.downloader = yield self._create_downloader(sd_blob, file_name=file_name)
yield self.pay_key_fee(key_fee, name) yield self.pay_key_fee(key_fee, name)
yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout))
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.finished_deferred = self.downloader.start() self.finished_deferred = self.downloader.start()
self.downloader.download_manager.progress_manager.wrote_first_data.addCallback(
self.data_downloading_deferred.callback
)
self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -209,18 +181,18 @@ class GetStream:
downloader - instance of ManagedEncryptedFileDownloader downloader - instance of ManagedEncryptedFileDownloader
finished_deferred - deferred callbacked when download is finished finished_deferred - deferred callbacked when download is finished
""" """
self.set_status(INITIALIZING_CODE, name) self.set_status(INITIALIZING_CODE, name)
key_fee = yield self._initialize(stream_info) key_fee = yield self._initialize(stream_info)
safe_start_looping_call(self.checker, 1)
self.set_status(DOWNLOAD_METADATA_CODE, name) self.set_status(DOWNLOAD_METADATA_CODE, name)
try: try:
sd_blob = yield self._download_sd_blob() sd_blob = yield self._download_sd_blob()
yield self._download(sd_blob, name, key_fee, txid, nout, file_name) yield self._download(sd_blob, name, key_fee, txid, nout, file_name)
self.set_status(DOWNLOAD_RUNNING_CODE, name) self.set_status(DOWNLOAD_RUNNING_CODE, name)
log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path)
self.data_downloading_deferred.addTimeout(self.timeout, self.reactor)
yield self.data_downloading_deferred yield self.data_downloading_deferred
except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: except (DownloadDataTimeout, InvalidStreamDescriptorError) as err:
safe_stop_looping_call(self.checker)
raise err raise err
defer.returnValue((self.downloader, self.finished_deferred)) defer.returnValue((self.downloader, self.finished_deferred))

View file

@ -1,15 +1,16 @@
import logging import logging
from twisted.internet import defer from twisted.internet import defer, task
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class StreamProgressManager: class FullStreamProgressManager:
#implements(IProgressManager) def __init__(self, finished_callback, blob_manager, download_manager,
delete_blob_after_finished: bool = False, reactor: task.Clock = None):
def __init__(self, finished_callback, blob_manager, if not reactor:
download_manager, delete_blob_after_finished=False): from twisted.internet import reactor
self.reactor = reactor
self.finished_callback = finished_callback self.finished_callback = finished_callback
self.blob_manager = blob_manager self.blob_manager = blob_manager
self.delete_blob_after_finished = delete_blob_after_finished self.delete_blob_after_finished = delete_blob_after_finished
@ -19,15 +20,11 @@ class StreamProgressManager:
self.stopped = True self.stopped = True
self._next_try_to_output_call = None self._next_try_to_output_call = None
self.outputting_d = None self.outputting_d = None
self.wrote_first_data = defer.Deferred()
######### IProgressManager #########
def start(self): def start(self):
from twisted.internet import reactor
self.stopped = False self.stopped = False
self._next_try_to_output_call = reactor.callLater(0, self._try_to_output) self._next_try_to_output_call = self.reactor.callLater(0, self._try_to_output)
return defer.succeed(True) return defer.succeed(True)
def stop(self): def stop(self):
@ -37,64 +34,9 @@ class StreamProgressManager:
self._next_try_to_output_call = None self._next_try_to_output_call = None
return self._stop_outputting() return self._stop_outputting()
def blob_downloaded(self, blob, blob_num): # def blob_downloaded(self, blob, blob_num):
if self.outputting_d is None: # if self.outputting_d is None:
self._output_loop() # self._output_loop()
######### internal #########
def _finished_outputting(self):
self.finished_callback(True)
def _try_to_output(self):
from twisted.internet import reactor
self._next_try_to_output_call = reactor.callLater(1, self._try_to_output)
if self.outputting_d is None:
self._output_loop()
def _output_loop(self):
pass
def _stop_outputting(self):
if self.outputting_d is not None:
return self.outputting_d
return defer.succeed(None)
def _finished_with_blob(self, blob_num):
log.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
if self.delete_blob_after_finished is True:
log.debug("delete_blob_after_finished is True")
blobs = self.download_manager.blobs
if blob_num in blobs:
log.debug("Telling the blob manager, %s, to delete blob %s",
self.blob_manager, blobs[blob_num].blob_hash)
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
else:
log.debug("Blob number %s was not in blobs", str(blob_num))
else:
log.debug("delete_blob_after_finished is False")
class FullStreamProgressManager(StreamProgressManager):
def __init__(self, finished_callback, blob_manager,
download_manager, delete_blob_after_finished=False):
super().__init__(finished_callback, blob_manager, download_manager,
delete_blob_after_finished)
self.outputting_d = None
######### IProgressManager #########
def _done(self, i, blobs):
"""Return true if `i` is a blob number we don't have"""
return (
i not in blobs or
(
not blobs[i].get_is_verified() and
i not in self.provided_blob_nums
)
)
def stream_position(self): def stream_position(self):
blobs = self.download_manager.blobs blobs = self.download_manager.blobs
@ -113,12 +55,46 @@ class FullStreamProgressManager(StreamProgressManager):
if not b.get_is_verified() and not n in self.provided_blob_nums if not b.get_is_verified() and not n in self.provided_blob_nums
] ]
######### internal ######### def _finished_outputting(self):
self.finished_callback(True)
def _try_to_output(self):
self._next_try_to_output_call = self.reactor.callLater(1, self._try_to_output)
if self.outputting_d is None:
self._output_loop()
def _stop_outputting(self):
if self.outputting_d is not None:
return self.outputting_d
return defer.succeed(None)
def _finished_with_blob(self, blob_num: int) -> None:
if blob_num == 0 and not self.wrote_first_data.called:
self.wrote_first_data.callback(True)
log.debug("In _finished_with_blob, blob_num = %s", str(blob_num))
if self.delete_blob_after_finished is True:
log.debug("delete_blob_after_finished is True")
blobs = self.download_manager.blobs
if blob_num in blobs:
log.debug("Telling the blob manager, %s, to delete blob %s",
self.blob_manager, blobs[blob_num].blob_hash)
self.blob_manager.delete_blobs([blobs[blob_num].blob_hash])
else:
log.debug("Blob number %s was not in blobs", str(blob_num))
else:
log.debug("delete_blob_after_finished is False")
def _done(self, i: int, blobs: list) -> bool:
"""Return true if `i` is a blob number we don't have"""
return (
i not in blobs or
(
not blobs[i].get_is_verified() and
i not in self.provided_blob_nums
)
)
def _output_loop(self): def _output_loop(self):
from twisted.internet import reactor
if self.stopped: if self.stopped:
if self.outputting_d is not None: if self.outputting_d is not None:
self.outputting_d.callback(True) self.outputting_d.callback(True)
@ -139,7 +115,7 @@ class FullStreamProgressManager(StreamProgressManager):
self.outputting_d.callback(True) self.outputting_d.callback(True)
self.outputting_d = None self.outputting_d = None
else: else:
reactor.callLater(0, self._output_loop) self.reactor.callLater(0, self._output_loop)
current_blob_num = self.last_blob_outputted + 1 current_blob_num = self.last_blob_outputted + 1