lbry-sdk/lbrynet/lbrynet_daemon/auth/util.py

93 lines
2.4 KiB
Python
Raw Normal View History

import base58
import hmac
import hashlib
import yaml
import os
import logging
log = logging.getLogger(__name__)
API_KEY_NAME = "api"
def sha(x):
h = hashlib.sha256(x).digest()
return base58.b58encode(h)
def generate_key(x=None):
if x is None:
return sha(os.urandom(256))
else:
return sha(x)
2016-09-21 09:49:52 +02:00
class APIKey(object):
def __init__(self, secret, name, expiration=None):
self.secret = secret
self.name = name
self.expiration = expiration
@classmethod
2016-09-21 09:49:52 +02:00
def new(cls, seed=None, name=None, expiration=None):
secret = generate_key(seed)
key_name = name if name else sha(secret)
return APIKey(secret, key_name, expiration)
def _raw_key(self):
2016-09-21 09:49:52 +02:00
return base58.b58decode(self.secret)
def get_hmac(self, message):
decoded_key = self._raw_key()
signature = hmac.new(decoded_key, message, hashlib.sha256)
return base58.b58encode(signature.digest())
def compare_hmac(self, message, token):
decoded_token = base58.b58decode(token)
target = base58.b58decode(self.get_hmac(message))
try:
assert len(decoded_token) == len(target), "Length mismatch"
r = hmac.compare_digest(decoded_token, target)
except:
return False
return r
def load_api_keys(path):
if not os.path.isfile(path):
raise Exception("Invalid api key path")
2016-09-21 09:49:52 +02:00
with open(path, "r") as f:
data = yaml.load(f.read())
2016-09-21 09:49:52 +02:00
keys_for_return = {}
for key_name in data:
key = data[key_name]
secret = key['secret']
expiration = key['expiration']
keys_for_return.update({key_name: APIKey(secret, key_name, expiration)})
2016-09-21 09:49:52 +02:00
return keys_for_return
def save_api_keys(keys, path):
2016-09-21 09:49:52 +02:00
with open(path, "w") as f:
key_dict = {keys[key_name].name: {'secret': keys[key_name].secret,
'expiration': keys[key_name].expiration}
for key_name in keys}
data = yaml.safe_dump(key_dict)
f.write(data)
def initialize_api_key_file(key_path):
if not os.path.isfile(key_path):
keys = {}
new_api_key = APIKey.new(name=API_KEY_NAME)
keys.update({new_api_key.name: new_api_key})
2016-09-22 03:36:06 +02:00
save_api_keys(keys, key_path)
def get_auth_message(message_dict):
to_auth = message_dict.get('method').encode('hex')
to_auth += str(message_dict.get('id')).encode('hex')
return to_auth.decode('hex')