From 4f87e87d3e74a739cdc4d53721c9c62560e35eed Mon Sep 17 00:00:00 2001
From: Kay Kurokawa <kay@lbry.io>
Date: Fri, 27 Jan 2017 11:18:40 -0500
Subject: [PATCH] Adding connection manager tests, and modifying connection
 manager to be testeable, fix bug where failed connection was not marked as
 down

---
 CHANGELOG.md                                  |   3 +
 lbrynet/core/Peer.py                          |   9 +-
 lbrynet/core/client/ClientProtocol.py         |  16 +-
 lbrynet/core/client/ConnectionManager.py      |  68 +++---
 .../core/client/test_ConnectionManager.py     | 214 ++++++++++++++++++
 5 files changed, 270 insertions(+), 40 deletions(-)
 create mode 100644 tests/unit/core/client/test_ConnectionManager.py

diff --git a/CHANGELOG.md b/CHANGELOG.md
index bf7adb8a4..89e8eabe2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,8 +8,11 @@ can and probably will change functionality and break backwards compatability
 at anytime.
 
 ## [Unreleased]
+### Changed
+ * make connection manager unit testeable
 ### Fixed
  * Change EWOULDBLOCK error in DHT to warning. #481
+ * mark peers as down if it fails download protocol
 
 ## [0.8.3rc0] - 2017-02-10
 ### Changed
diff --git a/lbrynet/core/Peer.py b/lbrynet/core/Peer.py
index 3705f6e68..a65f7048a 100644
--- a/lbrynet/core/Peer.py
+++ b/lbrynet/core/Peer.py
@@ -2,13 +2,17 @@ import datetime
 from collections import defaultdict
 from lbrynet.core import utils
 
-
+# Do not create this object except through PeerManager
 class Peer(object):
     def __init__(self, host, port):
         self.host = host
         self.port = port
+        # If a peer is reported down, we wait till this time till reattempting connection
         self.attempt_connection_at = None
+        # Number of times this Peer has been reported to be down, resets to 0 when it is up
         self.down_count = 0
+        # Number of succesful connections (with full protocol completion) to this peer
+        self.success_count = 0
         self.score = 0
         self.stats = defaultdict(float)  # {string stat_type, float count}
 
@@ -21,6 +25,9 @@ class Peer(object):
         self.down_count = 0
         self.attempt_connection_at = None
 
+    def report_success(self):
+        self.success_count += 1
+
     def report_down(self):
         self.down_count += 1
         timeout_time = datetime.timedelta(seconds=60 * self.down_count)
diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py
index dc0440cb9..c588f0f2a 100644
--- a/lbrynet/core/client/ClientProtocol.py
+++ b/lbrynet/core/client/ClientProtocol.py
@@ -23,7 +23,6 @@ def encode_decimal(obj):
 
 class ClientProtocol(Protocol):
     implements(IRequestSender, IRateLimited)
-
     ######### Protocol #########
 
     def connectionMade(self):
@@ -73,7 +72,7 @@ class ClientProtocol(Protocol):
             d.errback(err)
         if self._blob_download_request is not None:
             self._blob_download_request.cancel(err)
-        self._connection_manager.protocol_disconnected(self.peer, self)
+        self.factory.connection_was_made_deferred.callback(True)
 
     ######### IRequestSender #########
 
@@ -132,8 +131,8 @@ class ClientProtocol(Protocol):
                 log.info(
                     "Closing the connection to %s due to having no further requests to send",
                     self.peer)
+                self.peer.report_success()
                 self.transport.loseConnection()
-
         d = self._connection_manager.get_next_request(self.peer, self)
         d.addCallback(send_request_or_close)
         d.addErrback(self._handle_request_error)
@@ -208,10 +207,11 @@ class ClientProtocol(Protocol):
                         log.info(result.value)
                         log.info("The connection to %s is closing due to an error: %s",
                                  self.peer, result.getTraceback())
+
+                        self.peer.report_down()
             if failed is False:
                 log.debug("Asking for another request from %s", self.peer)
-                from twisted.internet import reactor
-                reactor.callLater(0, self._ask_for_request)
+                self._ask_for_request()
             else:
                 log.debug("Not asking for another request from %s", self.peer)
                 self.transport.loseConnection()
@@ -257,11 +257,15 @@ class ClientProtocolFactory(ClientFactory):
         self.rate_limiter = rate_limiter
         self.connection_manager = connection_manager
         self.p = None
