forked from LBRYCommunity/lbry-sdk
Create an optional way of downloading by head blob first in ConnectionManager
This commit is contained in:
parent
33843b3a19
commit
a31b6b192f
4 changed files with 88 additions and 37 deletions
|
@ -255,6 +255,7 @@ ADJUSTABLE_SETTINGS = {
|
|||
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
|
||||
'lbryum_wallet_dir': (str, default_lbryum_dir),
|
||||
'max_connections_per_stream': (int, 5),
|
||||
'seek_head_blob_first': (bool, False),
|
||||
# TODO: writing json on the cmd line is a pain, come up with a nicer
|
||||
# parser for this data structure. maybe 'USD:25'
|
||||
'max_key_fee': (json.loads, {'currency': 'USD', 'amount': 50.0}),
|
||||
|
|
|
@ -64,10 +64,19 @@ class BlobRequester(object):
|
|||
return defer.succeed(False)
|
||||
return self._send_next_request(peer, protocol)
|
||||
|
||||
def get_new_peers(self):
|
||||
d = self._get_hash_for_peer_search()
|
||||
d.addCallback(self._find_peers_for_hash)
|
||||
return d
|
||||
@defer.inlineCallbacks
|
||||
def get_new_peers_for_head_blob(self):
|
||||
""" look for peers for the head blob """
|
||||
head_blob_hash = self._download_manager.get_head_blob_hash()
|
||||
peers = yield self._find_peers_for_hash(head_blob_hash)
|
||||
defer.returnValue(peers)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_new_peers_for_next_unavailable(self):
|
||||
""" look for peers for the next unavailable blob """
|
||||
blob_hash = yield self._get_hash_for_peer_search()
|
||||
peers = yield self._find_peers_for_hash(blob_hash)
|
||||
defer.returnValue(peers)
|
||||
|
||||
######### internal calls #########
|
||||
def should_send_next_request(self, peer):
|
||||
|
|
|
@ -25,10 +25,12 @@ class ConnectionManager(object):
|
|||
|
||||
def __init__(self, downloader, rate_limiter,
|
||||
primary_request_creators, secondary_request_creators):
|
||||
|
||||
self.downloader = downloader
|
||||
self.rate_limiter = rate_limiter
|
||||
self._primary_request_creators = primary_request_creators
|
||||
self._secondary_request_creators = secondary_request_creators
|
||||
self.seek_head_blob_first = conf.settings['seek_head_blob_first']
|
||||
self._peer_connections = {} # {Peer: PeerConnectionHandler}
|
||||
self._connections_closing = {} # {Peer: deferred (fired when the connection is closed)}
|
||||
self._next_manage_call = None
|
||||
|
@ -150,10 +152,7 @@ class ConnectionManager(object):
|
|||
log.debug("%s have %d connections, looking for %d",
|
||||
self._get_log_name(), len(self._peer_connections),
|
||||
conf.settings['max_connections_per_stream'])
|
||||
ordered_request_creators = self._rank_request_creator_connections()
|
||||
peers = yield self._get_new_peers(ordered_request_creators)
|
||||
new_conns = conf.settings['max_connections_per_stream'] - len(self._peer_connections)
|
||||
peers = self._pick_best_peers(peers, new_conns)
|
||||
peers = yield self._get_new_peers()
|
||||
for peer in peers:
|
||||
self._connect_to_peer(peer)
|
||||
self._manage_deferred.callback(None)
|
||||
|
@ -161,42 +160,46 @@ class ConnectionManager(object):
|
|||
if not self.stopped and schedule_next_call:
|
||||
self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
|
||||
|
||||
def _rank_request_creator_connections(self):
|
||||
"""Returns an ordered list of our request creators, ranked according
|
||||
to which has the least number of connections open that it
|
||||
likes
|
||||
"""
|
||||
def count_peers(request_creator):
|
||||
return len([
|
||||
p for p in self._peer_connections.itervalues()
|
||||
if request_creator in p.request_creators])
|
||||
|
||||
return sorted(self._primary_request_creators, key=count_peers)
|
||||
def return_shuffled_peers_not_connected_to(self, peers, new_conns_needed):
|
||||
if peers is None:
|
||||
# can happen if there is some error in the lookup
|
||||
return []
|
||||
out = [peer for peer in peers if peer not in self._peer_connections]
|
||||
random.shuffle(out)
|
||||
out = out[0:new_conns_needed]
|
||||
return out
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _get_new_peers(self, request_creators):
|
||||
def _get_new_peers(self):
|
||||
new_conns_needed = conf.settings['max_connections_per_stream'] - len(self._peer_connections)
|
||||
if new_conns_needed < 1:
|
||||
defer.returnValue([])
|
||||
# we always get the peer from the first request creator
|
||||
# must be a type BlobRequester...
|
||||
request_creator = self._primary_request_creators[0]
|
||||
log.debug("%s Trying to get a new peer to connect to", self._get_log_name())
|
||||
if not request_creators:
|
||||
defer.returnValue(None)
|
||||
new_peers = yield request_creators[0].get_new_peers()
|
||||
if not new_peers:
|
||||
new_peers = yield self._get_new_peers(request_creators[1:])
|
||||
defer.returnValue(new_peers)
|
||||
|
||||
def _pick_best_peers(self, peers, num_peers_to_pick):
|
||||
# TODO: Eventually rank them based on past performance/reputation. For now
|
||||
# TODO: just pick the first to which we don't have an open connection
|
||||
# find peers for the head blob if configured to do so
|
||||
if self.seek_head_blob_first is True:
|
||||
peers = yield request_creator.get_new_peers_for_head_blob()
|
||||
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||
else:
|
||||
peers = []
|
||||
|
||||
# we didn't find any new peers on the head blob,
|
||||
# we have to look for the first unavailable blob
|
||||
if len(peers) == 0:
|
||||
peers = yield request_creator.get_new_peers_for_next_unavailable()
|
||||
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||
|
||||
log.debug("%s Got a list of peers to choose from: %s",
|
||||
self._get_log_name(), peers)
|
||||
log.debug("%s Current connections: %s",
|
||||
self._get_log_name(), self._peer_connections.keys())
|
||||
log.debug("%s List of connection states: %s", self._get_log_name(),
|
||||
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
||||
if peers is None:
|
||||
return []
|
||||
out = [peer for peer in peers if peer not in self._peer_connections]
|
||||
random.shuffle(out)
|
||||
return out[0:num_peers_to_pick]
|
||||
|
||||
defer.returnValue(peers)
|
||||
|
||||
|
||||
def _connect_to_peer(self, peer):
|
||||
|
|
|
@ -30,8 +30,9 @@ class MocDownloader(object):
|
|||
|
||||
class MocRequestCreator(object):
|
||||
implements(IRequestCreator)
|
||||
def __init__(self, peers_to_return):
|
||||
def __init__(self, peers_to_return, peers_to_return_head_blob=[]):
|
||||
self.peers_to_return = peers_to_return
|
||||
self.peers_to_return_head_blob = peers_to_return_head_blob
|
||||
self.sent_request = False
|
||||
|
||||
def send_next_request(self, peer, protocol):
|
||||
|
@ -53,9 +54,12 @@ class MocRequestCreator(object):
|
|||
if isinstance(err.value, NoResponseError):
|
||||
return err
|
||||
|
||||
def get_new_peers(self):
|
||||
def get_new_peers_for_next_unavailable(self):
|
||||
return self.peers_to_return
|
||||
|
||||
def get_new_peers_for_head_blob(self):
|
||||
return self.peers_to_return_head_blob
|
||||
|
||||
class MocFunctionalQueryHandler(object):
|
||||
implements(IQueryHandler)
|
||||
|
||||
|
@ -125,12 +129,17 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
|
||||
self.clock = task.Clock()
|
||||
utils.call_later = self.clock.callLater
|
||||
self.server_port = None
|
||||
|
||||
def _init_connection_manager(self, seek_head_blob_first=False):
|
||||
# this import is requierd here so utils.call_later is replaced by self.clock.callLater
|
||||
from lbrynet.core.client.ConnectionManager import ConnectionManager
|
||||
self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
|
||||
[self.primary_request_creator], [])
|
||||
|
||||
|
||||
self.connection_manager.seek_head_blob_first = seek_head_blob_first
|
||||
self.connection_manager._start()
|
||||
self.server_port = None
|
||||
|
||||
def tearDown(self):
|
||||
if self.server_port is not None:
|
||||
|
@ -140,6 +149,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def test_success(self):
|
||||
self._init_connection_manager()
|
||||
# test to see that if we setup a server, we get a connection
|
||||
self.server = MocServerProtocolFactory(self.clock)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
|
@ -153,6 +163,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def test_server_with_improper_reply(self):
|
||||
self._init_connection_manager()
|
||||
self.server = MocServerProtocolFactory(self.clock, is_good=False)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
|
@ -166,6 +177,8 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
@defer.inlineCallbacks
|
||||
def test_non_existing_server(self):
|
||||
# Test to see that if we don't setup a server, we don't get a connection
|
||||
|
||||
self._init_connection_manager()
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
|
||||
|
@ -180,6 +193,8 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
def test_parallel_connections(self):
|
||||
# Test to see that we make two new connections at a manage call,
|
||||
# without it waiting for the connection to complete
|
||||
|
||||
self._init_connection_manager()
|
||||
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
|
||||
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
|
@ -203,6 +218,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
# test to see that when we call stop, the ConnectionManager waits for the
|
||||
# current manage call to finish, closes connections,
|
||||
# and removes scheduled manage calls
|
||||
self._init_connection_manager()
|
||||
self.connection_manager.manage(schedule_next_call=True)
|
||||
yield self.connection_manager.stop()
|
||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||
|
@ -212,6 +228,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def test_closed_connection_when_server_is_slow(self):
|
||||
self._init_connection_manager()
|
||||
self.server = MocServerProtocolFactory(self.clock, has_moc_query_handler=True,is_delayed=True)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
|
||||
|
@ -224,3 +241,24 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
|||
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||
|
||||
|
||||
""" test header first seeks """
|
||||
@defer.inlineCallbacks
|
||||
def test_no_peer_for_head_blob(self):
|
||||
# test that if we can't find blobs for the head blob,
|
||||
# it looks at the next unavailable and makes connection
|
||||
self._init_connection_manager(seek_head_blob_first=True)
|
||||
self.server = MocServerProtocolFactory(self.clock)
|
||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||
|
||||
self.primary_request_creator.peers_to_return_head_blob = []
|
||||
self.primary_request_creator.peers_to_return = [self.TEST_PEER]
|
||||
|
||||
yield self.connection_manager.manage(schedule_next_call=False)
|
||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
|
||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||
self.assertTrue(connection_made)
|
||||
self.assertEqual(1, self.TEST_PEER.success_count)
|
||||
self.assertEqual(0, self.TEST_PEER.down_count)
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue