From 1b2c46e224702236264c09146faaf2549d4728ed Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:33:39 -0600 Subject: [PATCH 01/13] make _download_manager private --- lbrynet/core/client/BlobRequester.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index fbce3029e..53c5866a0 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -46,7 +46,7 @@ class BlobRequester(object): self.peer_finder = peer_finder self.payment_rate_manager = payment_rate_manager self.wallet = wallet - self.download_manager = download_manager + self._download_manager = download_manager self._peers = defaultdict(int) # {Peer: score} self._available_blobs = defaultdict(list) # {Peer: [blob_hash]} self._unavailable_blobs = defaultdict(list) # {Peer: [blob_hash]}} @@ -159,12 +159,12 @@ class BlobRequester(object): return False def _blobs_to_download(self): - needed_blobs = self.download_manager.needed_blobs() + needed_blobs = self._download_manager.needed_blobs() return sorted(needed_blobs, key=lambda b: b.is_downloading()) def _blobs_without_sources(self): return [ - b for b in self.download_manager.needed_blobs() + b for b in self._download_manager.needed_blobs() if not self._hash_available(b.blob_hash) ] From 8075ced1af88ba301fd34f033d73043edc5ce816 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:35:01 -0600 Subject: [PATCH 02/13] bug fix for file_seed cannot return a deferred in inlineCallbacks, it must be a value --- lbrynet/lbrynet_daemon/Daemon.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index e5ee1bb51..829bc1bd3 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1677,7 +1677,8 @@ class Daemon(AuthJSONRPCServer): msg = ( "File was already being seeded" if status == 'start' else "File was already stopped" ) - defer.returnValue(self._render_response(msg)) + response = yield self._render_response(msg) + defer.returnValue(response) @AuthJSONRPCServer.auth_required def jsonrpc_delete_lbry_file(self, p): From 11510191863d6ff8f1af4dddb5d2b8af30f2b001 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:35:53 -0600 Subject: [PATCH 03/13] Improve downloader error handling Instead of having an error cause a timeout, send the error directly to the callback --- lbrynet/lbrynet_daemon/Downloader.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Downloader.py b/lbrynet/lbrynet_daemon/Downloader.py index f17fb34e3..44cc674e2 100644 --- a/lbrynet/lbrynet_daemon/Downloader.py +++ b/lbrynet/lbrynet_daemon/Downloader.py @@ -85,9 +85,10 @@ class GetStream(object): return self.exchange_rate_manager.to_lbc(self.max_key_fee).amount def start(self, stream_info, name): - def _cause_timeout(err): - log.info('Cancelling download') - self.timeout_counter = self.timeout * 2 + def _cancel(err): + if self.checker: + self.checker.stop() + self.finished.errback(err) def _set_status(x, status): log.info("Download lbry://%s status changed to %s" % (self.resolved_name, status)) @@ -138,7 +139,7 @@ class GetStream(object): self._d.addCallback(lambda r: _set_status(r, DOWNLOAD_RUNNING_CODE)) self._d.addCallback(get_downloader_factory) self._d.addCallback(make_downloader) - self._d.addCallbacks(self._start_download, _cause_timeout) + self._d.addCallbacks(self._start_download, _cancel) self._d.callback(None) return self.finished From 6035a846752bdedf5466aa4d6c47eb37fe33c58a Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:37:44 -0600 Subject: [PATCH 04/13] bug fix for stopping downloads deferredlist returns a tuple of (success, result) and the previous code was looking at the entire tuple not the success values. --- lbrynet/core/client/DownloadManager.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/client/DownloadManager.py b/lbrynet/core/client/DownloadManager.py index 78f6a006e..0b1e3adb6 100644 --- a/lbrynet/core/client/DownloadManager.py +++ b/lbrynet/core/client/DownloadManager.py @@ -18,7 +18,6 @@ class DownloadManager(object): self.progress_manager = None self.blob_handler = None self.connection_manager = None - self.blobs = {} self.blob_infos = {} @@ -59,8 +58,8 @@ class DownloadManager(object): d1.addBoth(check_stop, "progress manager") d2 = self.connection_manager.stop() d2.addBoth(check_stop, "connection manager") - dl = defer.DeferredList([d1, d2]) - dl.addCallback(lambda xs: False not in xs) + dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True) + dl.addCallback(lambda results: all([success for success, val in results])) return dl def add_blobs_to_download(self, blob_infos): From 2449604844be835c564ee3b8d29c84555ac839ff Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 13 Jan 2017 08:18:07 -0600 Subject: [PATCH 05/13] fixup: bug fix for stopping downloads --- lbrynet/core/client/DownloadManager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/core/client/DownloadManager.py b/lbrynet/core/client/DownloadManager.py index 0b1e3adb6..dedd2e3df 100644 --- a/lbrynet/core/client/DownloadManager.py +++ b/lbrynet/core/client/DownloadManager.py @@ -58,7 +58,7 @@ class DownloadManager(object): d1.addBoth(check_stop, "progress manager") d2 = self.connection_manager.stop() d2.addBoth(check_stop, "connection manager") - dl = defer.DeferredList([d1, d2], fireOnOneErrback=True, consumeErrors=True) + dl = defer.DeferredList([d1, d2], consumeErrors=True) dl.addCallback(lambda results: all([success for success, val in results])) return dl From 15c5075d38d3d2066306622985ba46f4401511f1 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:40:53 -0600 Subject: [PATCH 06/13] refactor closing peers --- lbrynet/core/client/ConnectionManager.py | 37 ++++++++++++------------ 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index a3331b31f..b04d37ed3 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -46,30 +46,29 @@ class ConnectionManager(object): if self._next_manage_call is not None and self._next_manage_call.active() is True: self._next_manage_call.cancel() self._next_manage_call = None - closing_deferreds = [] - for peer in self._peer_connections.keys(): - def close_connection(p): - log.info( - "Abruptly closing a connection to %s due to downloading being paused", p) + return self._close_peers() - if self._peer_connections[p].factory.p is not None: - d = self._peer_connections[p].factory.p.cancel_requests() - else: - d = defer.succeed(True) + def _close_peers(self): - def disconnect_peer(): - d = defer.Deferred() - self._connections_closing[p] = d - self._peer_connections[p].connection.disconnect() - if p in self._peer_connections: - del self._peer_connections[p] - return d + def disconnect_peer(p): + d = defer.Deferred() + self._connections_closing[p] = d + self._peer_connections[p].connection.disconnect() + if p in self._peer_connections: + del self._peer_connections[p] + return d - d.addBoth(lambda _: disconnect_peer()) - return d + def close_connection(p): + log.info("Abruptly closing a connection to %s due to downloading being paused", p) + if self._peer_connections[p].factory.p is not None: + d = self._peer_connections[p].factory.p.cancel_requests() + else: + d = defer.succeed(True) + d.addBoth(lambda _: disconnect_peer(p)) + return d - closing_deferreds.append(close_connection(peer)) + closing_deferreds = [close_connection(peer) for peer in self._peer_connections.keys()] return defer.DeferredList(closing_deferreds) @defer.inlineCallbacks From f535d969296cb533a7571baa90431099add46eb4 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:42:07 -0600 Subject: [PATCH 07/13] switch stop to inlineCallback --- lbrynet/core/client/ConnectionManager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index b04d37ed3..81b436e03 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -41,13 +41,13 @@ class ConnectionManager(object): self._next_manage_call = reactor.callLater(0, self._manage) return defer.succeed(True) + @defer.inlineCallbacks def stop(self): self.stopped = True - if self._next_manage_call is not None and self._next_manage_call.active() is True: + if self._next_manage_call and self._next_manage_call.active(): self._next_manage_call.cancel() self._next_manage_call = None - - return self._close_peers() + yield self._close_peers() def _close_peers(self): From 0bb62515a8881d2f8f32e89ddf0f99be0f338b4e Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:45:18 -0600 Subject: [PATCH 08/13] reorder functions to match call order --- lbrynet/core/client/ConnectionManager.py | 50 ++++++++++++------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 81b436e03..8a9899bb6 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -125,31 +125,6 @@ class ConnectionManager(object): del self._connections_closing[peer] d.callback(True) - def _rank_request_creator_connections(self): - """Returns an ordered list of our request creators, ranked according - to which has the least number of connections open that it - likes - """ - def count_peers(request_creator): - return len([ - p for p in self._peer_connections.itervalues() - if request_creator in p.request_creators]) - - return sorted(self._primary_request_creators, key=count_peers) - - def _connect_to_peer(self, peer): - if peer is None or self.stopped: - return - - from twisted.internet import reactor - - log.debug("Trying to connect to %s", peer) - factory = ClientProtocolFactory(peer, self.rate_limiter, self) - self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], - factory) - connection = reactor.connectTCP(peer.host, peer.port, factory) - self._peer_connections[peer].connection = connection - @defer.inlineCallbacks def _manage(self): from twisted.internet import reactor @@ -164,6 +139,18 @@ class ConnectionManager(object): log.exception('Something bad happened picking a peer') self._next_manage_call = reactor.callLater(1, self._manage) + def _rank_request_creator_connections(self): + """Returns an ordered list of our request creators, ranked according + to which has the least number of connections open that it + likes + """ + def count_peers(request_creator): + return len([ + p for p in self._peer_connections.itervalues() + if request_creator in p.request_creators]) + + return sorted(self._primary_request_creators, key=count_peers) + @defer.inlineCallbacks def _get_new_peers(self, request_creators): log.debug("Trying to get a new peer to connect to") @@ -187,3 +174,16 @@ class ConnectionManager(object): return peer log.debug("Couldn't find a good peer to connect to") return None + + def _connect_to_peer(self, peer): + if peer is None or self.stopped: + return + + from twisted.internet import reactor + + log.debug("Trying to connect to %s", peer) + factory = ClientProtocolFactory(peer, self.rate_limiter, self) + self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], + factory) + connection = reactor.connectTCP(peer.host, peer.port, factory) + self._peer_connections[peer].connection = connection From cb2bb6ee6b4a4c795162f8e3ef8c3a16cbd72dec Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:45:37 -0600 Subject: [PATCH 09/13] bug fix: properly stop ConnectionManager It is possible (likely) that a manage call is in progress when `stop` is called. When that happens, _manage will continue to run, and schedule another call - and the manager won't actually stop, and will likely cause an error as other components have been torn down. This fix adds a deferred that gets created when a manage call starts and is fired when its done. At this points its safe to start the stopping process. Also add a check to not schedule another manage call if we're stopped This fixes https://app.asana.com/0/142330900434470/239832897034382 --- lbrynet/core/client/ConnectionManager.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 8a9899bb6..15298f9f1 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -29,6 +29,8 @@ class ConnectionManager(object): self._peer_connections = {} # {Peer: PeerConnectionHandler} self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)} self._next_manage_call = None + # a deferred that gets fired when a _manage call is set + self._manage_deferred = None self.stopped = True def start(self): @@ -44,6 +46,10 @@ class ConnectionManager(object): @defer.inlineCallbacks def stop(self): self.stopped = True + # wait for the current manage call to finish + if self._manage_deferred: + yield self._manage_deferred + # in case we stopped between manage calls, cancel the next one if self._next_manage_call and self._next_manage_call.active(): self._next_manage_call.cancel() self._next_manage_call = None @@ -127,6 +133,8 @@ class ConnectionManager(object): @defer.inlineCallbacks def _manage(self): + self._manage_deferred = defer.Deferred() + from twisted.internet import reactor if len(self._peer_connections) < conf.settings.max_connections_per_stream: try: @@ -137,7 +145,10 @@ class ConnectionManager(object): except Exception: # log this otherwise it will just end up as an unhandled error in deferred log.exception('Something bad happened picking a peer') - self._next_manage_call = reactor.callLater(1, self._manage) + self._manage_deferred.callback(None) + self._manage_deferred = None + if not self.stopped: + self._next_manage_call = reactor.callLater(1, self._manage) def _rank_request_creator_connections(self): """Returns an ordered list of our request creators, ranked according From 488c04543d81a792478c07fe7c6cfa553947c5a0 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:52:38 -0600 Subject: [PATCH 10/13] add TODO comment --- lbrynet/cryptstream/client/CryptStreamDownloader.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index ae312ad59..3dadc3b14 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -147,6 +147,10 @@ class CryptStreamDownloader(object): def _get_download_manager(self): assert self.blob_requester is None download_manager = DownloadManager(self.blob_manager, self.upload_allowed) + # TODO: can we get rid of these circular references. I'm not + # smart enough to handle thinking about the interactions + # between them and have hope that there is a simpler way + # to accomplish what we want download_manager.blob_info_finder = self._get_metadata_handler(download_manager) download_manager.progress_manager = self._get_progress_manager(download_manager) download_manager.blob_handler = self._get_blob_handler(download_manager) From 9366a6e561f0425140187190ba5729eabc40422d Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:53:18 -0600 Subject: [PATCH 11/13] bug fix: reset blob_requester between start/stop --- lbrynet/cryptstream/client/CryptStreamDownloader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbrynet/cryptstream/client/CryptStreamDownloader.py b/lbrynet/cryptstream/client/CryptStreamDownloader.py index 3dadc3b14..657b8ca5e 100644 --- a/lbrynet/cryptstream/client/CryptStreamDownloader.py +++ b/lbrynet/cryptstream/client/CryptStreamDownloader.py @@ -165,6 +165,7 @@ class CryptStreamDownloader(object): self.download_manager.progress_manager = None self.download_manager.blob_handler = None self.download_manager.wallet_info_exchanger = None + self.blob_requester = None self.download_manager.connection_manager = None self.download_manager = None From a9261b20323c84d38af49f04fc8d9581955cc923 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Wed, 11 Jan 2017 11:53:43 -0600 Subject: [PATCH 12/13] change stop to be an inlineCallback --- .../EncryptedFileDownloader.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index e2a1a4bc5..b091022e4 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -42,7 +42,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): self.claim_id = None self.rowid = rowid self.lbry_file_manager = lbry_file_manager - self.saving_status = False + self._saving_status = False + + @property + def saving_status(self): + return self._saving_status def restore(self): d = self.stream_info_manager._get_sd_blob_hashes_for_stream(self.stream_hash) @@ -96,18 +100,15 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): reflector_server = random.choice(conf.settings.reflector_servers) return reupload.check_and_restore_availability(self, reflector_server) + @defer.inlineCallbacks def stop(self, err=None, change_status=True): - - def set_saving_status_done(): - self.saving_status = False - + log.debug('Stopping download for %s', self.sd_hash) # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. - d = EncryptedFileDownloader.stop(self, err=err) + yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: - self.saving_status = True - d.addCallback(lambda _: self._save_status()) - d.addCallback(lambda _: set_saving_status_done()) - return d + self._saving_status = True + status = yield self._save_status() + self._saving_status = False def status(self): def find_completed_blobhashes(blobs): From e61260a263497b103aa05ec6db5aad207504d14e Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer Date: Fri, 13 Jan 2017 08:16:42 -0600 Subject: [PATCH 13/13] merge with change stop to be an inlineCallback --- lbrynet/lbryfilemanager/EncryptedFileDownloader.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index b091022e4..58d9f04dd 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -106,9 +106,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): # EncryptedFileSaver deletes metadata when it's stopped. We don't want that here. yield EncryptedFileDownloader.stop(self, err=err) if change_status is True: - self._saving_status = True status = yield self._save_status() - self._saving_status = False def status(self): def find_completed_blobhashes(blobs): @@ -159,14 +157,17 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): else: return "Download stopped" + @defer.inlineCallbacks def _save_status(self): + self._saving_status = True if self.completed is True: - s = ManagedEncryptedFileDownloader.STATUS_FINISHED + status = ManagedEncryptedFileDownloader.STATUS_FINISHED elif self.stopped is True: - s = ManagedEncryptedFileDownloader.STATUS_STOPPED + status = ManagedEncryptedFileDownloader.STATUS_STOPPED else: - s = ManagedEncryptedFileDownloader.STATUS_RUNNING - return self.lbry_file_manager.change_lbry_file_status(self, s) + status = ManagedEncryptedFileDownloader.STATUS_RUNNING + yield self.lbry_file_manager.change_lbry_file_status(self, status) + self._saving_status = False def _get_progress_manager(self, download_manager): return FullStreamProgressManager(self._finished_downloading,