diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py index a65f7048a..3705f6e68 100644 --- a/lbrynet/core/Peer.py +++ b/lbrynet/core/Peer.py @@ -2,17 +2,13 @@ import datetime from collections import defaultdict from lbrynet.core import utils -# Do not create this object except through PeerManager + class Peer(object): def __init__(self, host, port): self.host = host self.port = port - # If a peer is reported down, we wait till this time till reattempting connection self.attempt_connection_at = None - # Number of times this Peer has been reported to be down, resets to 0 when it is up self.down_count = 0 - # Number of succesful connections (with full protocol completion) to this peer - self.success_count = 0 self.score = 0 self.stats = defaultdict(float) # {string stat_type, float count} @@ -25,9 +21,6 @@ class Peer(object): self.down_count = 0 self.attempt_connection_at = None - def report_success(self): - self.success_count += 1 - def report_down(self): self.down_count += 1 timeout_time = datetime.timedelta(seconds=60 * self.down_count) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index ba4153280..dc0440cb9 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -23,6 +23,7 @@ def encode_decimal(obj): class ClientProtocol(Protocol): implements(IRequestSender, IRateLimited) + ######### Protocol ######### def connectionMade(self): @@ -73,7 +74,6 @@ class ClientProtocol(Protocol): if self._blob_download_request is not None: self._blob_download_request.cancel(err) self._connection_manager.protocol_disconnected(self.peer, self) - self.factory.deferred.callback(True) ######### IRequestSender ######### @@ -132,8 +132,8 @@ class ClientProtocol(Protocol): log.info( "Closing the connection to %s due to having no further requests to send", self.peer) - self.peer.report_success() self.transport.loseConnection() + d = self._connection_manager.get_next_request(self.peer, self) d.addCallback(send_request_or_close) d.addErrback(self._handle_request_error) @@ -208,11 +208,10 @@ class ClientProtocol(Protocol): log.info(result.value) log.info("The connection to %s is closing due to an error: %s", self.peer, result.getTraceback()) - - self.peer.report_down() if failed is False: log.debug("Asking for another request from %s", self.peer) - self._ask_for_request() + from twisted.internet import reactor + reactor.callLater(0, self._ask_for_request) else: log.debug("Not asking for another request from %s", self.peer) self.transport.loseConnection() @@ -258,13 +257,11 @@ class ClientProtocolFactory(ClientFactory): self.rate_limiter = rate_limiter self.connection_manager = connection_manager self.p = None - self.deferred = defer.Deferred() def clientConnectionFailed(self, connector, reason): log.debug("Connection failed to %s: %s", self.peer, reason) self.peer.report_down() self.connection_manager.protocol_disconnected(self.peer, connector) - self.deferred.errback(reason) def buildProtocol(self, addr): p = self.protocol() diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index 2b9cb8842..a3dc48401 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -1,5 +1,5 @@ import logging -from twisted.internet import defer, reactor +from twisted.internet import defer from zope.interface import implements from lbrynet import interfaces from lbrynet import conf @@ -19,8 +19,6 @@ class PeerConnectionHandler(object): class ConnectionManager(object): implements(interfaces.IConnectionManager) - callLater = reactor.callLater - MANAGE_CALL_INTERVAL_SEC = 1 def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators): @@ -46,17 +44,16 @@ class ConnectionManager(object): out = 'Connection Manager '+self.downloader.blob_hash return out - def _start(self): - self.stopped = False - if self._next_manage_call is not None and self._next_manage_call.active() is True: - self._next_manage_call.cancel() - def start(self): log.debug("%s starting", self._get_log_name()) - self._start() - self._next_manage_call = self.callLater(0, self.manage) - return defer.succeed(True) + from twisted.internet import reactor + self.stopped = False + + if self._next_manage_call is not None and self._next_manage_call.active() is True: + self._next_manage_call.cancel() + self._next_manage_call = reactor.callLater(0, self._manage) + return defer.succeed(True) @defer.inlineCallbacks def stop(self): @@ -71,10 +68,8 @@ class ConnectionManager(object): self._next_manage_call = None yield self._close_peers() - def num_peer_connections(self): - return len(self._peer_connections) - def _close_peers(self): + def disconnect_peer(p): d = defer.Deferred() self._connections_closing[p] = d @@ -152,27 +147,27 @@ class ConnectionManager(object): del self._connections_closing[peer] d.callback(True) - def manage(self): - self._manage_deferred = self._manage() - if not self.stopped: - self._next_manage_call = self.callLater(self.MANAGE_CALL_INTERVAL_SEC, self.manage) - - @defer.inlineCallbacks def _manage(self): + self._manage_deferred = defer.Deferred() + + from twisted.internet import reactor if len(self._peer_connections) < conf.settings['max_connections_per_stream']: log.debug("%s have %d connections, looking for %d", self._get_log_name(), len(self._peer_connections), conf.settings['max_connections_per_stream']) - ordered_request_creators = self._rank_request_creator_connections() - peers = yield self._get_new_peers(ordered_request_creators) - peer = self._pick_best_peer(peers) try: + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) yield self._connect_to_peer(peer) except Exception: # log this otherwise it will just end up as an unhandled error in deferred - # Can happen if connection fails with ConnectionRefusedError - log.exception('Something bad happened connecting to a peer') + log.exception('Something bad happened picking a peer') + self._manage_deferred.callback(None) + self._manage_deferred = None + if not self.stopped: + self._next_manage_call = reactor.callLater(1, self._manage) def _rank_request_creator_connections(self): """Returns an ordered list of our request creators, ranked according @@ -218,10 +213,11 @@ class ConnectionManager(object): if peer is None or self.stopped: return + from twisted.internet import reactor + log.debug("%s Trying to connect to %s", self._get_log_name(), peer) factory = ClientProtocolFactory(peer, self.rate_limiter, self) self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], factory) connection = reactor.connectTCP(peer.host, peer.port, factory) self._peer_connections[peer].connection = connection - return factory.deferred diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py deleted file mode 100644 index 1746e9e93..000000000 --- a/tests/unit/core/client/test_ConnectionManager.py +++ /dev/null @@ -1,188 +0,0 @@ -import sys -import time -import logging - -from lbrynet.core import log_support -from lbrynet.core.client.ConnectionManager import ConnectionManager -from lbrynet.core.client.ClientRequest import ClientRequest -from lbrynet.core.server.ServerProtocol import ServerProtocol -from lbrynet.core.RateLimiter import RateLimiter -from lbrynet.core.Peer import Peer -from lbrynet.core.PeerManager import PeerManager -from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError - -from twisted.trial import unittest -from twisted.internet import defer,reactor,task -from twisted.internet.task import deferLater -from twisted.internet.protocol import Protocol,ServerFactory -from lbrynet import conf -from lbrynet.interfaces import IQueryHandlerFactory,IQueryHandler,IRequestCreator - -from zope.interface import implements -conf.initialize_settings() - -# Set up logging if debugging is necessary for writing more tests -LOG_TO_CONSOLE = False -if LOG_TO_CONSOLE: - handler = logging.StreamHandler(sys.stdout) - logging.basicConfig() - logger = logging.getLogger() - logger.setLevel(0) - log_support.configure_console() - log_support.configure_twisted() - - -PEER_PORT = 5551 -LOCAL_HOST = '127.0.0.1' - -class MocDownloader(object): - def insufficient_funds(self): - pass - -class MocRequestCreator(object): - implements(IRequestCreator) - def __init__(self, peer_to_return): - self.peer_to_return = peer_to_return - self.sent_request = False - - def send_next_request(self, peer, protocol): - if self.sent_request is True: - return defer.succeed(False) - response_identifier = 'moc_request' - r_dict = {'moc_request':0} - request=ClientRequest(r_dict,response_identifier) - d = protocol.add_request(request) # ClientRequest here - """ - If connection timed out, goes here with twisted.internet.error.ConnectionAborted - a) clientprotocol, timeoutConnection - b) clientprotocol connectionLost - this adds errback to any outstanding responses - c) if there is no errback handler, error gets propagated - If bad response returns lbrynet.core.Error.NoResponseError - a) clientprotocol._handle_response - if there is no response deferreds for it adds errback NoResponseError - if response has errback, it calls transport.loseConnection() - Note that the response handler could possibly swallow the error here - """ - d.addErrback(self.request_err,peer) - d.addCallback(self.request_success) - self.sent_request = True - return defer.succeed(True) - - def request_success(self,suc): - pass - - def request_err(self,err,peer): - if isinstance(err.value,NoResponseError): - return err - - def get_new_peers(self): - return [self.peer_to_return] - -class MocFunctionalQueryHandler(object): - implements(IQueryHandler) - - def __init__(self, clock, is_good=True,is_delayed=False): - self.query_identifiers = [ 'moc_request' ] - self.is_good = is_good - self.is_delayed = is_delayed - self.clock = clock - - def register_with_request_handler(self, request_handler, peer): - request_handler.register_query_handler(self, self.query_identifiers) - - def handle_queries(self, queries): - if self.query_identifiers[0] in queries: - if self.is_delayed: - out = deferLater(self.clock, 10, lambda: {'moc_request':0}) - self.clock.advance(10) - return out - if self.is_good: - return defer.succeed({'moc_request':0}) - else: - return defer.succeed({'bad_request':0}) - else: - return defer.succeed({}) - - -class MocQueryHandlerFactory(object): - implements(IQueryHandlerFactory) - # is is_good, the query handler works as expectd, - # is is_delayed, the query handler will delay its resposne - def __init__(self,clock,is_good=True,is_delayed=False): - self.is_good = is_good - self.is_delayed = is_delayed - self.clock = clock - def build_query_handler(self): - return MocFunctionalQueryHandler(self.clock,self.is_good,self.is_delayed) - - def get_primary_query_identifier(self): - return 'moc_query' - - def get_description(self): - return "This is a Moc Query" - - -class MocServerProtocolFactory(ServerFactory): - protocol = ServerProtocol - def __init__(self, clock, is_good=True, is_delayed= False, has_moc_query_handler=True): - self.rate_limiter = RateLimiter() - query_handler_factory = MocQueryHandlerFactory(clock, is_good,is_delayed) - if has_moc_query_handler: - self.query_handler_factories = { - query_handler_factory.get_primary_query_identifier():query_handler_factory - } - else: - self.query_handler_factories = {} - self.peer_manager = PeerManager() - - -# Setup a server so that ConnectionManager can connect to it -class TestIntegrationConnectionManager(unittest.TestCase): - def setUp(self): - self.TEST_PEER = Peer(LOCAL_HOST,PEER_PORT) - self.downloader = MocDownloader() - self.rate_limiter = RateLimiter() - self.primary_request_creator = MocRequestCreator(self.TEST_PEER) - self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, - [self.primary_request_creator], []) - - self.clock = task.Clock() - self.connection_manager.callLater = self.clock.callLater - self.connection_manager._start() - self.server_port = None - - def tearDown(self): - if self.server_port is not None: - self.server_port.stopListening() - self.connection_manager.stop() - - - @defer.inlineCallbacks - def test_success(self): - # test to see that if we setup a server, we get a connection - self.server = MocServerProtocolFactory(self.clock) - self.server_port = reactor.listenTCP(PEER_PORT,self.server,interface=LOCAL_HOST) - - d = yield self.connection_manager._manage() - self.assertEqual(1, self.TEST_PEER.success_count) - - - @defer.inlineCallbacks - def test_bad_server(self): - # test to see that if we setup a server that returns an improper reply - # we don't get a connection - self.server = MocServerProtocolFactory(self.clock, is_good=False) - self.server_port = reactor.listenTCP(PEER_PORT,self.server,interface=LOCAL_HOST) - - d = yield self.connection_manager._manage() - self.assertEqual(0, self.TEST_PEER.success_count) - self.assertEqual(1, self.TEST_PEER.down_count) - - @defer.inlineCallbacks - def test_non_existing_server(self): - # Test to see that if we don't setup a server, we don't get a connection - d = yield self.connection_manager._manage() - self.assertEqual(0, self.connection_manager.num_peer_connections()) - self.assertEqual(0,self.TEST_PEER.success_count) - self.assertEqual(1,self.TEST_PEER.down_count) -