From 1c8aad7a075c7195d32061a0ba3c66e82000ac34 Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Thu, 17 Oct 2019 21:52:20 -0400 Subject: [PATCH] fix for encrypted wallet with import read-only channel accounts --- .../tests/integration/test_wallet_commands.py | 10 +++ torba/tests/client_tests/unit/test_account.py | 11 +++ torba/torba/client/baseaccount.py | 67 +++++++++++++------ 3 files changed, 67 insertions(+), 21 deletions(-) diff --git a/lbry/tests/integration/test_wallet_commands.py b/lbry/tests/integration/test_wallet_commands.py index faf665408..6cdbe29c5 100644 --- a/lbry/tests/integration/test_wallet_commands.py +++ b/lbry/tests/integration/test_wallet_commands.py @@ -115,6 +115,16 @@ class WalletEncryptionAndSynchronization(CommandTestCase): self.assertEqual(daemon.jsonrpc_preference_get(ENCRYPT_ON_DISK), {'encrypt-on-disk': False}) self.assertWalletEncrypted(wallet.storage.path, False) + async def test_encryption_with_imported_channel(self): + daemon, daemon2 = self.daemon, self.daemon2 + channel = await self.channel_create() + exported = await daemon.jsonrpc_channel_export(self.get_claim_id(channel)) + await daemon2.jsonrpc_channel_import(exported) + self.assertTrue(daemon2.jsonrpc_wallet_encrypt('password')) + self.assertTrue(daemon2.jsonrpc_wallet_lock()) + self.assertTrue(daemon2.jsonrpc_wallet_unlock("password")) + self.assertEqual(daemon2.jsonrpc_wallet_status(), {'is_locked': False, 'is_encrypted': True}) + async def test_sync_with_encryption_and_password_change(self): daemon, daemon2 = self.daemon, self.daemon2 wallet, wallet2 = daemon.wallet_manager.default_wallet, daemon2.wallet_manager.default_wallet diff --git a/torba/tests/client_tests/unit/test_account.py b/torba/tests/client_tests/unit/test_account.py index 3afdfcaae..0987edc82 100644 --- a/torba/tests/client_tests/unit/test_account.py +++ b/torba/tests/client_tests/unit/test_account.py @@ -480,3 +480,14 @@ class AccountEncryptionTests(AsyncioTestCase): self.assertEqual(account.to_dict(encrypt_password=self.password)['private_key'], self.encrypted_account['private_key']) self.assertEqual(account.to_dict()['seed'], self.unencrypted_account['seed']) self.assertEqual(account.to_dict()['private_key'], self.unencrypted_account['private_key']) + + def test_encrypt_decrypt_read_only_account(self): + account_data = self.unencrypted_account.copy() + del account_data['seed'] + del account_data['private_key'] + account = self.ledger.account_class.from_dict(self.ledger, Wallet(), account_data) + encrypted = account.to_dict('password') + self.assertFalse(encrypted['seed']) + self.assertFalse(encrypted['private_key']) + account.encrypt('password') + account.decrypt('password') diff --git a/torba/torba/client/baseaccount.py b/torba/torba/client/baseaccount.py index 693a65d10..4002625df 100644 --- a/torba/torba/client/baseaccount.py +++ b/torba/torba/client/baseaccount.py @@ -297,10 +297,12 @@ class BaseAccount: if not self.encrypted and self.private_key: private_key_string = self.private_key.extended_key_string() if not self.encrypted and encrypt_password: - private_key_string = aes_encrypt( - encrypt_password, private_key_string, self.get_init_vector('private_key') - ) - seed = aes_encrypt(encrypt_password, self.seed, self.get_init_vector('seed')) + if private_key_string: + private_key_string = aes_encrypt( + encrypt_password, private_key_string, self.get_init_vector('private_key') + ) + if seed: + seed = aes_encrypt(encrypt_password, self.seed, self.get_init_vector('seed')) return { 'ledger': self.ledger.get_id(), 'name': self.name, @@ -344,37 +346,60 @@ class BaseAccount: def decrypt(self, password: str) -> bool: assert self.encrypted, "Key is not encrypted." + success = ( + self._decrypt_seed(password) and + self._decrypt_private_key_string(password) + ) + if success: + self.encrypted = False + return success + + def _decrypt_private_key_string(self, password: str) -> bool: + if not self.private_key_string: + return True + try: + private_key_string, pk_iv = aes_decrypt(password, self.private_key_string) + except ValueError: + # failed to remove padding, password is wrong + return False + if not private_key_string: + self.private_key = None + return True + try: + self.private_key = from_extended_key_string( + self.ledger, private_key_string + ) + except (TypeError, ValueError): + return False + self.init_vectors['private_key'] = pk_iv + self.encrypted = False + return True + + def _decrypt_seed(self, password: str) -> bool: + if not self.seed: + return True try: seed, seed_iv = aes_decrypt(password, self.seed) - pk_string, pk_iv = aes_decrypt(password, self.private_key_string) except ValueError: # failed to remove padding, password is wrong return False try: Mnemonic().mnemonic_decode(seed) except IndexError: # failed to decode the seed, this either means it decrypted and is invalid - # or that we hit an edge case where an incorrect password gave valid padding - return False - try: - private_key = from_extended_key_string( - self.ledger, pk_string - ) - except (TypeError, ValueError): + # or that we hit an edge case where an incorrect password gave valid padding return False self.seed = seed - self.private_key = private_key self.init_vectors['seed'] = seed_iv - self.init_vectors['private_key'] = pk_iv - self.encrypted = False return True def encrypt(self, password: str) -> bool: assert not self.encrypted, "Key is already encrypted." - assert isinstance(self.private_key, PrivateKey) - self.seed = aes_encrypt(password, self.seed, self.get_init_vector('seed')) - self.private_key_string = aes_encrypt( - password, self.private_key.extended_key_string(), self.get_init_vector('private_key') - ) - self.private_key = None + if self.seed: + self.seed = aes_encrypt(password, self.seed, self.get_init_vector('seed')) + if isinstance(self.private_key, PrivateKey): + self.private_key_string = aes_encrypt( + password, self.private_key.extended_key_string(), self.get_init_vector('private_key') + ) + self.private_key = None self.encrypted = True return True