Merge pull request #819 from lbryio/client_seek_head_blob_only
Add option for clients to seek head blob first when downloading
This commit is contained in:
commit
009080ba69
5 changed files with 111 additions and 57 deletions
|
@ -14,6 +14,7 @@ at anytime.
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
* Added option to announce head blob only if seeding
|
* Added option to announce head blob only if seeding
|
||||||
|
* Adeed option to download by seeking head blob first
|
||||||
*
|
*
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
|
@ -255,6 +255,7 @@ ADJUSTABLE_SETTINGS = {
|
||||||
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
|
'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port),
|
||||||
'lbryum_wallet_dir': (str, default_lbryum_dir),
|
'lbryum_wallet_dir': (str, default_lbryum_dir),
|
||||||
'max_connections_per_stream': (int, 5),
|
'max_connections_per_stream': (int, 5),
|
||||||
|
'seek_head_blob_first': (bool, False),
|
||||||
# TODO: writing json on the cmd line is a pain, come up with a nicer
|
# TODO: writing json on the cmd line is a pain, come up with a nicer
|
||||||
# parser for this data structure. maybe 'USD:25'
|
# parser for this data structure. maybe 'USD:25'
|
||||||
'max_key_fee': (json.loads, {'currency': 'USD', 'amount': 50.0}),
|
'max_key_fee': (json.loads, {'currency': 'USD', 'amount': 50.0}),
|
||||||
|
|
|
@ -64,10 +64,24 @@ class BlobRequester(object):
|
||||||
return defer.succeed(False)
|
return defer.succeed(False)
|
||||||
return self._send_next_request(peer, protocol)
|
return self._send_next_request(peer, protocol)
|
||||||
|
|
||||||
def get_new_peers(self):
|
@defer.inlineCallbacks
|
||||||
d = self._get_hash_for_peer_search()
|
def get_new_peers_for_head_blob(self):
|
||||||
d.addCallback(self._find_peers_for_hash)
|
""" look for peers for the head blob """
|
||||||
return d
|
head_blob_hash = self._download_manager.get_head_blob_hash()
|
||||||
|
peers = yield self._find_peers_for_hash(head_blob_hash)
|
||||||
|
defer.returnValue(peers)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_new_peers_for_next_unavailable(self):
|
||||||
|
"""
|
||||||
|
Look for peers for the next unavailable blob, if we have
|
||||||
|
all blobs, return an empty list
|
||||||
|
"""
|
||||||
|
blob_hash = yield self._get_hash_for_peer_search()
|
||||||
|
if blob_hash is None:
|
||||||
|
defer.returnValue([])
|
||||||
|
peers = yield self._find_peers_for_hash(blob_hash)
|
||||||
|
defer.returnValue(peers)
|
||||||
|
|
||||||
######### internal calls #########
|
######### internal calls #########
|
||||||
def should_send_next_request(self, peer):
|
def should_send_next_request(self, peer):
|
||||||
|
@ -103,6 +117,10 @@ class BlobRequester(object):
|
||||||
return defer.succeed(sent_request)
|
return defer.succeed(sent_request)
|
||||||
|
|
||||||
def _get_hash_for_peer_search(self):
|
def _get_hash_for_peer_search(self):
|
||||||
|
"""
|
||||||
|
Get next unavailable hash for blob,
|
||||||
|
returns None if there is nothing left to download
|
||||||
|
"""
|
||||||
r = None
|
r = None
|
||||||
blobs_to_download = self._blobs_to_download()
|
blobs_to_download = self._blobs_to_download()
|
||||||
if blobs_to_download:
|
if blobs_to_download:
|
||||||
|
@ -116,26 +134,23 @@ class BlobRequester(object):
|
||||||
return defer.succeed(r)
|
return defer.succeed(r)
|
||||||
|
|
||||||
def _find_peers_for_hash(self, h):
|
def _find_peers_for_hash(self, h):
|
||||||
if h is None:
|
d = self.peer_finder.find_peers_for_blob(h)
|
||||||
return None
|
|
||||||
else:
|
|
||||||
d = self.peer_finder.find_peers_for_blob(h)
|
|
||||||
|
|
||||||
def choose_best_peers(peers):
|
def choose_best_peers(peers):
|
||||||
bad_peers = self._get_bad_peers()
|
bad_peers = self._get_bad_peers()
|
||||||
without_bad_peers = [p for p in peers if not p in bad_peers]
|
without_bad_peers = [p for p in peers if not p in bad_peers]
|
||||||
without_maxed_out_peers = [
|
without_maxed_out_peers = [
|
||||||
p for p in without_bad_peers if p not in self._maxed_out_peers]
|
p for p in without_bad_peers if p not in self._maxed_out_peers]
|
||||||
return without_maxed_out_peers
|
return without_maxed_out_peers
|
||||||
|
|
||||||
d.addCallback(choose_best_peers)
|
d.addCallback(choose_best_peers)
|
||||||
|
|
||||||
def lookup_failed(err):
|
def lookup_failed(err):
|
||||||
log.error("An error occurred looking up peers for a hash: %s", err.getTraceback())
|
log.error("An error occurred looking up peers for a hash: %s", err.getTraceback())
|
||||||
return []
|
return []
|
||||||
|
|
||||||
d.addErrback(lookup_failed)
|
d.addErrback(lookup_failed)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _should_send_request_to(self, peer):
|
def _should_send_request_to(self, peer):
|
||||||
if self._peers[peer] < -5.0:
|
if self._peers[peer] < -5.0:
|
||||||
|
|
|
@ -25,6 +25,10 @@ class ConnectionManager(object):
|
||||||
|
|
||||||
def __init__(self, downloader, rate_limiter,
|
def __init__(self, downloader, rate_limiter,
|
||||||
primary_request_creators, secondary_request_creators):
|
primary_request_creators, secondary_request_creators):
|
||||||
|
|
||||||
|
self.seek_head_blob_first = conf.settings['seek_head_blob_first']
|
||||||
|
self.max_connections_per_stream = conf.settings['max_connections_per_stream']
|
||||||
|
|
||||||
self.downloader = downloader
|
self.downloader = downloader
|
||||||
self.rate_limiter = rate_limiter
|
self.rate_limiter = rate_limiter
|
||||||
self._primary_request_creators = primary_request_creators
|
self._primary_request_creators = primary_request_creators
|
||||||
|
@ -146,14 +150,11 @@ class ConnectionManager(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def manage(self, schedule_next_call=True):
|
def manage(self, schedule_next_call=True):
|
||||||
self._manage_deferred = defer.Deferred()
|
self._manage_deferred = defer.Deferred()
|
||||||
if len(self._peer_connections) < conf.settings['max_connections_per_stream']:
|
if len(self._peer_connections) < self.max_connections_per_stream:
|
||||||
log.debug("%s have %d connections, looking for %d",
|
log.debug("%s have %d connections, looking for %d",
|
||||||
self._get_log_name(), len(self._peer_connections),
|
self._get_log_name(), len(self._peer_connections),
|
||||||
conf.settings['max_connections_per_stream'])
|
self.max_connections_per_stream)
|
||||||
ordered_request_creators = self._rank_request_creator_connections()
|
peers = yield self._get_new_peers()
|
||||||
peers = yield self._get_new_peers(ordered_request_creators)
|
|
||||||
new_conns = conf.settings['max_connections_per_stream'] - len(self._peer_connections)
|
|
||||||
peers = self._pick_best_peers(peers, new_conns)
|
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
self._connect_to_peer(peer)
|
self._connect_to_peer(peer)
|
||||||
self._manage_deferred.callback(None)
|
self._manage_deferred.callback(None)
|
||||||
|
@ -161,46 +162,46 @@ class ConnectionManager(object):
|
||||||
if not self.stopped and schedule_next_call:
|
if not self.stopped and schedule_next_call:
|
||||||
self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
|
self._next_manage_call = utils.call_later(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
|
||||||
|
|
||||||
def _rank_request_creator_connections(self):
|
def return_shuffled_peers_not_connected_to(self, peers, new_conns_needed):
|
||||||
"""Returns an ordered list of our request creators, ranked according
|
out = [peer for peer in peers if peer not in self._peer_connections]
|
||||||
to which has the least number of connections open that it
|
random.shuffle(out)
|
||||||
likes
|
return out[0:new_conns_needed]
|
||||||
"""
|
|
||||||
def count_peers(request_creator):
|
|
||||||
return len([
|
|
||||||
p for p in self._peer_connections.itervalues()
|
|
||||||
if request_creator in p.request_creators])
|
|
||||||
|
|
||||||
return sorted(self._primary_request_creators, key=count_peers)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_new_peers(self, request_creators):
|
def _get_new_peers(self):
|
||||||
|
new_conns_needed = self.max_connections_per_stream - len(self._peer_connections)
|
||||||
|
if new_conns_needed < 1:
|
||||||
|
defer.returnValue([])
|
||||||
|
# we always get the peer from the first request creator
|
||||||
|
# must be a type BlobRequester...
|
||||||
|
request_creator = self._primary_request_creators[0]
|
||||||
log.debug("%s Trying to get a new peer to connect to", self._get_log_name())
|
log.debug("%s Trying to get a new peer to connect to", self._get_log_name())
|
||||||
if not request_creators:
|
|
||||||
defer.returnValue(None)
|
|
||||||
new_peers = yield request_creators[0].get_new_peers()
|
|
||||||
if not new_peers:
|
|
||||||
new_peers = yield self._get_new_peers(request_creators[1:])
|
|
||||||
defer.returnValue(new_peers)
|
|
||||||
|
|
||||||
def _pick_best_peers(self, peers, num_peers_to_pick):
|
# find peers for the head blob if configured to do so
|
||||||
# TODO: Eventually rank them based on past performance/reputation. For now
|
if self.seek_head_blob_first:
|
||||||
# TODO: just pick the first to which we don't have an open connection
|
peers = yield request_creator.get_new_peers_for_head_blob()
|
||||||
|
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||||
|
else:
|
||||||
|
peers = []
|
||||||
|
|
||||||
|
# we didn't find any new peers on the head blob,
|
||||||
|
# we have to look for the first unavailable blob
|
||||||
|
if not peers:
|
||||||
|
peers = yield request_creator.get_new_peers_for_next_unavailable()
|
||||||
|
peers = self.return_shuffled_peers_not_connected_to(peers, new_conns_needed)
|
||||||
|
|
||||||
log.debug("%s Got a list of peers to choose from: %s",
|
log.debug("%s Got a list of peers to choose from: %s",
|
||||||
self._get_log_name(), peers)
|
self._get_log_name(), peers)
|
||||||
log.debug("%s Current connections: %s",
|
log.debug("%s Current connections: %s",
|
||||||
self._get_log_name(), self._peer_connections.keys())
|
self._get_log_name(), self._peer_connections.keys())
|
||||||
log.debug("%s List of connection states: %s", self._get_log_name(),
|
log.debug("%s List of connection states: %s", self._get_log_name(),
|
||||||
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
[p_c_h.connection.state for p_c_h in self._peer_connections.values()])
|
||||||
if peers is None:
|
|
||||||
return []
|
defer.returnValue(peers)
|
||||||
out = [peer for peer in peers if peer not in self._peer_connections]
|
|
||||||
random.shuffle(out)
|
|
||||||
return out[0:num_peers_to_pick]
|
|
||||||
|
|
||||||
|
|
||||||
def _connect_to_peer(self, peer):
|
def _connect_to_peer(self, peer):
|
||||||
if peer is None or self.stopped:
|
if self.stopped:
|
||||||
return
|
return
|
||||||
|
|
||||||
log.debug("%s Trying to connect to %s", self._get_log_name(), peer)
|
log.debug("%s Trying to connect to %s", self._get_log_name(), peer)
|
||||||
|
|
|
@ -30,8 +30,9 @@ class MocDownloader(object):
|
||||||
|
|
||||||
class MocRequestCreator(object):
|
class MocRequestCreator(object):
|
||||||
implements(IRequestCreator)
|
implements(IRequestCreator)
|
||||||
def __init__(self, peers_to_return):
|
def __init__(self, peers_to_return, peers_to_return_head_blob=[]):
|
||||||
self.peers_to_return = peers_to_return
|
self.peers_to_return = peers_to_return
|
||||||
|
self.peers_to_return_head_blob = peers_to_return_head_blob
|
||||||
self.sent_request = False
|
self.sent_request = False
|
||||||
|
|
||||||
def send_next_request(self, peer, protocol):
|
def send_next_request(self, peer, protocol):
|
||||||
|
@ -53,9 +54,12 @@ class MocRequestCreator(object):
|
||||||
if isinstance(err.value, NoResponseError):
|
if isinstance(err.value, NoResponseError):
|
||||||
return err
|
return err
|
||||||
|
|
||||||
def get_new_peers(self):
|
def get_new_peers_for_next_unavailable(self):
|
||||||
return self.peers_to_return
|
return self.peers_to_return
|
||||||
|
|
||||||
|
def get_new_peers_for_head_blob(self):
|
||||||
|
return self.peers_to_return_head_blob
|
||||||
|
|
||||||
class MocFunctionalQueryHandler(object):
|
class MocFunctionalQueryHandler(object):
|
||||||
implements(IQueryHandler)
|
implements(IQueryHandler)
|
||||||
|
|
||||||
|
@ -125,12 +129,15 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
|
self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
|
||||||
self.clock = task.Clock()
|
self.clock = task.Clock()
|
||||||
utils.call_later = self.clock.callLater
|
utils.call_later = self.clock.callLater
|
||||||
|
self.server_port = None
|
||||||
|
|
||||||
|
def _init_connection_manager(self, seek_head_blob_first=False):
|
||||||
|
# this import is requierd here so utils.call_later is replaced by self.clock.callLater
|
||||||
from lbrynet.core.client.ConnectionManager import ConnectionManager
|
from lbrynet.core.client.ConnectionManager import ConnectionManager
|
||||||
self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
|
self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
|
||||||
[self.primary_request_creator], [])
|
[self.primary_request_creator], [])
|
||||||
|
self.connection_manager.seek_head_blob_first = seek_head_blob_first
|
||||||
self.connection_manager._start()
|
self.connection_manager._start()
|
||||||
self.server_port = None
|
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
if self.server_port is not None:
|
if self.server_port is not None:
|
||||||
|
@ -140,6 +147,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_success(self):
|
def test_success(self):
|
||||||
|
self._init_connection_manager()
|
||||||
# test to see that if we setup a server, we get a connection
|
# test to see that if we setup a server, we get a connection
|
||||||
self.server = MocServerProtocolFactory(self.clock)
|
self.server = MocServerProtocolFactory(self.clock)
|
||||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||||
|
@ -153,6 +161,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_server_with_improper_reply(self):
|
def test_server_with_improper_reply(self):
|
||||||
|
self._init_connection_manager()
|
||||||
self.server = MocServerProtocolFactory(self.clock, is_good=False)
|
self.server = MocServerProtocolFactory(self.clock, is_good=False)
|
||||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||||
yield self.connection_manager.manage(schedule_next_call=False)
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
|
@ -166,6 +175,8 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_non_existing_server(self):
|
def test_non_existing_server(self):
|
||||||
# Test to see that if we don't setup a server, we don't get a connection
|
# Test to see that if we don't setup a server, we don't get a connection
|
||||||
|
|
||||||
|
self._init_connection_manager()
|
||||||
yield self.connection_manager.manage(schedule_next_call=False)
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||||
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
|
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
|
||||||
|
@ -180,6 +191,8 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
def test_parallel_connections(self):
|
def test_parallel_connections(self):
|
||||||
# Test to see that we make two new connections at a manage call,
|
# Test to see that we make two new connections at a manage call,
|
||||||
# without it waiting for the connection to complete
|
# without it waiting for the connection to complete
|
||||||
|
|
||||||
|
self._init_connection_manager()
|
||||||
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
|
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
|
||||||
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
|
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
|
||||||
yield self.connection_manager.manage(schedule_next_call=False)
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
|
@ -203,6 +216,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
# test to see that when we call stop, the ConnectionManager waits for the
|
# test to see that when we call stop, the ConnectionManager waits for the
|
||||||
# current manage call to finish, closes connections,
|
# current manage call to finish, closes connections,
|
||||||
# and removes scheduled manage calls
|
# and removes scheduled manage calls
|
||||||
|
self._init_connection_manager()
|
||||||
self.connection_manager.manage(schedule_next_call=True)
|
self.connection_manager.manage(schedule_next_call=True)
|
||||||
yield self.connection_manager.stop()
|
yield self.connection_manager.stop()
|
||||||
self.assertEqual(0, self.TEST_PEER.success_count)
|
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||||
|
@ -212,6 +226,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_closed_connection_when_server_is_slow(self):
|
def test_closed_connection_when_server_is_slow(self):
|
||||||
|
self._init_connection_manager()
|
||||||
self.server = MocServerProtocolFactory(self.clock, has_moc_query_handler=True,is_delayed=True)
|
self.server = MocServerProtocolFactory(self.clock, has_moc_query_handler=True,is_delayed=True)
|
||||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||||
|
|
||||||
|
@ -224,3 +239,24 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
self.assertEqual(1, self.TEST_PEER.down_count)
|
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||||
|
|
||||||
|
|
||||||
|
""" test header first seeks """
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_no_peer_for_head_blob(self):
|
||||||
|
# test that if we can't find blobs for the head blob,
|
||||||
|
# it looks at the next unavailable and makes connection
|
||||||
|
self._init_connection_manager(seek_head_blob_first=True)
|
||||||
|
self.server = MocServerProtocolFactory(self.clock)
|
||||||
|
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||||
|
|
||||||
|
self.primary_request_creator.peers_to_return_head_blob = []
|
||||||
|
self.primary_request_creator.peers_to_return = [self.TEST_PEER]
|
||||||
|
|
||||||
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
|
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||||
|
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
|
||||||
|
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||||
|
self.assertTrue(connection_made)
|
||||||
|
self.assertEqual(1, self.TEST_PEER.success_count)
|
||||||
|
self.assertEqual(0, self.TEST_PEER.down_count)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue