diff --git a/lbrynet/lbryfile/EncryptedFileMetadataManager.py b/lbrynet/lbryfile/EncryptedFileMetadataManager.py index bae6291b6..e9b2e7335 100644 --- a/lbrynet/lbryfile/EncryptedFileMetadataManager.py +++ b/lbrynet/lbryfile/EncryptedFileMetadataManager.py @@ -49,7 +49,7 @@ class DBEncryptedFileMetadataManager(object): def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): - log.debug("Getting blobs for a stream. Count is %s", str(count)) + log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count) def get_positions_of_start_and_end(): if start_blob is not None: diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 5eac402a7..295494087 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -70,6 +70,7 @@ STARTUP_STAGES = [ (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits...') ] +# TODO: make this consistent with the stages in Downloader.py DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' @@ -779,9 +780,9 @@ class Daemon(AuthJSONRPCServer): d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) d.addCallback(cb) - return r + @defer.inlineCallbacks def _download_name(self, name, timeout=None, download_directory=None, file_name=None, stream_info=None, wait_for_write=True): """ @@ -795,14 +796,11 @@ class Daemon(AuthJSONRPCServer): if not stream_info: self.waiting_on[name] = True - d = self._resolve_name(name) - else: - d = defer.succeed(stream_info) - d.addCallback(helper.setup_stream) - d.addCallback(helper.wait_or_get_stream) - if not stream_info: - d.addCallback(helper.remove_from_wait) - return d + stream_info = yield self._resolve_name(name) + del self.waiting_on[name] + lbry_file = yield helper.setup_stream(stream_info) + sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) + defer.returnValue((sd_hash, file_path)) def add_stream(self, name, timeout, download_directory, file_name, stream_info): """Makes, adds and starts a stream""" @@ -1400,8 +1398,6 @@ class Daemon(AuthJSONRPCServer): return self._render_response(None, BAD_REQUEST) d = self._resolve_name(name, force_refresh=force) - # TODO: this is the rpc call that returns a server.failure. - # what is up with that? d.addCallbacks( lambda info: self._render_response(info, OK_CODE), # TODO: Is server.failure a module? It looks like it: @@ -1484,6 +1480,7 @@ class Daemon(AuthJSONRPCServer): ) @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks def jsonrpc_get(self, p): """Download stream from a LBRY uri. @@ -1493,28 +1490,40 @@ class Daemon(AuthJSONRPCServer): 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name 'timeout': optional - 'wait_for_write': optional, defaults to True + 'wait_for_write': optional, defaults to True. When set, waits for the file to + only start to be written before returning any results. Returns: 'stream_hash': hex string 'path': path of download """ params = self._process_get_parameters(p) if not params.name: - return server.failure + # TODO: return a useful error message here, like "name argument is required" + defer.returnValue(server.failure) if params.name in self.waiting_on: - return server.failure - d = self._download_name(name=params.name, - timeout=params.timeout, - download_directory=params.download_directory, - stream_info=params.stream_info, - file_name=params.file_name, - wait_for_write=params.wait_for_write) - # TODO: downloading can timeout. Not sure what to do when that happens - d.addCallbacks( - get_output_callback(params), - lambda err: str(err)) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) - return d + # TODO: return a useful error message here, like "already + # waiting for name to be resolved" + defer.returnValue(server.failure) + try: + sd_hash, file_path = yield self._download_name( + name=params.name, + timeout=params.timeout, + download_directory=params.download_directory, + stream_info=params.stream_info, + file_name=params.file_name, + wait_for_write=params.wait_for_write + ) + except Exception as e: + log.exception('Failed to get %s', params.name) + response = yield self._render_response(str(e), OK_CODE) + else: + # TODO: should stream_hash key be changed to sd_hash? + message = { + 'stream_hash': params.sd_hash if params.stream_info else sd_hash, + 'path': file_path + } + response = yield self._render_response(message, OK_CODE) + defer.returnValue(response) @AuthJSONRPCServer.auth_required def jsonrpc_stop_lbry_file(self, p): @@ -1722,6 +1731,7 @@ class Daemon(AuthJSONRPCServer): txid = p['txid'] nout = p['nout'] else: + # TODO: return a useful error message return server.failure def _disp(x): @@ -1916,6 +1926,7 @@ class Daemon(AuthJSONRPCServer): amount = p['amount'] address = p['address'] else: + # TODO: return a useful error message return server.failure reserved_points = self.session.wallet.reserve_points(address, amount) @@ -1957,6 +1968,7 @@ class Daemon(AuthJSONRPCServer): d = self.session.wallet.get_block_info(height) d.addCallback(lambda blockhash: self.session.wallet.get_block(blockhash)) else: + # TODO: return a useful error message return server.failure d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1974,6 +1986,7 @@ class Daemon(AuthJSONRPCServer): if 'txid' in p.keys(): txid = p['txid'] else: + # TODO: return a useful error message return server.failure d = self.session.wallet.get_claims_from_tx(txid) @@ -2318,15 +2331,6 @@ def get_sd_hash(stream_info): return stream_info.get('stream_hash') -def get_output_callback(params): - def callback(l): - return { - 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, - 'path': os.path.join(params.download_directory, l.file_name) - } - return callback - - class _DownloadNameHelper(object): def __init__(self, daemon, name, timeout=None, @@ -2342,52 +2346,54 @@ class _DownloadNameHelper(object): self.file_name = file_name self.wait_for_write = wait_for_write + @defer.inlineCallbacks def setup_stream(self, stream_info): - stream_hash = get_sd_hash(stream_info) - d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) - d.addCallback(self._prepend_stream_info, stream_info) - return d + sd_hash = get_sd_hash(stream_info) + lbry_file = yield self.daemon._get_lbry_file_by_sd_hash(sd_hash) + if self._does_lbry_file_exists(lbry_file): + defer.returnValue(lbry_file) + else: + defer.returnValue(None) - def _prepend_stream_info(self, lbry_file, stream_info): - if lbry_file: - if os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)): - return defer.succeed((stream_info, lbry_file)) - return defer.succeed((stream_info, None)) + def _does_lbry_file_exists(self, lbry_file): + return lbry_file and os.path.isfile(self._full_path(lbry_file)) - def wait_or_get_stream(self, args): - stream_info, lbry_file = args + def _full_path(self, lbry_file): + return os.path.join(self.download_directory, lbry_file.file_name) + + @defer.inlineCallbacks + def wait_or_get_stream(self, stream_info, lbry_file): if lbry_file: log.debug('Wait on lbry_file') - return self._wait_on_lbry_file(lbry_file) + # returns the lbry_file + yield self._wait_on_lbry_file(lbry_file) + defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file))) else: log.debug('No lbry_file, need to get stream') - return self._get_stream(stream_info) + # returns an instance of ManagedEncryptedFileDownloaderFactory + sd_hash, file_path = yield self._get_stream(stream_info) + defer.returnValue((sd_hash, file_path)) + def _wait_on_lbry_file(self, f): + file_path = self._full_path(f) + written_bytes = self._get_written_bytes(file_path) + if written_bytes: + log.info("File has bytes: %s --> %s", f.sd_hash, file_path) + return defer.succeed(True) + return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) + + @defer.inlineCallbacks def _get_stream(self, stream_info): - d = self.daemon.add_stream( + was_successful, sd_hash, download_path = yield self.daemon.add_stream( self.name, self.timeout, self.download_directory, self.file_name, stream_info) - - def _handle_timeout(args): - was_successful, _, _ = args - if not was_successful: - log.warning("lbry://%s timed out, removing from streams", self.name) - del self.daemon.streams[self.name] - - d.addCallback(_handle_timeout) - + if not was_successful: + log.warning("lbry://%s timed out, removing from streams", self.name) + del self.daemon.streams[self.name] + self.remove_from_wait("Timed out") + raise Exception("Timed out") if self.wait_for_write: - d.addCallback(lambda _: self._wait_for_write()) - - def _get_stream_for_return(): - stream = self.daemon.streams.get(self.name, None) - if stream: - return stream.downloader - else: - self.remove_from_wait("Timed out") - return defer.fail(Exception("Timed out")) - - d.addCallback(lambda _: _get_stream_for_return()) - return d + yield self._wait_for_write() + defer.returnValue((sd_hash, download_path)) def _wait_for_write(self): d = defer.succeed(None) @@ -2398,39 +2404,27 @@ class _DownloadNameHelper(object): def _has_downloader_wrote(self): stream = self.daemon.streams.get(self.name, False) if stream: - return self._get_written_bytes(stream.downloader.file_name) + file_path = self._full_path(stream.downloader) + return self._get_written_bytes(file_path) else: return False - def _wait_on_lbry_file(self, f): - written_bytes = self._get_written_bytes(f.file_name) - if written_bytes: - return defer.succeed(self._disp_file(f)) - return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) + def _get_written_bytes(self, file_path): + """Returns the number of bytes written to `file_path`. - def _get_written_bytes(self, file_name): - """Returns the number of bytes written to `file_name`. - - Returns False if there were issues reading `file_name`. + Returns False if there were issues reading `file_path`. """ try: - file_path = os.path.join(self.download_directory, file_name) if os.path.isfile(file_path): - written_file = file(file_path) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() + with open(file_path) as written_file: + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() else: written_bytes = False except Exception: writen_bytes = False return written_bytes - def _disp_file(self, f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("Already downloaded: %s --> %s", f.sd_hash, file_path) - return f - def remove_from_wait(self, reason): if self.name in self.daemon.waiting_on: del self.daemon.waiting_on[self.name] diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index 3f59e175e..88eb10163 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -15,6 +15,7 @@ INITIALIZING_CODE = 'initializing' DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' +# TODO: is this ever used? DOWNLOAD_STOPPED_CODE = 'stopped' STREAM_STAGES = [ (INITIALIZING_CODE, 'Initializing...'), @@ -46,7 +47,7 @@ class GetStream(object): self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier - self.stream_hash = None + self.sd_hash = None self.max_key_fee = max_key_fee self.stream_info = None self.stream_info_manager = None @@ -56,6 +57,8 @@ class GetStream(object): self.download_directory = download_directory self.download_path = None self.downloader = None + # fired after the metadata has been downloaded and the + # actual file has been started self.finished = defer.Deferred(None) self.checker = LoopingCall(self.check_status) self.code = STREAM_STAGES[0] @@ -63,10 +66,10 @@ class GetStream(object): def check_status(self): self.timeout_counter += 1 - # TODO: Why is this the stopping condition for the finished callback? + # download_path is set after the sd blob has been downloaded if self.download_path: self.checker.stop() - self.finished.callback((True, self.stream_hash, self.download_path)) + self.finished.callback((True, self.sd_hash, self.download_path)) elif self.timeout_counter >= self.timeout: log.info("Timeout downloading lbry://%s" % self.resolved_name) @@ -108,20 +111,20 @@ class GetStream(object): self.resolved_name = name self.stream_info = deepcopy(stream_info) self.description = self.stream_info['description'] - self.stream_hash = self.stream_info['sources']['lbry_sd_hash'] + self.sd_hash = self.stream_info['sources']['lbry_sd_hash'] if 'fee' in self.stream_info: self.fee = FeeValidator(self.stream_info['fee']) max_key_fee = self._convert_max_fee() converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount if converted_fee > self.wallet.wallet_balance: - log.warning("Insufficient funds to download lbry://%s", self.resolved_name) - return defer.fail(InsufficientFundsError()) + msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format( + self.resolved_name, converted_fee, self.wallet.wallet_balance) + raise InsufficientFundsError(msg) if converted_fee > max_key_fee: - log.warning( - "Key fee %f above limit of %f didn't download lbry://%s", + msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format( converted_fee, max_key_fee, self.resolved_name) - return defer.fail(KeyFeeAboveMaxAllowed()) + raise KeyFeeAboveMaxAllowed(msg) log.info( "Key fee %f below limit of %f, downloading lbry://%s", converted_fee, max_key_fee, self.resolved_name) @@ -130,7 +133,7 @@ class GetStream(object): self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) self._d.addCallback(lambda _: download_sd_blob( - self.session, self.stream_hash, self.payment_rate_manager)) + self.session, self.sd_hash, self.payment_rate_manager)) self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) self._d.addCallback(get_downloader_factory) @@ -147,7 +150,7 @@ class GetStream(object): d = self._pay_key_fee() d.addCallback(lambda _: log.info( - "Downloading %s --> %s", self.stream_hash, self.downloader.file_name)) + "Downloading %s --> %s", self.sd_hash, self.downloader.file_name)) d.addCallback(lambda _: self.downloader.start()) def _pay_key_fee(self): @@ -155,6 +158,9 @@ class GetStream(object): fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) if reserved_points is None: + log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name) + # TODO: If we get here, nobody will know that there was an error + # as nobody actually cares about self._d return defer.fail(InsufficientFundsError()) return self.wallet.send_points_to_address(reserved_points, fee_lbc) return defer.succeed(None)