a7ef8889dd
* Added new custom cli class using aiohttp * Proper error handling in CLI based on RPC error codes(PoC) * Auth API working * UnitTests
52 lines
1.7 KiB
Python
52 lines
1.7 KiB
Python
import logging
|
|
from zope.interface import implementer
|
|
from twisted.cred import portal, checkers, credentials, error as cred_error
|
|
from twisted.internet import defer
|
|
from twisted.web import resource
|
|
from lbrynet.daemon.auth.util import load_api_keys
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
@implementer(portal.IRealm)
|
|
class HttpPasswordRealm:
|
|
def __init__(self, resource):
|
|
self.resource = resource
|
|
|
|
def requestAvatar(self, avatarId, mind, *interfaces):
|
|
log.debug("Processing request for %s", avatarId)
|
|
if resource.IResource in interfaces:
|
|
return (resource.IResource, self.resource, lambda: None)
|
|
raise NotImplementedError()
|
|
|
|
|
|
@implementer(checkers.ICredentialsChecker)
|
|
class PasswordChecker:
|
|
credentialInterfaces = (credentials.IUsernamePassword,)
|
|
|
|
def __init__(self, passwords):
|
|
self.passwords = passwords
|
|
|
|
@classmethod
|
|
def load_file(cls, key_path):
|
|
keys = load_api_keys(key_path)
|
|
return cls.load(keys)
|
|
|
|
@classmethod
|
|
def load(cls, password_dict):
|
|
passwords = {key: password_dict[key].secret for key in password_dict}
|
|
log.info("Loaded %i api key(s)", len(passwords))
|
|
return cls(passwords)
|
|
|
|
def requestAvatarId(self, creds):
|
|
password_dict_bytes = {}
|
|
for api in self.passwords:
|
|
password_dict_bytes.update({api.encode(): self.passwords[api].encode()})
|
|
|
|
if creds.username in password_dict_bytes:
|
|
pw = password_dict_bytes.get(creds.username)
|
|
pw_match = creds.checkPassword(pw)
|
|
if pw_match:
|
|
return defer.succeed(creds.username)
|
|
log.warning('Incorrect username or password')
|
|
return defer.fail(cred_error.UnauthorizedLogin('Incorrect username or password'))
|