Merge branch 'ssl'

This commit is contained in:
Jack Robison 2018-09-26 11:19:48 -04:00
commit 3d36e36173
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
10 changed files with 242 additions and 164 deletions

View file

@ -278,6 +278,7 @@ ADJUSTABLE_SETTINGS = {
'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY 'share_usage_data': (bool, True), # whether to share usage stats and diagnostic info with LBRY
'peer_search_timeout': (int, 60), 'peer_search_timeout': (int, 60),
'use_auth_http': (bool, False), 'use_auth_http': (bool, False),
'use_https': (bool, False),
'use_upnp': (bool, True), 'use_upnp': (bool, True),
'use_keyring': (bool, False), 'use_keyring': (bool, False),
'wallet': (str, LBRYUM_WALLET), 'wallet': (str, LBRYUM_WALLET),
@ -573,11 +574,14 @@ class Config:
""" """
return os.path.join(self.ensure_data_dir(), self['LOG_FILE_NAME']) return os.path.join(self.ensure_data_dir(), self['LOG_FILE_NAME'])
def get_api_connection_string(self): def get_api_connection_string(self, user: str = None, password: str = None) -> str:
return 'http://%s:%i/%s' % (self['api_host'], self['api_port'], self['API_ADDRESS']) return 'http%s://%s%s:%i/%s' % (
"" if not self['use_https'] else "s",
def get_ui_address(self): "" if not (user and password) else "%s:%s@" % (user, password),
return 'http://%s:%i' % (self['api_host'], self['api_port']) self['api_host'],
self['api_port'],
self['API_ADDRESS']
)
def get_db_revision_filename(self): def get_db_revision_filename(self):
return os.path.join(self.ensure_data_dir(), self['DB_REVISION_FILE_NAME']) return os.path.join(self.ensure_data_dir(), self['DB_REVISION_FILE_NAME'])

View file

@ -215,8 +215,8 @@ class Daemon(AuthJSONRPCServer):
Checker.INTERNET_CONNECTION[1]) Checker.INTERNET_CONNECTION[1])
} }
AuthJSONRPCServer.__init__(self, analytics_manager=analytics_manager, component_manager=component_manager, AuthJSONRPCServer.__init__(self, analytics_manager=analytics_manager, component_manager=component_manager,
use_authentication=conf.settings['use_auth_http'], to_skip=to_skip, use_authentication=conf.settings['use_auth_http'],
looping_calls=looping_calls) use_https=conf.settings['use_https'], to_skip=to_skip, looping_calls=looping_calls)
self.is_first_run = is_first_run() self.is_first_run = is_first_run()
# TODO: move this to a component # TODO: move this to a component

View file

@ -3,7 +3,7 @@ from zope.interface import implementer
from twisted.cred import portal, checkers, credentials, error as cred_error from twisted.cred import portal, checkers, credentials, error as cred_error
from twisted.internet import defer from twisted.internet import defer
from twisted.web import resource from twisted.web import resource
from lbrynet.daemon.auth.util import load_api_keys from .keyring import Keyring
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -24,29 +24,11 @@ class HttpPasswordRealm:
class PasswordChecker: class PasswordChecker:
credentialInterfaces = (credentials.IUsernamePassword,) credentialInterfaces = (credentials.IUsernamePassword,)
def __init__(self, passwords): def __init__(self, keyring: Keyring):
self.passwords = passwords self.api_key = keyring.api_key
@classmethod
def load_file(cls, key_path):
keys = load_api_keys(key_path)
return cls.load(keys)
@classmethod
def load(cls, password_dict):
passwords = {key: password_dict[key].secret for key in password_dict}
log.info("Loaded %i api key(s)", len(passwords))
return cls(passwords)
def requestAvatarId(self, creds): def requestAvatarId(self, creds):
password_dict_bytes = {} if creds.checkPassword(self.api_key.secret.encode()) and creds.username == self.api_key.name.encode():
for api in self.passwords:
password_dict_bytes.update({api.encode(): self.passwords[api].encode()})
if creds.username in password_dict_bytes:
pw = password_dict_bytes.get(creds.username)
pw_match = creds.checkPassword(pw)
if pw_match:
return defer.succeed(creds.username) return defer.succeed(creds.username)
log.warning('Incorrect username or password') log.warning('Incorrect username or password')
return defer.fail(cred_error.UnauthorizedLogin('Incorrect username or password')) return defer.fail(cred_error.UnauthorizedLogin('Incorrect username or password'))

