diff --git a/lbry/db/queries/txio.py b/lbry/db/queries/txio.py index e77f834d7..faccaaf88 100644 --- a/lbry/db/queries/txio.py +++ b/lbry/db/queries/txio.py @@ -2,7 +2,7 @@ import logging from datetime import date from typing import Tuple, List, Optional, Union -from sqlalchemy import union, func, text, between, distinct +from sqlalchemy import union, func, text, between, distinct, case from sqlalchemy.future import select, Select from ...blockchain.transaction import ( @@ -421,7 +421,7 @@ def rows_to_txos(rows: List[dict], include_tx=True) -> List[Output]: tx_ref=TXRefImmutable.from_hash(row['tx_hash'], row['height'], row['timestamp']), position=row['txo_position'], ) - txo.spent_height = bool(row['spent_height']) + txo.spent_height = row['spent_height'] if 'is_my_input' in row: txo.is_my_input = bool(row['is_my_input']) if 'is_my_output' in row: @@ -534,8 +534,47 @@ def get_txo_sum(**constraints): return result[0]['total'] or 0 -def get_balance(**constraints): - return get_txo_sum(spent_height=0, **constraints) +def get_balance(account_ids): + my_addresses = select(AccountAddress.c.address).where(in_account_ids(account_ids)) + query = ( + select( + func.sum(TXO.c.amount).label("total"), + func.sum(case( + [(TXO.c.txo_type != TXO_TYPES["other"], TXO.c.amount)], + else_=0 + )).label("reserved"), + func.sum(case( + [(where_txo_type_in(CLAIM_TYPE_CODES), TXO.c.amount)], + else_=0 + )).label("claims"), + func.sum(case( + [(where_txo_type_in(TXO_TYPES["support"]), TXO.c.amount)], + else_=0 + )).label("supports"), + func.sum(case( + [(where_txo_type_in(TXO_TYPES["support"]) & ( + (TXI.c.address.isnot(None)) & + (TXI.c.address.in_(my_addresses)) + ), TXO.c.amount)], + else_=0 + )).label("my_supports"), + ) + .where((TXO.c.spent_height == 0) & (TXO.c.address.in_(my_addresses))) + .select_from( + TXO.join(TXI, (TXI.c.position == 0) & (TXI.c.tx_hash == TXO.c.tx_hash), isouter=True) + ) + ) + result = context().fetchone(query) + return { + "total": result["total"], + "available": result["total"] - result["reserved"], + "reserved": result["reserved"], + "reserved_subtotals": { + "claims": result["claims"], + "supports": result["my_supports"], + "tips": result["supports"] - result["my_supports"] + } + } def get_report(account_ids): diff --git a/lbry/service/api.py b/lbry/service/api.py index 59c5113b6..91a36fca1 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -755,13 +755,12 @@ class API: async def wallet_balance( self, wallet_id: str = None, # balance for specific wallet, other than default wallet - confirmations=0 # only include transactions with this many confirmed blocks. ) -> dict: """ Return the balance of a wallet Usage: - wallet balance [] [--confirmations=] + wallet balance [] """ wallet = self.wallets.get_or_default(wallet_id) @@ -911,20 +910,17 @@ class API: self, account_id: str = None, # balance for specific account, default otherwise wallet_id: str = None, # restrict operation to specific wallet - confirmations=0 # required confirmations of transactions included ) -> dict: """ Return the balance of an account Usage: - account balance [] [--wallet_id=] [--confirmations=] + account balance [] [--wallet_id=] """ wallet = self.wallets.get_or_default(wallet_id) account = wallet.accounts.get_or_default(account_id) - balance = await account.get_detailed_balance( - confirmations=confirmations, reserved_subtotals=True, - ) + balance = await account.get_balance() return dict_values_to_lbc(balance) async def account_add( @@ -1953,10 +1949,11 @@ class API: holding_account = wallet.accounts.get_or_default(stream_dict.pop('account_id')) funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id')) signing_channel = None - if 'channel_id' in stream_dict or 'channel_name' in stream_dict: + channel_id = stream_dict.pop('channel_id') + channel_name = stream_dict.pop('channel_name') + if channel_id or channel_name: signing_channel = await wallet.channels.get_for_signing_or_none( - channel_id=stream_dict.pop('channel_id', None), - channel_name=stream_dict.pop('channel_name', None) + channel_id=channel_id, channel_name=channel_name ) holding_address = await holding_account.get_valid_receiving_address( stream_dict.pop('claim_address', None) @@ -1999,103 +1996,63 @@ class API: {kwargs} """ - wallet = self.wallets.get_or_default(wallet_id) - assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first." - funding_accounts = wallet.accounts.get_or_all(fund_account_id) - if account_id: - account = wallet.get_account_or_error(account_id) - accounts = [account] - else: - account = wallet.default_account - accounts = wallet.accounts + stream_dict, kwargs = pop_kwargs('stream_edit', extract_stream_edit(**stream_edit_and_tx_kwargs)) + tx_dict, kwargs = pop_kwargs('tx', extract_tx(**kwargs)) + assert_consumed_kwargs(kwargs) + wallet = self.wallets.get_or_default_for_spending(tx_dict.pop('wallet_id')) + holding_account = wallet.accounts.get_or_default(stream_dict.pop('account_id')) + funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id')) + replace = stream_dict.pop('replace') - existing_claims = await self.ledger.get_claims( - wallet=wallet, accounts=accounts, claim_id=claim_id - ) - if len(existing_claims) != 1: - account_ids = ', '.join(f"'{account.id}'" for account in accounts) + old = await wallet.claims.get(claim_id=claim_id) + if not old.claim.is_stream: raise Exception( - f"Can't find the stream '{claim_id}' in account(s) {account_ids}." - ) - old_txo = existing_claims[0] - if not old_txo.claim.is_stream: - raise Exception( - f"A claim with id '{claim_id}' was found but it is not a stream claim." + f"A claim with id '{claim_id}' was found but " + f"it is not a stream claim." ) if bid is not None: - amount = self.get_dewies_or_error('bid', bid, positive_value=True) + amount = self.ledger.get_dewies_or_error('bid', bid, positive_value=True) else: - amount = old_txo.amount + amount = old.amount - if claim_address is not None: - self.valid_address_or_error(claim_address) + claim_address = stream_dict.pop('claim_address') + if claim_address: + holding_address = await holding_account.get_valid_receiving_address(claim_address) else: - claim_address = old_txo.get_address(account.ledger) + holding_address = old.get_address(self.ledger) - channel = None + signing_channel = None + channel_id = stream_dict.pop('channel_id') + channel_name = stream_dict.pop('channel_name') + clear_channel = stream_dict.pop('clear_channel') if channel_id or channel_name: - channel = await self.get_channel_or_error( - wallet, channel_account_id, channel_id, channel_name, for_signing=True) - elif old_txo.claim.is_signed and not clear_channel and not replace: - channel = old_txo.channel - - fee_address = self.get_fee_address(kwargs, claim_address) - if fee_address: - kwargs['fee_address'] = fee_address - - file_path, spec = await self._video_file_analyzer.verify_or_repair( - validate_file, optimize_file, file_path, ignore_non_video=True - ) - kwargs.update(spec) - - if replace: - claim = Claim() - claim.stream.message.source.CopyFrom( - old_txo.claim.stream.message.source + signing_channel = await wallet.channels.get_for_signing_or_none( + channel_id=channel_id, channel_name=channel_name ) - stream_type = old_txo.claim.stream.stream_type - if stream_type: - old_stream_type = getattr(old_txo.claim.stream.message, stream_type) - new_stream_type = getattr(claim.stream.message, stream_type) - new_stream_type.CopyFrom(old_stream_type) - claim.stream.update(file_path=file_path, **kwargs) - else: - claim = Claim.from_bytes(old_txo.claim.to_bytes()) - claim.stream.update(file_path=file_path, **kwargs) - tx = await Transaction.claim_update( - old_txo, claim, amount, claim_address, funding_accounts, funding_accounts[0], channel + elif old.claim.is_signed and not clear_channel and not replace: + signing_channel = old.channel + + kwargs['fee_address'] = self.ledger.get_fee_address(kwargs, holding_address) + + stream_dict.pop('validate_file') + stream_dict.pop('optimize_file') + # TODO: fix + #file_path, spec = await self._video_file_analyzer.verify_or_repair( + # validate_file, optimize_file, file_path, ignore_non_video=True + #) + #kwargs.update(spec) + class FakeManagedStream: + sd_hash = 'beef' + async def create_file_stream(path): + return FakeManagedStream() + tx, fs = await wallet.streams.update( + old=old, amount=amount, file_path=stream_dict.pop('file_path'), + create_file_stream=create_file_stream, replace=replace, + holding_address=holding_address, funding_accounts=funding_accounts, + signing_channel=signing_channel, **remove_nulls(stream_dict) ) - new_txo = tx.outputs[0] - - stream_hash = None - if not preview: - old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) - if file_path is not None: - if old_stream: - await self.stream_manager.delete_stream(old_stream, delete_file=False) - file_stream = await self.stream_manager.create_stream(file_path) - new_txo.claim.stream.source.sd_hash = file_stream.sd_hash - new_txo.script.generate() - stream_hash = file_stream.stream_hash - elif old_stream: - stream_hash = old_stream.stream_hash - - if channel: - new_txo.sign(channel) - await tx.sign(funding_accounts) - - if not preview: - await self.broadcast_or_release(tx, blocking) - await self.storage.save_claims([self._old_get_temp_claim_info( - tx, new_txo, claim_address, new_txo.claim, new_txo.claim_name, dewies_to_lbc(amount) - )]) - if stream_hash: - await self.storage.save_content_claim(stream_hash, new_txo.id) - self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('publish')) - else: - await account.ledger.release_tx(tx) - + await self.service.maybe_broadcast_or_release(tx, tx_dict['preview'], tx_dict['no_wait']) return tx async def stream_abandon( @@ -2403,33 +2360,21 @@ class API: {kwargs} """ - wallet = self.wallets.get_or_default(wallet_id) - assert not wallet.is_locked, "Cannot spend funds with locked wallet, unlock first." - funding_accounts = wallet.accounts.get_or_all(fund_account_id) - amount = self.ledger.get_dewies_or_error("amount", amount) - claim = await self.ledger.get_claim_by_claim_id(wallet.accounts, claim_id) - claim_address = claim.get_address(self.ledger.ledger) + tx_dict, kwargs = pop_kwargs('tx', extract_tx(**tx_kwargs)) + assert_consumed_kwargs(kwargs) + wallet = self.wallets.get_or_default_for_spending(tx_dict.pop('wallet_id')) + amount = self.ledger.get_dewies_or_error('amount', amount, positive_value=True) + funding_accounts = wallet.accounts.get_or_all(tx_dict.pop('fund_account_id')) + claim = await wallet.claims.get(claim_id=claim_id) + claim_address = claim.get_address(self.ledger) if not tip: - account = wallet.accounts.get_or_default(account_id) - claim_address = await account.receiving.get_or_create_usable_address() + holding_account = wallet.accounts.get_or_default(account_id) + claim_address = await holding_account.receiving.get_or_create_usable_address() - tx = await Transaction.support( + tx = await wallet.supports.create( claim.claim_name, claim_id, amount, claim_address, funding_accounts, funding_accounts[0] ) - - if not preview: - await self.broadcast_or_release(tx, blocking) - await self.storage.save_supports({claim_id: [{ - 'txid': tx.id, - 'nout': tx.position, - 'address': claim_address, - 'claim_id': claim_id, - 'amount': dewies_to_lbc(amount) - }]}) - self.component_manager.loop.create_task(self.analytics_manager.send_claim_action('new_support')) - else: - await self.ledger.release_tx(tx) - + await self.service.maybe_broadcast_or_release(tx, tx_dict['preview'], tx_dict['no_wait']) return tx async def support_list( diff --git a/lbry/wallet/account.py b/lbry/wallet/account.py index 1a7b065b1..ce6508958 100644 --- a/lbry/wallet/account.py +++ b/lbry/wallet/account.py @@ -431,14 +431,6 @@ class Account: def get_public_key(self, chain: int, index: int) -> PubKey: return self.address_managers[chain].get_public_key(index) - def get_balance(self, confirmations=0, include_claims=False, **constraints): - if not include_claims: - constraints.update({'txo_type__in': (TXO_TYPES['other'], TXO_TYPES['purchase'])}) - if confirmations > 0: - height = self.ledger.headers.height - (confirmations-1) - constraints.update({'height__lte': height, 'height__gt': 0}) - return self.db.get_balance(account=self, **constraints) - async def get_max_gap(self): change_gap = await self.change.get_max_gap() receiving_gap = await self.receiving.get_max_gap() @@ -497,35 +489,5 @@ class Account: gap_changed = True return gap_changed - def get_support_summary(self): - return self.db.get_supports_summary(account=self) - - async def get_detailed_balance(self, confirmations=0, reserved_subtotals=False): - tips_balance, supports_balance, claims_balance = 0, 0, 0 - get_total_balance = partial(self.get_balance, confirmations=confirmations, - include_claims=True) - total = await get_total_balance() - if reserved_subtotals: - claims_balance = await get_total_balance(txo_type__in=CLAIM_TYPE_CODES) - for txo in await self.get_support_summary(): - if confirmations > 0 and not 0 < txo.tx_ref.height <= self.ledger.headers.height - (confirmations - 1): - continue - if txo.is_my_input: - supports_balance += txo.amount - else: - tips_balance += txo.amount - reserved = claims_balance + supports_balance + tips_balance - else: - reserved = await self.get_balance( - confirmations=confirmations, include_claims=True, txo_type__gt=0 - ) - return { - 'total': total, - 'available': total - reserved, - 'reserved': reserved, - 'reserved_subtotals': { - 'claims': claims_balance, - 'supports': supports_balance, - 'tips': tips_balance - } if reserved_subtotals else None - } + async def get_balance(self, **constraints): + return await self.db.get_balance(account=self, **constraints) diff --git a/lbry/wallet/wallet.py b/lbry/wallet/wallet.py index 0ba899b7b..ba2c82161 100644 --- a/lbry/wallet/wallet.py +++ b/lbry/wallet/wallet.py @@ -388,11 +388,8 @@ class Wallet: f"Use --allow-duplicate-name flag to override." ) - async def get_balance(self): - balance = {"total": 0} - for account in self.accounts: - balance["total"] += await account.get_balance() - return balance + async def get_balance(self, **constraints): + return await self.db.get_balance(accounts=self.accounts, **constraints) class AccountListManager: @@ -533,7 +530,7 @@ class ClaimListManager(BaseListManager): else: updated_claim.clear_signature() return await self.wallet.create_transaction( - [Input.spend(previous_claim)], [updated_claim], funding_accounts, change_account, sign=False + [Input.spend(previous_claim)], [updated_claim], funding_accounts, change_account ) async def delete(self, claim_id=None, txid=None, nout=None): @@ -554,7 +551,7 @@ class ClaimListManager(BaseListManager): key, value, constraints = 'name', claim_name, {'claim_name': claim_name} else: raise ValueError(f"Couldn't find {self.name} because an {self.name}_id or name was not provided.") - claims = await self.list(**constraints) + claims = await self.list(is_spent=False, **constraints) if len(claims) == 1: return claims[0] elif len(claims) > 1: @@ -574,8 +571,9 @@ class ChannelListManager(ClaimListManager): __slots__ = () async def create( - self, name: str, amount: int, holding_account: Account, - funding_accounts: List[Account], save_key=True, **kwargs) -> Transaction: + self, name: str, amount: int, holding_account: Account, + funding_accounts: List[Account], save_key=True, **kwargs + ) -> Transaction: holding_address = await holding_account.receiving.get_or_create_usable_address() @@ -600,9 +598,10 @@ class ChannelListManager(ClaimListManager): return tx async def update( - self, old: Output, amount: int, new_signing_key: bool, replace: bool, - holding_account: Account, funding_accounts: List[Account], - save_key=True, **kwargs) -> Transaction: + self, old: Output, amount: int, new_signing_key: bool, replace: bool, + holding_account: Account, funding_accounts: List[Account], + save_key=True, **kwargs + ) -> Transaction: moving_accounts = False holding_address = old.get_address(self.wallet.ledger) @@ -664,11 +663,12 @@ class StreamListManager(ClaimListManager): __slots__ = () async def create( - self, name: str, amount: int, file_path: str, - create_file_stream: Callable[[str], Awaitable[ManagedStream]], - holding_address: str, funding_accounts: List[Account], - signing_channel: Optional[Output] = None, - preview=False, **kwargs) -> Tuple[Transaction, ManagedStream]: + self, name: str, amount: int, file_path: str, + create_file_stream: Callable[[str], Awaitable[ManagedStream]], + holding_address: str, funding_accounts: List[Account], + signing_channel: Optional[Output] = None, + preview=False, **kwargs + ) -> Tuple[Transaction, ManagedStream]: claim = Claim() claim.stream.update(file_path=file_path, sd_hash='0' * 96, **kwargs) @@ -702,6 +702,61 @@ class StreamListManager(ClaimListManager): return tx, file_stream + async def update( + self, old: Output, amount: int, file_path: str, + create_file_stream: Callable[[str], Awaitable[ManagedStream]], + holding_address: str, funding_accounts: List[Account], + signing_channel: Optional[Output] = None, + preview=False, replace=False, **kwargs + ) -> Tuple[Transaction, ManagedStream]: + + if replace: + claim = Claim() + # stream file metadata is not replaced + claim.stream.message.source.CopyFrom(old.claim.stream.message.source) + stream_type = old.claim.stream.stream_type + if stream_type: + old_stream_type = getattr(old.claim.stream.message, stream_type) + new_stream_type = getattr(claim.stream.message, stream_type) + new_stream_type.CopyFrom(old_stream_type) + else: + claim = Claim.from_bytes(old.claim.to_bytes()) + claim.stream.update(file_path=file_path, **kwargs) + + # before creating file stream, create TX to ensure we have enough LBC + tx = await super().update( + old, claim, amount, holding_address, + funding_accounts, funding_accounts[0], + signing_channel + ) + txo = tx.outputs[0] + + file_stream = None + try: + + # we have enough LBC to create TX, now try create the file stream + if not preview: + old_stream = None # TODO: get old stream + if file_path is not None: + if old_stream is not None: + # TODO: delete the old stream + pass + file_stream = await create_file_stream(file_path) + claim.stream.source.sd_hash = file_stream.sd_hash + txo.script.generate() + + # creating TX and file stream was successful, now sign all the things + if signing_channel is not None: + txo.sign(signing_channel) + await self.wallet.sign(tx) + + except Exception as e: + # creating file stream or something else went wrong, release txos + await self.wallet.db.release_tx(tx) + raise e + + return tx, file_stream + async def list(self, **constraints) -> Result[Output]: return await self.wallet.db.get_streams(wallet=self.wallet, **constraints)