diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py index 3705f6e68..a65f7048a 100644 --- a/lbrynet/core/Peer.py +++ b/lbrynet/core/Peer.py @@ -2,13 +2,17 @@ import datetime from collections import defaultdict from lbrynet.core import utils - +# Do not create this object except through PeerManager class Peer(object): def __init__(self, host, port): self.host = host self.port = port + # If a peer is reported down, we wait till this time till reattempting connection self.attempt_connection_at = None + # Number of times this Peer has been reported to be down, resets to 0 when it is up self.down_count = 0 + # Number of succesful connections (with full protocol completion) to this peer + self.success_count = 0 self.score = 0 self.stats = defaultdict(float) # {string stat_type, float count} @@ -21,6 +25,9 @@ class Peer(object): self.down_count = 0 self.attempt_connection_at = None + def report_success(self): + self.success_count += 1 + def report_down(self): self.down_count += 1 timeout_time = datetime.timedelta(seconds=60 * self.down_count) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index dc0440cb9..ba4153280 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -23,7 +23,6 @@ def encode_decimal(obj): class ClientProtocol(Protocol): implements(IRequestSender, IRateLimited) - ######### Protocol ######### def connectionMade(self): @@ -74,6 +73,7 @@ class ClientProtocol(Protocol): if self._blob_download_request is not None: self._blob_download_request.cancel(err) self._connection_manager.protocol_disconnected(self.peer, self) + self.factory.deferred.callback(True) ######### IRequestSender ######### @@ -132,8 +132,8 @@ class ClientProtocol(Protocol): log.info( "Closing the connection to %s due to having no further requests to send", self.peer) + self.peer.report_success() self.transport.loseConnection() - d = self._connection_manager.get_next_request(self.peer, self) d.addCallback(send_request_or_close) d.addErrback(self._handle_request_error) @@ -208,10 +208,11 @@ class ClientProtocol(Protocol): log.info(result.value) log.info("The connection to %s is closing due to an error: %s", self.peer, result.getTraceback()) + + self.peer.report_down() if failed is False: log.debug("Asking for another request from %s", self.peer) - from twisted.internet import reactor - reactor.callLater(0, self._ask_for_request) + self._ask_for_request() else: log.debug("Not asking for another request from %s", self.peer) self.transport.loseConnection() @@ -257,11 +258,13 @@ class ClientProtocolFactory(ClientFactory): self.rate_limiter = rate_limiter self.connection_manager = connection_manager self.p = None + self.deferred = defer.Deferred() def clientConnectionFailed(self, connector, reason): log.debug("Connection failed to %s: %s", self.peer, reason) self.peer.report_down() self.connection_manager.protocol_disconnected(self.peer, connector) + self.deferred.errback(reason) def buildProtocol(self, addr): p = self.protocol() diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index a3dc48401..2b9cb8842 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -1,5 +1,5 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from zope.interface import implements from lbrynet import interfaces from lbrynet import conf @@ -19,6 +19,8 @@ class PeerConnectionHandler(object): class ConnectionManager(object): implements(interfaces.IConnectionManager) + callLater = reactor.callLater + MANAGE_CALL_INTERVAL_SEC = 1 def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators): @@ -44,17 +46,18 @@ class ConnectionManager(object): out = 'Connection Manager '+self.downloader.blob_hash return out - def start(self): - log.debug("%s starting", self._get_log_name()) - from twisted.internet import reactor - + def _start(self): self.stopped = False - if self._next_manage_call is not None and self._next_manage_call.active() is True: self._next_manage_call.cancel() - self._next_manage_call = reactor.callLater(0, self._manage) + + def start(self): + log.debug("%s starting", self._get_log_name()) + self._start() + self._next_manage_call = self.callLater(0, self.manage) return defer.succeed(True) + @defer.inlineCallbacks def stop(self): log.debug("%s stopping", self._get_log_name()) @@ -68,8 +71,10 @@ class ConnectionManager(object): self._next_manage_call = None yield self._close_peers() - def _close_peers(self): + def num_peer_connections(self): + return len(self._peer_connections) + def _close_peers(self): def disconnect_peer(p): d = defer.Deferred() self._connections_closing[p] = d @@ -147,27 +152,27 @@ class ConnectionManager(object): del self._connections_closing[peer] d.callback(True) + def manage(self): + self._manage_deferred = self._manage() + if not self.stopped: + self._next_manage_call = self.callLater(self.MANAGE_CALL_INTERVAL_SEC, self.manage) + + @defer.inlineCallbacks def _manage(self): - self._manage_deferred = defer.Deferred() - - from twisted.internet import reactor if len(self._peer_connections) < conf.settings['max_connections_per_stream']: log.debug("%s have %d connections, looking for %d", self._get_log_name(), len(self._peer_connections), conf.settings['max_connections_per_stream']) + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) try: - ordered_request_creators = self._rank_request_creator_connections() - peers = yield self._get_new_peers(ordered_request_creators) - peer = self._pick_best_peer(peers) yield self._connect_to_peer(peer) except Exception: # log this otherwise it will just end up as an unhandled error in deferred - log.exception('Something bad happened picking a peer') - self._manage_deferred.callback(None) - self._manage_deferred = None - if not self.stopped: - self._next_manage_call = reactor.callLater(1, self._manage) + # Can happen if connection fails with ConnectionRefusedError + log.exception('Something bad happened connecting to a peer') def _rank_request_creator_connections(self): """Returns an ordered list of our request creators, ranked according @@ -213,11 +218,10 @@ class ConnectionManager(object): if peer is None or self.stopped: return - from twisted.internet import reactor - log.debug("%s Trying to connect to %s", self._get_log_name(), peer) factory = ClientProtocolFactory(peer, self.rate_limiter, self) self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], factory) connection = reactor.connectTCP(peer.host, peer.port, factory) self._peer_connections[peer].connection = connection + return factory.deferred diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py new file mode 100644 index 000000000..1746e9e93 --- /dev/null +++ b/tests/unit/core/client/test_ConnectionManager.py @@ -0,0 +1,188 @@ +import sys +import time +import logging + +from lbrynet.core import log_support +from lbrynet.core.client.ConnectionManager import ConnectionManager +from lbrynet.core.client.ClientRequest import ClientRequest +from lbrynet.core.server.ServerProtocol import ServerProtocol +from lbrynet.core.RateLimiter import RateLimiter +from lbrynet.core.Peer import Peer +from lbrynet.core.PeerManager import PeerManager +from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError + +from twisted.trial import unittest +from twisted.internet import defer,reactor,task +from twisted.internet.task import deferLater +from twisted.internet.protocol import Protocol,ServerFactory +from lbrynet import conf +from lbrynet.interfaces import IQueryHandlerFactory,IQueryHandler,IRequestCreator + +from zope.interface import implements +conf.initialize_settings() + +# Set up logging if debugging is necessary for writing more tests +LOG_TO_CONSOLE = False +if LOG_TO_CONSOLE: + handler = logging.StreamHandler(sys.stdout) + logging.basicConfig() + logger = logging.getLogger() + logger.setLevel(0) + log_support.configure_console() + log_support.configure_twisted() + + +PEER_PORT = 5551 +LOCAL_HOST = '127.0.0.1' + +class MocDownloader(object): + def insufficient_funds(self): + pass + +class MocRequestCreator(object): + implements(IRequestCreator) + def __init__(self, peer_to_return): + self.peer_to_return = peer_to_return + self.sent_request = False + + def send_next_request(self, peer, protocol): + if self.sent_request is True: + return defer.succeed(False) + response_identifier = 'moc_request' + r_dict = {'moc_request':0} + request=ClientRequest(r_dict,response_identifier) + d = protocol.add_request(request) # ClientRequest here + """ + If connection timed out, goes here with twisted.internet.error.ConnectionAborted + a) clientprotocol, timeoutConnection + b) clientprotocol connectionLost - this adds errback to any outstanding responses + c) if there is no errback handler, error gets propagated + If bad response returns lbrynet.core.Error.NoResponseError + a) clientprotocol._handle_response + if there is no response deferreds for it adds errback NoResponseError + if response has errback, it calls transport.loseConnection() + Note that the response handler could possibly swallow the error here + """ + d.addErrback(self.request_err,peer) + d.addCallback(self.request_success) + self.sent_request = True + return defer.succeed(True) + + def request_success(self,suc): + pass + + def request_err(self,err,peer): + if isinstance(err.value,NoResponseError): + return err + + def get_new_peers(self): + return [self.peer_to_return] + +class MocFunctionalQueryHandler(object): + implements(IQueryHandler) + + def __init__(self, clock, is_good=True,is_delayed=False): + self.query_identifiers = [ 'moc_request' ] + self.is_good = is_good + self.is_delayed = is_delayed + self.clock = clock + + def register_with_request_handler(self, request_handler, peer): + request_handler.register_query_handler(self, self.query_identifiers) + + def handle_queries(self, queries): + if self.query_identifiers[0] in queries: + if self.is_delayed: + out = deferLater(self.clock, 10, lambda: {'moc_request':0}) + self.clock.advance(10) + return out + if self.is_good: + return defer.succeed({'moc_request':0}) + else: + return defer.succeed({'bad_request':0}) + else: + return defer.succeed({}) + + +class MocQueryHandlerFactory(object): + implements(IQueryHandlerFactory) + # is is_good, the query handler works as expectd, + # is is_delayed, the query handler will delay its resposne + def __init__(self,clock,is_good=True,is_delayed=False): + self.is_good = is_good + self.is_delayed = is_delayed + self.clock = clock + def build_query_handler(self): + return MocFunctionalQueryHandler(self.clock,self.is_good,self.is_delayed) + + def get_primary_query_identifier(self): + return 'moc_query' + + def get_description(self): + return "This is a Moc Query" + + +class MocServerProtocolFactory(ServerFactory): + protocol = ServerProtocol + def __init__(self, clock, is_good=True, is_delayed= False, has_moc_query_handler=True): + self.rate_limiter = RateLimiter() + query_handler_factory = MocQueryHandlerFactory(clock, is_good,is_delayed) + if has_moc_query_handler: + self.query_handler_factories = { + query_handler_factory.get_primary_query_identifier():query_handler_factory + } + else: + self.query_handler_factories = {} + self.peer_manager = PeerManager() + + +# Setup a server so that ConnectionManager can connect to it +class TestIntegrationConnectionManager(unittest.TestCase): + def setUp(self): + self.TEST_PEER = Peer(LOCAL_HOST,PEER_PORT) + self.downloader = MocDownloader() + self.rate_limiter = RateLimiter() + self.primary_request_creator = MocRequestCreator(self.TEST_PEER) + self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, + [self.primary_request_creator], []) + + self.clock = task.Clock() + self.connection_manager.callLater = self.clock.callLater + self.connection_manager._start() + self.server_port = None + + def tearDown(self): + if self.server_port is not None: + self.server_port.stopListening() + self.connection_manager.stop() + + + @defer.inlineCallbacks + def test_success(self): + # test to see that if we setup a server, we get a connection + self.server = MocServerProtocolFactory(self.clock) + self.server_port = reactor.listenTCP(PEER_PORT,self.server,interface=LOCAL_HOST) + + d = yield self.connection_manager._manage() + self.assertEqual(1, self.TEST_PEER.success_count) + + + @defer.inlineCallbacks + def test_bad_server(self): + # test to see that if we setup a server that returns an improper reply + # we don't get a connection + self.server = MocServerProtocolFactory(self.clock, is_good=False) + self.server_port = reactor.listenTCP(PEER_PORT,self.server,interface=LOCAL_HOST) + + d = yield self.connection_manager._manage() + self.assertEqual(0, self.TEST_PEER.success_count) + self.assertEqual(1, self.TEST_PEER.down_count) + + @defer.inlineCallbacks + def test_non_existing_server(self): + # Test to see that if we don't setup a server, we don't get a connection + d = yield self.connection_manager._manage() + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertEqual(0,self.TEST_PEER.success_count) + self.assertEqual(1,self.TEST_PEER.down_count) +