View file

@ -1,18 +1,17 @@
import os
import json import json
import aiohttp import aiohttp
import logging import logging
from urllib.parse import urlparse from urllib.parse import urlparse
from lbrynet import conf from lbrynet import conf
from lbrynet.daemon.auth.util import load_api_keys, APIKey, API_KEY_NAME, get_auth_message from lbrynet.daemon.auth.keyring import Keyring, APIKey
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
USER_AGENT = "AuthServiceProxy/0.1" USER_AGENT = "AuthServiceProxy/0.1"
TWISTED_SECURE_SESSION = "TWISTED_SECURE_SESSION"
TWISTED_SESSION = "TWISTED_SESSION" TWISTED_SESSION = "TWISTED_SESSION"
LBRY_SECRET = "LBRY_SECRET" LBRY_SECRET = "LBRY_SECRET"
HTTP_TIMEOUT = 30 HTTP_TIMEOUT = 30
SCHEME = "http"
class JSONRPCException(Exception): class JSONRPCException(Exception):
@ -26,7 +25,6 @@ class UnAuthAPIClient:
self.host = host self.host = host
self.port = port self.port = port
self.session = session self.session = session
self.scheme = SCHEME
def __getattr__(self, method): def __getattr__(self, method):
async def f(*args, **kwargs): async def f(*args, **kwargs):
@ -39,12 +37,15 @@ class UnAuthAPIClient:
url_fragment = urlparse(url) url_fragment = urlparse(url)
host = url_fragment.hostname host = url_fragment.hostname
port = url_fragment.port port = url_fragment.port
session = aiohttp.ClientSession() connector = aiohttp.TCPConnector(
ssl=None if not conf.settings['use_https'] else Keyring.load_from_disk().ssl_context
)
session = aiohttp.ClientSession(connector=connector)
return cls(host, port, session) return cls(host, port, session)
async def call(self, method, params=None): async def call(self, method, params=None):
message = {'method': method, 'params': params} message = {'method': method, 'params': params}
async with self.session.get('{}://{}:{}'.format(self.scheme, self.host, self.port), json=message) as resp: async with self.session.get(conf.settings.get_api_connection_string(), json=message) as resp:
return await resp.json() return await resp.json()
@ -76,7 +77,7 @@ class AuthAPIClient:
'params': params, 'params': params,
'id': self.__id_count 'id': self.__id_count
} }
to_auth = get_auth_message(pre_auth_post_data) to_auth = json.dumps(pre_auth_post_data, sort_keys=True)
auth_msg = self.__api_key.get_hmac(to_auth).decode() auth_msg = self.__api_key.get_hmac(to_auth).decode()
pre_auth_post_data.update({'hmac': auth_msg}) pre_auth_post_data.update({'hmac': auth_msg})
post_data = json.dumps(pre_auth_post_data) post_data = json.dumps(pre_auth_post_data)
@ -100,14 +101,11 @@ class AuthAPIClient:
@classmethod @classmethod
async def get_client(cls, key_name=None): async def get_client(cls, key_name=None):
api_key_name = key_name or API_KEY_NAME api_key_name = key_name or "api"
keyring = Keyring.load_from_disk()
pw_path = os.path.join(conf.settings['data_dir'], ".api_keys") api_key = keyring.api_key
keys = load_api_keys(pw_path) login_url = conf.settings.get_api_connection_string(api_key_name, api_key.secret)
api_key = keys.get(api_key_name, False)
login_url = "http://{}:{}@{}:{}".format(api_key_name, api_key.secret, conf.settings['api_host'],
conf.settings['api_port'])
url = urlparse(login_url) url = urlparse(login_url)
headers = { headers = {
@ -115,14 +113,13 @@ class AuthAPIClient:
'User-Agent': USER_AGENT, 'User-Agent': USER_AGENT,
'Content-type': 'application/json' 'Content-type': 'application/json'
} }
connector = aiohttp.TCPConnector(ssl=None if not conf.settings['use_https'] else keyring.ssl_context)
session = aiohttp.ClientSession() session = aiohttp.ClientSession(connector=connector)
async with session.post(login_url, headers=headers) as r: async with session.post(login_url, headers=headers) as r:
cookies = r.cookies cookies = r.cookies
uid = cookies.get(TWISTED_SECURE_SESSION if conf.settings['use_https'] else TWISTED_SESSION).value
uid = cookies.get(TWISTED_SESSION).value api_key = APIKey.create(seed=uid.encode())
api_key = APIKey.new(seed=uid.encode())
return cls(api_key, session, cookies, url, login_url) return cls(api_key, session, cookies, url, login_url)

