forked from LBRYCommunity/lbry-sdk
Merge branch 'use-shared-queued-deferredsemaphore'
This commit is contained in:
commit
81a44fa824
2 changed files with 7 additions and 23 deletions
|
@ -17,6 +17,7 @@ at anytime.
|
||||||
* Fixed external IP detection via jsonip.com (avoid detecting IPv6)
|
* Fixed external IP detection via jsonip.com (avoid detecting IPv6)
|
||||||
* Fixed failing ConnectionManager unit test for parallel connections
|
* Fixed failing ConnectionManager unit test for parallel connections
|
||||||
*
|
*
|
||||||
|
* Fixed race condition between `publish` and `channel_new`
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
*
|
*
|
||||||
|
@ -24,7 +25,7 @@ at anytime.
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
* Moved BLOB_SIZE from conf.py to MAX_BLOB_SIZE in blob/blob_file.py
|
* Moved BLOB_SIZE from conf.py to MAX_BLOB_SIZE in blob/blob_file.py
|
||||||
*
|
* Use shared deferredSemaphore for api methods decorated with `@AuthJSONRPCServer.queued`
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
* Added `utxo_list` command to list unspent transaction outputs
|
* Added `utxo_list` command to list unspent transaction outputs
|
||||||
|
|
|
@ -191,12 +191,12 @@ class AuthJSONRPCServer(AuthorizedBase):
|
||||||
allowed_during_startup = []
|
allowed_during_startup = []
|
||||||
|
|
||||||
def __init__(self, use_authentication=None):
|
def __init__(self, use_authentication=None):
|
||||||
self._call_lock = {}
|
|
||||||
self._use_authentication = (
|
self._use_authentication = (
|
||||||
use_authentication if use_authentication is not None else conf.settings['use_auth_http']
|
use_authentication if use_authentication is not None else conf.settings['use_auth_http']
|
||||||
)
|
)
|
||||||
self.announced_startup = False
|
self.announced_startup = False
|
||||||
self.sessions = {}
|
self.sessions = {}
|
||||||
|
self._queued_lock = defer.DeferredSemaphore(1)
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
return NotImplementedError()
|
return NotImplementedError()
|
||||||
|
@ -319,7 +319,7 @@ class AuthJSONRPCServer(AuthorizedBase):
|
||||||
reply_with_next_secret = True
|
reply_with_next_secret = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
function = self._get_jsonrpc_method(function_name)
|
fn = self._get_jsonrpc_method(function_name)
|
||||||
except UnknownAPIMethodError as err:
|
except UnknownAPIMethodError as err:
|
||||||
log.warning('Failed to get function %s: %s', function_name, err)
|
log.warning('Failed to get function %s: %s', function_name, err)
|
||||||
self._render_error(
|
self._render_error(
|
||||||
|
@ -350,7 +350,7 @@ class AuthJSONRPCServer(AuthorizedBase):
|
||||||
# d = defer.maybeDeferred(function, *args) # if we want to support positional args too
|
# d = defer.maybeDeferred(function, *args) # if we want to support positional args too
|
||||||
raise ValueError('Args must be a dict')
|
raise ValueError('Args must be a dict')
|
||||||
|
|
||||||
params_error, erroneous_params = self._check_params(function, args_dict)
|
params_error, erroneous_params = self._check_params(fn, args_dict)
|
||||||
if params_error is not None:
|
if params_error is not None:
|
||||||
params_error_message = '{} for {} command: {}'.format(
|
params_error_message = '{} for {} command: {}'.format(
|
||||||
params_error, function_name, ', '.join(erroneous_params)
|
params_error, function_name, ', '.join(erroneous_params)
|
||||||
|
@ -363,26 +363,9 @@ class AuthJSONRPCServer(AuthorizedBase):
|
||||||
return server.NOT_DONE_YET
|
return server.NOT_DONE_YET
|
||||||
|
|
||||||
if is_queued:
|
if is_queued:
|
||||||
d_lock = self._call_lock.get(function_name, False)
|
d = self._queued_lock.run(fn, self, **args_dict)
|
||||||
if not d_lock:
|
|
||||||
d = defer.maybeDeferred(function, self, **args_dict)
|
|
||||||
self._call_lock[function_name] = finished_deferred
|
|
||||||
|
|
||||||
def _del_lock(*args):
|
|
||||||
if function_name in self._call_lock:
|
|
||||||
del self._call_lock[function_name]
|
|
||||||
if args:
|
|
||||||
return args
|
|
||||||
|
|
||||||
finished_deferred.addCallback(_del_lock)
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.info("queued %s", function_name)
|
|
||||||
d = d_lock
|
|
||||||
d.addBoth(lambda _: log.info("running %s from queue", function_name))
|
|
||||||
d.addCallback(lambda _: defer.maybeDeferred(function, self, **args_dict))
|
|
||||||
else:
|
else:
|
||||||
d = defer.maybeDeferred(function, self, **args_dict)
|
d = defer.maybeDeferred(fn, self, **args_dict)
|
||||||
|
|
||||||
# finished_deferred will callback when the request is finished
|
# finished_deferred will callback when the request is finished
|
||||||
# and errback if something went wrong. If the errback is
|
# and errback if something went wrong. If the errback is
|
||||||
|
|
Loading…
Reference in a new issue