forked from LBRYCommunity/lbry-sdk
adding timeout to ClientProtocol
This commit is contained in:
parent
fba91139e3
commit
824fbcc25a
3 changed files with 33 additions and 10 deletions
|
@ -19,7 +19,7 @@ at anytime.
|
||||||
*
|
*
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
*
|
* Added timeout to ClientProtocol
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
|
|
||||||
|
|
|
@ -3,8 +3,10 @@ import logging
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from twisted.internet import error, defer
|
from twisted.internet import error, defer
|
||||||
from twisted.internet.protocol import Protocol, ClientFactory
|
from twisted.internet.protocol import Protocol, ClientFactory
|
||||||
|
from twisted.protocols.policies import TimeoutMixin
|
||||||
from twisted.python import failure
|
from twisted.python import failure
|
||||||
from lbrynet import conf
|
from lbrynet import conf
|
||||||
|
from lbrynet.core import utils
|
||||||
from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError
|
from lbrynet.core.Error import ConnectionClosedBeforeResponseError, NoResponseError
|
||||||
from lbrynet.core.Error import DownloadCanceledError, MisbehavingPeerError
|
from lbrynet.core.Error import DownloadCanceledError, MisbehavingPeerError
|
||||||
from lbrynet.core.Error import RequestCanceledError
|
from lbrynet.core.Error import RequestCanceledError
|
||||||
|
@ -21,9 +23,10 @@ def encode_decimal(obj):
|
||||||
raise TypeError(repr(obj) + " is not JSON serializable")
|
raise TypeError(repr(obj) + " is not JSON serializable")
|
||||||
|
|
||||||
|
|
||||||
class ClientProtocol(Protocol):
|
class ClientProtocol(Protocol, TimeoutMixin):
|
||||||
implements(IRequestSender, IRateLimited)
|
implements(IRequestSender, IRateLimited)
|
||||||
######### Protocol #########
|
######### Protocol #########
|
||||||
|
PROTOCOL_TIMEOUT = 30
|
||||||
|
|
||||||
def connectionMade(self):
|
def connectionMade(self):
|
||||||
log.debug("Connection made to %s", self.factory.peer)
|
log.debug("Connection made to %s", self.factory.peer)
|
||||||
|
@ -37,13 +40,15 @@ class ClientProtocol(Protocol):
|
||||||
self._next_request = {}
|
self._next_request = {}
|
||||||
self.connection_closed = False
|
self.connection_closed = False
|
||||||
self.connection_closing = False
|
self.connection_closing = False
|
||||||
|
# This needs to be set for TimeoutMixin
|
||||||
|
self.callLater = utils.call_later
|
||||||
self.peer.report_up()
|
self.peer.report_up()
|
||||||
|
|
||||||
self._ask_for_request()
|
self._ask_for_request()
|
||||||
|
|
||||||
def dataReceived(self, data):
|
def dataReceived(self, data):
|
||||||
log.debug("Data receieved from %s", self.peer)
|
log.debug("Data receieved from %s", self.peer)
|
||||||
|
self.setTimeout(None)
|
||||||
self._rate_limiter.report_dl_bytes(len(data))
|
self._rate_limiter.report_dl_bytes(len(data))
|
||||||
if self._downloading_blob is True:
|
if self._downloading_blob is True:
|
||||||
self._blob_download_request.write(data)
|
self._blob_download_request.write(data)
|
||||||
|
@ -60,8 +65,14 @@ class ClientProtocol(Protocol):
|
||||||
if self._downloading_blob is True and len(extra_data) != 0:
|
if self._downloading_blob is True and len(extra_data) != 0:
|
||||||
self._blob_download_request.write(extra_data)
|
self._blob_download_request.write(extra_data)
|
||||||
|
|
||||||
|
def timeoutConnection(self):
|
||||||
|
log.info("Connection timed out to %s", self.peer)
|
||||||
|
self.peer.report_down()
|
||||||
|
self.transport.abortConnection()
|
||||||
|
|
||||||
def connectionLost(self, reason):
|
def connectionLost(self, reason):
|
||||||
log.debug("Connection lost to %s: %s", self.peer, reason)
|
log.debug("Connection lost to %s: %s", self.peer, reason)
|
||||||
|
self.setTimeout(None)
|
||||||
self.connection_closed = True
|
self.connection_closed = True
|
||||||
if reason.check(error.ConnectionDone):
|
if reason.check(error.ConnectionDone):
|
||||||
err = failure.Failure(ConnectionClosedBeforeResponseError())
|
err = failure.Failure(ConnectionClosedBeforeResponseError())
|
||||||
|
@ -138,6 +149,7 @@ class ClientProtocol(Protocol):
|
||||||
d.addErrback(self._handle_request_error)
|
d.addErrback(self._handle_request_error)
|
||||||
|
|
||||||
def _send_request_message(self, request_msg):
|
def _send_request_message(self, request_msg):
|
||||||
|
self.setTimeout(self.PROTOCOL_TIMEOUT)
|
||||||
# TODO: compare this message to the last one. If they're the same,
|
# TODO: compare this message to the last one. If they're the same,
|
||||||
# TODO: incrementally delay this message.
|
# TODO: incrementally delay this message.
|
||||||
m = json.dumps(request_msg, default=encode_decimal)
|
m = json.dumps(request_msg, default=encode_decimal)
|
||||||
|
|
|
@ -3,9 +3,9 @@ import time
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from lbrynet.core import log_support
|
from lbrynet.core import log_support
|
||||||
#from lbrynet.core.client.ConnectionManager import ConnectionManager
|
|
||||||
from lbrynet.core.client.ClientRequest import ClientRequest
|
from lbrynet.core.client.ClientRequest import ClientRequest
|
||||||
from lbrynet.core.server.ServerProtocol import ServerProtocol
|
from lbrynet.core.server.ServerProtocol import ServerProtocol
|
||||||
|
from lbrynet.core.client.ClientProtocol import ClientProtocol
|
||||||
from lbrynet.core.RateLimiter import RateLimiter
|
from lbrynet.core.RateLimiter import RateLimiter
|
||||||
from lbrynet.core.Peer import Peer
|
from lbrynet.core.Peer import Peer
|
||||||
from lbrynet.core.PeerManager import PeerManager
|
from lbrynet.core.PeerManager import PeerManager
|
||||||
|
@ -71,8 +71,9 @@ class MocFunctionalQueryHandler(object):
|
||||||
def handle_queries(self, queries):
|
def handle_queries(self, queries):
|
||||||
if self.query_identifiers[0] in queries:
|
if self.query_identifiers[0] in queries:
|
||||||
if self.is_delayed:
|
if self.is_delayed:
|
||||||
out = deferLater(self.clock, 10, lambda: {'moc_request':0})
|
delay = ClientProtocol.PROTOCOL_TIMEOUT+1
|
||||||
self.clock.advance(10)
|
out = deferLater(self.clock, delay, lambda: {'moc_request':0})
|
||||||
|
self.clock.advance(delay)
|
||||||
return out
|
return out
|
||||||
if self.is_good:
|
if self.is_good:
|
||||||
return defer.succeed({'moc_request':0})
|
return defer.succeed({'moc_request':0})
|
||||||
|
@ -113,7 +114,6 @@ class MocServerProtocolFactory(ServerFactory):
|
||||||
self.query_handler_factories = {}
|
self.query_handler_factories = {}
|
||||||
self.peer_manager = PeerManager()
|
self.peer_manager = PeerManager()
|
||||||
|
|
||||||
|
|
||||||
class TestIntegrationConnectionManager(unittest.TestCase):
|
class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
|
||||||
|
@ -152,9 +152,7 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
self.assertEqual(0, self.TEST_PEER.down_count)
|
self.assertEqual(0, self.TEST_PEER.down_count)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_bad_server(self):
|
def test_server_with_improper_reply(self):
|
||||||
# test to see that if we setup a server that returns an improper reply
|
|
||||||
# we don't get a connection
|
|
||||||
self.server = MocServerProtocolFactory(self.clock, is_good=False)
|
self.server = MocServerProtocolFactory(self.clock, is_good=False)
|
||||||
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||||
yield self.connection_manager.manage(schedule_next_call=False)
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
|
@ -213,4 +211,17 @@ class TestIntegrationConnectionManager(unittest.TestCase):
|
||||||
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||||
self.assertEqual(None, self.connection_manager._next_manage_call)
|
self.assertEqual(None, self.connection_manager._next_manage_call)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_closed_connection_when_server_is_slow(self):
|
||||||
|
self.server = MocServerProtocolFactory(self.clock, has_moc_query_handler=True,is_delayed=True)
|
||||||
|
self.server_port = reactor.listenTCP(PEER_PORT, self.server, interface=LOCAL_HOST)
|
||||||
|
|
||||||
|
yield self.connection_manager.manage(schedule_next_call=False)
|
||||||
|
self.assertEqual(1, self.connection_manager.num_peer_connections())
|
||||||
|
connection_made = yield self.connection_manager._peer_connections[self.TEST_PEER].factory.connection_was_made_deferred
|
||||||
|
self.assertEqual(0, self.connection_manager.num_peer_connections())
|
||||||
|
self.assertEqual(True, connection_made)
|
||||||
|
self.assertEqual(0, self.TEST_PEER.success_count)
|
||||||
|
self.assertEqual(1, self.TEST_PEER.down_count)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue