Adding connection manager tests, and modifying connection manager to be testeable, fix bug where failed connection was not marked as down

This commit is contained in:
Kay Kurokawa 2017-01-27 11:18:40 -05:00
parent 11b1f8c7af
commit 4f87e87d3e
5 changed files with 270 additions and 40 deletions

View file

@ -8,8 +8,11 @@ can and probably will change functionality and break backwards compatability
at anytime. at anytime.
## [Unreleased] ## [Unreleased]
### Changed
* make connection manager unit testeable
### Fixed ### Fixed
* Change EWOULDBLOCK error in DHT to warning. #481 * Change EWOULDBLOCK error in DHT to warning. #481
* mark peers as down if it fails download protocol
## [0.8.3rc0] - 2017-02-10 ## [0.8.3rc0] - 2017-02-10
### Changed ### Changed

View file

@ -2,13 +2,17 @@ import datetime
from collections import defaultdict from collections import defaultdict
from lbrynet.core import utils from lbrynet.core import utils
# Do not create this object except through PeerManager
class Peer(object): class Peer(object):
def __init__(self, host, port): def __init__(self, host, port):
self.host = host self.host = host
self.port = port self.port = port
# If a peer is reported down, we wait till this time till reattempting connection
self.attempt_connection_at = None self.attempt_connection_at = None
# Number of times this Peer has been reported to be down, resets to 0 when it is up
self.down_count = 0 self.down_count = 0
# Number of succesful connections (with full protocol completion) to this peer
self.success_count = 0
self.score = 0 self.score = 0
self.stats = defaultdict(float) # {string stat_type, float count} self.stats = defaultdict(float) # {string stat_type, float count}
@ -21,6 +25,9 @@ class Peer(object):
self.down_count = 0 self.down_count = 0
self.attempt_connection_at = None self.attempt_connection_at = None
def report_success(self):
self.success_count += 1
def report_down(self): def report_down(self):
self.down_count += 1 self.down_count += 1
timeout_time = datetime.timedelta(seconds=60 * self.down_count) timeout_time = datetime.timedelta(seconds=60 * self.down_count)

View file

@ -23,7 +23,6 @@ def encode_decimal(obj):
class ClientProtocol(Protocol): class ClientProtocol(Protocol):
implements(IRequestSender, IRateLimited) implements(IRequestSender, IRateLimited)
######### Protocol ######### ######### Protocol #########
def connectionMade(self): def connectionMade(self):
@ -73,7 +72,7 @@ class ClientProtocol(Protocol):
d.errback(err) d.errback(err)
if self._blob_download_request is not None: if self._blob_download_request is not None:
self._blob_download_request.cancel(err) self._blob_download_request.cancel(err)
self._connection_manager.protocol_disconnected(self.peer, self) self.factory.connection_was_made_deferred.callback(True)
######### IRequestSender ######### ######### IRequestSender #########
@ -132,8 +131,8 @@ class ClientProtocol(Protocol):
log.info( log.info(
"Closing the connection to %s due to having no further requests to send", "Closing the connection to %s due to having no further requests to send",
self.peer) self.peer)
self.peer.report_success()
self.transport.loseConnection() self.transport.loseConnection()
d = self._connection_manager.get_next_request(self.peer, self) d = self._connection_manager.get_next_request(self.peer, self)
d.addCallback(send_request_or_close) d.addCallback(send_request_or_close)
d.addErrback(self._handle_request_error) d.addErrback(self._handle_request_error)
@ -208,10 +207,11 @@ class ClientProtocol(Protocol):
log.info(result.value) log.info(result.value)
log.info("The connection to %s is closing due to an error: %s", log.info("The connection to %s is closing due to an error: %s",
self.peer, result.getTraceback()) self.peer, result.getTraceback())
self.peer.report_down()
if failed is False: if failed is False:
log.debug("Asking for another request from %s", self.peer) log.debug("Asking for another request from %s", self.peer)
from twisted.internet import reactor self._ask_for_request()
reactor.callLater(0, self._ask_for_request)
else: else:
log.debug("Not asking for another request from %s", self.peer) log.debug("Not asking for another request from %s", self.peer)
self.transport.loseConnection() self.transport.loseConnection()
@ -257,11 +257,15 @@ class ClientProtocolFactory(ClientFactory):
self.rate_limiter = rate_limiter self.rate_limiter = rate_limiter
self.connection_manager = connection_manager self.connection_manager = connection_manager
self.p = None self.p = None
# This defer fires and returns True when connection was
# made and completed, or fires and returns False if
# connection failed
self.connection_was_made_deferred = defer.Deferred()
def clientConnectionFailed(self, connector, reason): def clientConnectionFailed(self, connector, reason):
log.debug("Connection failed to %s: %s", self.peer, reason) log.debug("Connection failed to %s: %s", self.peer, reason)
self.peer.report_down() self.peer.report_down()
self.connection_manager.protocol_disconnected(self.peer, connector) self.connection_was_made_deferred.callback(False)
def buildProtocol(self, addr): def buildProtocol(self, addr):
p = self.protocol() p = self.protocol()

