diff --git a/lbrynet/analytics/events.py b/lbrynet/analytics/events.py index 310ee1ce1..353e93073 100644 --- a/lbrynet/analytics/events.py +++ b/lbrynet/analytics/events.py @@ -48,13 +48,18 @@ class Events(object): def heartbeat(self): return self._event('Heartbeat') - def download_started(self, name, stream_info=None): - properties = { - 'name': name, - 'stream_info': get_sd_hash(stream_info) - } + def download_started(self, *args, **kwargs): + properties = download_properties(*args, **kwargs) return self._event('Download Started', properties) + def download_errored(self, *args, **kwargs): + properties = download_properties(*args, **kwargs) + return self._event('Download Errored', properties) + + def download_finished(self, *args, **kwargs): + properties = download_properties(*args, **kwargs) + return self._event('Download Finished', properties) + def error(self, message, sd_hash=None): properties = { 'message': message, @@ -110,3 +115,11 @@ def make_context(platform, wallet): 'version': '1.0.0' }, } + + +def download_properties(id_, name, stream_info=None): + return { + 'download_id': id_, + 'name': name, + 'stream_info': get_sd_hash(stream_info) + } diff --git a/lbrynet/analytics/manager.py b/lbrynet/analytics/manager.py index 5af578b61..6e11c805c 100644 --- a/lbrynet/analytics/manager.py +++ b/lbrynet/analytics/manager.py @@ -70,8 +70,16 @@ class Manager(object): event = self.events_generator.server_startup_error(message) self.analytics_api.track(event) - def send_download_started(self, name, stream_info=None): - event = self.events_generator.download_started(name, stream_info) + def send_download_started(self, id_, name, stream_info=None): + event = self.events_generator.download_started(id_, name, stream_info) + self.analytics_api.track(event) + + def send_download_errored(self, id_, name, stream_info=None): + event = self.events_generator.download_errored(id_, name, stream_info) + self.analytics_api.track(event) + + def send_download_finished(self, id_, name, stream_info=None): + event = self.events_generator.download_finished(id_, name, stream_info) self.analytics_api.track(event) def send_error(self, message, sd_hash=None): diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index 2d9e3d0f5..f95162cf3 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -176,11 +176,12 @@ class ClientProtocol(Protocol): def _handle_response(self, response): ds = [] - log.debug("Handling a response. Current expected responses: %s", self._response_deferreds) + log.debug( + "Handling a response from %s. Expected responses: %s. Actual responses: %s", + self.peer, self._response_deferreds.keys(), response.keys()) for key, val in response.items(): if key in self._response_deferreds: - d = self._response_deferreds[key] - del self._response_deferreds[key] + d = self._response_deferreds.pop(key) d.callback({key: val}) ds.append(d) for k, d in self._response_deferreds.items(): @@ -194,6 +195,7 @@ class ClientProtocol(Protocol): d.addErrback(self._handle_response_error) ds.append(d) + # TODO: are we sure we want to consume errors here dl = defer.DeferredList(ds, consumeErrors=True) def get_next_request(results): diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index d8c6170fc..a3331b31f 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -72,13 +72,19 @@ class ConnectionManager(object): closing_deferreds.append(close_connection(peer)) return defer.DeferredList(closing_deferreds) + @defer.inlineCallbacks def get_next_request(self, peer, protocol): - log.debug("Trying to get the next request for peer %s", peer) - if not peer in self._peer_connections or self.stopped is True: log.debug("The peer has already been told to shut down.") - return defer.succeed(False) + defer.returnValue(False) + requests = yield self._send_primary_requests(peer, protocol) + have_request = any(r[1] for r in requests if r[0] is True) + if have_request: + yield self._send_secondary_requests(peer, protocol) + defer.returnValue(have_request) + + def _send_primary_requests(self, peer, protocol): def handle_error(err): err.trap(InsufficientFundsError) @@ -97,34 +103,20 @@ class ConnectionManager(object): self._peer_connections[peer].request_creators.append(request_creator) return request_sent - def check_requests(requests): - have_request = True in [r[1] for r in requests if r[0] is True] - return have_request - - def get_secondary_requests_if_necessary(have_request): - if have_request is True: - ds = [] - for s_r_c in self._secondary_request_creators: - d = s_r_c.send_next_request(peer, protocol) - ds.append(d) - dl = defer.DeferredList(ds) - else: - dl = defer.succeed(None) - dl.addCallback(lambda _: have_request) - return dl - ds = [] - for p_r_c in self._primary_request_creators: d = p_r_c.send_next_request(peer, protocol) d.addErrback(handle_error) d.addCallback(check_if_request_sent, p_r_c) ds.append(d) + return defer.DeferredList(ds, fireOnOneErrback=True) - dl = defer.DeferredList(ds, fireOnOneErrback=True) - dl.addCallback(check_requests) - dl.addCallback(get_secondary_requests_if_necessary) - return dl + def _send_secondary_requests(self, peer, protocol): + ds = [ + s_r_c.send_next_request(peer, protocol) + for s_r_c in self._secondary_request_creators + ] + return defer.DeferredList(ds) def protocol_disconnected(self, peer, protocol): if peer in self._peer_connections: @@ -147,49 +139,52 @@ class ConnectionManager(object): return sorted(self._primary_request_creators, key=count_peers) def _connect_to_peer(self, peer): + if peer is None or self.stopped: + return from twisted.internet import reactor - if peer is not None and self.stopped is False: - log.debug("Trying to connect to %s", peer) - factory = ClientProtocolFactory(peer, self.rate_limiter, self) - self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], - factory) - connection = reactor.connectTCP(peer.host, peer.port, factory) - self._peer_connections[peer].connection = connection + log.debug("Trying to connect to %s", peer) + factory = ClientProtocolFactory(peer, self.rate_limiter, self) + self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], + factory) + connection = reactor.connectTCP(peer.host, peer.port, factory) + self._peer_connections[peer].connection = connection + @defer.inlineCallbacks def _manage(self): - from twisted.internet import reactor - - def get_new_peers(request_creators): - log.debug("Trying to get a new peer to connect to") - if len(request_creators) > 0: - log.debug("Got a creator to check: %s", request_creators[0]) - d = request_creators[0].get_new_peers() - d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:])) - return d - else: - return defer.succeed(None) - - def pick_best_peer(peers): - # TODO: Eventually rank them based on past performance/reputation. For now - # TODO: just pick the first to which we don't have an open connection - - log.debug("Got a list of peers to choose from: %s", peers) - if peers is None: - return None - for peer in peers: - if not peer in self._peer_connections: - log.debug("Got a good peer. Returning peer %s", peer) - return peer - log.debug("Couldn't find a good peer to connect to") - return None - if len(self._peer_connections) < conf.settings.max_connections_per_stream: - ordered_request_creators = self._rank_request_creator_connections() - d = get_new_peers(ordered_request_creators) - d.addCallback(pick_best_peer) - d.addCallback(self._connect_to_peer) - + try: + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) + yield self._connect_to_peer(peer) + except Exception: + # log this otherwise it will just end up as an unhandled error in deferred + log.exception('Something bad happened picking a peer') self._next_manage_call = reactor.callLater(1, self._manage) + + @defer.inlineCallbacks + def _get_new_peers(self, request_creators): + log.debug("Trying to get a new peer to connect to") + if not request_creators: + defer.returnValue(None) + log.debug("Got a creator to check: %s", request_creators[0]) + new_peers = yield request_creators[0].get_new_peers() + if not new_peers: + new_peers = yield self._get_new_peers(request_creators[1:]) + defer.returnValue(new_peers) + + def _pick_best_peer(self, peers): + # TODO: Eventually rank them based on past performance/reputation. For now + # TODO: just pick the first to which we don't have an open connection + log.debug("Got a list of peers to choose from: %s", peers) + if peers is None: + return None + for peer in peers: + if not peer in self._peer_connections: + log.debug("Got a good peer. Returning peer %s", peer) + return peer + log.debug("Couldn't find a good peer to connect to") + return None diff --git a/lbrynet/core/client/DownloadManager.py b/lbrynet/core/client/DownloadManager.py index 689a607df..78f6a006e 100644 --- a/lbrynet/core/client/DownloadManager.py +++ b/lbrynet/core/client/DownloadManager.py @@ -14,7 +14,6 @@ class DownloadManager(object): def __init__(self, blob_manager, upload_allowed): self.blob_manager = blob_manager self.upload_allowed = upload_allowed - self.blob_requester = None self.blob_info_finder = None self.progress_manager = None self.blob_handler = None diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index 0c228e8a6..72f96da14 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -369,7 +369,7 @@ class Logger(logging.Logger): self.name, level, fn, lno, msg, msg_args, exc_info, func, msg_kwargs) self.handle(record) if callback: - callback(err, *args, **kwargs) + return callback(err, *args, **kwargs) return _fail def trace(self, msg, *args, **kwargs): diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index 08d5e813f..b55f6c208 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -4,6 +4,7 @@ import logging import random import os import socket +import string import sys import pkg_resources @@ -92,3 +93,7 @@ def setup_certs_for_windows(): if getattr(sys, 'frozen', False) and os.name == "nt": cert_path = os.path.join(os.path.dirname(sys.executable), "cacert.pem") os.environ["REQUESTS_CA_BUNDLE"] = cert_path + + +def random_string(length=10, chars=string.ascii_lowercase): + return ''.join([random.choice(chars) for _ in range(length)]) diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index 9b1e351c7..ae312ad59 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -62,19 +62,16 @@ class CryptStreamDownloader(object): self.payment_rate_manager = payment_rate_manager self.wallet = wallet self.upload_allowed = upload_allowed - self.key = None self.stream_name = None - self.completed = False self.stopped = True self.stopping = False self.starting = False - self.download_manager = None self.finished_deferred = None - self.points_paid = 0.0 + self.blob_requester = None def __str__(self): return str(self.stream_name) @@ -86,7 +83,6 @@ class CryptStreamDownloader(object): return self.stop() def start(self): - if self.starting is True: raise CurrentlyStartingError() if self.stopping is True: @@ -97,30 +93,24 @@ class CryptStreamDownloader(object): self.starting = True self.completed = False self.finished_deferred = defer.Deferred() - fd = self.finished_deferred d = self._start() - d.addCallback(lambda _: fd) + d.addCallback(lambda _: self.finished_deferred) return d + @defer.inlineCallbacks def stop(self, err=None): - - def check_if_stop_succeeded(success): - self.stopping = False - if success is True: - self.stopped = True - self._remove_download_manager() - return success - if self.stopped is True: raise AlreadyStoppedError() if self.stopping is True: raise CurrentlyStoppingError() assert self.download_manager is not None self.stopping = True - d = self.download_manager.stop_downloading() - d.addCallback(check_if_stop_succeeded) - d.addCallback(lambda _: self._fire_completed_deferred(err)) - return d + success = yield self.download_manager.stop_downloading() + self.stopping = False + if success is True: + self.stopped = True + self._remove_download_manager() + yield self._fire_completed_deferred(err) def _start_failed(self): @@ -155,20 +145,19 @@ class CryptStreamDownloader(object): return d def _get_download_manager(self): + assert self.blob_requester is None download_manager = DownloadManager(self.blob_manager, self.upload_allowed) download_manager.blob_info_finder = self._get_metadata_handler(download_manager) - download_manager.blob_requester = self._get_blob_requester(download_manager) download_manager.progress_manager = self._get_progress_manager(download_manager) download_manager.blob_handler = self._get_blob_handler(download_manager) download_manager.wallet_info_exchanger = self.wallet.get_info_exchanger() + # blob_requester needs to be set before the connection manager is setup + self.blob_requester = self._get_blob_requester(download_manager) download_manager.connection_manager = self._get_connection_manager(download_manager) - #return DownloadManager(self.blob_manager, self.blob_requester, self.metadata_handler, - # self.progress_manager, self.blob_handler, self.connection_manager) return download_manager def _remove_download_manager(self): self.download_manager.blob_info_finder = None - self.download_manager.blob_requester = None self.download_manager.progress_manager = None self.download_manager.blob_handler = None self.download_manager.wallet_info_exchanger = None @@ -176,7 +165,7 @@ class CryptStreamDownloader(object): self.download_manager = None def _get_primary_request_creators(self, download_manager): - return [download_manager.blob_requester] + return [self.blob_requester] def _get_secondary_request_creators(self, download_manager): return [download_manager.wallet_info_exchanger] @@ -210,7 +199,8 @@ class CryptStreamDownloader(object): if err is not None: d.errback(err) else: - d.callback(self._get_finished_deferred_callback_value()) + value = self._get_finished_deferred_callback_value() + d.callback(value) else: log.debug("Not firing the completed deferred because d is None") diff --git a/lbrynet/lbryfile/EncryptedFileMetadataManager.py b/lbrynet/lbryfile/EncryptedFileMetadataManager.py index bae6291b6..e9b2e7335 100644 --- a/lbrynet/lbryfile/EncryptedFileMetadataManager.py +++ b/lbrynet/lbryfile/EncryptedFileMetadataManager.py @@ -49,7 +49,7 @@ class DBEncryptedFileMetadataManager(object): def get_blobs_for_stream(self, stream_hash, start_blob=None, end_blob=None, count=None, reverse=False): - log.debug("Getting blobs for a stream. Count is %s", str(count)) + log.debug("Getting blobs for stream %s. Count is %s", stream_hash, count) def get_positions_of_start_and_end(): if start_blob is not None: diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 527bcfdc3..e2a1a4bc5 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -137,32 +137,20 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): d.addCallback(make_full_status) return d + @defer.inlineCallbacks def _start(self): - - d = EncryptedFileSaver._start(self) - d.addCallback( - lambda _: self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash)) - - def _save_sd_hash(sd_hash): - if len(sd_hash): - self.sd_hash = sd_hash[0] - d = self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) - else: - d = defer.succeed(None) - - return d - - def _save_claim(name, txid, nout): - self.uri = name - self.txid = txid - self.nout = nout - return defer.succeed(None) - - d.addCallback(_save_sd_hash) - d.addCallback(lambda r: _save_claim(r[0], r[1], r[2]) if r else None) - d.addCallback(lambda _: self._save_status()) - - return d + yield EncryptedFileSaver._start(self) + sd_hash = yield self.stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + if len(sd_hash): + self.sd_hash = sd_hash[0] + maybe_metadata = yield self.wallet.get_claim_metadata_for_sd_hash(self.sd_hash) + if maybe_metadata: + name, txid, nout = maybe_metadata + self.uri = name + self.txid = txid + self.nout = nout + status = yield self._save_status() + defer.returnValue(status) def _get_finished_deferred_callback_value(self): if self.completed is True: diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index 9416facca..b7d1a81e4 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -70,6 +70,7 @@ STARTUP_STAGES = [ (WAITING_FOR_FIRST_RUN_CREDITS, 'Waiting for first run credits...') ] +# TODO: make this consistent with the stages in Downloader.py DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' @@ -778,9 +779,9 @@ class Daemon(AuthJSONRPCServer): d.addCallback(BlobStreamDescriptorReader) d.addCallback(lambda blob: blob.get_info()) d.addCallback(cb) - return r + @defer.inlineCallbacks def _download_name(self, name, timeout=None, download_directory=None, file_name=None, stream_info=None, wait_for_write=True): """ @@ -788,20 +789,17 @@ class Daemon(AuthJSONRPCServer): If it already exists in the file manager, return the existing lbry file """ timeout = timeout if timeout is not None else conf.settings.download_timeout - self.analytics_manager.send_download_started(name, stream_info) + helper = _DownloadNameHelper( self, name, timeout, download_directory, file_name, wait_for_write) if not stream_info: self.waiting_on[name] = True - d = self._resolve_name(name) - else: - d = defer.succeed(stream_info) - d.addCallback(helper._setup_stream) - d.addCallback(helper.wait_or_get_stream) - if not stream_info: - d.addCallback(helper._remove_from_wait) - return d + stream_info = yield self._resolve_name(name) + del self.waiting_on[name] + lbry_file = yield helper.setup_stream(stream_info) + sd_hash, file_path = yield helper.wait_or_get_stream(stream_info, lbry_file) + defer.returnValue((sd_hash, file_path)) def add_stream(self, name, timeout, download_directory, file_name, stream_info): """Makes, adds and starts a stream""" @@ -1399,8 +1397,6 @@ class Daemon(AuthJSONRPCServer): return self._render_response(None, BAD_REQUEST) d = self._resolve_name(name, force_refresh=force) - # TODO: this is the rpc call that returns a server.failure. - # what is up with that? d.addCallbacks( lambda info: self._render_response(info, OK_CODE), # TODO: Is server.failure a module? It looks like it: @@ -1483,6 +1479,7 @@ class Daemon(AuthJSONRPCServer): ) @AuthJSONRPCServer.auth_required + @defer.inlineCallbacks def jsonrpc_get(self, p): """Download stream from a LBRY uri. @@ -1492,28 +1489,63 @@ class Daemon(AuthJSONRPCServer): 'file_name': optional, a user specified name for the downloaded file 'stream_info': optional, specified stream info overrides name 'timeout': optional - 'wait_for_write': optional, defaults to True + 'wait_for_write': optional, defaults to True. When set, waits for the file to + only start to be written before returning any results. Returns: 'stream_hash': hex string 'path': path of download """ params = self._process_get_parameters(p) if not params.name: - return server.failure + # TODO: return a useful error message here, like "name argument is required" + defer.returnValue(server.failure) if params.name in self.waiting_on: - return server.failure - d = self._download_name(name=params.name, - timeout=params.timeout, - download_directory=params.download_directory, - stream_info=params.stream_info, - file_name=params.file_name, - wait_for_write=params.wait_for_write) - # TODO: downloading can timeout. Not sure what to do when that happens - d.addCallbacks( - get_output_callback(params), - lambda err: str(err)) - d.addCallback(lambda message: self._render_response(message, OK_CODE)) - return d + # TODO: return a useful error message here, like "already + # waiting for name to be resolved" + defer.returnValue(server.failure) + name = params.name + stream_info = params.stream_info + + # first check if we already have this + lbry_file = yield self._get_lbry_file(FileID.NAME, name, return_json=False) + if lbry_file: + log.info('Already have a file for %s', name) + message = { + 'stream_hash': params.sd_hash if params.stream_info else lbry_file.sd_hash, + 'path': os.path.join(lbry_file.download_directory, lbry_file.file_name) + } + response = yield self._render_response(message, OK_CODE) + defer.returnValue(response) + + download_id = utils.random_string() + self.analytics_manager.send_download_started(download_id, name, stream_info) + try: + sd_hash, file_path = yield self._download_name( + name=params.name, + timeout=params.timeout, + download_directory=params.download_directory, + stream_info=params.stream_info, + file_name=params.file_name, + wait_for_write=params.wait_for_write + ) + except Exception as e: + self.analytics_manager.send_download_errored(download_id, name, stream_info) + log.exception('Failed to get %s', params.name) + response = yield self._render_response(str(e), OK_CODE) + else: + # TODO: should stream_hash key be changed to sd_hash? + message = { + 'stream_hash': params.sd_hash if params.stream_info else sd_hash, + 'path': file_path + } + stream = self.streams.get(name) + if stream: + stream.downloader.finished_deferred.addCallback( + lambda _: self.analytics_manager.send_download_finished( + download_id, name, stream_info) + ) + response = yield self._render_response(message, OK_CODE) + defer.returnValue(response) @AuthJSONRPCServer.auth_required def jsonrpc_stop_lbry_file(self, p): @@ -1721,6 +1753,7 @@ class Daemon(AuthJSONRPCServer): txid = p['txid'] nout = p['nout'] else: + # TODO: return a useful error message return server.failure def _disp(x): @@ -1915,6 +1948,7 @@ class Daemon(AuthJSONRPCServer): amount = p['amount'] address = p['address'] else: + # TODO: return a useful error message return server.failure reserved_points = self.session.wallet.reserve_points(address, amount) @@ -1956,6 +1990,7 @@ class Daemon(AuthJSONRPCServer): d = self.session.wallet.get_block_info(height) d.addCallback(lambda blockhash: self.session.wallet.get_block(blockhash)) else: + # TODO: return a useful error message return server.failure d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d @@ -1973,6 +2008,7 @@ class Daemon(AuthJSONRPCServer): if 'txid' in p.keys(): txid = p['txid'] else: + # TODO: return a useful error message return server.failure d = self.session.wallet.get_claims_from_tx(txid) @@ -2317,15 +2353,6 @@ def get_sd_hash(stream_info): return stream_info.get('stream_hash') -def get_output_callback(params): - def callback(l): - return { - 'stream_hash': params.sd_hash if params.stream_info else l.sd_hash, - 'path': os.path.join(params.download_directory, l.file_name) - } - return callback - - class _DownloadNameHelper(object): def __init__(self, daemon, name, timeout=None, @@ -2341,102 +2368,89 @@ class _DownloadNameHelper(object): self.file_name = file_name self.wait_for_write = wait_for_write - def _setup_stream(self, stream_info): - stream_hash = get_sd_hash(stream_info) - d = self.daemon._get_lbry_file_by_sd_hash(stream_hash) - d.addCallback(self._prepend_stream_info, stream_info) - return d + @defer.inlineCallbacks + def setup_stream(self, stream_info): + sd_hash = get_sd_hash(stream_info) + lbry_file = yield self.daemon._get_lbry_file_by_sd_hash(sd_hash) + if self._does_lbry_file_exists(lbry_file): + defer.returnValue(lbry_file) + else: + defer.returnValue(None) - def _prepend_stream_info(self, lbry_file, stream_info): - if lbry_file: - if os.path.isfile(os.path.join(self.download_directory, lbry_file.file_name)): - return defer.succeed((stream_info, lbry_file)) - return defer.succeed((stream_info, None)) + def _does_lbry_file_exists(self, lbry_file): + return lbry_file and os.path.isfile(self._full_path(lbry_file)) - def wait_or_get_stream(self, args): - stream_info, lbry_file = args + def _full_path(self, lbry_file): + return os.path.join(self.download_directory, lbry_file.file_name) + + @defer.inlineCallbacks + def wait_or_get_stream(self, stream_info, lbry_file): if lbry_file: log.debug('Wait on lbry_file') - return self._wait_on_lbry_file(lbry_file) + # returns the lbry_file + yield self._wait_on_lbry_file(lbry_file) + defer.returnValue((lbry_file.sd_hash, self._full_path(lbry_file))) else: log.debug('No lbry_file, need to get stream') - return self._get_stream(stream_info) + # returns an instance of ManagedEncryptedFileDownloaderFactory + sd_hash, file_path = yield self._get_stream(stream_info) + defer.returnValue((sd_hash, file_path)) + def _wait_on_lbry_file(self, f): + file_path = self._full_path(f) + written_bytes = self._get_written_bytes(file_path) + if written_bytes: + log.info("File has bytes: %s --> %s", f.sd_hash, file_path) + return defer.succeed(True) + return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) + + @defer.inlineCallbacks def _get_stream(self, stream_info): - d = self.daemon.add_stream( + was_successful, sd_hash, download_path = yield self.daemon.add_stream( self.name, self.timeout, self.download_directory, self.file_name, stream_info) - - def _handle_timeout(args): - was_successful, _, _ = args - if not was_successful: - log.warning("lbry://%s timed out, removing from streams", self.name) - del self.daemon.streams[self.name] - - d.addCallback(_handle_timeout) - + if not was_successful: + log.warning("lbry://%s timed out, removing from streams", self.name) + del self.daemon.streams[self.name] + self.remove_from_wait("Timed out") + raise Exception("Timed out") if self.wait_for_write: - d.addCallback(lambda _: self._wait_for_write()) - - def _get_stream_for_return(): - stream = self.daemon.streams.get(self.name, None) - if stream: - return stream.downloader - else: - self._remove_from_wait("Timed out") - return defer.fail(Exception("Timed out")) - - d.addCallback(lambda _: _get_stream_for_return()) - return d + yield self._wait_for_write() + defer.returnValue((sd_hash, download_path)) def _wait_for_write(self): d = defer.succeed(None) - if not self.has_downloader_wrote(): + if not self._has_downloader_wrote(): d.addCallback(lambda _: reactor.callLater(1, self._wait_for_write)) return d - def has_downloader_wrote(self): + def _has_downloader_wrote(self): stream = self.daemon.streams.get(self.name, False) if stream: - downloader = stream.downloader + file_path = self._full_path(stream.downloader) + return self._get_written_bytes(file_path) else: - downloader = False - if not downloader: return False - return self.get_written_bytes(downloader.file_name) - def _wait_on_lbry_file(self, f): - written_bytes = self.get_written_bytes(f.file_name) - if written_bytes: - return defer.succeed(self._disp_file(f)) - return task.deferLater(reactor, 1, self._wait_on_lbry_file, f) + def _get_written_bytes(self, file_path): + """Returns the number of bytes written to `file_path`. - def get_written_bytes(self, file_name): - """Returns the number of bytes written to `file_name`. - - Returns False if there were issues reading `file_name`. + Returns False if there were issues reading `file_path`. """ try: - file_path = os.path.join(self.download_directory, file_name) if os.path.isfile(file_path): - written_file = file(file_path) - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - written_file.close() + with open(file_path) as written_file: + written_file.seek(0, os.SEEK_END) + written_bytes = written_file.tell() else: written_bytes = False except Exception: writen_bytes = False return written_bytes - def _disp_file(self, f): - file_path = os.path.join(self.download_directory, f.file_name) - log.info("Already downloaded: %s --> %s", f.sd_hash, file_path) - return f - - def _remove_from_wait(self, r): + def remove_from_wait(self, reason): if self.name in self.daemon.waiting_on: del self.daemon.waiting_on[self.name] - return r + return reason class _ResolveNameHelper(object): diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index e4ec76a3a..88eb10163 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -15,6 +15,7 @@ INITIALIZING_CODE = 'initializing' DOWNLOAD_METADATA_CODE = 'downloading_metadata' DOWNLOAD_TIMEOUT_CODE = 'timeout' DOWNLOAD_RUNNING_CODE = 'running' +# TODO: is this ever used? DOWNLOAD_STOPPED_CODE = 'stopped' STREAM_STAGES = [ (INITIALIZING_CODE, 'Initializing...'), @@ -46,16 +47,18 @@ class GetStream(object): self.payment_rate_manager = self.session.payment_rate_manager self.lbry_file_manager = lbry_file_manager self.sd_identifier = sd_identifier - self.stream_hash = None + self.sd_hash = None self.max_key_fee = max_key_fee self.stream_info = None self.stream_info_manager = None - self.d = defer.Deferred(None) + self._d = defer.Deferred(None) self.timeout = timeout self.timeout_counter = 0 self.download_directory = download_directory self.download_path = None self.downloader = None + # fired after the metadata has been downloaded and the + # actual file has been started self.finished = defer.Deferred(None) self.checker = LoopingCall(self.check_status) self.code = STREAM_STAGES[0] @@ -63,15 +66,15 @@ class GetStream(object): def check_status(self): self.timeout_counter += 1 - # TODO: Why is this the stopping condition for the finished callback? + # download_path is set after the sd blob has been downloaded if self.download_path: self.checker.stop() - self.finished.callback((True, self.stream_hash, self.download_path)) + self.finished.callback((True, self.sd_hash, self.download_path)) elif self.timeout_counter >= self.timeout: log.info("Timeout downloading lbry://%s" % self.resolved_name) self.checker.stop() - self.d.cancel() + self._d.cancel() self.code = STREAM_STAGES[4] self.finished.callback((False, None, None)) @@ -108,35 +111,35 @@ class GetStream(object): self.resolved_name = name self.stream_info = deepcopy(stream_info) self.description = self.stream_info['description'] - self.stream_hash = self.stream_info['sources']['lbry_sd_hash'] + self.sd_hash = self.stream_info['sources']['lbry_sd_hash'] if 'fee' in self.stream_info: self.fee = FeeValidator(self.stream_info['fee']) max_key_fee = self._convert_max_fee() converted_fee = self.exchange_rate_manager.to_lbc(self.fee).amount if converted_fee > self.wallet.wallet_balance: - log.warning("Insufficient funds to download lbry://%s", self.resolved_name) - return defer.fail(InsufficientFundsError()) + msg = "Insufficient funds to download lbry://{}. Need {:0.2f}, have {:0.2f}".format( + self.resolved_name, converted_fee, self.wallet.wallet_balance) + raise InsufficientFundsError(msg) if converted_fee > max_key_fee: - log.warning( - "Key fee %f above limit of %f didn't download lbry://%s", + msg = "Key fee {:0.2f} above limit of {:0.2f} didn't download lbry://{}".format( converted_fee, max_key_fee, self.resolved_name) - return defer.fail(KeyFeeAboveMaxAllowed()) + raise KeyFeeAboveMaxAllowed(msg) log.info( "Key fee %f below limit of %f, downloading lbry://%s", converted_fee, max_key_fee, self.resolved_name) self.checker.start(1) - self.d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) - self.d.addCallback(lambda _: download_sd_blob( - self.session, self.stream_hash, self.payment_rate_manager)) - self.d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) - self.d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) - self.d.addCallback(get_downloader_factory) - self.d.addCallback(make_downloader) - self.d.addCallbacks(self._start_download, _cause_timeout) - self.d.callback(None) + self._d.addCallback(lambda _: _set_status(None, DOWNLOAD_METADATA_CODE)) + self._d.addCallback(lambda _: download_sd_blob( + self.session, self.sd_hash, self.payment_rate_manager)) + self._d.addCallback(self.sd_identifier.get_metadata_for_sd_blob) + self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) + self._d.addCallback(get_downloader_factory) + self._d.addCallback(make_downloader) + self._d.addCallbacks(self._start_download, _cause_timeout) + self._d.callback(None) return self.finished @@ -147,7 +150,7 @@ class GetStream(object): d = self._pay_key_fee() d.addCallback(lambda _: log.info( - "Downloading %s --> %s", self.stream_hash, self.downloader.file_name)) + "Downloading %s --> %s", self.sd_hash, self.downloader.file_name)) d.addCallback(lambda _: self.downloader.start()) def _pay_key_fee(self): @@ -155,6 +158,9 @@ class GetStream(object): fee_lbc = self.exchange_rate_manager.to_lbc(self.fee).amount reserved_points = self.wallet.reserve_points(self.fee.address, fee_lbc) if reserved_points is None: + log.warning('Unable to pay the key fee of %s for %s', fee_lbc, self.resolved_name) + # TODO: If we get here, nobody will know that there was an error + # as nobody actually cares about self._d return defer.fail(InsufficientFundsError()) return self.wallet.send_points_to_address(reserved_points, fee_lbc) return defer.succeed(None) diff --git a/tests/functional/test_misc.py b/tests/functional/test_misc.py index 17ea1a7f8..1e0efc37e 100644 --- a/tests/functional/test_misc.py +++ b/tests/functional/test_misc.py @@ -652,7 +652,8 @@ class TestTransfer(TestCase): options = metadata.options factories = metadata.factories chosen_options = [ - o.default_value for o in options.get_downloader_options(info_validator, prm)] + o.default_value for o in options.get_downloader_options(info_validator, prm) + ] return factories[0].make_downloader(metadata, chosen_options, prm) def download_file(sd_hash): @@ -669,17 +670,14 @@ class TestTransfer(TestCase): hashsum.update(f.read()) self.assertEqual(hashsum.hexdigest(), "4ca2aafb4101c1e42235aad24fbb83be") + @defer.inlineCallbacks def start_transfer(sd_hash): - logging.debug("Starting the transfer") - - d = self.session.setup() - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: check_md5_sum()) - - return d + yield self.session.setup() + yield add_lbry_file_to_sd_identifier(sd_identifier) + yield self.lbry_file_manager.setup() + yield download_file(sd_hash) + yield check_md5_sum() def stop(arg): if isinstance(arg, Failure): @@ -914,25 +912,30 @@ class TestTransfer(TestCase): self.stream_info_manager = DBEncryptedFileMetadataManager(self.session.db_dir) self.lbry_file_manager = EncryptedFileManager(self.session, self.stream_info_manager, sd_identifier) + @defer.inlineCallbacks def make_downloader(metadata, prm): info_validator = metadata.validator options = metadata.options factories = metadata.factories - chosen_options = [o.default_value for o in options.get_downloader_options(info_validator, prm)] - return factories[0].make_downloader(metadata, chosen_options, prm) + chosen_options = [ + o.default_value for o in options.get_downloader_options(info_validator, prm) + ] + downloader = yield factories[0].make_downloader(metadata, chosen_options, prm) + defer.returnValue(downloader) def append_downloader(downloader): downloaders.append(downloader) return downloader + @defer.inlineCallbacks def download_file(sd_hash): prm = self.session.payment_rate_manager - d = download_sd_blob(self.session, sd_hash, prm) - d.addCallback(sd_identifier.get_metadata_for_sd_blob) - d.addCallback(make_downloader, prm) - d.addCallback(append_downloader) - d.addCallback(lambda downloader: downloader.start()) - return d + sd_blob = yield download_sd_blob(self.session, sd_hash, prm) + metadata = yield sd_identifier.get_metadata_for_sd_blob(sd_blob) + downloader = yield make_downloader(metadata, prm) + downloaders.append(downloader) + finished_value = yield downloader.start() + defer.returnValue(finished_value) def check_md5_sum(): f = open('test_file') @@ -959,20 +962,18 @@ class TestTransfer(TestCase): d.addCallback(check_status_report) return d + @defer.inlineCallbacks def start_transfer(sd_hash): logging.debug("Starting the transfer") - - d = self.session.setup() - d.addCallback(lambda _: self.stream_info_manager.setup()) - d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) - d.addCallback(lambda _: self.lbry_file_manager.setup()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: check_md5_sum()) - d.addCallback(lambda _: download_file(sd_hash)) - d.addCallback(lambda _: delete_lbry_file()) - d.addCallback(lambda _: check_lbry_file()) - - return d + yield self.session.setup() + yield self.stream_info_manager.setup() + yield add_lbry_file_to_sd_identifier(sd_identifier) + yield self.lbry_file_manager.setup() + yield download_file(sd_hash) + yield check_md5_sum() + yield download_file(sd_hash) + yield delete_lbry_file() + yield check_lbry_file() def stop(arg): if isinstance(arg, Failure): diff --git a/tests/unit/analytics/test_events.py b/tests/unit/analytics/test_events.py index d9baf2e59..06b9bcd92 100644 --- a/tests/unit/analytics/test_events.py +++ b/tests/unit/analytics/test_events.py @@ -22,7 +22,7 @@ class EventsTest(unittest.TestCase): self.assertEqual(desired_result, result) def test_download_started(self): - result = self.event_generator.download_started('great gatsby') + result = self.event_generator.download_started('1', 'great gatsby') desired_result = { 'context': 'any valid json datatype', 'event': 'Download Started', @@ -31,6 +31,7 @@ class EventsTest(unittest.TestCase): 'session_id': 'session456', 'name': 'great gatsby', 'stream_info': None, + 'download_id': '1' }, 'timestamp': '2016-01-01T00:00:00Z', 'userId': 'lbry'