+        # This defer fires and returns True when connection was
+        # made and completed, or fires and returns False if
+        # connection failed
+        self.connection_was_made_deferred = defer.Deferred()
 
     def clientConnectionFailed(self, connector, reason):
         log.debug("Connection failed to %s: %s", self.peer, reason)
         self.peer.report_down()
-        self.connection_manager.protocol_disconnected(self.peer, connector)
+        self.connection_was_made_deferred.callback(False)
 
     def buildProtocol(self, addr):
         p = self.protocol()
diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py
index a3dc48401..bb06a645b 100644
--- a/lbrynet/core/client/ConnectionManager.py
+++ b/lbrynet/core/client/ConnectionManager.py
@@ -1,5 +1,5 @@
 import logging
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from zope.interface import implements
 from lbrynet import interfaces
 from lbrynet import conf
@@ -19,6 +19,8 @@ class PeerConnectionHandler(object):
 
 class ConnectionManager(object):
     implements(interfaces.IConnectionManager)
+    callLater = reactor.callLater
+    MANAGE_CALL_INTERVAL_SEC = 1
 
     def __init__(self, downloader, rate_limiter,
                  primary_request_creators, secondary_request_creators):
@@ -44,17 +46,18 @@ class ConnectionManager(object):
             out = 'Connection Manager '+self.downloader.blob_hash
         return out
 
-    def start(self):
-        log.debug("%s starting", self._get_log_name())
-        from twisted.internet import reactor
-
+    def _start(self):
         self.stopped = False
-
         if self._next_manage_call is not None and self._next_manage_call.active() is True:
             self._next_manage_call.cancel()
-        self._next_manage_call = reactor.callLater(0, self._manage)
+
+    def start(self):
+        log.debug("%s starting", self._get_log_name())
+        self._start()
+        self._next_manage_call = self.callLater(0, self.manage)
         return defer.succeed(True)
 
+
     @defer.inlineCallbacks
     def stop(self):
         log.debug("%s stopping", self._get_log_name())
@@ -68,8 +71,10 @@ class ConnectionManager(object):
         self._next_manage_call = None
         yield self._close_peers()
 
-    def _close_peers(self):
+    def num_peer_connections(self):
+        return len(self._peer_connections)
 
+    def _close_peers(self):
         def disconnect_peer(p):
             d = defer.Deferred()
             self._connections_closing[p] = d
@@ -137,37 +142,21 @@ class ConnectionManager(object):
         ]
         return defer.DeferredList(ds)
 
-    def protocol_disconnected(self, peer, protocol):
-        log.debug("%s protocol %s disconnected for %s",
-                    self._get_log_name(), type(protocol).__name__, peer)
-        if peer in self._peer_connections:
-            del self._peer_connections[peer]
-        if peer in self._connections_closing:
-            d = self._connections_closing[peer]
-            del self._connections_closing[peer]
-            d.callback(True)
-
     @defer.inlineCallbacks
-    def _manage(self):
+    def manage(self, schedule_next_call=True):
         self._manage_deferred = defer.Deferred()
-
-        from twisted.internet import reactor
         if len(self._peer_connections) < conf.settings['max_connections_per_stream']:
             log.debug("%s have %d connections, looking for %d",
                         self._get_log_name(), len(self._peer_connections),
                         conf.settings['max_connections_per_stream'])
-            try:
-                ordered_request_creators = self._rank_request_creator_connections()
-                peers = yield self._get_new_peers(ordered_request_creators)
-                peer = self._pick_best_peer(peers)
-                yield self._connect_to_peer(peer)
-            except Exception:
-                # log this otherwise it will just end up as an unhandled error in deferred
-                log.exception('Something bad happened picking a peer')
+            ordered_request_creators = self._rank_request_creator_connections()
+            peers = yield self._get_new_peers(ordered_request_creators)
+            peer = self._pick_best_peer(peers)
+            self._connect_to_peer(peer)
         self._manage_deferred.callback(None)
         self._manage_deferred = None