View file

@ -1,16 +1,28 @@
import logging import logging
import os
from twisted.web import server, guard, resource from twisted.web import server, guard, resource
from twisted.cred import portal from twisted.cred import portal
from lbrynet import conf from lbrynet import conf
from .auth import PasswordChecker, HttpPasswordRealm from .auth import PasswordChecker, HttpPasswordRealm
from .util import initialize_api_key_file from ..auth.keyring import Keyring
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class HTTPJSONRPCFactory(server.Site):
def __init__(self, resource, keyring, requestFactory=None, *args, **kwargs):
super().__init__(resource, requestFactory=requestFactory, *args, **kwargs)
self.use_ssl = False
class HTTPSJSONRPCFactory(server.Site):
def __init__(self, resource, keyring, requestFactory=None, *args, **kwargs):
super().__init__(resource, requestFactory=requestFactory, *args, **kwargs)
self.options = keyring.private_certificate.options()
self.use_ssl = True
class AuthJSONRPCResource(resource.Resource): class AuthJSONRPCResource(resource.Resource):
def __init__(self, protocol): def __init__(self, protocol):
resource.Resource.__init__(self) resource.Resource.__init__(self)
@ -22,17 +34,17 @@ class AuthJSONRPCResource(resource.Resource):
request.setHeader('expires', '0') request.setHeader('expires', '0')
return self if name == '' else resource.Resource.getChild(self, name, request) return self if name == '' else resource.Resource.getChild(self, name, request)
def getServerFactory(self): def getServerFactory(self, keyring: Keyring, use_authentication: bool, use_https: bool) -> server.Site:
if conf.settings['use_auth_http']: factory_class = HTTPSJSONRPCFactory if use_https else HTTPJSONRPCFactory
if use_authentication:
log.info("Using authenticated API") log.info("Using authenticated API")
pw_path = os.path.join(conf.settings['data_dir'], ".api_keys") checker = PasswordChecker(keyring)
initialize_api_key_file(pw_path)
checker = PasswordChecker.load_file(pw_path)
realm = HttpPasswordRealm(self) realm = HttpPasswordRealm(self)
portal_to_realm = portal.Portal(realm, [checker, ]) portal_to_realm = portal.Portal(realm, [checker, ])
factory = guard.BasicCredentialFactory('Login to lbrynet api') root = guard.HTTPAuthSessionWrapper(
root = guard.HTTPAuthSessionWrapper(portal_to_realm, [factory, ]) portal_to_realm, [guard.BasicCredentialFactory('Login to lbrynet api'), ]
)
else: else:
log.info("Using non-authenticated API") log.info("Using non-authenticated API")
root = self root = self
return server.Site(root) return factory_class(root, keyring)

View file

