diff --git a/lbrynet/blob/CryptBlob.py b/lbrynet/blob/CryptBlob.py index 2a3b103d2..7538b11d8 100644 --- a/lbrynet/blob/CryptBlob.py +++ b/lbrynet/blob/CryptBlob.py @@ -75,7 +75,7 @@ class StreamBlobDecryptor: def finish_decrypt(): bytes_left = len(self.buff) % (AES.block_size // 8) if bytes_left != 0: - log.warning(self.buff[-1 * (AES.block_size // 8):].encode('hex')) + log.warning(binascii.hexlify(self.buff[-1 * (AES.block_size // 8):]).decode()) raise Exception("blob %s has incorrect padding: %i bytes left" % (self.blob.blob_hash, bytes_left)) data_to_decrypt, self.buff = self.buff, b'' diff --git a/lbrynet/extras/daemon/Downloader.py b/lbrynet/extras/daemon/Downloader.py index f4badd0ad..6e3b7a04a 100644 --- a/lbrynet/extras/daemon/Downloader.py +++ b/lbrynet/extras/daemon/Downloader.py @@ -1,7 +1,6 @@ import logging import os from twisted.internet import defer -from twisted.internet.task import LoopingCall from lbrynet import conf from lbrynet.schema.fee import Fee @@ -34,8 +33,11 @@ log = logging.getLogger(__name__) class GetStream: def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter, - payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): - + payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None, + reactor=None): + if not reactor: + from twisted.internet import reactor + self.reactor = reactor self.timeout = timeout or conf.settings['download_timeout'] self.data_rate = data_rate or conf.settings['data_rate'] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] @@ -53,44 +55,17 @@ class GetStream: self.sd_identifier = sd_identifier self.storage = storage self.downloader = None - self.checker = LoopingCall(self.check_status) # fired when the download is complete self.finished_deferred = None # fired after the metadata and the first data blob have been downloaded self.data_downloading_deferred = defer.Deferred(None) + self.wrote_data = False @property def download_path(self): return os.path.join(self.download_directory, self.downloader.file_name) - def _check_status(self, status): - if status.num_completed > 0 and not self.data_downloading_deferred.called: - self.data_downloading_deferred.callback(True) - if self.data_downloading_deferred.called: - safe_stop_looping_call(self.checker) - else: - log.debug("Waiting for stream data (%i seconds)", self.timeout_counter) - - def check_status(self): - """ - Check if we've got the first data blob in the stream yet - """ - self.timeout_counter += 1 - if self.timeout_counter > self.timeout: - if not self.data_downloading_deferred.called: - if self.downloader: - err = DownloadDataTimeout(self.sd_hash) - else: - err = DownloadSDTimeout(self.sd_hash) - self.data_downloading_deferred.errback(err) - safe_stop_looping_call(self.checker) - elif self.downloader: - d = self.downloader.status() - d.addCallback(self._check_status) - else: - log.debug("Waiting for stream descriptor (%i seconds)", self.timeout_counter) - def convert_max_fee(self): currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] return self.exchange_rate_manager.convert_currency(currency, "LBC", amount) @@ -151,18 +126,13 @@ class GetStream: else: defer.returnValue(None) - @defer.inlineCallbacks def finish(self, results, name): self.set_status(DOWNLOAD_STOPPED_CODE, name) log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) - safe_stop_looping_call(self.checker) - status = yield self.downloader.status() - self._check_status(status) - defer.returnValue(self.download_path) + return defer.succeed(self.download_path) def fail(self, err): - safe_stop_looping_call(self.checker) raise err @defer.inlineCallbacks @@ -194,8 +164,10 @@ class GetStream: self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) yield self.pay_key_fee(key_fee, name) yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) - log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() + self.downloader.download_manager.progress_manager.wrote_first_data.addCallback( + self.data_downloading_deferred.callback + ) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) @defer.inlineCallbacks @@ -209,18 +181,18 @@ class GetStream: downloader - instance of ManagedEncryptedFileDownloader finished_deferred - deferred callbacked when download is finished """ + self.set_status(INITIALIZING_CODE, name) key_fee = yield self._initialize(stream_info) - - safe_start_looping_call(self.checker, 1) self.set_status(DOWNLOAD_METADATA_CODE, name) try: sd_blob = yield self._download_sd_blob() yield self._download(sd_blob, name, key_fee, txid, nout, file_name) self.set_status(DOWNLOAD_RUNNING_CODE, name) + log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) + self.data_downloading_deferred.addTimeout(self.timeout, self.reactor) yield self.data_downloading_deferred except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: - safe_stop_looping_call(self.checker) raise err defer.returnValue((self.downloader, self.finished_deferred)) diff --git a/lbrynet/p2p/client/StreamProgressManager.py b/lbrynet/p2p/client/StreamProgressManager.py index f7749b666..b1c396d0c 100644 --- a/lbrynet/p2p/client/StreamProgressManager.py +++ b/lbrynet/p2p/client/StreamProgressManager.py @@ -1,15 +1,16 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, task log = logging.getLogger(__name__) -class StreamProgressManager: - #implements(IProgressManager) - - def __init__(self, finished_callback, blob_manager, - download_manager, delete_blob_after_finished=False): +class FullStreamProgressManager: + def __init__(self, finished_callback, blob_manager, download_manager, + delete_blob_after_finished: bool = False, reactor: task.Clock = None): + if not reactor: + from twisted.internet import reactor + self.reactor = reactor self.finished_callback = finished_callback self.blob_manager = blob_manager self.delete_blob_after_finished = delete_blob_after_finished @@ -19,15 +20,11 @@ class StreamProgressManager: self.stopped = True self._next_try_to_output_call = None self.outputting_d = None - - ######### IProgressManager ######### + self.wrote_first_data = defer.Deferred() def start(self): - - from twisted.internet import reactor - self.stopped = False - self._next_try_to_output_call = reactor.callLater(0, self._try_to_output) + self._next_try_to_output_call = self.reactor.callLater(0, self._try_to_output) return defer.succeed(True) def stop(self): @@ -37,64 +34,9 @@ class StreamProgressManager: self._next_try_to_output_call = None return self._stop_outputting() - def blob_downloaded(self, blob, blob_num): - if self.outputting_d is None: - self._output_loop() - - ######### internal ######### - - def _finished_outputting(self): - self.finished_callback(True) - - def _try_to_output(self): - - from twisted.internet import reactor - - self._next_try_to_output_call = reactor.callLater(1, self._try_to_output) - if self.outputting_d is None: - self._output_loop() - - def _output_loop(self): - pass - - def _stop_outputting(self): - if self.outputting_d is not None: - return self.outputting_d - return defer.succeed(None) - - def _finished_with_blob(self, blob_num): - log.debug("In _finished_with_blob, blob_num = %s", str(blob_num)) - if self.delete_blob_after_finished is True: - log.debug("delete_blob_after_finished is True") - blobs = self.download_manager.blobs - if blob_num in blobs: - log.debug("Telling the blob manager, %s, to delete blob %s", - self.blob_manager, blobs[blob_num].blob_hash) - self.blob_manager.delete_blobs([blobs[blob_num].blob_hash]) - else: - log.debug("Blob number %s was not in blobs", str(blob_num)) - else: - log.debug("delete_blob_after_finished is False") - - -class FullStreamProgressManager(StreamProgressManager): - def __init__(self, finished_callback, blob_manager, - download_manager, delete_blob_after_finished=False): - super().__init__(finished_callback, blob_manager, download_manager, - delete_blob_after_finished) - self.outputting_d = None - - ######### IProgressManager ######### - - def _done(self, i, blobs): - """Return true if `i` is a blob number we don't have""" - return ( - i not in blobs or - ( - not blobs[i].get_is_verified() and - i not in self.provided_blob_nums - ) - ) + # def blob_downloaded(self, blob, blob_num): + # if self.outputting_d is None: + # self._output_loop() def stream_position(self): blobs = self.download_manager.blobs @@ -113,12 +55,46 @@ class FullStreamProgressManager(StreamProgressManager): if not b.get_is_verified() and not n in self.provided_blob_nums ] - ######### internal ######### + def _finished_outputting(self): + self.finished_callback(True) + + def _try_to_output(self): + self._next_try_to_output_call = self.reactor.callLater(1, self._try_to_output) + if self.outputting_d is None: + self._output_loop() + + def _stop_outputting(self): + if self.outputting_d is not None: + return self.outputting_d + return defer.succeed(None) + + def _finished_with_blob(self, blob_num: int) -> None: + if blob_num == 0 and not self.wrote_first_data.called: + self.wrote_first_data.callback(True) + log.debug("In _finished_with_blob, blob_num = %s", str(blob_num)) + if self.delete_blob_after_finished is True: + log.debug("delete_blob_after_finished is True") + blobs = self.download_manager.blobs + if blob_num in blobs: + log.debug("Telling the blob manager, %s, to delete blob %s", + self.blob_manager, blobs[blob_num].blob_hash) + self.blob_manager.delete_blobs([blobs[blob_num].blob_hash]) + else: + log.debug("Blob number %s was not in blobs", str(blob_num)) + else: + log.debug("delete_blob_after_finished is False") + + def _done(self, i: int, blobs: list) -> bool: + """Return true if `i` is a blob number we don't have""" + return ( + i not in blobs or + ( + not blobs[i].get_is_verified() and + i not in self.provided_blob_nums + ) + ) def _output_loop(self): - - from twisted.internet import reactor - if self.stopped: if self.outputting_d is not None: self.outputting_d.callback(True) @@ -139,7 +115,7 @@ class FullStreamProgressManager(StreamProgressManager): self.outputting_d.callback(True) self.outputting_d = None else: - reactor.callLater(0, self._output_loop) + self.reactor.callLater(0, self._output_loop) current_blob_num = self.last_blob_outputted + 1