diff --git a/lbrynet/blob/CryptBlob.py b/lbrynet/blob/CryptBlob.py index 2a3b103d2..7538b11d8 100644 --- a/lbrynet/blob/CryptBlob.py +++ b/lbrynet/blob/CryptBlob.py @@ -75,7 +75,7 @@ class StreamBlobDecryptor: def finish_decrypt(): bytes_left = len(self.buff) % (AES.block_size // 8) if bytes_left != 0: - log.warning(self.buff[-1 * (AES.block_size // 8):].encode('hex')) + log.warning(binascii.hexlify(self.buff[-1 * (AES.block_size // 8):]).decode()) raise Exception("blob %s has incorrect padding: %i bytes left" % (self.blob.blob_hash, bytes_left)) data_to_decrypt, self.buff = self.buff, b'' diff --git a/lbrynet/blob/client/EncryptedFileDownloader.py b/lbrynet/blob/client/EncryptedFileDownloader.py index 48e7b6565..059382868 100644 --- a/lbrynet/blob/client/EncryptedFileDownloader.py +++ b/lbrynet/blob/client/EncryptedFileDownloader.py @@ -44,8 +44,19 @@ class EncryptedFileDownloader(CryptStreamDownloader): self.blob_manager, download_manager) def _start(self): + def check_start_succeeded(success): + if success: + self.starting = False + self.stopped = False + self.completed = False + return True + else: + return self._start_failed() + + self.download_manager = self._get_download_manager() d = self._setup_output() - d.addCallback(lambda _: CryptStreamDownloader._start(self)) + d.addCallback(lambda _: self.download_manager.start_downloading()) + d.addCallbacks(check_start_succeeded) return d def _setup_output(self): diff --git a/lbrynet/extras/daemon/Downloader.py b/lbrynet/extras/daemon/Downloader.py index f4badd0ad..45900e2f4 100644 --- a/lbrynet/extras/daemon/Downloader.py +++ b/lbrynet/extras/daemon/Downloader.py @@ -1,14 +1,12 @@ import logging import os from twisted.internet import defer -from twisted.internet.task import LoopingCall from lbrynet import conf from lbrynet.schema.fee import Fee from lbrynet.p2p.Error import InsufficientFundsError, KeyFeeAboveMaxAllowed, InvalidStreamDescriptorError -from lbrynet.p2p.Error import DownloadDataTimeout, DownloadCanceledError, DownloadSDTimeout -from lbrynet.utils import safe_start_looping_call, safe_stop_looping_call +from lbrynet.p2p.Error import DownloadDataTimeout, DownloadCanceledError from lbrynet.p2p.StreamDescriptor import download_sd_blob from lbrynet.blob.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from torba.client.constants import COIN @@ -34,8 +32,11 @@ log = logging.getLogger(__name__) class GetStream: def __init__(self, sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, rate_limiter, - payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None): - + payment_rate_manager, storage, max_key_fee, disable_max_key_fee, data_rate=None, timeout=None, + reactor=None): + if not reactor: + from twisted.internet import reactor + self.reactor = reactor self.timeout = timeout or conf.settings['download_timeout'] self.data_rate = data_rate or conf.settings['data_rate'] self.max_key_fee = max_key_fee or conf.settings['max_key_fee'][1] @@ -53,44 +54,17 @@ class GetStream: self.sd_identifier = sd_identifier self.storage = storage self.downloader = None - self.checker = LoopingCall(self.check_status) # fired when the download is complete self.finished_deferred = None # fired after the metadata and the first data blob have been downloaded self.data_downloading_deferred = defer.Deferred(None) + self.wrote_data = False @property def download_path(self): return os.path.join(self.download_directory, self.downloader.file_name) - def _check_status(self, status): - if status.num_completed > 0 and not self.data_downloading_deferred.called: - self.data_downloading_deferred.callback(True) - if self.data_downloading_deferred.called: - safe_stop_looping_call(self.checker) - else: - log.debug("Waiting for stream data (%i seconds)", self.timeout_counter) - - def check_status(self): - """ - Check if we've got the first data blob in the stream yet - """ - self.timeout_counter += 1 - if self.timeout_counter > self.timeout: - if not self.data_downloading_deferred.called: - if self.downloader: - err = DownloadDataTimeout(self.sd_hash) - else: - err = DownloadSDTimeout(self.sd_hash) - self.data_downloading_deferred.errback(err) - safe_stop_looping_call(self.checker) - elif self.downloader: - d = self.downloader.status() - d.addCallback(self._check_status) - else: - log.debug("Waiting for stream descriptor (%i seconds)", self.timeout_counter) - def convert_max_fee(self): currency, amount = self.max_key_fee['currency'], self.max_key_fee['amount'] return self.exchange_rate_manager.convert_currency(currency, "LBC", amount) @@ -151,18 +125,13 @@ class GetStream: else: defer.returnValue(None) - @defer.inlineCallbacks def finish(self, results, name): self.set_status(DOWNLOAD_STOPPED_CODE, name) log.info("Finished downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) - safe_stop_looping_call(self.checker) - status = yield self.downloader.status() - self._check_status(status) - defer.returnValue(self.download_path) + return defer.succeed(self.download_path) def fail(self, err): - safe_stop_looping_call(self.checker) raise err @defer.inlineCallbacks @@ -194,8 +163,10 @@ class GetStream: self.downloader = yield self._create_downloader(sd_blob, file_name=file_name) yield self.pay_key_fee(key_fee, name) yield self.storage.save_content_claim(self.downloader.stream_hash, "%s:%i" % (txid, nout)) - log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) self.finished_deferred = self.downloader.start() + self.downloader.download_manager.progress_manager.wrote_first_data.addCallback( + self.data_downloading_deferred.callback + ) self.finished_deferred.addCallbacks(lambda result: self.finish(result, name), self.fail) @defer.inlineCallbacks @@ -211,16 +182,19 @@ class GetStream: """ self.set_status(INITIALIZING_CODE, name) key_fee = yield self._initialize(stream_info) - - safe_start_looping_call(self.checker, 1) self.set_status(DOWNLOAD_METADATA_CODE, name) try: sd_blob = yield self._download_sd_blob() yield self._download(sd_blob, name, key_fee, txid, nout, file_name) self.set_status(DOWNLOAD_RUNNING_CODE, name) - yield self.data_downloading_deferred + log.info("Downloading lbry://%s (%s) --> %s", name, self.sd_hash[:6], self.download_path) + self.data_downloading_deferred.addTimeout(self.timeout, self.reactor) + try: + yield self.data_downloading_deferred + self.wrote_data = True + except defer.TimeoutError: + raise DownloadDataTimeout("data download timed out") except (DownloadDataTimeout, InvalidStreamDescriptorError) as err: - safe_stop_looping_call(self.checker) raise err defer.returnValue((self.downloader, self.finished_deferred)) diff --git a/lbrynet/p2p/client/StreamProgressManager.py b/lbrynet/p2p/client/StreamProgressManager.py index f7749b666..b1c396d0c 100644 --- a/lbrynet/p2p/client/StreamProgressManager.py +++ b/lbrynet/p2p/client/StreamProgressManager.py @@ -1,15 +1,16 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, task log = logging.getLogger(__name__) -class StreamProgressManager: - #implements(IProgressManager) - - def __init__(self, finished_callback, blob_manager, - download_manager, delete_blob_after_finished=False): +class FullStreamProgressManager: + def __init__(self, finished_callback, blob_manager, download_manager, + delete_blob_after_finished: bool = False, reactor: task.Clock = None): + if not reactor: + from twisted.internet import reactor + self.reactor = reactor self.finished_callback = finished_callback self.blob_manager = blob_manager self.delete_blob_after_finished = delete_blob_after_finished @@ -19,15 +20,11 @@ class StreamProgressManager: self.stopped = True self._next_try_to_output_call = None self.outputting_d = None - - ######### IProgressManager ######### + self.wrote_first_data = defer.Deferred() def start(self): - - from twisted.internet import reactor - self.stopped = False - self._next_try_to_output_call = reactor.callLater(0, self._try_to_output) + self._next_try_to_output_call = self.reactor.callLater(0, self._try_to_output) return defer.succeed(True) def stop(self): @@ -37,64 +34,9 @@ class StreamProgressManager: self._next_try_to_output_call = None return self._stop_outputting() - def blob_downloaded(self, blob, blob_num): - if self.outputting_d is None: - self._output_loop() - - ######### internal ######### - - def _finished_outputting(self): - self.finished_callback(True) - - def _try_to_output(self): - - from twisted.internet import reactor - - self._next_try_to_output_call = reactor.callLater(1, self._try_to_output) - if self.outputting_d is None: - self._output_loop() - - def _output_loop(self): - pass - - def _stop_outputting(self): - if self.outputting_d is not None: - return self.outputting_d - return defer.succeed(None) - - def _finished_with_blob(self, blob_num): - log.debug("In _finished_with_blob, blob_num = %s", str(blob_num)) - if self.delete_blob_after_finished is True: - log.debug("delete_blob_after_finished is True") - blobs = self.download_manager.blobs - if blob_num in blobs: - log.debug("Telling the blob manager, %s, to delete blob %s", - self.blob_manager, blobs[blob_num].blob_hash) - self.blob_manager.delete_blobs([blobs[blob_num].blob_hash]) - else: - log.debug("Blob number %s was not in blobs", str(blob_num)) - else: - log.debug("delete_blob_after_finished is False") - - -class FullStreamProgressManager(StreamProgressManager): - def __init__(self, finished_callback, blob_manager, - download_manager, delete_blob_after_finished=False): - super().__init__(finished_callback, blob_manager, download_manager, - delete_blob_after_finished) - self.outputting_d = None - - ######### IProgressManager ######### - - def _done(self, i, blobs): - """Return true if `i` is a blob number we don't have""" - return ( - i not in blobs or - ( - not blobs[i].get_is_verified() and - i not in self.provided_blob_nums - ) - ) + # def blob_downloaded(self, blob, blob_num): + # if self.outputting_d is None: + # self._output_loop() def stream_position(self): blobs = self.download_manager.blobs @@ -113,12 +55,46 @@ class FullStreamProgressManager(StreamProgressManager): if not b.get_is_verified() and not n in self.provided_blob_nums ] - ######### internal ######### + def _finished_outputting(self): + self.finished_callback(True) + + def _try_to_output(self): + self._next_try_to_output_call = self.reactor.callLater(1, self._try_to_output) + if self.outputting_d is None: + self._output_loop() + + def _stop_outputting(self): + if self.outputting_d is not None: + return self.outputting_d + return defer.succeed(None) + + def _finished_with_blob(self, blob_num: int) -> None: + if blob_num == 0 and not self.wrote_first_data.called: + self.wrote_first_data.callback(True) + log.debug("In _finished_with_blob, blob_num = %s", str(blob_num)) + if self.delete_blob_after_finished is True: + log.debug("delete_blob_after_finished is True") + blobs = self.download_manager.blobs + if blob_num in blobs: + log.debug("Telling the blob manager, %s, to delete blob %s", + self.blob_manager, blobs[blob_num].blob_hash) + self.blob_manager.delete_blobs([blobs[blob_num].blob_hash]) + else: + log.debug("Blob number %s was not in blobs", str(blob_num)) + else: + log.debug("delete_blob_after_finished is False") + + def _done(self, i: int, blobs: list) -> bool: + """Return true if `i` is a blob number we don't have""" + return ( + i not in blobs or + ( + not blobs[i].get_is_verified() and + i not in self.provided_blob_nums + ) + ) def _output_loop(self): - - from twisted.internet import reactor - if self.stopped: if self.outputting_d is not None: self.outputting_d.callback(True) @@ -139,7 +115,7 @@ class FullStreamProgressManager(StreamProgressManager): self.outputting_d.callback(True) self.outputting_d = None else: - reactor.callLater(0, self._output_loop) + self.reactor.callLater(0, self._output_loop) current_blob_num = self.last_blob_outputted + 1 diff --git a/tests/unit/lbrynet_daemon/test_Downloader.py b/tests/unit/lbrynet_daemon/test_Downloader.py index 625179518..36a5e97a5 100644 --- a/tests/unit/lbrynet_daemon/test_Downloader.py +++ b/tests/unit/lbrynet_daemon/test_Downloader.py @@ -9,6 +9,7 @@ from lbrynet.p2p.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier from lbrynet.p2p.BlobManager import DiskBlobManager from lbrynet.p2p.RateLimiter import DummyRateLimiter +from lbrynet.p2p.client.DownloadManager import DownloadManager from lbrynet.extras.daemon import Downloader from lbrynet.extras.daemon import ExchangeRateManager from lbrynet.extras.daemon.storage import SQLiteStorage @@ -24,7 +25,7 @@ class MocDownloader: def __init__(self): self.finish_deferred = defer.Deferred(None) self.stop_called = False - + self.file_name = 'test' self.name = 'test' self.num_completed = 0 self.num_known = 1 @@ -60,8 +61,10 @@ def moc_download(self, sd_blob, name, txid, nout, key_fee, file_name): self.downloader.start() -def moc_pay_key_fee(self, key_fee, name): - self.pay_key_fee_called = True +def moc_pay_key_fee(d): + def _moc_pay_key_fee(self, key_fee, name): + d.callback(True) + return _moc_pay_key_fee class GetStreamTests(unittest.TestCase): @@ -83,10 +86,7 @@ class GetStreamTests(unittest.TestCase): sd_identifier, wallet, exchange_rate_manager, blob_manager, peer_finder, DummyRateLimiter(), prm, storage, max_key_fee, disable_max_key_fee, timeout=3, data_rate=data_rate ) - getstream.pay_key_fee_called = False - - self.clock = task.Clock() - getstream.checker.clock = self.clock + getstream.download_manager = mock.Mock(spec=DownloadManager) return getstream @defer.inlineCallbacks @@ -112,16 +112,18 @@ class GetStreamTests(unittest.TestCase): def download_sd_blob(self): raise DownloadSDTimeout(self) + called_pay_fee = defer.Deferred() + getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._download_sd_blob = types.MethodType(download_sd_blob, getstream) getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream) name = 'test' stream_info = None with self.assertRaises(DownloadSDTimeout): yield getstream.start(stream_info, name, "deadbeef" * 12, 0) - self.assertFalse(getstream.pay_key_fee_called) + self.assertFalse(called_pay_fee.called) @defer.inlineCallbacks def test_timeout(self): @@ -129,20 +131,18 @@ class GetStreamTests(unittest.TestCase): test that timeout (set to 3 here) exception is raised when download times out while downloading first blob, and key fee is paid """ + called_pay_fee = defer.Deferred() + getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream) name = 'test' stream_info = None start = getstream.start(stream_info, name, "deadbeef" * 12, 0) - self.clock.advance(1) - self.clock.advance(1) - self.clock.advance(1) with self.assertRaises(DownloadDataTimeout): yield start - self.assertTrue(getstream.pay_key_fee_called) @defer.inlineCallbacks def test_finish_one_blob(self): @@ -150,20 +150,22 @@ class GetStreamTests(unittest.TestCase): test that if we have 1 completed blob, start() returns and key fee is paid """ + called_pay_fee = defer.Deferred() + getstream = self.init_getstream_with_mocs() getstream._initialize = types.MethodType(moc_initialize, getstream) getstream._download_sd_blob = types.MethodType(moc_download_sd_blob, getstream) getstream._download = types.MethodType(moc_download, getstream) - getstream.pay_key_fee = types.MethodType(moc_pay_key_fee, getstream) + getstream.pay_key_fee = types.MethodType(moc_pay_key_fee(called_pay_fee), getstream) name = 'test' stream_info = None - start = getstream.start(stream_info, name, "deadbeef" * 12, 0) - getstream.downloader.num_completed = 1 - self.clock.advance(1) + self.assertFalse(getstream.wrote_data) + getstream.data_downloading_deferred.callback(None) + yield getstream.start(stream_info, name, "deadbeef" * 12, 0) + self.assertTrue(getstream.wrote_data) - downloader, f_deferred = yield start - self.assertTrue(getstream.pay_key_fee_called) + # self.assertTrue(getstream.pay_key_fee_called) # @defer.inlineCallbacks # def test_finish_stopped_downloader(self):