@ -0,0 +1,130 @@
import os
import datetime
import hmac
import hashlib
import base58
from OpenSSL.crypto import FILETYPE_PEM
from ssl import create_default_context, SSLContext
from cryptography.hazmat.backends import default_backend
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.x509.name import NameOID, NameAttribute
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from twisted.internet import ssl
import logging
from lbrynet import conf
log = logging.getLogger(__name__)
def sha(x: bytes) -> str:
h = hashlib.sha256(x).digest()
return base58.b58encode(h).decode()
def generate_key(x: bytes = None) -> str:
if not x:
return sha(os.urandom(256))
else:
return sha(x)
class APIKey:
def __init__(self, secret: str, name: str):
self.secret = secret
self.name = name
@classmethod
def create(cls, seed=None, name=None):
secret = generate_key(seed)
return APIKey(secret, name)
def _raw_key(self) -> str:
return base58.b58decode(self.secret)
def get_hmac(self, message) -> str:
decoded_key = self._raw_key()
signature = hmac.new(decoded_key, message.encode(), hashlib.sha256)
return base58.b58encode(signature.digest())
def compare_hmac(self, message, token) -> bool:
decoded_token = base58.b58decode(token)
target = base58.b58decode(self.get_hmac(message))
try:
if len(decoded_token) != len(target):
return False
return hmac.compare_digest(decoded_token, target)
except:
return False
class Keyring:
encoding = serialization.Encoding.PEM
filetype = FILETYPE_PEM
def __init__(self, api_key: APIKey, public_certificate: str, private_certificate: ssl.PrivateCertificate = None):
self.api_key: APIKey = api_key
self.public_certificate: str = public_certificate
self.private_certificate: (ssl.PrivateCertificate or None) = private_certificate
self.ssl_context: SSLContext = create_default_context(cadata=self.public_certificate)
@classmethod
def load_from_disk(cls):
api_key_path = os.path.join(conf.settings['data_dir'], 'auth_token')
api_ssl_cert_path = os.path.join(conf.settings['data_dir'], 'api_ssl_cert.pem')
if not os.path.isfile(api_key_path) or not os.path.isfile(api_ssl_cert_path):
return
with open(api_key_path, 'rb') as f:
api_key = APIKey(f.read().decode(), "api")
with open(api_ssl_cert_path, 'rb') as f:
public_cert = f.read().decode()
return cls(api_key, public_cert)
@classmethod
def generate_and_save(cls):
dns = conf.settings['api_host']
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
backend=default_backend()
)
subject = issuer = x509.Name([
NameAttribute(NameOID.COUNTRY_NAME, "US"),
NameAttribute(NameOID.ORGANIZATION_NAME, "LBRY"),
NameAttribute(NameOID.COMMON_NAME, "LBRY API"),
])
alternative_name = x509.SubjectAlternativeName([x509.DNSName(dns)])
certificate = x509.CertificateBuilder(
subject_name=subject,
issuer_name=issuer,
public_key=private_key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=datetime.datetime.utcnow(),
not_valid_after=datetime.datetime.utcnow() + datetime.timedelta(days=365),
extensions=[x509.Extension(oid=alternative_name.oid, critical=False, value=alternative_name)]
).sign(private_key, hashes.SHA256(), default_backend())
public_certificate = certificate.public_bytes(cls.encoding).decode()
private_certificate = ssl.PrivateCertificate.load(
public_certificate,
ssl.KeyPair.load(
private_key.private_bytes(
encoding=cls.encoding,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
).decode(),
cls.filetype
),
cls.filetype
)
auth_token = APIKey.create(seed=None, name="api")
with open(os.path.join(conf.settings['data_dir'], 'auth_token'), 'wb') as f:
f.write(auth_token.secret.encode())
with open(os.path.join(conf.settings['data_dir'], 'api_ssl_cert.pem'), 'wb') as f:
f.write(public_certificate.encode())
return cls(auth_token, public_certificate, private_certificate)

View file