View file

@ -1,5 +1,5 @@
import logging import logging
from twisted.internet import defer from twisted.internet import defer, reactor
from zope.interface import implements from zope.interface import implements
from lbrynet import interfaces from lbrynet import interfaces
from lbrynet import conf from lbrynet import conf
@ -19,6 +19,8 @@ class PeerConnectionHandler(object):
class ConnectionManager(object): class ConnectionManager(object):
implements(interfaces.IConnectionManager) implements(interfaces.IConnectionManager)
callLater = reactor.callLater
MANAGE_CALL_INTERVAL_SEC = 1
def __init__(self, downloader, rate_limiter, def __init__(self, downloader, rate_limiter,
primary_request_creators, secondary_request_creators): primary_request_creators, secondary_request_creators):
@ -44,17 +46,18 @@ class ConnectionManager(object):
out = 'Connection Manager '+self.downloader.blob_hash out = 'Connection Manager '+self.downloader.blob_hash
return out return out
def start(self): def _start(self):
log.debug("%s starting", self._get_log_name())
from twisted.internet import reactor
self.stopped = False self.stopped = False
if self._next_manage_call is not None and self._next_manage_call.active() is True: if self._next_manage_call is not None and self._next_manage_call.active() is True:
self._next_manage_call.cancel() self._next_manage_call.cancel()
self._next_manage_call = reactor.callLater(0, self._manage)
def start(self):
log.debug("%s starting", self._get_log_name())
self._start()
self._next_manage_call = self.callLater(0, self.manage)
return defer.succeed(True) return defer.succeed(True)
@defer.inlineCallbacks @defer.inlineCallbacks
def stop(self): def stop(self):
log.debug("%s stopping", self._get_log_name()) log.debug("%s stopping", self._get_log_name())
@ -68,8 +71,10 @@ class ConnectionManager(object):
self._next_manage_call = None self._next_manage_call = None
yield self._close_peers() yield self._close_peers()
def _close_peers(self): def num_peer_connections(self):
return len(self._peer_connections)
def _close_peers(self):
def disconnect_peer(p): def disconnect_peer(p):
d = defer.Deferred() d = defer.Deferred()
self._connections_closing[p] = d self._connections_closing[p] = d
@ -137,37 +142,21 @@ class ConnectionManager(object):
] ]
return defer.DeferredList(ds) return defer.DeferredList(ds)
def protocol_disconnected(self, peer, protocol):
log.debug("%s protocol %s disconnected for %s",
self._get_log_name(), type(protocol).__name__, peer)
if peer in self._peer_connections:
del self._peer_connections[peer]
if peer in self._connections_closing:
d = self._connections_closing[peer]
del self._connections_closing[peer]
d.callback(True)
@defer.inlineCallbacks @defer.inlineCallbacks
def _manage(self): def manage(self, schedule_next_call=True):
self._manage_deferred = defer.Deferred() self._manage_deferred = defer.Deferred()
from twisted.internet import reactor
if len(self._peer_connections) < conf.settings['max_connections_per_stream']: if len(self._peer_connections) < conf.settings['max_connections_per_stream']:
log.debug("%s have %d connections, looking for %d", log.debug("%s have %d connections, looking for %d",
self._get_log_name(), len(self._peer_connections), self._get_log_name(), len(self._peer_connections),
conf.settings['max_connections_per_stream']) conf.settings['max_connections_per_stream'])
try:
ordered_request_creators = self._rank_request_creator_connections() ordered_request_creators = self._rank_request_creator_connections()
peers = yield self._get_new_peers(ordered_request_creators) peers = yield self._get_new_peers(ordered_request_creators)
peer = self._pick_best_peer(peers) peer = self._pick_best_peer(peers)
yield self._connect_to_peer(peer) self._connect_to_peer(peer)
except Exception:
# log this otherwise it will just end up as an unhandled error in deferred
log.exception('Something bad happened picking a peer')
self._manage_deferred.callback(None) self._manage_deferred.callback(None)
self._manage_deferred = None self._manage_deferred = None
if not self.stopped: if not self.stopped and schedule_next_call:
self._next_manage_call = reactor.callLater(1, self._manage) self._next_manage_call = self.callLater(self.MANAGE_CALL_INTERVAL_SEC, self.manage)
def _rank_request_creator_connections(self): def _rank_request_creator_connections(self):
"""Returns an ordered list of our request creators, ranked according """Returns an ordered list of our request creators, ranked according
@ -213,11 +202,24 @@ class ConnectionManager(object):
if peer is None or self.stopped: if peer is None or self.stopped:
return return
from twisted.internet import reactor
log.debug("%s Trying to connect to %s", self._get_log_name(), peer) log.debug("%s Trying to connect to %s", self._get_log_name(), peer)
factory = ClientProtocolFactory(peer, self.rate_limiter, self) factory = ClientProtocolFactory(peer, self.rate_limiter, self)
factory.connection_was_made_deferred.addCallback(
lambda c_was_made: self._peer_disconnected(c_was_made, peer))
self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:], self._peer_connections[peer] = PeerConnectionHandler(self._primary_request_creators[:],
factory) factory)
connection = reactor.connectTCP(peer.host, peer.port, factory) connection = reactor.connectTCP(peer.host, peer.port, factory)
self._peer_connections[peer].connection = connection self._peer_connections[peer].connection = connection
def _peer_disconnected(self, connection_was_made, peer):
log.debug("%s protocol disconnected for %s",
self._get_log_name(), peer)
if peer in self._peer_connections:
del self._peer_connections[peer]
if peer in self._connections_closing:
d = self._connections_closing[peer]
del self._connections_closing[peer]
d.callback(True)
return connection_was_made

