Refactor client, cli and test_cli, fix delayedCalls not expiring

This commit is contained in:
hackrush 2018-08-16 21:43:54 +05:30 committed by Jack Robison
parent 84c91c480f
commit 2b5e3204c0
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 27 additions and 37 deletions

View file

@ -37,10 +37,7 @@ async def execute_command(method, params, conf_path=None):
# this actually executes the method # this actually executes the method
try: try:
resp = await api.call(method, params) resp = await api.call(method, params)
try:
await api.session.close() await api.session.close()
except AttributeError:
pass
print(json.dumps(resp["result"], indent=2)) print(json.dumps(resp["result"], indent=2))
except KeyError: except KeyError:
if resp["error"]["code"] == -32500: if resp["error"]["code"] == -32500:

View file

@ -22,9 +22,10 @@ class JSONRPCException(Exception):
class UnAuthAPIClient: class UnAuthAPIClient:
def __init__(self, host, port): def __init__(self, host, port, session):
self.host = host self.host = host
self.port = port self.port = port
self.session = session
self.scheme = SCHEME self.scheme = SCHEME
def __getattr__(self, method): def __getattr__(self, method):
@ -38,12 +39,12 @@ class UnAuthAPIClient:
url_fragment = urlparse(url) url_fragment = urlparse(url)
host = url_fragment.hostname host = url_fragment.hostname
port = url_fragment.port port = url_fragment.port
return cls(host, port) session = aiohttp.ClientSession()
return cls(host, port, session)
async def call(self, method, params=None): async def call(self, method, params=None):
message = {'method': method, 'params': params} message = {'method': method, 'params': params}
async with aiohttp.ClientSession() as session: async with self.session.get('{}://{}:{}'.format(self.scheme, self.host, self.port), json=message) as resp:
async with session.get('{}://{}:{}'.format(self.scheme, self.host, self.port), json=message) as resp:
return await resp.json() return await resp.json()

View file

@ -203,14 +203,16 @@ class AuthJSONRPCServer(AuthorizedBase):
self._component_setup_deferred = None self._component_setup_deferred = None
self.announced_startup = False self.announced_startup = False
self.sessions = {} self.sessions = {}
self.server = None
@defer.inlineCallbacks @defer.inlineCallbacks
def start_listening(self): def start_listening(self):
from twisted.internet import reactor, error as tx_error from twisted.internet import reactor, error as tx_error
try: try:
self.server = self.get_server_factory()
self.listening_port = reactor.listenTCP( self.listening_port = reactor.listenTCP(
conf.settings['api_port'], self.get_server_factory(), interface=conf.settings['api_host'] conf.settings['api_port'], self.server, interface=conf.settings['api_host']
) )
log.info("lbrynet API listening on TCP %s:%i", conf.settings['api_host'], conf.settings['api_port']) log.info("lbrynet API listening on TCP %s:%i", conf.settings['api_host'], conf.settings['api_port'])
yield self.setup() yield self.setup()
@ -254,6 +256,8 @@ class AuthJSONRPCServer(AuthorizedBase):
if self.listening_port: if self.listening_port:
self.listening_port.stopListening() self.listening_port.stopListening()
self.looping_call_manager.shutdown() self.looping_call_manager.shutdown()
for session in list(self.server.sessions.values()):
session.expire()
if self.analytics_manager: if self.analytics_manager:
self.analytics_manager.shutdown() self.analytics_manager.shutdown()
try: try:
@ -345,7 +349,6 @@ class AuthJSONRPCServer(AuthorizedBase):
def expire_session(): def expire_session():
self._unregister_user_session(session_id) self._unregister_user_session(session_id)
session.startCheckingExpiration()
session.notifyOnExpire(expire_session) session.notifyOnExpire(expire_session)
message = "OK" message = "OK"
request.setResponseCode(200) request.setResponseCode(200)

View file

@ -1,5 +1,5 @@
import contextlib import contextlib
import unittest from twisted.trial import unittest
from io import StringIO from io import StringIO
from twisted.internet import defer from twisted.internet import defer
@ -12,7 +12,9 @@ from lbrynet.daemon.Components import DATABASE_COMPONENT, BLOB_COMPONENT, HEADER
from lbrynet.daemon.Daemon import Daemon from lbrynet.daemon.Daemon import Daemon
class AuthCLIIntegrationTest(unittest.TestCase): class CLIIntegrationTest(unittest.TestCase):
USE_AUTH = False
@defer.inlineCallbacks @defer.inlineCallbacks
def setUp(self): def setUp(self):
skip = [ skip = [
@ -22,13 +24,21 @@ class AuthCLIIntegrationTest(unittest.TestCase):
RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT
] ]
conf.initialize_settings(load_conf_file=False) conf.initialize_settings(load_conf_file=False)
conf.settings['use_auth_http'] = True conf.settings['use_auth_http'] = self.USE_AUTH
conf.settings["components_to_skip"] = skip conf.settings["components_to_skip"] = skip
conf.settings.initialize_post_conf_load() conf.settings.initialize_post_conf_load()
Daemon.component_attributes = {} Daemon.component_attributes = {}
self.daemon = Daemon() self.daemon = Daemon()
yield self.daemon.start_listening() yield self.daemon.start_listening()
@defer.inlineCallbacks
def tearDown(self):
yield self.daemon._shutdown()
class AuthenticatedCLITest(CLIIntegrationTest):
USE_AUTH = True
def test_cli_status_command_with_auth(self): def test_cli_status_command_with_auth(self):
self.assertTrue(self.daemon._use_authentication) self.assertTrue(self.daemon._use_authentication)
actual_output = StringIO() actual_output = StringIO()
@ -37,26 +47,9 @@ class AuthCLIIntegrationTest(unittest.TestCase):
actual_output = actual_output.getvalue() actual_output = actual_output.getvalue()
self.assertIn("connection_status", actual_output) self.assertIn("connection_status", actual_output)
@defer.inlineCallbacks
def tearDown(self):
yield self.daemon._shutdown()
class UnauthenticatedCLITest(CLIIntegrationTest):
class UnAuthCLIIntegrationTest(unittest.TestCase): USE_AUTH = False
@defer.inlineCallbacks
def setUp(self):
skip = [
DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT,
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT,
RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT
]
conf.initialize_settings(load_conf_file=False)
conf.settings["components_to_skip"] = skip
conf.settings.initialize_post_conf_load()
Daemon.component_attributes = {}
self.daemon = Daemon()
yield self.daemon.start_listening()
def test_cli_status_command_with_auth(self): def test_cli_status_command_with_auth(self):
self.assertFalse(self.daemon._use_authentication) self.assertFalse(self.daemon._use_authentication)
@ -65,7 +58,3 @@ class UnAuthCLIIntegrationTest(unittest.TestCase):
cli.main(["status"]) cli.main(["status"])
actual_output = actual_output.getvalue() actual_output = actual_output.getvalue()
self.assertIn("connection_status", actual_output) self.assertIn("connection_status", actual_output)
@defer.inlineCallbacks
def tearDown(self):
yield self.daemon._shutdown()