From 36782e087820b61c70c701490a6eb9392dcf5dba Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 4 Nov 2017 20:25:19 -0400 Subject: [PATCH 1/3] use shared deferredSemaphore for daemon methods decorated with 'queued' fixes race condition between publish and channel_new --- lbrynet/daemon/auth/server.py | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index c3171f0fe..c396d13a0 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -191,12 +191,12 @@ class AuthJSONRPCServer(AuthorizedBase): allowed_during_startup = [] def __init__(self, use_authentication=None): - self._call_lock = {} self._use_authentication = ( use_authentication if use_authentication is not None else conf.settings['use_auth_http'] ) self.announced_startup = False self.sessions = {} + self._queued_lock = defer.DeferredSemaphore(1) def setup(self): return NotImplementedError() @@ -363,24 +363,7 @@ class AuthJSONRPCServer(AuthorizedBase): return server.NOT_DONE_YET if is_queued: - d_lock = self._call_lock.get(function_name, False) - if not d_lock: - d = defer.maybeDeferred(function, self, **args_dict) - self._call_lock[function_name] = finished_deferred - - def _del_lock(*args): - if function_name in self._call_lock: - del self._call_lock[function_name] - if args: - return args - - finished_deferred.addCallback(_del_lock) - - else: - log.info("queued %s", function_name) - d = d_lock - d.addBoth(lambda _: log.info("running %s from queue", function_name)) - d.addCallback(lambda _: defer.maybeDeferred(function, self, **args_dict)) + d = self._queued_lock.run(function, self, **args_dict) else: d = defer.maybeDeferred(function, self, **args_dict) From a79a00180de64f1172dfa57f6d3e4fb795f14ab4 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 4 Nov 2017 20:26:11 -0400 Subject: [PATCH 2/3] rename variable --- lbrynet/daemon/auth/server.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index c396d13a0..f2f936992 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -319,7 +319,7 @@ class AuthJSONRPCServer(AuthorizedBase): reply_with_next_secret = True try: - function = self._get_jsonrpc_method(function_name) + fn = self._get_jsonrpc_method(function_name) except UnknownAPIMethodError as err: log.warning('Failed to get function %s: %s', function_name, err) self._render_error( @@ -350,7 +350,7 @@ class AuthJSONRPCServer(AuthorizedBase): # d = defer.maybeDeferred(function, *args) # if we want to support positional args too raise ValueError('Args must be a dict') - params_error, erroneous_params = self._check_params(function, args_dict) + params_error, erroneous_params = self._check_params(fn, args_dict) if params_error is not None: params_error_message = '{} for {} command: {}'.format( params_error, function_name, ', '.join(erroneous_params) @@ -363,9 +363,9 @@ class AuthJSONRPCServer(AuthorizedBase): return server.NOT_DONE_YET if is_queued: - d = self._queued_lock.run(function, self, **args_dict) + d = self._queued_lock.run(fn, self, **args_dict) else: - d = defer.maybeDeferred(function, self, **args_dict) + d = defer.maybeDeferred(fn, self, **args_dict) # finished_deferred will callback when the request is finished # and errback if something went wrong. If the errback is From 43ba33fd80ea7a24e80991b2534f28c6957febe6 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Sat, 4 Nov 2017 20:28:10 -0400 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa8c2cb74..81de6ca14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ at anytime. ### Fixed * Fixed amount of close nodes to add to list in case of extension to neighbouring k-buckets * Fixed external IP detection via jsonip.com (avoid detecting IPv6) - * + * Fixed race condition between `publish` and `channel_new` ### Deprecated * @@ -23,7 +23,7 @@ at anytime. ### Changed * Moved BLOB_SIZE from conf.py to MAX_BLOB_SIZE in blob/blob_file.py - * + * Use shared deferredSemaphore for api methods decorated with `@AuthJSONRPCServer.queued` ### Added * Added `utxo_list` command to list unspent transaction outputs