forked from LBRYCommunity/lbry-sdk
Merge branch 'connection_manager_improve_manage'
* connection_manager_improve_manage: add changelog decrease connection timeout connect to multiple peers at once in manage
This commit is contained in:
commit
7abfbc4c5f
3 changed files with 17 additions and 15 deletions
|
@ -15,6 +15,7 @@ at anytime.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
* wallet_new_address API command always returns new address
|
* wallet_new_address API command always returns new address
|
||||||
|
* Improved ConnectionManager speed
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import random
|
||||||
import logging
|
import logging
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
|
@ -19,7 +20,8 @@ class PeerConnectionHandler(object):
|
||||||
|
|
||||||
class ConnectionManager(object):
|
class ConnectionManager(object):
|
||||||
implements(interfaces.IConnectionManager)
|
implements(interfaces.IConnectionManager)
|
||||||
MANAGE_CALL_INTERVAL_SEC = 1
|
MANAGE_CALL_INTERVAL_SEC = 5
|
||||||
|
TCP_CONNECT_TIMEOUT = 15
|
||||||
|
|
||||||
def __init__(self, downloader, rate_limiter,
|
def __init__(self, downloader, rate_limiter,
|
||||||
primary_request_creators, secondary_request_creators):
|
primary_request_creators, secondary_request_creators):
|
||||||
|
@ -150,8 +152,10 @@ class ConnectionManager(object):
|
||||||
conf.settings['max_connections_per_stream'])
|
conf.settings['max_connections_per_stream'])
|
||||||
ordered_request_creators = self._rank_request_creator_connections()
|
ordered_request_creators = self._rank_request_creator_connections()
|
||||||
peers = yield self._get_new_peers(ordered_request_creators)
|
peers = yield self._get_new_peers(ordered_request_creators)
|
||||||
peer = self._pick_best_peer(peers)
|
new_conns = conf.settings['max_connections_per_stream'] - len(self._peer_connections)
|
||||||
self._connect_to_peer(peer)
|
peers = self._pick_best_peers(peers, new_conns)
|
||||||
|
for peer in peers:
|
||||||
|
self._connect_to_peer(peer)
|
||||||
self._manage_deferred.callback(None)
|
self._manage_deferred.callback(None)
|
||||||
self._manage_deferred = None
|
self._manage_deferred = None
|
||||||
if not self.stopped and schedule_next_call:
|
if not self.stopped and schedule_next_call:
|
||||||
|
@ -179,7 +183,7 @@ class ConnectionManager(object):
|
||||||
new_peers = yield self._get_new_peers(request_creators[1:])
|
new_peers = yield self._get_new_peers(request_creators[1:])
|
||||||
defer.returnValue(new_peers)
|
defer.returnValue(new_peers)
|
||||||
|
|
||||||
def _pick_best_peer(self, peers):
|
def _pick_best_peers(self, peers, num_peers_to_pick):
|
||||||
# TODO: Eventually rank them based on past performance/reputation. For now
|
# TODO: Eventually rank them based on past performance/reputation. For now
|
||||||
# TODO: just pick the first to which we don't have an open connection
|
# TODO: just pick the first to which we don't have an open connection
|
||||||
log.debug("%s Got a list of peers to choose from: %s",
|
log.debug("%s Got a list of peers to choose from: %s",
|
||||||
|
@ -189,13 +193,11 @@ class ConnectionManager(object):
|
||||||
log.debug("%s List of connection states: %s", self._get_log_name(),
|
log.debug("%s List of connection states: %s", self._get_log_name(),
|
||||||
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
||||||
if peers is None:
|
if peers is None:
|
||||||
return None
|
return []
|
||||||
for peer in peers:
|
out = [peer for peer in peers if peer not in self._peer_connections]
|
||||||
if not peer in self._peer_connections:
|
random.shuffle(out)
|
||||||
log.debug("%s Got a good peer %s", self._get_log_name(), peer)
|
return out[0:num_peers_to_pick]
|
||||||
return peer
|
|
||||||
log.debug("%s Couldn't find a good peer to connect to", self._get_log_name())
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _connect_to_peer(self, peer):
|
def _connect_to_peer(self, peer):
|
||||||
if peer is None or self.stopped:
|
if peer is None or self.stopped:
|
||||||
|
@ -207,7 +209,8 @@ class ConnectionManager(object):
|
||||||
lambda c_was_made: self._peer_disconnected(c_was_made, peer))
|
lambda c_was_made: self._peer_disconnected(c_was_made, peer))
|
||||||
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
|
||||||
factory)
|
factory)
|
||||||
connection = reactor.connectTCP(peer.host, peer.port, factory)
|
connection = reactor.connectTCP(peer.host, peer.port, factory,
|
||||||
|
timeout=self.TCP_CONNECT_TIMEOUT)
|
||||||
self._peer_connections[peer].connection = connection
|
self._peer_connections[peer].connection = connection
|
||||||
|
|
||||||
def _peer_disconnected(self, connection_was_made, peer):
|
def _peer_disconnected(self, connection_was_made, peer):
|
||||||
|
|
|
@ -177,13 +177,11 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_parallel_connections(self):
|
def test_parallel_connections(self):
|
||||||
# Test to see that we make new connection for each manage call,
|
# Test to see that we make two new connections at a manage call,
|
||||||
# without it waiting for the connection to complete
|
# without it waiting for the connection to complete
|
||||||
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
|
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
|
||||||
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
|
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
|
||||||
yield self.connection_manager.manage(schedule_next_call=False)
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
|
||||||
yield self.connection_manager.manage(schedule_next_call=False)
|
|
||||||
self.assertEqual(2, self.connection_manager.num_peer_connections())
|
self.assertEqual(2, self.connection_manager.num_peer_connections())
|
||||||
self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections)
|
self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections)
|
||||||
self.assertIn(test_peer2, self.connection_manager._peer_connections)
|
self.assertIn(test_peer2, self.connection_manager._peer_connections)
|
||||||
|
|
Loading…
Reference in a new issue