use shared deferredSemaphore for daemon methods decorated with 'queued'

fixes race condition between publish and channel_new
This commit is contained in:
Jack Robison 2017-11-04 20:25:19 -04:00
parent 5c2d1ec3c2
commit 36782e0878
No known key found for this signature in database
GPG key ID: 284699E7404E3CFF

View file

@ -191,12 +191,12 @@ class AuthJSONRPCServer(AuthorizedBase):
allowed_during_startup = []
def __init__(self, use_authentication=None):
self._call_lock = {}
self._use_authentication = (
use_authentication if use_authentication is not None else conf.settings['use_auth_http']
)
self.announced_startup = False
self.sessions = {}
self._queued_lock = defer.DeferredSemaphore(1)
def setup(self):
return NotImplementedError()
@ -363,24 +363,7 @@ class AuthJSONRPCServer(AuthorizedBase):
return server.NOT_DONE_YET
if is_queued:
d_lock = self._call_lock.get(function_name, False)
if not d_lock:
d = defer.maybeDeferred(function, self, **args_dict)
self._call_lock[function_name] = finished_deferred
def _del_lock(*args):
if function_name in self._call_lock:
del self._call_lock[function_name]
if args:
return args
finished_deferred.addCallback(_del_lock)
else:
log.info("queued %s", function_name)
d = d_lock
d.addBoth(lambda _: log.info("running %s from queue", function_name))
d.addCallback(lambda _: defer.maybeDeferred(function, self, **args_dict))
d = self._queued_lock.run(function, self, **args_dict)
else:
d = defer.maybeDeferred(function, self, **args_dict)