forked from LBRYCommunity/lbry-sdk
fix for encrypted wallet with import read-only channel accounts
This commit is contained in:
parent
7743a6a71d
commit
1c8aad7a07
3 changed files with 67 additions and 21 deletions
|
@ -115,6 +115,16 @@ class WalletEncryptionAndSynchronization(CommandTestCase):
|
||||||
self.assertEqual(daemon.jsonrpc_preference_get(ENCRYPT_ON_DISK), {'encrypt-on-disk': False})
|
self.assertEqual(daemon.jsonrpc_preference_get(ENCRYPT_ON_DISK), {'encrypt-on-disk': False})
|
||||||
self.assertWalletEncrypted(wallet.storage.path, False)
|
self.assertWalletEncrypted(wallet.storage.path, False)
|
||||||
|
|
||||||
|
async def test_encryption_with_imported_channel(self):
|
||||||
|
daemon, daemon2 = self.daemon, self.daemon2
|
||||||
|
channel = await self.channel_create()
|
||||||
|
exported = await daemon.jsonrpc_channel_export(self.get_claim_id(channel))
|
||||||
|
await daemon2.jsonrpc_channel_import(exported)
|
||||||
|
self.assertTrue(daemon2.jsonrpc_wallet_encrypt('password'))
|
||||||
|
self.assertTrue(daemon2.jsonrpc_wallet_lock())
|
||||||
|
self.assertTrue(daemon2.jsonrpc_wallet_unlock("password"))
|
||||||
|
self.assertEqual(daemon2.jsonrpc_wallet_status(), {'is_locked': False, 'is_encrypted': True})
|
||||||
|
|
||||||
async def test_sync_with_encryption_and_password_change(self):
|
async def test_sync_with_encryption_and_password_change(self):
|
||||||
daemon, daemon2 = self.daemon, self.daemon2
|
daemon, daemon2 = self.daemon, self.daemon2
|
||||||
wallet, wallet2 = daemon.wallet_manager.default_wallet, daemon2.wallet_manager.default_wallet
|
wallet, wallet2 = daemon.wallet_manager.default_wallet, daemon2.wallet_manager.default_wallet
|
||||||
|
|
|
@ -480,3 +480,14 @@ class AccountEncryptionTests(AsyncioTestCase):
|
||||||
self.assertEqual(account.to_dict(encrypt_password=self.password)['private_key'], self.encrypted_account['private_key'])
|
self.assertEqual(account.to_dict(encrypt_password=self.password)['private_key'], self.encrypted_account['private_key'])
|
||||||
self.assertEqual(account.to_dict()['seed'], self.unencrypted_account['seed'])
|
self.assertEqual(account.to_dict()['seed'], self.unencrypted_account['seed'])
|
||||||
self.assertEqual(account.to_dict()['private_key'], self.unencrypted_account['private_key'])
|
self.assertEqual(account.to_dict()['private_key'], self.unencrypted_account['private_key'])
|
||||||
|
|
||||||
|
def test_encrypt_decrypt_read_only_account(self):
|
||||||
|
account_data = self.unencrypted_account.copy()
|
||||||
|
del account_data['seed']
|
||||||
|
del account_data['private_key']
|
||||||
|
account = self.ledger.account_class.from_dict(self.ledger, Wallet(), account_data)
|
||||||
|
encrypted = account.to_dict('password')
|
||||||
|
self.assertFalse(encrypted['seed'])
|
||||||
|
self.assertFalse(encrypted['private_key'])
|
||||||
|
account.encrypt('password')
|
||||||
|
account.decrypt('password')
|
||||||
|
|
|
@ -297,10 +297,12 @@ class BaseAccount:
|
||||||
if not self.encrypted and self.private_key:
|
if not self.encrypted and self.private_key:
|
||||||
private_key_string = self.private_key.extended_key_string()
|
private_key_string = self.private_key.extended_key_string()
|
||||||
if not self.encrypted and encrypt_password:
|
if not self.encrypted and encrypt_password:
|
||||||
private_key_string = aes_encrypt(
|
if private_key_string:
|
||||||
encrypt_password, private_key_string, self.get_init_vector('private_key')
|
private_key_string = aes_encrypt(
|
||||||
)
|
encrypt_password, private_key_string, self.get_init_vector('private_key')
|
||||||
seed = aes_encrypt(encrypt_password, self.seed, self.get_init_vector('seed'))
|
)
|
||||||
|
if seed:
|
||||||
|
seed = aes_encrypt(encrypt_password, self.seed, self.get_init_vector('seed'))
|
||||||
return {
|
return {
|
||||||
'ledger': self.ledger.get_id(),
|
'ledger': self.ledger.get_id(),
|
||||||
'name': self.name,
|
'name': self.name,
|
||||||
|
@ -344,37 +346,60 @@ class BaseAccount:
|
||||||
|
|
||||||
def decrypt(self, password: str) -> bool:
|
def decrypt(self, password: str) -> bool:
|
||||||
assert self.encrypted, "Key is not encrypted."
|
assert self.encrypted, "Key is not encrypted."
|
||||||
|
success = (
|
||||||
|
self._decrypt_seed(password) and
|
||||||
|
self._decrypt_private_key_string(password)
|
||||||
|
)
|
||||||
|
if success:
|
||||||
|
self.encrypted = False
|
||||||
|
return success
|
||||||
|
|
||||||
|
def _decrypt_private_key_string(self, password: str) -> bool:
|
||||||
|
if not self.private_key_string:
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
private_key_string, pk_iv = aes_decrypt(password, self.private_key_string)
|
||||||
|
except ValueError:
|
||||||
|
# failed to remove padding, password is wrong
|
||||||
|
return False
|
||||||
|
if not private_key_string:
|
||||||
|
self.private_key = None
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
self.private_key = from_extended_key_string(
|
||||||
|
self.ledger, private_key_string
|
||||||
|
)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return False
|
||||||
|
self.init_vectors['private_key'] = pk_iv
|
||||||
|
self.encrypted = False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _decrypt_seed(self, password: str) -> bool:
|
||||||
|
if not self.seed:
|
||||||
|
return True
|
||||||
try:
|
try:
|
||||||
seed, seed_iv = aes_decrypt(password, self.seed)
|
seed, seed_iv = aes_decrypt(password, self.seed)
|
||||||
pk_string, pk_iv = aes_decrypt(password, self.private_key_string)
|
|
||||||
except ValueError: # failed to remove padding, password is wrong
|
except ValueError: # failed to remove padding, password is wrong
|
||||||
return False
|
return False
|
||||||
try:
|
try:
|
||||||
Mnemonic().mnemonic_decode(seed)
|
Mnemonic().mnemonic_decode(seed)
|
||||||
except IndexError: # failed to decode the seed, this either means it decrypted and is invalid
|
except IndexError: # failed to decode the seed, this either means it decrypted and is invalid
|
||||||
# or that we hit an edge case where an incorrect password gave valid padding
|
# or that we hit an edge case where an incorrect password gave valid padding
|
||||||
return False
|
|
||||||
try:
|
|
||||||
private_key = from_extended_key_string(
|
|
||||||
self.ledger, pk_string
|
|
||||||
)
|
|
||||||
except (TypeError, ValueError):
|
|
||||||
return False
|
return False
|
||||||
self.seed = seed
|
self.seed = seed
|
||||||
self.private_key = private_key
|
|
||||||
self.init_vectors['seed'] = seed_iv
|
self.init_vectors['seed'] = seed_iv
|
||||||
self.init_vectors['private_key'] = pk_iv
|
|
||||||
self.encrypted = False
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def encrypt(self, password: str) -> bool:
|
def encrypt(self, password: str) -> bool:
|
||||||
assert not self.encrypted, "Key is already encrypted."
|
assert not self.encrypted, "Key is already encrypted."
|
||||||
assert isinstance(self.private_key, PrivateKey)
|
if self.seed:
|
||||||
self.seed = aes_encrypt(password, self.seed, self.get_init_vector('seed'))
|
self.seed = aes_encrypt(password, self.seed, self.get_init_vector('seed'))
|
||||||
self.private_key_string = aes_encrypt(
|
if isinstance(self.private_key, PrivateKey):
|
||||||
password, self.private_key.extended_key_string(), self.get_init_vector('private_key')
|
self.private_key_string = aes_encrypt(
|
||||||
)
|
password, self.private_key.extended_key_string(), self.get_init_vector('private_key')
|
||||||
self.private_key = None
|
)
|
||||||
|
self.private_key = None
|
||||||
self.encrypted = True
|
self.encrypted = True
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue