Merge pull request #407 from lbryio/handle-closed-api-requests-rebase
handle dropped api requests
This commit is contained in:
commit
ddf24de16d
2 changed files with 60 additions and 52 deletions
|
@ -1486,20 +1486,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
return self._render_response(None)
|
||||
|
||||
d = self._resolve_name(name, force_refresh=force)
|
||||
d.addCallbacks(
|
||||
lambda info: self._render_response(info),
|
||||
# TODO: Is server.failure a module? It looks like it:
|
||||
#
|
||||
# In [1]: import twisted.web.server
|
||||
# In [2]: twisted.web.server.failure
|
||||
# Out[2]: <module 'twisted.python.failure' from
|
||||
# '.../site-packages/twisted/python/failure.pyc'>
|
||||
#
|
||||
# If so, maybe we should return something else.
|
||||
errback=log.fail(lambda err: server.failure),
|
||||
errbackArgs=('Failed to resolve name',),
|
||||
errbackKeywords={'level': 'INFO'},
|
||||
)
|
||||
d.addCallback(self._render_response)
|
||||
return d
|
||||
|
||||
def jsonrpc_get_claim_info(self, p):
|
||||
|
|
|
@ -6,10 +6,12 @@ from zope.interface import implements
|
|||
from twisted.web import server, resource
|
||||
from twisted.internet import defer
|
||||
from twisted.python.failure import Failure
|
||||
|
||||
from twisted.internet.error import ConnectionDone, ConnectionLost
|
||||
from txjsonrpc import jsonrpclib
|
||||
from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError
|
||||
|
||||
from lbrynet import conf
|
||||
from lbrynet.core.Error import InvalidAuthenticationToken, InvalidHeaderError
|
||||
from lbrynet.core import utils
|
||||
from lbrynet.lbrynet_daemon.auth.util import APIKey, get_auth_message
|
||||
from lbrynet.lbrynet_daemon.auth.client import LBRY_SECRET
|
||||
|
||||
|
@ -33,6 +35,18 @@ class JSONRPCException(Exception):
|
|||
return self.err.getTraceback()
|
||||
|
||||
|
||||
class UnknownAPIMethodError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NotAllowedDuringStartupError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def trap(err, *to_trap):
|
||||
err.trap(*to_trap)
|
||||
|
||||
|
||||
class AuthorizedBase(object):
|
||||
def __init__(self):
|
||||
self.authorized_functions = []
|
||||
|
@ -93,6 +107,19 @@ class AuthJSONRPCServer(AuthorizedBase):
|
|||
def setup(self):
|
||||
return NotImplementedError()
|
||||
|
||||
def _set_headers(self, request, data, update_secret=False):
|
||||
if conf.settings.allowed_origin:
|
||||
request.setHeader("Access-Control-Allow-Origin", conf.settings.allowed_origin)
|
||||
request.setHeader("Content-Type", "text/json")
|
||||
request.setHeader("Content-Length", str(len(data)))
|
||||
if update_secret:
|
||||
session_id = request.getSession().uid
|
||||
request.setHeader(LBRY_SECRET, self.sessions.get(session_id).secret)
|
||||
|
||||
def _render_message(self, request, message):
|
||||
request.write(message)
|
||||
request.finish()
|
||||
|
||||
def _render_error(self, failure, request, id_,
|
||||
version=jsonrpclib.VERSION_2, response_code=FAILURE):
|
||||
err = JSONRPCException(Failure(failure), response_code)
|
||||
|
@ -100,14 +127,19 @@ class AuthJSONRPCServer(AuthorizedBase):
|
|||
self._set_headers(request, fault)
|
||||
if response_code != AuthJSONRPCServer.FAILURE:
|
||||
request.setResponseCode(response_code)
|
||||
request.write(fault)
|
||||
request.finish()
|
||||
self._render_message(request, fault)
|
||||
|
||||
def _handle_dropped_request(self, result, d, function_name):
|
||||
if not d.called:
|
||||
log.warning("Cancelling dropped api request %s", function_name)
|
||||
d.cancel()
|
||||
|
||||
def render(self, request):
|
||||
notify_finish = request.notifyFinish()
|
||||
time_in = utils.now()
|
||||
assert self._check_headers(request), InvalidHeaderError
|
||||
session = request.getSession()
|
||||
session_id = session.uid
|
||||
finished_deferred = request.notifyFinish()
|
||||
|
||||
if self._use_authentication:
|
||||
# if this is a new session, send a new secret and set the expiration
|
||||
|
@ -157,9 +189,9 @@ class AuthJSONRPCServer(AuthorizedBase):
|
|||
|
||||
try:
|
||||
function = self._get_jsonrpc_method(function_name)
|
||||
except AttributeError as err:
|
||||
log.warning("Unknown method: %s", function_name)
|
||||
self._render_error(err, request, id_, version)
|
||||
except (UnknownAPIMethodError, NotAllowedDuringStartupError) as err:
|
||||
log.warning('Failed to get function %s: %s', function_name, err)
|
||||
self._render_error(err, request, version)
|
||||
return server.NOT_DONE_YET
|
||||
|
||||
if args == EMPTY_PARAMS:
|
||||
|
@ -167,13 +199,22 @@ class AuthJSONRPCServer(AuthorizedBase):
|
|||
else:
|
||||
d = defer.maybeDeferred(function, *args)
|
||||
|
||||
# cancel the response if the connection is broken
|
||||
notify_finish.addErrback(self._response_failed, d)
|
||||
# finished_deferred will callback when the request is finished
|
||||
# and errback if something went wrong. If the errback is
|
||||
# called, cancel the deferred stack. This is to prevent
|
||||
# request.finish() from being called on a closed request.
|
||||
finished_deferred.addErrback(self._handle_dropped_request, d, function_name)
|
||||
|
||||
d.addCallback(self._callback_render, request, id_, version, reply_with_next_secret)
|
||||
d.addErrback(
|
||||
log.fail(self._render_error, request, id_, version=version),
|
||||
'Failed to process %s', function_name
|
||||
)
|
||||
# TODO: don't trap RuntimeError, which is presently caught to
|
||||
# handle deferredLists that won't peacefully cancel, namely
|
||||
# get_lbry_files
|
||||
d.addErrback(trap, ConnectionDone, ConnectionLost, defer.CancelledError, RuntimeError)
|
||||
d.addErrback(log.fail(self._render_error, request, version=version),
|
||||
'Failed to process %s', function_name)
|
||||
d.addBoth(lambda _: log.debug("%s took %f",
|
||||
function_name,
|
||||
(utils.now() - time_in).total_seconds()))
|
||||
return server.NOT_DONE_YET
|
||||
|
||||
def _register_user_session(self, session_id):
|
||||
|
@ -191,22 +232,6 @@ class AuthJSONRPCServer(AuthorizedBase):
|
|||
log.info("Unregister API session")
|
||||
del self.sessions[session_id]
|
||||
|
||||
def _response_failed(self, err, call):
|
||||
log.debug(err.getTraceback())
|
||||
|
||||
def _set_headers(self, request, data, update_secret=False):
|
||||
if conf.settings.allowed_origin:
|
||||
request.setHeader("Access-Control-Allow-Origin", conf.settings.allowed_origin)
|
||||
request.setHeader("Content-Type", "text/json")
|
||||
request.setHeader("Content-Length", str(len(data)))
|
||||
if update_secret:
|
||||
session_id = request.getSession().uid
|
||||
request.setHeader(LBRY_SECRET, self.sessions.get(session_id).secret)
|
||||
|
||||
def _render_message(self, request, message):
|
||||
request.write(message)
|
||||
request.finish()
|
||||
|
||||
def _check_headers(self, request):
|
||||
return (
|
||||
self._check_header_source(request, 'Origin') and
|
||||
|
@ -239,19 +264,15 @@ class AuthJSONRPCServer(AuthorizedBase):
|
|||
else:
|
||||
return server_port[0], 80
|
||||
|
||||
def _check_function_path(self, function_path):
|
||||
def _verify_method_is_callable(self, function_path):
|
||||
if function_path not in self.callable_methods:
|
||||
log.warning("Unknown method: %s", function_path)
|
||||
return False
|
||||
raise UnknownAPIMethodError(function_path)
|
||||
if not self.announced_startup:
|
||||
if function_path not in self.allowed_during_startup:
|
||||
log.warning("Cannot call %s during startup", function_path)
|
||||
return False
|
||||
return True
|
||||
raise NotAllowedDuringStartupError(function_path)
|
||||
|
||||
def _get_jsonrpc_method(self, function_path):
|
||||
if not self._check_function_path(function_path):
|
||||
raise AttributeError(function_path)
|
||||
self._verify_method_is_callable(function_path)
|
||||
return self.callable_methods.get(function_path)
|
||||
|
||||
def _initialize_session(self, session_id):
|
||||
|
|
Loading…
Reference in a new issue