From 4f87e87d3e74a739cdc4d53721c9c62560e35eed Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Fri, 27 Jan 2017 11:18:40 -0500 Subject: [PATCH] Adding connection manager tests, and modifying connection manager to be testeable, fix bug where failed connection was not marked as down --- CHANGELOG.md | 3 + lbrynet/core/Peer.py | 9 +- lbrynet/core/client/ClientProtocol.py | 16 +- lbrynet/core/client/ConnectionManager.py | 68 +++--- .../core/client/test_ConnectionManager.py | 214 ++++++++++++++++++ 5 files changed, 270 insertions(+), 40 deletions(-) create mode 100644 tests/unit/core/client/test_ConnectionManager.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bf7adb8a4..89e8eabe2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,8 +8,11 @@ can and probably will change functionality and break backwards compatability at anytime. ## [Unreleased] +### Changed + * make connection manager unit testeable ### Fixed * Change EWOULDBLOCK error in DHT to warning. #481 + * mark peers as down if it fails download protocol ## [0.8.3rc0] - 2017-02-10 ### Changed diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py index 3705f6e68..a65f7048a 100644 --- a/lbrynet/core/Peer.py +++ b/lbrynet/core/Peer.py @@ -2,13 +2,17 @@ import datetime from collections import defaultdict from lbrynet.core import utils - +# Do not create this object except through PeerManager class Peer(object): def __init__(self, host, port): self.host = host self.port = port + # If a peer is reported down, we wait till this time till reattempting connection self.attempt_connection_at = None + # Number of times this Peer has been reported to be down, resets to 0 when it is up self.down_count = 0 + # Number of succesful connections (with full protocol completion) to this peer + self.success_count = 0 self.score = 0 self.stats = defaultdict(float) # {string stat_type, float count} @@ -21,6 +25,9 @@ class Peer(object): self.down_count = 0 self.attempt_connection_at = None + def report_success(self): + self.success_count += 1 + def report_down(self): self.down_count += 1 timeout_time = datetime.timedelta(seconds=60 * self.down_count) diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index dc0440cb9..c588f0f2a 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -23,7 +23,6 @@ def encode_decimal(obj): class ClientProtocol(Protocol): implements(IRequestSender, IRateLimited) - ######### Protocol ######### def connectionMade(self): @@ -73,7 +72,7 @@ class ClientProtocol(Protocol): d.errback(err) if self._blob_download_request is not None: self._blob_download_request.cancel(err) - self._connection_manager.protocol_disconnected(self.peer, self) + self.factory.connection_was_made_deferred.callback(True) ######### IRequestSender ######### @@ -132,8 +131,8 @@ class ClientProtocol(Protocol): log.info( "Closing the connection to %s due to having no further requests to send", self.peer) + self.peer.report_success() self.transport.loseConnection() - d = self._connection_manager.get_next_request(self.peer, self) d.addCallback(send_request_or_close) d.addErrback(self._handle_request_error) @@ -208,10 +207,11 @@ class ClientProtocol(Protocol): log.info(result.value) log.info("The connection to %s is closing due to an error: %s", self.peer, result.getTraceback()) + + self.peer.report_down() if failed is False: log.debug("Asking for another request from %s", self.peer) - from twisted.internet import reactor - reactor.callLater(0, self._ask_for_request) + self._ask_for_request() else: log.debug("Not asking for another request from %s", self.peer) self.transport.loseConnection() @@ -257,11 +257,15 @@ class ClientProtocolFactory(ClientFactory): self.rate_limiter = rate_limiter self.connection_manager = connection_manager self.p = None + # This defer fires and returns True when connection was + # made and completed, or fires and returns False if + # connection failed + self.connection_was_made_deferred = defer.Deferred() def clientConnectionFailed(self, connector, reason): log.debug("Connection failed to %s: %s", self.peer, reason) self.peer.report_down() - self.connection_manager.protocol_disconnected(self.peer, connector) + self.connection_was_made_deferred.callback(False) def buildProtocol(self, addr): p = self.protocol() diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index a3dc48401..bb06a645b 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -1,5 +1,5 @@ import logging -from twisted.internet import defer +from twisted.internet import defer, reactor from zope.interface import implements from lbrynet import interfaces from lbrynet import conf @@ -19,6 +19,8 @@ class PeerConnectionHandler(object): class ConnectionManager(object): implements(interfaces.IConnectionManager) + callLater = reactor.callLater + MANAGE_CALL_INTERVAL_SEC = 1 def __init__(self, downloader, rate_limiter, primary_request_creators, secondary_request_creators): @@ -44,17 +46,18 @@ class ConnectionManager(object): out = 'Connection Manager '+self.downloader.blob_hash return out - def start(self): - log.debug("%s starting", self._get_log_name()) - from twisted.internet import reactor - + def _start(self): self.stopped = False - if self._next_manage_call is not None and self._next_manage_call.active() is True: self._next_manage_call.cancel() - self._next_manage_call = reactor.callLater(0, self._manage) + + def start(self): + log.debug("%s starting", self._get_log_name()) + self._start() + self._next_manage_call = self.callLater(0, self.manage) return defer.succeed(True) + @defer.inlineCallbacks def stop(self): log.debug("%s stopping", self._get_log_name()) @@ -68,8 +71,10 @@ class ConnectionManager(object): self._next_manage_call = None yield self._close_peers() - def _close_peers(self): + def num_peer_connections(self): + return len(self._peer_connections) + def _close_peers(self): def disconnect_peer(p): d = defer.Deferred() self._connections_closing[p] = d @@ -137,37 +142,21 @@ class ConnectionManager(object): ] return defer.DeferredList(ds) - def protocol_disconnected(self, peer, protocol): - log.debug("%s protocol %s disconnected for %s", - self._get_log_name(), type(protocol).__name__, peer) - if peer in self._peer_connections: - del self._peer_connections[peer] - if peer in self._connections_closing: - d = self._connections_closing[peer] - del self._connections_closing[peer] - d.callback(True) - @defer.inlineCallbacks - def _manage(self): + def manage(self, schedule_next_call=True): self._manage_deferred = defer.Deferred() - - from twisted.internet import reactor if len(self._peer_connections) < conf.settings['max_connections_per_stream']: log.debug("%s have %d connections, looking for %d", self._get_log_name(), len(self._peer_connections), conf.settings['max_connections_per_stream']) - try: - ordered_request_creators = self._rank_request_creator_connections() - peers = yield self._get_new_peers(ordered_request_creators) - peer = self._pick_best_peer(peers) - yield self._connect_to_peer(peer) - except Exception: - # log this otherwise it will just end up as an unhandled error in deferred - log.exception('Something bad happened picking a peer') + ordered_request_creators = self._rank_request_creator_connections() + peers = yield self._get_new_peers(ordered_request_creators) + peer = self._pick_best_peer(peers) + self._connect_to_peer(peer) self._manage_deferred.callback(None) self._manage_deferred = None - if not self.stopped: - self._next_manage_call = reactor.callLater(1, self._manage) + if not self.stopped and schedule_next_call: + self._next_manage_call = self.callLater(self.MANAGE_CALL_INTERVAL_SEC, self.manage) def _rank_request_creator_connections(self): """Returns an ordered list of our request creators, ranked according @@ -213,11 +202,24 @@ class ConnectionManager(object): if peer is None or self.stopped: return - from twisted.internet import reactor - log.debug("%s Trying to connect to %s", self._get_log_name(), peer) factory = ClientProtocolFactory(peer, self.rate_limiter, self) + factory.connection_was_made_deferred.addCallback( + lambda c_was_made: self._peer_disconnected(c_was_made, peer)) self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], factory) connection = reactor.connectTCP(peer.host, peer.port, factory) self._peer_connections[peer].connection = connection + + def _peer_disconnected(self, connection_was_made, peer): + log.debug("%s protocol disconnected for %s", + self._get_log_name(), peer) + if peer in self._peer_connections: + del self._peer_connections[peer] + if peer in self._connections_closing: + d = self._connections_closing[peer] + del self._connections_closing[peer] + d.callback(True) + return connection_was_made + + diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py new file mode 100644 index 000000000..0aee113d3 --- /dev/null +++ b/tests/unit/core/client/test_ConnectionManager.py @@ -0,0 +1,214 @@ +import sys +import time +import logging + +from lbrynet.core import log_support +from lbrynet.core.client.ConnectionManager import ConnectionManager +from lbrynet.core.client.ClientRequest import ClientRequest +from lbrynet.core.server.ServerProtocol import ServerProtocol +from lbrynet.core.RateLimiter import RateLimiter +from lbrynet.core.Peer import Peer +from lbrynet.core.PeerManager import PeerManager +from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError + +from twisted.trial import unittest +from twisted.internet import defer, reactor, task +from twisted.internet.task import deferLater +from twisted.internet.protocol import Protocol, ServerFactory +from lbrynet import conf +from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IRequestCreator + +from zope.interface import implements + +PEER_PORT = 5551 +LOCAL_HOST = '127.0.0.1' + +class MocDownloader(object): + def insufficient_funds(self): + pass + +class MocRequestCreator(object): + implements(IRequestCreator) + def __init__(self, peers_to_return): + self.peers_to_return = peers_to_return + self.sent_request = False + + def send_next_request(self, peer, protocol): + if self.sent_request is True: + return defer.succeed(False) + response_identifier = 'moc_request' + r_dict = {'moc_request':0} + request = ClientRequest(r_dict, response_identifier) + d = protocol.add_request(request) # ClientRequest here + d.addErrback(self.request_err, peer) + d.addCallback(self.request_success) + self.sent_request = True + return defer.succeed(True) + + def request_success(self, suc): + pass + + def request_err(self, err, peer): + if isinstance(err.value, NoResponseError): + return err + + def get_new_peers(self): + return self.peers_to_return + +class MocFunctionalQueryHandler(object): + implements(IQueryHandler) + + def __init__(self, clock, is_good=True, is_delayed=False): + self.query_identifiers = ['moc_request'] + self.is_good = is_good + self.is_delayed = is_delayed + self.clock = clock + + def register_with_request_handler(self, request_handler, peer): + request_handler.register_query_handler(self, self.query_identifiers) + + def handle_queries(self, queries): + if self.query_identifiers[0] in queries: + if self.is_delayed: + out = deferLater(self.clock, 10, lambda: {'moc_request':0}) + self.clock.advance(10) + return out + if self.is_good: + return defer.succeed({'moc_request':0}) + else: + return defer.succeed({'bad_request':0}) + else: + return defer.succeed({}) + + +class MocQueryHandlerFactory(object): + implements(IQueryHandlerFactory) + # is is_good, the query handler works as expectd, + # is is_delayed, the query handler will delay its resposne + def __init__(self, clock, is_good=True, is_delayed=False): + self.is_good = is_good + self.is_delayed = is_delayed + self.clock = clock + def build_query_handler(self): + return MocFunctionalQueryHandler(self.clock, self.is_good, self.is_delayed) + + def get_primary_query_identifier(self): + return 'moc_query' + + def get_description(self): + return "This is a Moc Query" + + +class MocServerProtocolFactory(ServerFactory): + protocol = ServerProtocol + def __init__(self, clock, is_good=True, is_delayed=False, has_moc_query_handler=True): + self.rate_limiter = RateLimiter() + query_handler_factory = MocQueryHandlerFactory(clock, is_good, is_delayed) + if has_moc_query_handler: + self.query_handler_factories = { + query_handler_factory.get_primary_query_identifier():query_handler_factory + } + else: + self.query_handler_factories = {} + self.peer_manager = PeerManager() + + +class TestIntegrationConnectionManager(unittest.TestCase): + def setUp(self): + + conf.initialize_settings() + + self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT) + self.downloader = MocDownloader() + self.rate_limiter = RateLimiter() + self.primary_request_creator = MocRequestCreator([self.TEST_PEER]) + self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter, + [self.primary_request_creator], []) + + self.clock = task.Clock() + self.connection_manager.callLater = self.clock.callLater + self.connection_manager._start() + self.server_port = None + + def tearDown(self): + if self.server_port is not None: + self.server_port.stopListening() + self.connection_manager.stop() + conf.settings = None + + @defer.inlineCallbacks + def test_success(self): + # test to see that if we setup a server, we get a connection + self.server = MocServerProtocolFactory(self.clock) + self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) + yield self.connection_manager.manage(schedule_next_call=False) + self.assertEqual(1, self.connection_manager.num_peer_connections()) + connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertTrue(connection_made) + self.assertEqual(1, self.TEST_PEER.success_count) + self.assertEqual(0, self.TEST_PEER.down_count) + + @defer.inlineCallbacks + def test_bad_server(self): + # test to see that if we setup a server that returns an improper reply + # we don't get a connection + self.server = MocServerProtocolFactory(self.clock, is_good=False) + self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST) + yield self.connection_manager.manage(schedule_next_call=False) + self.assertEqual(1, self.connection_manager.num_peer_connections()) + connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertTrue(connection_made) + self.assertEqual(0, self.TEST_PEER.success_count) + self.assertEqual(1, self.TEST_PEER.down_count) + + @defer.inlineCallbacks + def test_non_existing_server(self): + # Test to see that if we don't setup a server, we don't get a connection + yield self.connection_manager.manage(schedule_next_call=False) + self.assertEqual(1, self.connection_manager.num_peer_connections()) + connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertFalse(connection_made) + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertEqual(0, self.TEST_PEER.success_count) + self.assertEqual(1, self.TEST_PEER.down_count) + + @defer.inlineCallbacks + def test_parallel_connections(self): + # Test to see that we make new connection for each manage call, + # without it waiting for the connection to complete + test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1) + self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2] + yield self.connection_manager.manage(schedule_next_call=False) + self.assertEqual(1, self.connection_manager.num_peer_connections()) + yield self.connection_manager.manage(schedule_next_call=False) + self.assertEqual(2, self.connection_manager.num_peer_connections()) + self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections) + self.assertIn(test_peer2, self.connection_manager._peer_connections) + connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred + self.assertFalse(connection_made) + self.assertEqual(1, self.connection_manager.num_peer_connections()) + self.assertEqual(0, self.TEST_PEER.success_count) + self.assertEqual(1, self.TEST_PEER.down_count) + connection_made = yield self.connection_manager._peer_connections[test_peer2].factory.connection_was_made_deferred + self.assertFalse(connection_made) + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertEqual(0, test_peer2.success_count) + self.assertEqual(1, test_peer2.down_count) + + + @defer.inlineCallbacks + def test_stop(self): + # test to see that when we call stop, the ConnectionManager waits for the + # current manage call to finish, closes connections, + # and removes scheduled manage calls + self.connection_manager.manage(schedule_next_call=True) + yield self.connection_manager.stop() + self.assertEqual(0, self.TEST_PEER.success_count) + self.assertEqual(1, self.TEST_PEER.down_count) + self.assertEqual(0, self.connection_manager.num_peer_connections()) + self.assertEqual(None, self.connection_manager._next_manage_call) + +