lbry-sdk/torba/baseaccount.py

272 lines
10 KiB
Python

from typing import Dict
from twisted.internet import defer
from torba.mnemonic import Mnemonic
from torba.bip32 import PrivateKey, PubKey, from_extended_key_string
from torba.hash import double_sha256, aes_encrypt, aes_decrypt
class KeyManager(object):
__slots__ = 'account', 'public_key', 'chain_number'
def __init__(self, account, public_key, chain_number):
self.account = account
self.public_key = public_key
self.chain_number = chain_number
@property
def db(self):
return self.account.ledger.db
def _query_addresses(self, limit=None, max_used_times=None, order_by=None):
return self.db.get_addresses(
self.account, self.chain_number, limit, max_used_times, order_by
)
def ensure_address_gap(self): # type: () -> defer.Deferred
raise NotImplementedError
def get_address_records(self, limit=None, only_usable=False): # type: (int, bool) -> defer.Deferred
raise NotImplementedError
@defer.inlineCallbacks
def get_addresses(self, limit=None, only_usable=False): # type: (int, bool) -> defer.Deferred
records = yield self.get_address_records(limit=limit, only_usable=only_usable)
defer.returnValue([r['address'] for r in records])
@defer.inlineCallbacks
def get_or_create_usable_address(self): # type: () -> defer.Deferred
addresses = yield self.get_addresses(limit=1, only_usable=True)
if addresses:
defer.returnValue(addresses[0])
addresses = yield self.ensure_address_gap()
defer.returnValue(addresses[0])
class KeyChain(KeyManager):
""" Implements simple version of Bitcoin Hierarchical Deterministic key management. """
__slots__ = 'gap', 'maximum_uses_per_address'
def __init__(self, account, root_public_key, chain_number, gap, maximum_uses_per_address):
# type: ('BaseAccount', PubKey, int, int, int) -> None
super(KeyChain, self).__init__(account, root_public_key.child(chain_number), chain_number)
self.gap = gap
self.maximum_uses_per_address = maximum_uses_per_address
@defer.inlineCallbacks
def generate_keys(self, start, end):
new_keys = []
for index in range(start, end+1):
new_keys.append((index, self.public_key.child(index)))
yield self.db.add_keys(
self.account, self.chain_number, new_keys
)
defer.returnValue([key[1].address for key in new_keys])
@defer.inlineCallbacks
def ensure_address_gap(self):
addresses = yield self._query_addresses(self.gap, None, "position DESC")
existing_gap = 0
for address in addresses:
if address['used_times'] == 0:
existing_gap += 1
else:
break
if existing_gap == self.gap:
defer.returnValue([])
start = addresses[0]['position']+1 if addresses else 0
end = start + (self.gap - existing_gap)
new_keys = yield self.generate_keys(start, end-1)
defer.returnValue(new_keys)
def get_address_records(self, limit=None, only_usable=False):
return self._query_addresses(
limit, self.maximum_uses_per_address if only_usable else None,
"used_times ASC, position ASC"
)
class SingleKey(KeyManager):
""" Single Key manager always returns the same address for all operations. """
__slots__ = ()
def __init__(self, account, root_public_key, chain_number):
# type: ('BaseAccount', PubKey) -> None
super(SingleKey, self).__init__(account, root_public_key, chain_number)
@defer.inlineCallbacks
def ensure_address_gap(self):
exists = yield self.get_address_records()
if not exists:
yield self.db.add_keys(
self.account, self.chain_number, [(0, self.public_key)]
)
defer.returnValue([self.public_key.address])
defer.returnValue([])
def get_address_records(self, **kwargs):
return self._query_addresses()
class BaseAccount(object):
mnemonic_class = Mnemonic
private_key_class = PrivateKey
public_key_class = PubKey
def __init__(self, ledger, name, seed, encrypted, is_hd, private_key,
public_key, receiving_gap=20, change_gap=6,
receiving_maximum_uses_per_address=2, change_maximum_uses_per_address=2):
# type: (torba.baseledger.BaseLedger, str, str, bool, bool, PrivateKey, PubKey, int, int, int, int) -> None
self.ledger = ledger
self.name = name
self.seed = seed
self.encrypted = encrypted
self.private_key = private_key
self.public_key = public_key
if is_hd:
receiving, change = self.keychains = (
KeyChain(self, public_key, 0, receiving_gap, receiving_maximum_uses_per_address),
KeyChain(self, public_key, 1, change_gap, change_maximum_uses_per_address)
)
else:
self.keychains = SingleKey(self, public_key, 0),
receiving = change = self.keychains[0]
self.receiving = receiving # type: KeyManager
self.change = change # type: KeyManager
ledger.add_account(self)
@classmethod
def generate(cls, ledger, password, **kwargs): # type: (torba.baseledger.BaseLedger, str) -> BaseAccount
seed = cls.mnemonic_class().make_seed()
return cls.from_seed(ledger, seed, password, **kwargs)
@classmethod
def from_seed(cls, ledger, seed, password, is_hd=True, **kwargs):
# type: (torba.baseledger.BaseLedger, str, str) -> BaseAccount
private_key = cls.get_private_key_from_seed(ledger, seed, password)
return cls(
ledger=ledger, name='Account #{}'.format(private_key.public_key.address),
seed=seed, encrypted=False, is_hd=is_hd,
private_key=private_key,
public_key=private_key.public_key,
**kwargs
)
@classmethod
def get_private_key_from_seed(cls, ledger, seed, password):
# type: (torba.baseledger.BaseLedger, str, str) -> PrivateKey
return cls.private_key_class.from_seed(
ledger, cls.mnemonic_class.mnemonic_to_seed(seed, password)
)
@classmethod
def from_dict(cls, ledger, d): # type: (torba.baseledger.BaseLedger, Dict) -> BaseAccount
if not d['encrypted'] and d['private_key']:
private_key = from_extended_key_string(ledger, d['private_key'])
public_key = private_key.public_key
else:
private_key = d['private_key']
public_key = from_extended_key_string(ledger, d['public_key'])
kwargs = dict(
ledger=ledger,
name=d['name'],
seed=d['seed'],
encrypted=d['encrypted'],
private_key=private_key,
public_key=public_key,
is_hd=False
)
if d['is_hd']:
kwargs.update(dict(
receiving_gap=d['receiving_gap'],
change_gap=d['change_gap'],
receiving_maximum_uses_per_address=d['receiving_maximum_uses_per_address'],
change_maximum_uses_per_address=d['change_maximum_uses_per_address'],
is_hd=True
))
return cls(**kwargs)
def to_dict(self):
private_key = self.private_key
if not self.encrypted and self.private_key:
private_key = self.private_key.extended_key_string()
d = {
'ledger': self.ledger.get_id(),
'name': self.name,
'seed': self.seed,
'encrypted': self.encrypted,
'private_key': private_key,
'public_key': self.public_key.extended_key_string(),
'is_hd': False
}
if isinstance(self.receiving, KeyChain) and isinstance(self.change, KeyChain):
d.update({
'receiving_gap': self.receiving.gap,
'change_gap': self.change.gap,
'receiving_maximum_uses_per_address': self.receiving.maximum_uses_per_address,
'change_maximum_uses_per_address': self.change.maximum_uses_per_address,
'is_hd': True
})
return d
def decrypt(self, password):
assert self.encrypted, "Key is not encrypted."
secret = double_sha256(password)
self.seed = aes_decrypt(secret, self.seed)
self.private_key = from_extended_key_string(self.ledger, aes_decrypt(secret, self.private_key))
self.encrypted = False
def encrypt(self, password):
assert not self.encrypted, "Key is already encrypted."
secret = double_sha256(password)
self.seed = aes_encrypt(secret, self.seed)
self.private_key = aes_encrypt(secret, self.private_key.extended_key_string())
self.encrypted = True
@defer.inlineCallbacks
def ensure_address_gap(self):
addresses = []
for keychain in self.keychains:
new_addresses = yield keychain.ensure_address_gap()
addresses.extend(new_addresses)
defer.returnValue(addresses)
@defer.inlineCallbacks
def get_addresses(self, limit=None, max_used_times=None): # type: (int, int) -> defer.Deferred
records = yield self.get_address_records(limit, max_used_times)
defer.returnValue([r['address'] for r in records])
def get_address_records(self, limit=None, max_used_times=None): # type: (int, int) -> defer.Deferred
return self.ledger.db.get_addresses(self, None, limit, max_used_times)
def get_private_key(self, chain, index):
assert not self.encrypted, "Cannot get private key on encrypted wallet account."
if isinstance(self.receiving, SingleKey):
return self.private_key
else:
return self.private_key.child(chain).child(index)
def get_balance(self, confirmations=6, **constraints):
if confirmations == 0:
return self.ledger.db.get_balance_for_account(self, **constraints)
else:
height = self.ledger.headers.height - (confirmations-1)
return self.ledger.db.get_balance_for_account(
self, height__lte=height, height__not=-1, **constraints
)
def get_unspent_outputs(self, **constraints):
return self.ledger.db.get_utxos_for_account(self, **constraints)