lbry-sdk/torba/client/hash.py

255 lines
7.3 KiB
Python
Raw Normal View History

2018-05-25 08:03:25 +02:00
# Copyright (c) 2016-2017, Neil Booth
# Copyright (c) 2018, LBRY Inc.
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
""" Cryptography hash functions and related classes. """
import os
import base64
import hashlib
import hmac
import typing
2018-05-25 08:03:25 +02:00
from binascii import hexlify, unhexlify
2019-03-11 21:55:56 +01:00
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
2018-05-25 08:03:25 +02:00
from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives.ciphers.algorithms import AES
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.backends import default_backend
2018-11-04 06:55:50 +01:00
from torba.client.util import bytes_to_int, int_to_bytes
from torba.client.constants import NULL_HASH32
2018-05-25 08:03:25 +02:00
class TXRef:
__slots__ = '_id', '_hash'
def __init__(self):
self._id = None
self._hash = None
@property
def id(self):
return self._id
@property
def hash(self):
return self._hash
2018-10-11 03:29:29 +02:00
@property
def height(self):
return -1
@property
def is_null(self):
return self.hash == NULL_HASH32
class TXRefImmutable(TXRef):
2018-10-11 03:39:36 +02:00
__slots__ = ('_height',)
def __init__(self):
super().__init__()
self._height = -1
@classmethod
2018-10-11 03:29:29 +02:00
def from_hash(cls, tx_hash: bytes, height: int) -> 'TXRefImmutable':
ref = cls()
ref._hash = tx_hash
ref._id = hexlify(tx_hash[::-1]).decode()
2018-10-11 03:29:29 +02:00
ref._height = height
return ref
@classmethod
2018-10-11 03:29:29 +02:00
def from_id(cls, tx_id: str, height: int) -> 'TXRefImmutable':
ref = cls()
ref._id = tx_id
ref._hash = unhexlify(tx_id)[::-1]
2018-10-11 03:29:29 +02:00
ref._height = height
return ref
2018-10-11 03:29:29 +02:00
@property
def height(self):
return self._height
2018-05-25 08:03:25 +02:00
def sha256(x):
""" Simple wrapper of hashlib sha256. """
return hashlib.sha256(x).digest()
2018-05-25 08:03:25 +02:00
def sha512(x):
""" Simple wrapper of hashlib sha512. """
return hashlib.sha512(x).digest()
2018-05-25 08:03:25 +02:00
def ripemd160(x):
""" Simple wrapper of hashlib ripemd160. """
h = hashlib.new('ripemd160')
2018-05-25 08:03:25 +02:00
h.update(x)
return h.digest()
def double_sha256(x):
""" SHA-256 of SHA-256, as used extensively in bitcoin. """
return sha256(sha256(x))
def hmac_sha512(key, msg):
""" Use SHA-512 to provide an HMAC. """
return hmac.new(key, msg, hashlib.sha512).digest()
2018-05-25 08:03:25 +02:00
def hash160(x):
""" RIPEMD-160 of SHA-256.
Used to make bitcoin addresses from pubkeys. """
return ripemd160(sha256(x))
def hash_to_hex_str(x):
""" Convert a big-endian binary hash to displayed hex string.
Display form of a binary hash is reversed and converted to hex. """
return hexlify(reversed(x))
def hex_str_to_hash(x):
""" Convert a displayed hex string to a binary hash. """
return reversed(unhexlify(x))
2019-03-11 17:26:24 +01:00
def aes_encrypt(secret: str, value: str, init_vector: bytes = None) -> str:
2018-09-21 20:49:16 +02:00
if init_vector is not None:
assert len(init_vector) == 16
else:
2018-09-21 20:49:16 +02:00
init_vector = os.urandom(16)
key = double_sha256(secret.encode())
encryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).encryptor()
2018-05-25 08:03:25 +02:00
padder = PKCS7(AES.block_size).padder()
2018-09-21 20:49:16 +02:00
padded_data = padder.update(value.encode()) + padder.finalize()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
2018-09-21 20:49:16 +02:00
return base64.b64encode(init_vector + encrypted_data).decode()
2018-05-25 08:03:25 +02:00
def aes_decrypt(secret: str, value: str) -> typing.Tuple[str, bytes]:
2018-09-21 20:49:16 +02:00
data = base64.b64decode(value.encode())
key = double_sha256(secret.encode())
2018-05-25 08:03:25 +02:00
init_vector, data = data[:16], data[16:]
decryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
result = unpadder.update(decryptor.update(data)) + unpadder.finalize()
return result.decode(), init_vector
2018-05-25 08:03:25 +02:00
2019-03-11 17:04:06 +01:00
def better_aes_encrypt(secret: str, value: bytes) -> bytes:
init_vector = os.urandom(16)
2019-03-11 21:55:56 +01:00
key = scrypt(secret.encode(), salt=init_vector)
2019-03-11 17:04:06 +01:00
encryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).encryptor()
padder = PKCS7(AES.block_size).padder()
padded_data = padder.update(value) + padder.finalize()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
2019-03-11 21:55:56 +01:00
return base64.b64encode(b's:8192:16:1:' + init_vector + encrypted_data)
2019-03-11 17:04:06 +01:00
def better_aes_decrypt(secret: str, value: bytes) -> bytes:
data = base64.b64decode(value)
2019-03-12 01:12:04 +01:00
_, scryp_n, scrypt_r, scrypt_p, data = data.split(b':', maxsplit=4)
2019-03-11 17:04:06 +01:00
init_vector, data = data[:16], data[16:]
2019-03-12 01:12:04 +01:00
key = scrypt(secret.encode(), init_vector, int(scryp_n), int(scrypt_r), int(scrypt_p))
2019-03-11 17:04:06 +01:00
decryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
return unpadder.update(decryptor.update(data)) + unpadder.finalize()
2019-03-12 01:12:04 +01:00
def scrypt(passphrase, salt, scrypt_n=1<<13, scrypt_r=16, scrypt_p=1):
kdf = Scrypt(salt, length=32, n=scrypt_n, r=scrypt_r, p=scrypt_p, backend=default_backend())
2019-03-11 21:55:56 +01:00
return kdf.derive(passphrase)
2018-05-25 08:03:25 +02:00
class Base58Error(Exception):
""" Exception used for Base58 errors. """
class Base58:
2018-05-25 08:03:25 +02:00
""" Class providing base 58 functionality. """
chars = u'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
assert len(chars) == 58
char_map = {c: n for n, c in enumerate(chars)}
@classmethod
def char_value(cls, c):
val = cls.char_map.get(c)
if val is None:
raise Base58Error('invalid base 58 character "{}"'.format(c))
return val
@classmethod
def decode(cls, txt):
""" Decodes txt into a big-endian bytearray. """
2018-07-26 20:35:44 +02:00
if isinstance(txt, memoryview):
2018-06-14 02:57:57 +02:00
txt = str(txt)
2018-07-26 20:35:44 +02:00
if isinstance(txt, bytes):
2018-05-25 08:03:25 +02:00
txt = txt.decode()
2018-07-26 20:35:44 +02:00
if not isinstance(txt, str):
2018-05-25 08:03:25 +02:00
raise TypeError('a string is required')
if not txt:
raise Base58Error('string cannot be empty')
value = 0
for c in txt:
value = value * 58 + cls.char_value(c)
result = int_to_bytes(value)
# Prepend leading zero bytes if necessary
count = 0
for c in txt:
if c != u'1':
break
count += 1
if count:
result = bytes((0,)) * count + result
2018-05-25 08:03:25 +02:00
return result
@classmethod
def encode(cls, be_bytes):
"""Converts a big-endian bytearray into a base58 string."""
value = bytes_to_int(be_bytes)
txt = u''
while value:
value, mod = divmod(value, 58)
txt += cls.chars[mod]
for byte in be_bytes:
if byte != 0:
break
txt += u'1'
return txt[::-1]
2018-05-25 08:03:25 +02:00
@classmethod
def decode_check(cls, txt, hash_fn=double_sha256):
""" Decodes a Base58Check-encoded string to a payload. The version prefixes it. """
be_bytes = cls.decode(txt)
result, check = be_bytes[:-4], be_bytes[-4:]
if check != hash_fn(result)[:4]:
raise Base58Error('invalid base 58 checksum for {}'.format(txt))
return result
@classmethod
def encode_check(cls, payload, hash_fn=double_sha256):
""" Encodes a payload bytearray (which includes the version byte(s))
into a Base58Check string."""
be_bytes = payload + hash_fn(payload)[:4]
return cls.encode(be_bytes)