From 0b53fde3529bcb5e9ccd7336549010ee4fc72096 Mon Sep 17 00:00:00 2001 From: Job Evers-Meltzer <jobevers@users.noreply.github.com> Date: Fri, 30 Dec 2016 08:27:50 -0600 Subject: [PATCH] refactor connectionmanager to use inlineCallbacks --- lbrynet/core/client/ConnectionManager.py | 119 +++++++++++------------ 1 file changed, 57 insertions(+), 62 deletions(-) diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index d8c6170fc..a3331b31f 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -72,13 +72,19 @@ class ConnectionManager(object): closing_deferreds.append(close_connection(peer)) return defer.DeferredList(closing_deferreds) + @defer.inlineCallbacks def get_next_request(self, peer, protocol): - log.debug("Trying to get the next request for peer %s", peer) - if not peer in self._peer_connections or self.stopped is True: log.debug("The peer has already been told to shut down.") - return defer.succeed(False) + defer.returnValue(False) + requests = yield self._send_primary_requests(peer, protocol) + have_request = any(r[1] for r in requests if r[0] is True) + if have_request: + yield self._send_secondary_requests(peer, protocol) + defer.returnValue(have_request) + + def _send_primary_requests(self, peer, protocol): def handle_error(err): err.trap(InsufficientFundsError) @@ -97,34 +103,20 @@ class ConnectionManager(object): self._peer_connections[peer].request_creators.append(request_creator) return request_sent - def check_requests(requests): - have_request = True in [r[1] for r in requests if r[0] is True] - return have_request - - def get_secondary_requests_if_necessary(have_request): - if have_request is True: - ds = [] - for s_r_c in self._secondary_request_creators: - d = s_r_c.send_next_request(peer, protocol) - ds.append(d) - dl = defer.DeferredList(ds) - else: - dl = defer.succeed(None) - dl.addCallback(lambda _: have_request) - return dl - ds = [] - for p_r_c in self._primary_request_creators: d = p_r_c.send_next_request(peer, protocol) d.addErrback(handle_error) d.addCallback(check_if_request_sent, p_r_c) ds.append(d) + return defer.DeferredList(ds, fireOnOneErrback=True) - dl = defer.DeferredList(ds, fireOnOneErrback=True) - dl.addCallback(check_requests) - dl.addCallback(get_secondary_requests_if_necessary) - return dl + def _send_secondary_requests(self, peer, protocol): + ds = [ + s_r_c.send_next_request(peer, protocol) + for s_r_c in self._secondary_request_creators + ] + return defer.DeferredList(ds) def protocol_disconnected(self, peer, protocol): if peer in self._peer_connections: @@ -147,49 +139,52 @@ class ConnectionManager(object): return sorted(self._primary_request_creators, key=count_peers) def _connect_to_peer(self, peer): + if peer is None or self.stopped: + return from twisted.internet import reactor - if peer is not None and self.stopped is False: - log.debug("Trying to connect to %s", peer) - factory = ClientProtocolFactory(peer, self.rate_limiter, self) - self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], - factory) - connection = reactor.connectTCP(peer.host, peer.port, factory) - self._peer_connections[peer].connection = connection + log.debug("Trying to connect to %s", peer) + factory = ClientProtocolFactory(peer, self.rate_limiter, self) + self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], + factory) + connection = reactor.connectTCP(peer.host, peer.port, factory) + self._peer_connections[peer].connection = connection + @defer.inlineCallbacks def _manage(self): - from twisted.internet import reactor - - def get_new_peers(request_creators): - log.debug("Trying to get a new peer to connect to") - if len(request_creators) > 0: - log.debug("Got a creator to check: %s", request_creators[0]) - d = request_creators[0].get_new_peers() - d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:])) - return d - else: - return defer.succeed(None) - - def pick_best_peer(peers): - # TODO: Eventually rank them based on past performance/reputation. For now - # TODO: just pick the first to which we don't have an open connection - - log.debug("Got a list of peers to choose from: %s", peers) - if peers is None: - return None - for peer in peers: - if not peer in self._peer_connections: - log.debug("Got a good peer. Returning peer %s", peer) - return peer - log.debug("Couldn't find a good peer to connect to") - return None - if len(self._peer_connections) < conf.settings.max_connections_per_stream: - ordered_request_creators = self._rank_request_creator_connections() - d = get_new_peers(ordered_request_creators) - d.addCallback(pick_best_peer) - d.addCallback(self._connect_to_peer) - + try: + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) + yield self._connect_to_peer(peer) + except Exception: + # log this otherwise it will just end up as an unhandled error in deferred + log.exception('Something bad happened picking a peer') self._next_manage_call = reactor.callLater(1, self._manage) + + @defer.inlineCallbacks + def _get_new_peers(self, request_creators): + log.debug("Trying to get a new peer to connect to") + if not request_creators: + defer.returnValue(None) + log.debug("Got a creator to check: %s", request_creators[0]) + new_peers = yield request_creators[0].get_new_peers() + if not new_peers: + new_peers = yield self._get_new_peers(request_creators[1:]) + defer.returnValue(new_peers) + + def _pick_best_peer(self, peers): + # TODO: Eventually rank them based on past performance/reputation. For now + # TODO: just pick the first to which we don't have an open connection + log.debug("Got a list of peers to choose from: %s", peers) + if peers is None: + return None + for peer in peers: + if not peer in self._peer_connections: + log.debug("Got a good peer. Returning peer %s", peer) + return peer + log.debug("Couldn't find a good peer to connect to") + return None