diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index fbce3029e..53c5866a0 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -46,7 +46,7 @@ class BlobRequester(object): self.peer_finder = peer_finder self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.download_manager = download_manager + self._download_manager = download_manager self._peers = defaultdict(int) # {Peer: score} self._available_blobs = defaultdict(list) # {Peer: [blob_hash]} self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}} @@ -159,12 +159,12 @@ class BlobRequester(object): return False def _blobs_to_download(self): - needed_blobs = self.download_manager.needed_blobs() + needed_blobs = self._download_manager.needed_blobs() return sorted(needed_blobs, key=lambda b: b.is_downloading()) def _blobs_without_sources(self): return [ - b for b in self.download_manager.needed_blobs() + b for b in self._download_manager.needed_blobs() if not self._hash_available(b.blob_hash) ] diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index a3331b31f..15298f9f1 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -29,6 +29,8 @@ class ConnectionManager(object): self._peer_connections = {} # {Peer: PeerConnectionHandler} self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} self._next_manage_call = None + # a deferred that gets fired when a _manage call is set + self._manage_deferred = None self.stopped = True def start(self): @@ -41,35 +43,38 @@ class ConnectionManager(object): self._next_manage_call = reactor.callLater(0, self._manage) return defer.succeed(True) + @defer.inlineCallbacks def stop(self): self.stopped = True - if self._next_manage_call is not None and self._next_manage_call.active() is True: + # wait for the current manage call to finish + if self._manage_deferred: + yield self._manage_deferred + # in case we stopped between manage calls, cancel the next one + if self._next_manage_call and self._next_manage_call.active(): self._next_manage_call.cancel() self._next_manage_call = None - closing_deferreds = [] - for peer in self._peer_connections.keys(): + yield self._close_peers() - def close_connection(p): - log.info( - "Abruptly closing a connection to %s due to downloading being paused", p) + def _close_peers(self): - if self._peer_connections[p].factory.p is not None: - d = self._peer_connections[p].factory.p.cancel_requests() - else: - d = defer.succeed(True) + def disconnect_peer(p): + d = defer.Deferred() + self._connections_closing[p] = d + self._peer_connections[p].connection.disconnect() + if p in self._peer_connections: + del self._peer_connections[p] + return d - def disconnect_peer(): - d = defer.Deferred() - self._connections_closing[p] = d - self._peer_connections[p].connection.disconnect() - if p in self._peer_connections: - del self._peer_connections[p] - return d + def close_connection(p): + log.info("Abruptly closing a connection to %s due to downloading being paused", p) + if self._peer_connections[p].factory.p is not None: + d = self._peer_connections[p].factory.p.cancel_requests() + else: + d = defer.succeed(True) + d.addBoth(lambda _: disconnect_peer(p)) + return d - d.addBoth(lambda _: disconnect_peer()) - return d - - closing_deferreds.append(close_connection(peer)) + closing_deferreds = [close_connection(peer) for peer in self._peer_connections.keys()] return defer.DeferredList(closing_deferreds) @defer.inlineCallbacks @@ -126,6 +131,25 @@ class ConnectionManager(object): del self._connections_closing[peer] d.callback(True) + @defer.inlineCallbacks + def _manage(self): + self._manage_deferred = defer.Deferred() + + from twisted.internet import reactor + if len(self._peer_connections) < conf.settings.max_connections_per_stream: + try: + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) + yield self._connect_to_peer(peer) + except Exception: + # log this otherwise it will just end up as an unhandled error in deferred + log.exception('Something bad happened picking a peer') + self._manage_deferred.callback(None) + self._manage_deferred = None + if not self.stopped: + self._next_manage_call = reactor.callLater(1, self._manage) + def _rank_request_creator_connections(self): """Returns an ordered list of our request creators, ranked according to which has the least number of connections open that it @@ -138,33 +162,6 @@ class ConnectionManager(object): return sorted(self._primary_request_creators, key=count_peers) - def _connect_to_peer(self, peer): - if peer is None or self.stopped: - return - - from twisted.internet import reactor - - log.debug("Trying to connect to %s", peer) - factory = ClientProtocolFactory(peer, self.rate_limiter, self) - self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], - factory) - connection = reactor.connectTCP(peer.host, peer.port, factory) - self._peer_connections[peer].connection = connection - - @defer.inlineCallbacks - def _manage(self): - from twisted.internet import reactor - if len(self._peer_connections) < conf.settings.max_connections_per_stream: - try: - ordered_request_creators = self._rank_request_creator_connections() - peers = yield self._get_new_peers(ordered_request_creators) - peer = self._pick_best_peer(peers) - yield self._connect_to_peer(peer) - except Exception: - # log this otherwise it will just end up as an unhandled error in deferred - log.exception('Something bad happened picking a peer') - self._next_manage_call = reactor.callLater(1, self._manage) - @defer.inlineCallbacks def _get_new_peers(self, request_creators): log.debug("Trying to get a new peer to connect to") @@ -188,3 +185,16 @@ class ConnectionManager(object): return peer log.debug("Couldn't find a good peer to connect to") return None + + def _connect_to_peer(self, peer): + if peer is None or self.stopped: + return + + from twisted.internet import reactor + + log.debug("Trying to connect to %s", peer) + factory = ClientProtocolFactory(peer, self.rate_limiter, self) + self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], + factory) + connection = reactor.connectTCP(peer.host, peer.port, factory) + self._peer_connections[peer].connection = connection diff --git a/lbrynet/core/client/DownloadManager.py b/lbrynet/core/client/DownloadManager.py index 78f6a006e..dedd2e3df 100644 --- a/lbrynet/core/client/DownloadManager.py +++ b/lbrynet/core/client/DownloadManager.py @@ -18,7 +18,6 @@ class DownloadManager(object): self.progress_manager = None self.blob_handler = None self.connection_manager = None - self.blobs = {} self.blob_infos = {} @@ -59,8 +58,8 @@ class DownloadManager(object): d1.addBoth(check_stop, "progress manager") d2 = self.connection_manager.stop() d2.addBoth(check_stop, "connection manager") - dl = defer.DeferredList([d1, d2]) - dl.addCallback(lambda xs: False not in xs) + dl = defer.DeferredList([d1, d2], consumeErrors=True) + dl.addCallback(lambda results: all([success for success, val in results])) return dl def add_blobs_to_download(self, blob_infos): diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index ae312ad59..657b8ca5e 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -147,6 +147,10 @@ class CryptStreamDownloader(object): def _get_download_manager(self): assert self.blob_requester is None download_manager = DownloadManager(self.blob_manager, self.upload_allowed) + # TODO: can we get rid of these circular references. I'm not + # smart enough to handle thinking about the interactions + # between them and have hope that there is a simpler way + # to accomplish what we want download_manager.blob_info_finder = self._get_metadata_handler(download_manager) download_manager.progress_manager = self._get_progress_manager(download_manager) download_manager.blob_handler = self._get_blob_handler(download_manager) @@ -161,6 +165,7 @@ class CryptStreamDownloader(object): self.download_manager.progress_manager = None self.download_manager.blob_handler = None self.download_manager.wallet_info_exchanger = None + self.blob_requester = None self.download_manager.connection_manager = None self.download_manager = None diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index e2a1a4bc5..58d9f04dd 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -42,7 +42,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.claim_id = None self.rowid = rowid self.lbry_file_manager = lbry_file_manager - self.saving_status = False + self._saving_status = False + + @property + def saving_status(self): + return self._saving_status def restore(self): d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) @@ -96,18 +100,13 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): reflector_server = random.choice(conf.settings.reflector_servers) return reupload.check_and_restore_availability(self, reflector_server) + @defer.inlineCallbacks def stop(self, err=None, change_status=True): - - def set_saving_status_done(): - self.saving_status = False - + log.debug('Stopping download for %s', self.sd_hash) # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. - d = EncryptedFileDownloader.stop(self, err=err) + yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: - self.saving_status = True - d.addCallback(lambda _: self._save_status()) - d.addCallback(lambda _: set_saving_status_done()) - return d + status = yield self._save_status() def status(self): def find_completed_blobhashes(blobs): @@ -158,14 +157,17 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): else: return "Download stopped" + @defer.inlineCallbacks def _save_status(self): + self._saving_status = True if self.completed is True: - s = ManagedEncryptedFileDownloader.STATUS_FINISHED + status = ManagedEncryptedFileDownloader.STATUS_FINISHED elif self.stopped is True: - s = ManagedEncryptedFileDownloader.STATUS_STOPPED + status = ManagedEncryptedFileDownloader.STATUS_STOPPED else: - s = ManagedEncryptedFileDownloader.STATUS_RUNNING - return self.lbry_file_manager.change_lbry_file_status(self, s) + status = ManagedEncryptedFileDownloader.STATUS_RUNNING + yield self.lbry_file_manager.change_lbry_file_status(self, status) + self._saving_status = False def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading, diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index e5ee1bb51..829bc1bd3 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1677,7 +1677,8 @@ class Daemon(AuthJSONRPCServer): msg = ( "File was already being seeded" if status == 'start' else "File was already stopped" ) - defer.returnValue(self._render_response(msg)) + response = yield self._render_response(msg) + defer.returnValue(response) @AuthJSONRPCServer.auth_required def jsonrpc_delete_lbry_file(self, p): diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index f17fb34e3..44cc674e2 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -85,9 +85,10 @@ class GetStream(object): return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount def start(self, stream_info, name): - def _cause_timeout(err): - log.info('Cancelling download') - self.timeout_counter = self.timeout * 2 + def _cancel(err): + if self.checker: + self.checker.stop() + self.finished.errback(err) def _set_status(x, status): log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status)) @@ -138,7 +139,7 @@ class GetStream(object): self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) self._d.addCallback(get_downloader_factory) self._d.addCallback(make_downloader) - self._d.addCallbacks(self._start_download, _cause_timeout) + self._d.addCallbacks(self._start_download, _cancel) self._d.callback(None) return self.finished