View file

@ -0,0 +1,214 @@
import sys
import time
import logging
from lbrynet.core import log_support
from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.server.ServerProtocol import ServerProtocol
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.Peer import Peer
from lbrynet.core.PeerManager import PeerManager
from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError
from twisted.trial import unittest
from twisted.internet import defer, reactor, task
from twisted.internet.task import deferLater
from twisted.internet.protocol import Protocol, ServerFactory
from lbrynet import conf
from lbrynet.interfaces import IQueryHandlerFactory, IQueryHandler, IRequestCreator
from zope.interface import implements
PEER_PORT = 5551
LOCAL_HOST = '127.0.0.1'
class MocDownloader(object):
def insufficient_funds(self):
pass
class MocRequestCreator(object):
implements(IRequestCreator)
def __init__(self, peers_to_return):
self.peers_to_return = peers_to_return
self.sent_request = False
def send_next_request(self, peer, protocol):
if self.sent_request is True:
return defer.succeed(False)
response_identifier = 'moc_request'
r_dict = {'moc_request':0}
request = ClientRequest(r_dict, response_identifier)
d = protocol.add_request(request) # ClientRequest here
d.addErrback(self.request_err, peer)
d.addCallback(self.request_success)
self.sent_request = True
return defer.succeed(True)
def request_success(self, suc):
pass
def request_err(self, err, peer):
if isinstance(err.value, NoResponseError):
return err
def get_new_peers(self):
return self.peers_to_return
class MocFunctionalQueryHandler(object):
implements(IQueryHandler)
def __init__(self, clock, is_good=True, is_delayed=False):
self.query_identifiers = ['moc_request']
self.is_good = is_good
self.is_delayed = is_delayed
self.clock = clock
def register_with_request_handler(self, request_handler, peer):
request_handler.register_query_handler(self, self.query_identifiers)
def handle_queries(self, queries):
if self.query_identifiers[0] in queries:
if self.is_delayed:
out = deferLater(self.clock, 10, lambda: {'moc_request':0})
self.clock.advance(10)
return out
if self.is_good:
return defer.succeed({'moc_request':0})
else:
return defer.succeed({'bad_request':0})
else:
return defer.succeed({})
class MocQueryHandlerFactory(object):
implements(IQueryHandlerFactory)
# is is_good, the query handler works as expectd,
# is is_delayed, the query handler will delay its resposne
def __init__(self, clock, is_good=True, is_delayed=False):
self.is_good = is_good
self.is_delayed = is_delayed
self.clock = clock
def build_query_handler(self):
return MocFunctionalQueryHandler(self.clock, self.is_good, self.is_delayed)
def get_primary_query_identifier(self):
return 'moc_query'
def get_description(self):
return "This is a Moc Query"
class MocServerProtocolFactory(ServerFactory):
protocol = ServerProtocol
def __init__(self, clock, is_good=True, is_delayed=False, has_moc_query_handler=True):
self.rate_limiter = RateLimiter()
query_handler_factory = MocQueryHandlerFactory(clock, is_good, is_delayed)
if has_moc_query_handler:
self.query_handler_factories = {
query_handler_factory.get_primary_query_identifier():query_handler_factory
}
else:
self.query_handler_factories = {}
self.peer_manager = PeerManager()
class TestIntegrationConnectionManager(unittest.TestCase):
def setUp(self):
conf.initialize_settings()
self.TEST_PEER = Peer(LOCAL_HOST, PEER_PORT)
self.downloader = MocDownloader()
self.rate_limiter = RateLimiter()
self.primary_request_creator = MocRequestCreator([self.TEST_PEER])
self.connection_manager = ConnectionManager(self.downloader, self.rate_limiter,
[self.primary_request_creator], [])
self.clock = task.Clock()
self.connection_manager.callLater = self.clock.callLater
self.connection_manager._start()
self.server_port = None
def tearDown(self):
if self.server_port is not None:
self.server_port.stopListening()
self.connection_manager.stop()
conf.settings = None
@defer.inlineCallbacks
def test_success(self):
# test to see that if we setup a server, we get a connection
self.server = MocServerProtocolFactory(self.clock)
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(1, self.connection_manager.num_peer_connections())
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertTrue(connection_made)
self.assertEqual(1, self.TEST_PEER.success_count)
self.assertEqual(0, self.TEST_PEER.down_count)
@defer.inlineCallbacks
def test_bad_server(self):
# test to see that if we setup a server that returns an improper reply
# we don't get a connection
self.server = MocServerProtocolFactory(self.clock, is_good=False)
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(1, self.connection_manager.num_peer_connections())
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertTrue(connection_made)
self.assertEqual(0, self.TEST_PEER.success_count)
self.assertEqual(1, self.TEST_PEER.down_count)
@defer.inlineCallbacks
def test_non_existing_server(self):
# Test to see that if we don't setup a server, we don't get a connection
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(1, self.connection_manager.num_peer_connections())
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertFalse(connection_made)
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertEqual(0, self.TEST_PEER.success_count)
self.assertEqual(1, self.TEST_PEER.down_count)
@defer.inlineCallbacks
def test_parallel_connections(self):
# Test to see that we make new connection for each manage call,
# without it waiting for the connection to complete
test_peer2 = Peer(LOCAL_HOST, PEER_PORT+1)
self.primary_request_creator.peers_to_return = [self.TEST_PEER, test_peer2]
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(1, self.connection_manager.num_peer_connections())
yield self.connection_manager.manage(schedule_next_call=False)
self.assertEqual(2, self.connection_manager.num_peer_connections())
self.assertIn(self.TEST_PEER, self.connection_manager._peer_connections)
self.assertIn(test_peer2, self.connection_manager._peer_connections)
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
self.assertFalse(connection_made)
self.assertEqual(1, self.connection_manager.num_peer_connections())
self.assertEqual(0, self.TEST_PEER.success_count)
self.assertEqual(1, self.TEST_PEER.down_count)
connection_made = yield self.connection_manager._peer_connections[test_peer2].factory.connection_was_made_deferred
self.assertFalse(connection_made)
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertEqual(0, test_peer2.success_count)
self.assertEqual(1, test_peer2.down_count)
@defer.inlineCallbacks
def test_stop(self):
# test to see that when we call stop, the ConnectionManager waits for the
# current manage call to finish, closes connections,
# and removes scheduled manage calls
self.connection_manager.manage(schedule_next_call=True)
yield self.connection_manager.stop()
self.assertEqual(0, self.TEST_PEER.success_count)
self.assertEqual(1, self.TEST_PEER.down_count)
self.assertEqual(0, self.connection_manager.num_peer_connections())
self.assertEqual(None, self.connection_manager._next_manage_call)