refactor connectionmanager to use inlineCallbacks

This commit is contained in:
Job Evers-Meltzer 2016-12-30 08:27:50 -06:00
parent 4eb10b56c1
commit 0b53fde352

View file

@ -72,13 +72,19 @@ class ConnectionManager(object):
closing_deferreds.append(close_connection(peer)) closing_deferreds.append(close_connection(peer))
return defer.DeferredList(closing_deferreds) return defer.DeferredList(closing_deferreds)
@defer.inlineCallbacks
def get_next_request(self, peer, protocol): def get_next_request(self, peer, protocol):
log.debug("Trying to get the next request for peer %s", peer) log.debug("Trying to get the next request for peer %s", peer)
if not peer in self._peer_connections or self.stopped is True: if not peer in self._peer_connections or self.stopped is True:
log.debug("The peer has already been told to shut down.") log.debug("The peer has already been told to shut down.")
return defer.succeed(False) defer.returnValue(False)
requests = yield self._send_primary_requests(peer, protocol)
have_request = any(r[1] for r in requests if r[0] is True)
if have_request:
yield self._send_secondary_requests(peer, protocol)
defer.returnValue(have_request)
def _send_primary_requests(self, peer, protocol):
def handle_error(err): def handle_error(err):
err.trap(InsufficientFundsError) err.trap(InsufficientFundsError)
@ -97,34 +103,20 @@ class ConnectionManager(object):
self._peer_connections[peer].request_creators.append(request_creator) self._peer_connections[peer].request_creators.append(request_creator)
return request_sent return request_sent
def check_requests(requests):
have_request = True in [r[1] for r in requests if r[0] is True]
return have_request
def get_secondary_requests_if_necessary(have_request):
if have_request is True:
ds = [] ds = []
for s_r_c in self._secondary_request_creators:
d = s_r_c.send_next_request(peer, protocol)
ds.append(d)
dl = defer.DeferredList(ds)
else:
dl = defer.succeed(None)
dl.addCallback(lambda _: have_request)
return dl
ds = []
for p_r_c in self._primary_request_creators: for p_r_c in self._primary_request_creators:
d = p_r_c.send_next_request(peer, protocol) d = p_r_c.send_next_request(peer, protocol)
d.addErrback(handle_error) d.addErrback(handle_error)
d.addCallback(check_if_request_sent, p_r_c) d.addCallback(check_if_request_sent, p_r_c)
ds.append(d) ds.append(d)
return defer.DeferredList(ds, fireOnOneErrback=True)
dl = defer.DeferredList(ds, fireOnOneErrback=True) def _send_secondary_requests(self, peer, protocol):
dl.addCallback(check_requests) ds = [
dl.addCallback(get_secondary_requests_if_necessary) s_r_c.send_next_request(peer, protocol)
return dl for s_r_c in self._secondary_request_creators
]
return defer.DeferredList(ds)
def protocol_disconnected(self, peer, protocol): def protocol_disconnected(self, peer, protocol):
if peer in self._peer_connections: if peer in self._peer_connections:
@ -147,10 +139,11 @@ class ConnectionManager(object):
return sorted(self._primary_request_creators, key=count_peers) return sorted(self._primary_request_creators, key=count_peers)
def _connect_to_peer(self, peer): def _connect_to_peer(self, peer):
if peer is None or self.stopped:
return
from twisted.internet import reactor from twisted.internet import reactor
if peer is not None and self.stopped is False:
log.debug("Trying to connect to %s", peer) log.debug("Trying to connect to %s", peer)
factory = ClientProtocolFactory(peer, self.rate_limiter, self) factory = ClientProtocolFactory(peer, self.rate_limiter, self)
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
@ -158,24 +151,34 @@ class ConnectionManager(object):
connection = reactor.connectTCP(peer.host, peer.port, factory) connection = reactor.connectTCP(peer.host, peer.port, factory)
self._peer_connections[peer].connection = connection self._peer_connections[peer].connection = connection
@defer.inlineCallbacks
def _manage(self): def _manage(self):
from twisted.internet import reactor from twisted.internet import reactor
if len(self._peer_connections) < conf.settings.max_connections_per_stream:
try:
ordered_request_creators = self._rank_request_creator_connections()
peers = yield self._get_new_peers(ordered_request_creators)
peer = self._pick_best_peer(peers)
yield self._connect_to_peer(peer)
except Exception:
# log this otherwise it will just end up as an unhandled error in deferred
log.exception('Something bad happened picking a peer')
self._next_manage_call = reactor.callLater(1, self._manage)
def get_new_peers(request_creators): @defer.inlineCallbacks
def _get_new_peers(self, request_creators):
log.debug("Trying to get a new peer to connect to") log.debug("Trying to get a new peer to connect to")
if len(request_creators) > 0: if not request_creators:
defer.returnValue(None)
log.debug("Got a creator to check: %s", request_creators[0]) log.debug("Got a creator to check: %s", request_creators[0])
d = request_creators[0].get_new_peers() new_peers = yield request_creators[0].get_new_peers()
d.addCallback(lambda h: h if h is not None else get_new_peers(request_creators[1:])) if not new_peers:
return d new_peers = yield self._get_new_peers(request_creators[1:])
else: defer.returnValue(new_peers)
return defer.succeed(None)
def pick_best_peer(peers): def _pick_best_peer(self, peers):
# TODO: Eventually rank them based on past performance/reputation. For now # TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection # TODO: just pick the first to which we don't have an open connection
log.debug("Got a list of peers to choose from: %s", peers) log.debug("Got a list of peers to choose from: %s", peers)
if peers is None: if peers is None:
return None return None
@ -185,11 +188,3 @@ class ConnectionManager(object):
return peer return peer
log.debug("Couldn't find a good peer to connect to") log.debug("Couldn't find a good peer to connect to")
return None return None
if len(self._peer_connections) < conf.settings.max_connections_per_stream:
ordered_request_creators = self._rank_request_creator_connections()
d = get_new_peers(ordered_request_creators)
d.addCallback(pick_best_peer)
d.addCallback(self._connect_to_peer)
self._next_manage_call = reactor.callLater(1, self._manage)