connect to multiple peers at once in manage

This commit is contained in:
Kay Kurokawa 2017-03-23 14:12:09 -04:00 committed by Alex Grintsvayg
parent 23f1cf9c82
commit 3af5e9dc5c
2 changed files with 13 additions and 14 deletions

View file

@ -1,3 +1,4 @@
import random
import logging
from twisted.internet import defer, reactor
from zope.interface import implements
@ -19,7 +20,7 @@ class PeerConnectionHandler(object):
class ConnectionManager(object):
implements(interfaces.IConnectionManager)
MANAGE_CALL_INTERVAL_SEC = 1
MANAGE_CALL_INTERVAL_SEC = 5
def __init__(self, downloader, rate_limiter,
primary_request_creators, secondary_request_creators):
@ -150,7 +151,9 @@ class ConnectionManager(object):
conf.settings['max_connections_per_stream'])
ordered_request_creators = self._rank_request_creator_connections()
peers = yield self._get_new_peers(ordered_request_creators)
peer = self._pick_best_peer(peers)
new_conns = conf.settings['max_connections_per_stream'] - len(self._peer_connections)
peers = self._pick_best_peers(peers, new_conns)
for peer in peers:
self._connect_to_peer(peer)
self._manage_deferred.callback(None)
self._manage_deferred = None
@ -179,7 +182,7 @@ class ConnectionManager(object):
new_peers = yield self._get_new_peers(request_creators[1:])
defer.returnValue(new_peers)
def _pick_best_peer(self, peers):
def _pick_best_peers(self, peers, num_peers_to_pick):
# TODO: Eventually rank them based on past performance/reputation. For now
# TODO: just pick the first to which we don't have an open connection
log.debug("%s Got a list of peers to choose from: %s",
@ -189,13 +192,11 @@ class ConnectionManager(object):
log.debug("%s List of connection states: %s", self._get_log_name(),
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
if peers is None:
return None
for peer in peers:
if not peer in self._peer_connections:
log.debug("%s Got a good peer %s", self._get_log_name(), peer)
return peer
log.debug("%s Couldn't find a good peer to connect to", self._get_log_name())
return None
return []
out = [peer for peer in peers if peer not in self._peer_connections]
random.shuffle(out)
return out[0:num_peers_to_pick]
def _connect_to_peer(self, peer):
if peer is None or self.stopped:

View file

@ -177,13 +177,11 @@ class TestIntegrationConnectionManager(unittest.TestCase):
@defer.inlineCallbacks
def test_parallel_connections(self):
# Test to see that we make new connection for each manage call,
# Test to see that we make two new connections at a manage call,
# without it waiting for the connection to complete
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(1, self.connection_manager.num_peer_connections())
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(2, self.connection_manager.num_peer_connections())
self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections)
self.assertIn(test_peer2, self.connection_manager._peer_connections)