From e03bba0b62dbdd82a3f75c20d63eef42e4ecd297 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 9 Jan 2017 14:03:29 -0500 Subject: [PATCH] handle dropped api requests --- lbrynet/lbrynet_daemon/Daemon.py | 15 +---- lbrynet/lbrynet_daemon/auth/server.py | 97 ++++++++++++++++----------- 2 files changed, 60 insertions(+), 52 deletions(-) diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index e5ee1bb51..fe0db41e7 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -1486,20 +1486,7 @@ class Daemon(AuthJSONRPCServer): return self._render_response(None) d = self._resolve_name(name, force_refresh=force) - d.addCallbacks( - lambda info: self._render_response(info), - # TODO: Is server.failure a module? It looks like it: - # - # In [1]: import twisted.web.server - # In [2]: twisted.web.server.failure - # Out[2]: - # - # If so, maybe we should return something else. - errback=log.fail(lambda err: server.failure), - errbackArgs=('Failed to resolve name',), - errbackKeywords={'level': 'INFO'}, - ) + d.addCallback(self._render_response) return d def jsonrpc_get_claim_info(self, p): diff --git a/lbrynet/lbrynet_daemon/auth/server.py b/lbrynet/lbrynet_daemon/auth/server.py index d96a25a45..60ceafb5f 100644 --- a/lbrynet/lbrynet_daemon/auth/server.py +++ b/lbrynet/lbrynet_daemon/auth/server.py @@ -6,10 +6,12 @@ from zope.interface import implements from twisted.web import server, resource from twisted.internet import defer from twisted.python.failure import Failure - +from twisted.internet.error import ConnectionDone, ConnectionLost from txjsonrpc import jsonrpclib -from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError + from lbrynet import conf +from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError +from lbrynet.core import utils from lbrynet.lbrynet_daemon.auth.util import APIKey, get_auth_message from lbrynet.lbrynet_daemon.auth.client import LBRY_SECRET @@ -33,6 +35,18 @@ class JSONRPCException(Exception): return self.err.getTraceback() +class UnknownAPIMethodError(Exception): + pass + + +class NotAllowedDuringStartupError(Exception): + pass + + +def trap(err, *to_trap): + err.trap(*to_trap) + + class AuthorizedBase(object): def __init__(self): self.authorized_functions = [] @@ -93,6 +107,19 @@ class AuthJSONRPCServer(AuthorizedBase): def setup(self): return NotImplementedError() + def _set_headers(self, request, data, update_secret=False): + if conf.settings.allowed_origin: + request.setHeader("Access-Control-Allow-Origin", conf.settings.allowed_origin) + request.setHeader("Content-Type", "text/json") + request.setHeader("Content-Length", str(len(data))) + if update_secret: + session_id = request.getSession().uid + request.setHeader(LBRY_SECRET, self.sessions.get(session_id).secret) + + def _render_message(self, request, message): + request.write(message) + request.finish() + def _render_error(self, failure, request, id_, version=jsonrpclib.VERSION_2, response_code=FAILURE): err = JSONRPCException(Failure(failure), response_code) @@ -100,14 +127,19 @@ class AuthJSONRPCServer(AuthorizedBase): self._set_headers(request, fault) if response_code != AuthJSONRPCServer.FAILURE: request.setResponseCode(response_code) - request.write(fault) - request.finish() + self._render_message(request, fault) + + def _handle_dropped_request(self, result, d, function_name): + if not d.called: + log.warning("Cancelling dropped api request %s", function_name) + d.cancel() def render(self, request): - notify_finish = request.notifyFinish() + time_in = utils.now() assert self._check_headers(request), InvalidHeaderError session = request.getSession() session_id = session.uid + finished_deferred = request.notifyFinish() if self._use_authentication: # if this is a new session, send a new secret and set the expiration @@ -157,9 +189,9 @@ class AuthJSONRPCServer(AuthorizedBase): try: function = self._get_jsonrpc_method(function_name) - except AttributeError as err: - log.warning("Unknown method: %s", function_name) - self._render_error(err, request, id_, version) + except (UnknownAPIMethodError, NotAllowedDuringStartupError) as err: + log.warning('Failed to get function %s: %s', function_name, err) + self._render_error(err, request, version) return server.NOT_DONE_YET if args == EMPTY_PARAMS: @@ -167,13 +199,22 @@ class AuthJSONRPCServer(AuthorizedBase): else: d = defer.maybeDeferred(function, *args) - # cancel the response if the connection is broken - notify_finish.addErrback(self._response_failed, d) + # finished_deferred will callback when the request is finished + # and errback if something went wrong. If the errback is + # called, cancel the deferred stack. This is to prevent + # request.finish() from being called on a closed request. + finished_deferred.addErrback(self._handle_dropped_request, d, function_name) + d.addCallback(self._callback_render, request, id_, version, reply_with_next_secret) - d.addErrback( - log.fail(self._render_error, request, id_, version=version), - 'Failed to process %s', function_name - ) + # TODO: don't trap RuntimeError, which is presently caught to + # handle deferredLists that won't peacefully cancel, namely + # get_lbry_files + d.addErrback(trap, ConnectionDone, ConnectionLost, defer.CancelledError, RuntimeError) + d.addErrback(log.fail(self._render_error, request, version=version), + 'Failed to process %s', function_name) + d.addBoth(lambda _: log.debug("%s took %f", + function_name, + (utils.now() - time_in).total_seconds())) return server.NOT_DONE_YET def _register_user_session(self, session_id): @@ -191,22 +232,6 @@ class AuthJSONRPCServer(AuthorizedBase): log.info("Unregister API session") del self.sessions[session_id] - def _response_failed(self, err, call): - log.debug(err.getTraceback()) - - def _set_headers(self, request, data, update_secret=False): - if conf.settings.allowed_origin: - request.setHeader("Access-Control-Allow-Origin", conf.settings.allowed_origin) - request.setHeader("Content-Type", "text/json") - request.setHeader("Content-Length", str(len(data))) - if update_secret: - session_id = request.getSession().uid - request.setHeader(LBRY_SECRET, self.sessions.get(session_id).secret) - - def _render_message(self, request, message): - request.write(message) - request.finish() - def _check_headers(self, request): return ( self._check_header_source(request, 'Origin') and @@ -239,19 +264,15 @@ class AuthJSONRPCServer(AuthorizedBase): else: return server_port[0], 80 - def _check_function_path(self, function_path): + def _verify_method_is_callable(self, function_path): if function_path not in self.callable_methods: - log.warning("Unknown method: %s", function_path) - return False + raise UnknownAPIMethodError(function_path) if not self.announced_startup: if function_path not in self.allowed_during_startup: - log.warning("Cannot call %s during startup", function_path) - return False - return True + raise NotAllowedDuringStartupError(function_path) def _get_jsonrpc_method(self, function_path): - if not self._check_function_path(function_path): - raise AttributeError(function_path) + self._verify_method_is_callable(function_path) return self.callable_methods.get(function_path) def _initialize_session(self, session_id):