@ -18,13 +18,14 @@ from lbrynet.core import utils
from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet from lbrynet.core.Error import ComponentsNotStarted, ComponentStartConditionNotMet
from lbrynet.core.looping_call_manager import LoopingCallManager from lbrynet.core.looping_call_manager import LoopingCallManager
from lbrynet.daemon.ComponentManager import ComponentManager from lbrynet.daemon.ComponentManager import ComponentManager
from .util import APIKey, get_auth_message, LBRY_SECRET from .keyring import APIKey, Keyring
from .undecorated import undecorated from .undecorated import undecorated
from .factory import AuthJSONRPCResource from .factory import AuthJSONRPCResource
from lbrynet.daemon.json_response_encoder import JSONResponseEncoder from lbrynet.daemon.json_response_encoder import JSONResponseEncoder
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
EMPTY_PARAMS = [{}] EMPTY_PARAMS = [{}]
LBRY_SECRET = "LBRY_SECRET"
class JSONRPCError: class JSONRPCError:
@ -186,8 +187,8 @@ class AuthJSONRPCServer(AuthorizedBase):
allowed_during_startup = [] allowed_during_startup = []
component_attributes = {} component_attributes = {}
def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, to_skip=None, def __init__(self, analytics_manager=None, component_manager=None, use_authentication=None, use_https=None,
looping_calls=None, reactor=None): to_skip=None, looping_calls=None, reactor=None):
if not reactor: if not reactor:
from twisted.internet import reactor from twisted.internet import reactor
self.analytics_manager = analytics_manager or analytics.Manager.new_instance() self.analytics_manager = analytics_manager or analytics.Manager.new_instance()
@ -199,11 +200,13 @@ class AuthJSONRPCServer(AuthorizedBase):
self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()}) self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()})
self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()} self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()}
self._use_authentication = use_authentication or conf.settings['use_auth_http'] self._use_authentication = use_authentication or conf.settings['use_auth_http']
self._use_https = use_https or conf.settings['use_https']
self.listening_port = None self.listening_port = None
self._component_setup_deferred = None self._component_setup_deferred = None
self.announced_startup = False self.announced_startup = False
self.sessions = {} self.sessions = {}
self.server = None self.server = None
self.keyring = Keyring.generate_and_save()
@defer.inlineCallbacks @defer.inlineCallbacks
def start_listening(self): def start_listening(self):
@ -211,6 +214,13 @@ class AuthJSONRPCServer(AuthorizedBase):
try: try:
self.server = self.get_server_factory() self.server = self.get_server_factory()
if self.server.use_ssl:
log.info("Using SSL")
self.listening_port = reactor.listenSSL(
conf.settings['api_port'], self.server, self.server.options, interface=conf.settings['api_host']
)
else:
log.info("Not using SSL")
self.listening_port = reactor.listenTCP( self.listening_port = reactor.listenTCP(
conf.settings['api_port'], self.server, interface=conf.settings['api_host'] conf.settings['api_port'], self.server, interface=conf.settings['api_host']
) )
@ -274,7 +284,7 @@ class AuthJSONRPCServer(AuthorizedBase):
return d return d
def get_server_factory(self): def get_server_factory(self):
return AuthJSONRPCResource(self).getServerFactory() return AuthJSONRPCResource(self).getServerFactory(self.keyring, self._use_authentication, self._use_https)
def _set_headers(self, request, data, update_secret=False): def _set_headers(self, request, data, update_secret=False):
if conf.settings['allowed_origin']: if conf.settings['allowed_origin']:
@ -460,7 +470,7 @@ class AuthJSONRPCServer(AuthorizedBase):
@return: secret @return: secret
""" """
log.info("Started new api session") log.info("Started new api session")
token = APIKey.new(seed=session_id) token = APIKey.create(seed=session_id)
self.sessions.update({session_id: token}) self.sessions.update({session_id: token})
def _unregister_user_session(self, session_id): def _unregister_user_session(self, session_id):
@ -565,13 +575,13 @@ class AuthJSONRPCServer(AuthorizedBase):
def _verify_token(self, session_id, message, token): def _verify_token(self, session_id, message, token):
if token is None: if token is None:
raise InvalidAuthenticationToken('Authentication token not found') raise InvalidAuthenticationToken('Authentication token not found')
to_auth = get_auth_message(message) to_auth = json.dumps(message, sort_keys=True)
api_key = self.sessions.get(session_id) api_key = self.sessions.get(session_id)
if not api_key.compare_hmac(to_auth, token): if not api_key.compare_hmac(to_auth, token):
raise InvalidAuthenticationToken('Invalid authentication token') raise InvalidAuthenticationToken('Invalid authentication token')
def _update_session_secret(self, session_id): def _update_session_secret(self, session_id):
self.sessions.update({session_id: APIKey.new(name=session_id)}) self.sessions.update({session_id: APIKey.create(name=session_id)})
def _callback_render(self, result, request, id_, auth_required=False): def _callback_render(self, result, request, id_, auth_required=False):
try: try:

View file

