decorator for queued api commands

This commit is contained in:
Jack Robison 2017-05-10 01:23:42 -04:00
parent a49039117f
commit 6931d8e586
2 changed files with 33 additions and 1 deletions

View file

@ -1681,6 +1681,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(cost)
@AuthJSONRPCServer.auth_required
@AuthJSONRPCServer.queued
@defer.inlineCallbacks
def jsonrpc_channel_new(self, channel_name, amount):
"""
@ -1735,6 +1736,7 @@ class Daemon(AuthJSONRPCServer):
defer.returnValue(response)
@AuthJSONRPCServer.auth_required
@AuthJSONRPCServer.queued
@defer.inlineCallbacks
def jsonrpc_publish(self, name, bid, metadata=None, file_path=None, fee=None, title=None,
description=None, author=None, language=None, license=None,

View file

@ -114,6 +114,8 @@ class AuthorizedBase(object):
def __init__(self):
self.authorized_functions = []
self.callable_methods = {}
self._call_lock = {}
self._queued_methods = []
for methodname in dir(self):
if methodname.startswith("jsonrpc_"):
@ -121,12 +123,19 @@ class AuthorizedBase(object):
self.callable_methods.update({methodname.split("jsonrpc_")[1]: method})
if hasattr(method, '_auth_required'):
self.authorized_functions.append(methodname.split("jsonrpc_")[1])
if hasattr(method, '_queued'):
self._queued_methods.append(methodname.split("jsonrpc_")[1])
@staticmethod
def auth_required(f):
f._auth_required = True
return f
@staticmethod
def queued(f):
f._queued = True
return f
class AuthJSONRPCServer(AuthorizedBase):
"""Authorized JSONRPC server used as the base class for the LBRY API
@ -254,6 +263,7 @@ class AuthJSONRPCServer(AuthorizedBase):
id_ = None
try:
function_name = parsed.get('method')
is_queued = function_name in self._queued_methods
args = parsed.get('params', {})
id_ = parsed.get('id', None)
token = parsed.pop('hmac', None)
@ -324,7 +334,27 @@ class AuthJSONRPCServer(AuthorizedBase):
)
return server.NOT_DONE_YET
d = defer.maybeDeferred(function, **args_dict)
if is_queued:
d_lock = self._call_lock.get(function_name, False)
if not d_lock:
d = defer.maybeDeferred(function, **args_dict)
self._call_lock[function_name] = finished_deferred
def _del_lock(*args):
if function_name in self._call_lock:
del self._call_lock[function_name]
if args:
return args
finished_deferred.addCallback(_del_lock)
else:
log.info("queued %s", function_name)
d = d_lock
d.addBoth(lambda _: log.info("running %s from queue", function_name))
d.addCallback(lambda _: defer.maybeDeferred(function, **args_dict))
else:
d = defer.maybeDeferred(function, **args_dict)
# finished_deferred will callback when the request is finished
# and errback if something went wrong. If the errback is