added InvalidPasswordError code when password is invalid

This commit is contained in:
Lex Berezhny 2019-12-07 09:16:04 -05:00
parent 88263db831
commit 75d78bfa53
8 changed files with 116 additions and 41 deletions

View file

@ -1,2 +1,5 @@
generate:
python generate.py > __init__.py
python generate.py generate > __init__.py
analyze:
python generate.py analyze

View file

@ -53,6 +53,7 @@ Code | Name | Message
408 | Resolve | Failed to resolve '{url}'.
409 | ResolveTimeout | Failed to resolve '{url}' within the timeout.
410 | KeyFeeAboveMaxAllowed | {message}
411 | InvalidPassword | Password is invalid.
**5xx** | Blob | **Blobs**
500 | BlobNotFound | Blob not found.
501 | BlobPermissionDenied | Permission denied to read blob.

View file

@ -197,6 +197,12 @@ class KeyFeeAboveMaxAllowedError(WalletError):
super().__init__(f"{message}")
class InvalidPasswordError(WalletError):
def __init__(self):
super().__init__("Password is invalid.")
class BlobError(BaseError):
"""
**Blobs**

View file

@ -1,4 +1,7 @@
import re
import sys
import argparse
from pathlib import Path
from textwrap import fill, indent
@ -75,18 +78,19 @@ class ErrorClass:
))
def error_rows(lines):
lines = iter(lines)
for line in lines:
if line.startswith('## Exceptions Table'):
break
for line in lines:
if line.startswith('---:|'):
break
for line in lines:
if not line:
break
yield line
def get_errors():
with open('README.md', 'r') as readme:
lines = iter(readme.readlines())
for line in lines:
if line.startswith('## Exceptions Table'):
break
for line in lines:
if line.startswith('---:|'):
break
for line in lines:
if not line:
break
yield ErrorClass(*[c.strip() for c in line.split('|')])
def find_parent(stack, child):
@ -96,19 +100,50 @@ def find_parent(stack, child):
return parent
def main(out):
with open('README.md', 'r') as readme:
lines = readme.readlines()
out.write('from .base import BaseError\n')
stack = {}
for row in error_rows(lines):
error = ErrorClass(*[c.strip() for c in row.split('|')])
error.render(out, find_parent(stack, error))
if not error.is_leaf:
assert error.code not in stack, f"Duplicate code: {error.code}"
stack[error.code] = error
def generate(out):
out.write('from .base import BaseError\n')
stack = {}
for error in get_errors():
error.render(out, find_parent(stack, error))
if not error.is_leaf:
assert error.code not in stack, f"Duplicate code: {error.code}"
stack[error.code] = error
def analyze():
errors = {e.class_name: [] for e in get_errors() if e.is_leaf}
here = Path(__file__).absolute().parents[0]
module = here.parent
for file_path in module.glob('**/*.py'):
if here in file_path.parents:
continue
with open(file_path) as src_file:
src = src_file.read()
for error in errors.keys():
found = src.count(error)
if found > 0:
errors[error].append((file_path, found))
print('Unused Errors:\n')
for error, used in errors.items():
if used:
print(f' - {error}')
for use in used:
print(f' {use[0].relative_to(module.parent)} {use[1]}')
print('')
print('')
print('Unused Errors:')
for error, used in errors.items():
if not used:
print(f' - {error}')
if __name__ == "__main__":
import sys
main(sys.stdout)
parser = argparse.ArgumentParser()
parser.add_argument("action", choices=['generate', 'analyze'])
args = parser.parse_args()
if args.action == "analyze":
analyze()
elif args.action == "generate":
generate(sys.stdout)

View file

@ -4,6 +4,7 @@ import json
from torba.client.wallet import ENCRYPT_ON_DISK
from lbry.testcase import CommandTestCase
from lbry.wallet.dewies import dict_values_to_lbc
from lbry.error import InvalidPasswordError
class WalletCommands(CommandTestCase):
@ -303,7 +304,7 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
# sync_apply doesn't save password if encrypt-on-disk is False
self.assertEqual(wallet2.encryption_password, None)
# need to use new password2 in sync_apply
with self.assertRaises(ValueError): # wrong password
with self.assertRaises(InvalidPasswordError):
await daemon.jsonrpc_sync_apply('password', data=data['data'], blocking=True)
await daemon.jsonrpc_sync_apply('password2', data=data['data'], blocking=True)
# sync_apply with new password2 also sets it as new local password

View file

@ -1,5 +1,6 @@
from unittest import TestCase, mock
from torba.client.hash import aes_decrypt, aes_encrypt, better_aes_decrypt, better_aes_encrypt
from torba.client.errors import InvalidPasswordError
class TestAESEncryptDecrypt(TestCase):
@ -34,9 +35,20 @@ class TestAESEncryptDecrypt(TestCase):
self.message
)
def test_decrypt_error(self):
with self.assertRaises(InvalidPasswordError):
aes_decrypt('notbubblegum', aes_encrypt('bubblegum', self.message))
def test_better_encrypt_decrypt(self):
self.assertEqual(
b'valuable value',
better_aes_decrypt(
'super secret',
better_aes_encrypt('super secret', b'valuable value')))
def test_better_decrypt_error(self):
with self.assertRaises(InvalidPasswordError):
better_aes_decrypt(
'super secret but wrong',
better_aes_encrypt('super secret', b'valuable value')
)

View file

@ -1,2 +1,8 @@
class InvalidPasswordError(Exception):
def __init__(self):
super().__init__("Password is invalid.")
class InsufficientFundsError(Exception):
pass

View file

@ -22,6 +22,7 @@ from cryptography.hazmat.backends import default_backend
from torba.client.util import bytes_to_int, int_to_bytes
from torba.client.constants import NULL_HASH32
from torba.client.errors import InvalidPasswordError
class TXRef:
@ -136,13 +137,18 @@ def aes_encrypt(secret: str, value: str, init_vector: bytes = None) -> str:
def aes_decrypt(secret: str, value: str) -> typing.Tuple[str, bytes]:
data = base64.b64decode(value.encode())
key = double_sha256(secret.encode())
init_vector, data = data[:16], data[16:]
decryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
result = unpadder.update(decryptor.update(data)) + unpadder.finalize()
return result.decode(), init_vector
try:
data = base64.b64decode(value.encode())
key = double_sha256(secret.encode())
init_vector, data = data[:16], data[16:]
decryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
result = unpadder.update(decryptor.update(data)) + unpadder.finalize()
return result.decode(), init_vector
except ValueError as e:
if e.args[0] == 'Invalid padding bytes.':
raise InvalidPasswordError()
raise
def better_aes_encrypt(secret: str, value: bytes) -> bytes:
@ -156,13 +162,18 @@ def better_aes_encrypt(secret: str, value: bytes) -> bytes:
def better_aes_decrypt(secret: str, value: bytes) -> bytes:
data = base64.b64decode(value)
_, scryp_n, scrypt_r, scrypt_p, data = data.split(b':', maxsplit=4)
init_vector, data = data[:16], data[16:]
key = scrypt(secret.encode(), init_vector, int(scryp_n), int(scrypt_r), int(scrypt_p))
decryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
return unpadder.update(decryptor.update(data)) + unpadder.finalize()
try:
data = base64.b64decode(value)
_, scryp_n, scrypt_r, scrypt_p, data = data.split(b':', maxsplit=4)
init_vector, data = data[:16], data[16:]
key = scrypt(secret.encode(), init_vector, int(scryp_n), int(scrypt_r), int(scrypt_p))
decryptor = Cipher(AES(key), modes.CBC(init_vector), default_backend()).decryptor()
unpadder = PKCS7(AES.block_size).unpadder()
return unpadder.update(decryptor.update(data)) + unpadder.finalize()
except ValueError as e:
if e.args[0] == 'Invalid padding bytes.':
raise InvalidPasswordError()
raise
def scrypt(passphrase, salt, scrypt_n=1<<13, scrypt_r=16, scrypt_p=1):