@ -1,92 +0,0 @@
import base58
import hmac
import hashlib
import yaml
import os
import json
import logging
log = logging.getLogger(__name__)
API_KEY_NAME = "api"
LBRY_SECRET = "LBRY_SECRET"
def sha(x: bytes) -> bytes:
h = hashlib.sha256(x).digest()
return base58.b58encode(h)
def generate_key(x: bytes = None) -> bytes:
if x is None:
return sha(os.urandom(256))
else:
return sha(x)
class APIKey:
def __init__(self, secret, name, expiration=None):
self.secret = secret
self.name = name
self.expiration = expiration
@classmethod
def new(cls, seed=None, name=None, expiration=None):
secret = generate_key(seed)
key_name = name if name else sha(secret)
return APIKey(secret, key_name, expiration)
def _raw_key(self):
return base58.b58decode(self.secret)
def get_hmac(self, message):
decoded_key = self._raw_key()
signature = hmac.new(decoded_key, message.encode(), hashlib.sha256)
return base58.b58encode(signature.digest())
def compare_hmac(self, message, token):
decoded_token = base58.b58decode(token)
target = base58.b58decode(self.get_hmac(message))
try:
if len(decoded_token) != len(target):
return False
return hmac.compare_digest(decoded_token, target)
except:
return False
def load_api_keys(path):
if not os.path.isfile(path):
raise Exception("Invalid api key path")
with open(path, "r") as f:
data = yaml.load(f.read())
keys_for_return = {}
for key_name in data:
key = data[key_name]
secret = key['secret'].decode()
expiration = key['expiration']
keys_for_return.update({key_name: APIKey(secret, key_name, expiration)})
return keys_for_return
def save_api_keys(keys, path):
with open(path, "w") as f:
key_dict = {keys[key_name].name: {'secret': keys[key_name].secret,
'expiration': keys[key_name].expiration}
for key_name in keys}
data = yaml.safe_dump(key_dict)
f.write(data)
def initialize_api_key_file(key_path):
keys = {}
new_api_key = APIKey.new(name=API_KEY_NAME)
keys.update({new_api_key.name: new_api_key})
save_api_keys(keys, key_path)
def get_auth_message(message_dict):
return json.dumps(message_dict, sort_keys=True)

View file

@ -1,5 +1,3 @@
from unittest import skip
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
from lbrynet.core.server.ServerProtocol import ServerProtocol from lbrynet.core.server.ServerProtocol import ServerProtocol
from lbrynet.core.client.ClientProtocol import ClientProtocol from lbrynet.core.client.ClientProtocol import ClientProtocol
@ -116,8 +114,8 @@ class MocServerProtocolFactory(ServerFactory):
self.peer_manager = PeerManager() self.peer_manager = PeerManager()
@skip('times out, needs to be refactored to work with py3')
class TestIntegrationConnectionManager(TestCase): class TestIntegrationConnectionManager(TestCase):
skip = 'times out, needs to be refactored to work with py3'
def setUp(self): def setUp(self):

View file

@ -1,4 +1,5 @@
import mock import mock
from twisted.internet import defer, reactor
from twisted.trial import unittest from twisted.trial import unittest
from lbrynet import conf from lbrynet import conf
from lbrynet.daemon.auth import server from lbrynet.daemon.auth import server
@ -14,6 +15,42 @@ class AuthJSONRPCServerTest(unittest.TestCase):
conf.initialize_settings(False) conf.initialize_settings(False)
self.server = server.AuthJSONRPCServer(True, use_authentication=False) self.server = server.AuthJSONRPCServer(True, use_authentication=False)
def test_listen_auth_https(self):
self.server._use_https = True
self.server._use_authentication = True
factory = self.server.get_server_factory()
listening_port = reactor.listenSSL(
conf.settings['api_port'], factory, factory.options, interface="localhost"
)
listening_port.stopListening()
def test_listen_no_auth_https(self):
self.server._use_https = True
self.server._use_authentication = False
factory = self.server.get_server_factory()
listening_port = reactor.listenSSL(
conf.settings['api_port'], factory, factory.options, interface="localhost"
)
listening_port.stopListening()
def test_listen_auth_http(self):
self.server._use_https = False
self.server._use_authentication = True
factory = self.server.get_server_factory()
listening_port = reactor.listenTCP(
conf.settings['api_port'], factory, interface="localhost"
)
listening_port.stopListening()
def test_listen_no_auth_http(self):
self.server._use_https = False
self.server._use_authentication = False
factory = self.server.get_server_factory()
listening_port = reactor.listenTCP(
conf.settings['api_port'], factory, interface="localhost"
)
listening_port.stopListening()
def test_get_server_port(self): def test_get_server_port(self):
self.assertSequenceEqual( self.assertSequenceEqual(
('example.com', 80), self.server.get_server_port('http://example.com')) ('example.com', 80), self.server.get_server_port('http://example.com'))