-        if not self.stopped:
-            self._next_manage_call = reactor.callLater(1, self._manage)
+        if not self.stopped and schedule_next_call:
+            self._next_manage_call = self.callLater(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
 
     def _rank_request_creator_connections(self):
         """Returns an ordered list of our request creators, ranked according
@@ -213,11 +202,24 @@ class ConnectionManager(object):
         if peer is None or self.stopped:
             return
 
-        from twisted.internet import reactor
-
         log.debug("%s Trying to connect to %s", self._get_log_name(), peer)
         factory = ClientProtocolFactory(peer, self.rate_limiter, self)
+        factory.connection_was_made_deferred.addCallback(
+                lambda c_was_made: self._peer_disconnected(c_was_made, peer))
         self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
                                                              factory)
         connection = reactor.connectTCP(peer.host, peer.port, factory)
         self._peer_connections[peer].connection = connection
+
+    def _peer_disconnected(self, connection_was_made, peer):
+        log.debug("%s protocol disconnected for %s",
+                    self._get_log_name(), peer)
+        if peer in self._peer_connections:
+            del self._peer_connections[peer]
+        if peer in self._connections_closing:
+            d = self._connections_closing[peer]
+            del self._connections_closing[peer]
+            d.callback(True)
+        return connection_was_made
+
+
diff --git a/tests/unit/core/client/test_ConnectionManager.py b/tests/unit/core/client/test_ConnectionManager.py
new file mode 100644
index 000000000..0aee113d3
--- /dev/null
+++ b/tests/unit/core/client/test_ConnectionManager.py
@@ -0,0 +1,214 @@
+import sys
+import time
+import logging
+
+from lbrynet.core import log_support
+from lbrynet.core.client.ConnectionManager import ConnectionManager
+from lbrynet.core.client.ClientRequest import ClientRequest
+from lbrynet.core.server.ServerProtocol import ServerProtocol
+from lbrynet.core.RateLimiter import RateLimiter
+from lbrynet.core.Peer import Peer
+from lbrynet.core.PeerManager import PeerManager
+from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError
+
+from twisted.trial import unittest
+from twisted.internet import defer, reactor, task
+from twisted.internet.task import deferLater
+from twisted.internet.protocol import Protocol, ServerFactory
+from lbrynet import conf
+from lbrynet.interfaces  import IQueryHandlerFactory, IQueryHandler, IRequestCreator
+
+from zope.interface import implements
+
+PEER_PORT = 5551
+LOCAL_HOST = '127.0.0.1'
+
+class MocDownloader(object):
+    def insufficient_funds(self):
+        pass
+
+class MocRequestCreator(object):
+    implements(IRequestCreator)
+    def __init__(self, peers_to_return):
+        self.peers_to_return = peers_to_return
+        self.sent_request = False
+
+    def send_next_request(self, peer, protocol):
+        if self.sent_request is True:
+            return defer.succeed(False)
+        response_identifier = 'moc_request'
+        r_dict = {'moc_request':0}
+        request = ClientRequest(r_dict, response_identifier)
+        d = protocol.add_request(request) # ClientRequest here
+        d.addErrback(self.request_err, peer)
+        d.addCallback(self.request_success)
+        self.sent_request = True
+        return defer.succeed(True)
+
+    def request_success(self, suc):
+        pass
+
+    def request_err(self, err, peer):
+        if isinstance(err.value, NoResponseError):
+            return err
+
+    def get_new_peers(self):
+        return self.peers_to_return
+
+class MocFunctionalQueryHandler(object):
+    implements(IQueryHandler)
+
+    def __init__(self, clock, is_good=True, is_delayed=False):
+        self.query_identifiers = ['moc_request']
+        self.is_good = is_good
+        self.is_delayed = is_delayed
+        self.clock = clock
+
+    def register_with_request_handler(self, request_handler, peer):
+        request_handler.register_query_handler(self, self.query_identifiers)
+
+    def handle_queries(self, queries):
+        if self.query_identifiers[0] in queries:
+            if self.is_delayed:
+                out = deferLater(self.clock, 10, lambda: {'moc_request':0})
+                self.clock.advance(10)
+                return out
+            if self.is_good:
+                return defer.succeed({'moc_request':0})
+            else:
+                return defer.succeed({'bad_request':0})
+        else:
+            return defer.succeed({})
+
+
+class MocQueryHandlerFactory(object):
+    implements(IQueryHandlerFactory)
+    # is is_good, the query handler works as expectd,
+    # is is_delayed, the query handler will delay its resposne
+    def __init__(self, clock, is_good=True, is_delayed=False):
+        self.is_good = is_good
+        self.is_delayed = is_delayed
+        self.clock = clock
+    def build_query_handler(self):
+        return MocFunctionalQueryHandler(self.clock, self.is_good, self.is_delayed)
+
+    def get_primary_query_identifier(self):
+        return 'moc_query'
+
+    def get_description(self):
+        return "This is a Moc Query"
+
+
+class MocServerProtocolFactory(ServerFactory):
+    protocol = ServerProtocol
+    def __init__(self, clock, is_good=True, is_delayed=False, has_moc_query_handler=True):
+        self.rate_limiter = RateLimiter()
+        query_handler_factory = MocQueryHandlerFactory(clock, is_good, is_delayed)
+        if has_moc_query_handler:
+            self.query_handler_factories = {
+                query_handler_factory.get_primary_query_identifier():query_handler_factory
+            }
+        else:
+            self.query_handler_factories = {}
+        self.peer_manager = PeerManager()
+
+
+class TestIntegrationConnectionManager(unittest.TestCase):
+    def setUp(self):
+
+        conf.initialize_settings()
+
+        self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT)
+        self.downloader = MocDownloader()
+        self.rate_limiter = RateLimiter()
+        self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
+        self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
+                                                    [self.primary_request_creator], [])
+
+        self.clock = task.Clock()
+        self.connection_manager.callLater = self.clock.callLater
+        self.connection_manager._start()
+        self.server_port = None
+
+    def tearDown(self):
+        if self.server_port is not None:
+            self.server_port.stopListening()
+        self.connection_manager.stop()
+        conf.settings = None
+
+    @defer.inlineCallbacks
+    def test_success(self):
+        # test to see that if we setup a server, we get a connection
+        self.server = MocServerProtocolFactory(self.clock)
+        self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
+        yield self.connection_manager.manage(schedule_next_call=False)
+        self.assertEqual(1, self.connection_manager.num_peer_connections())
+        connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
+        self.assertEqual(0, self.connection_manager.num_peer_connections())
+        self.assertTrue(connection_made)
+        self.assertEqual(1, self.TEST_PEER.success_count)
+        self.assertEqual(0, self.TEST_PEER.down_count)
+
+    @defer.inlineCallbacks
+    def test_bad_server(self):
+        # test to see that if we setup a server that returns an improper reply
+        # we don't get a connection
+        self.server = MocServerProtocolFactory(self.clock, is_good=False)
+        self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
+        yield self.connection_manager.manage(schedule_next_call=False)
+        self.assertEqual(1, self.connection_manager.num_peer_connections())
+        connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
+        self.assertEqual(0, self.connection_manager.num_peer_connections())
+        self.assertTrue(connection_made)
+        self.assertEqual(0, self.TEST_PEER.success_count)
+        self.assertEqual(1, self.TEST_PEER.down_count)
+
+    @defer.inlineCallbacks
+    def test_non_existing_server(self):
+        # Test to see that if we don't setup a server, we don't get a connection
+        yield self.connection_manager.manage(schedule_next_call=False)
+        self.assertEqual(1, self.connection_manager.num_peer_connections())
+        connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
+        self.assertEqual(0, self.connection_manager.num_peer_connections())
+        self.assertFalse(connection_made)
+        self.assertEqual(0, self.connection_manager.num_peer_connections())
+        self.assertEqual(0, self.TEST_PEER.success_count)
+        self.assertEqual(1, self.TEST_PEER.down_count)
+
+    @defer.inlineCallbacks
+    def test_parallel_connections(self):
+        # Test to see that we make new connection for each manage call,
+        # without it waiting for the connection to complete
+        test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
+        self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
+        yield self.connection_manager.manage(schedule_next_call=False)
+        self.assertEqual(1, self.connection_manager.num_peer_connections())
+        yield self.connection_manager.manage(schedule_next_call=False)
+        self.assertEqual(2, self.connection_manager.num_peer_connections())
+        self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections)
+        self.assertIn(test_peer2, self.connection_manager._peer_connections)
+        connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
+        self.assertFalse(connection_made)
+        self.assertEqual(1, self.connection_manager.num_peer_connections())
+        self.assertEqual(0, self.TEST_PEER.success_count)
+        self.assertEqual(1, self.TEST_PEER.down_count)
+        connection_made = yield self.connection_manager._peer_connections[test_peer2].factory.connection_was_made_deferred
+        self.assertFalse(connection_made)
+        self.assertEqual(0, self.connection_manager.num_peer_connections())
+        self.assertEqual(0, test_peer2.success_count)
+        self.assertEqual(1, test_peer2.down_count)
+
+
+    @defer.inlineCallbacks
+    def test_stop(self):
+        # test to see that when we call stop, the ConnectionManager waits for the
+        # current manage call to finish, closes connections,
+        # and removes scheduled manage calls
+        self.connection_manager.manage(schedule_next_call=True)
+        yield self.connection_manager.stop()
+        self.assertEqual(0, self.TEST_PEER.success_count)
+        self.assertEqual(1, self.TEST_PEER.down_count)
+        self.assertEqual(0, self.connection_manager.num_peer_connections())
+        self.assertEqual(None, self.connection_manager._next_manage_call)
+
+