Added Authenticated API Client Integration Test

This commit is contained in:
hackrush 2018-08-10 16:06:05 +05:30 committed by Jack Robison
parent 083bf4f61d
commit 6a8963d807
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
2 changed files with 40 additions and 3 deletions

View file

@ -204,6 +204,7 @@ class AuthJSONRPCServer(AuthorizedBase):
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()}
self._use_authentication = use_authentication or conf.settings['use_auth_http']
self.listening_port = None
self._component_setup_deferred = None
self.announced_startup = False
self.sessions = {}
@ -213,7 +214,7 @@ class AuthJSONRPCServer(AuthorizedBase):
from twisted.internet import reactor, error as tx_error
try:
reactor.listenTCP(
self.listening_port = reactor.listenTCP(
conf.settings['api_port'], self.get_server_factory(), interface=conf.settings['api_host']
)
log.info("lbrynet API listening on TCP %s:%i", conf.settings['api_host'], conf.settings['api_port'])
@ -255,6 +256,7 @@ class AuthJSONRPCServer(AuthorizedBase):
# ignore INT/TERM signals once shutdown has started
signal.signal(signal.SIGINT, self._already_shutting_down)
signal.signal(signal.SIGTERM, self._already_shutting_down)
self.listening_port.stopListening()
self.looping_call_manager.shutdown()
if self.analytics_manager:
self.analytics_manager.shutdown()

View file

@ -1,6 +1,41 @@
import contextlib
import unittest
from io import StringIO
from twisted.internet import defer
from lbrynet import conf
from lbrynet import cli
from lbrynet.daemon.Components import DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT, \
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT, \
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, \
RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT
from lbrynet.daemon.Daemon import Daemon
class CLIIntegrationTest:
pass
class AuthCLIIntegrationTest(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
skip = [
DATABASE_COMPONENT, BLOB_COMPONENT, HEADERS_COMPONENT, WALLET_COMPONENT,
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, STREAM_IDENTIFIER_COMPONENT, FILE_MANAGER_COMPONENT,
PEER_PROTOCOL_SERVER_COMPONENT, REFLECTOR_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT,
RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT
]
conf.initialize_settings(load_conf_file=False)
conf.settings['use_auth_http'] = True
conf.settings["components_to_skip"] = skip
conf.settings.initialize_post_conf_load()
Daemon.component_attributes = {}
self.daemon = Daemon()
yield self.daemon.start_listening()
def test_cli_status_command_with_auth(self):
actual_output = StringIO()
with contextlib.redirect_stdout(actual_output):
cli.main(["status"])
actual_output = actual_output.getvalue()
self.assertIn("connection_status", actual_output)
@defer.inlineCallbacks
def tearDown(self):
yield self.daemon._shutdown()