diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index 3b4ac0b7c..86f127f01 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -14,7 +14,7 @@ from jsonschema import ValidationError from decimal import Decimal from lbryum import SimpleConfig, Network -from lbryum.lbrycrd import COIN +from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS import lbryum.wallet from lbryum.commands import known_commands, Commands @@ -92,7 +92,6 @@ class Wallet(object): self._first_run = self._FIRST_RUN_UNKNOWN def start(self): - def start_manage(): self.stopped = False self.manage() @@ -765,7 +764,6 @@ class LBRYumWallet(Wallet): self.max_behind = 0 def _start(self): - network_start_d = defer.Deferred() def setup_network(): @@ -774,6 +772,7 @@ class LBRYumWallet(Wallet): alert.info("Loading the wallet...") return defer.succeed(self.network.start()) + d = setup_network() def check_started(): @@ -838,6 +837,7 @@ class LBRYumWallet(Wallet): wallet.create_main_account() wallet.synchronize() self.wallet = wallet + return defer.succeed(True) blockchain_caught_d = defer.Deferred() @@ -881,8 +881,7 @@ class LBRYumWallet(Wallet): return defer.fail(err) self._catch_up_check = task.LoopingCall(check_caught_up) - - d = threads.deferToThread(get_wallet) + d = get_wallet() d.addCallback(self._save_wallet) d.addCallback(lambda _: self.wallet.start_threads(self.network)) d.addCallback(lambda _: self._catch_up_check.start(.1)) @@ -893,24 +892,36 @@ class LBRYumWallet(Wallet): def _get_cmd_runner(self): self.cmd_runner = Commands(self.config, self.wallet, self.network) - def get_balance(self): - cmd = known_commands['getbalance'] + # run commands as a defer.succeed, + # lbryum commands should be run this way , unless if the command + # only makes a lbrum server query, use _run_cmd_as_defer_to_thread() + def _run_cmd_as_defer_succeed(self, command_name, *args): + cmd = known_commands[command_name] func = getattr(self.cmd_runner, cmd.name) + return defer.succeed(func(*args)) + + + # run commands as a deferToThread, lbryum commands that only make + # queries to lbryum server should be run this way + def _run_cmd_as_defer_to_thread(self, command_name, *args): + cmd = known_commands[command_name] + func = getattr(self.cmd_runner, cmd.name) + return threads.deferToThread(func, *args) + + def get_balance(self): accounts = None exclude_claimtrietx = True - d = threads.deferToThread(func, accounts, exclude_claimtrietx) + d = self._run_cmd_as_defer_succeed('getbalance', accounts, exclude_claimtrietx) d.addCallback(lambda result: Decimal(result['confirmed']) + Decimal(result.get('unconfirmed', 0.0))) return d def get_new_address(self): - d = threads.deferToThread(self.wallet.create_new_address) + d = defer.succeed(self.wallet.create_new_address()) d.addCallback(self._save_wallet) return d def get_block(self, blockhash): - cmd = known_commands['getblock'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, blockhash) + return self._run_cmd_as_defer_to_thread('getblock', blockhash) def get_most_recent_blocktime(self): header = self.network.get_header(self.network.get_local_height()) @@ -918,111 +929,112 @@ class LBRYumWallet(Wallet): def get_best_blockhash(self): height = self.network.get_local_height() - d = threads.deferToThread(self.network.blockchain.read_header, height) - d.addCallback(lambda header: self.network.blockchain.hash_header(header)) - return d + header = self.network.blockchain.read_header(height) + return defer.succeed(self.network.blockchain.hash_header(header)) def _get_blockhash(self, height): - d = threads.deferToThread(self.network.blockchain.read_header, height) - d.addCallback(lambda header: self.network.blockchain.hash_header(header)) - return d + header = self.network.blockchain.read_header(height) + return defer.succeed(self.network.blockchain.hash_header(header)) def get_name_claims(self): - cmd = known_commands['getnameclaims'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func) + return self._run_cmd_as_defer_succeed('getnameclaims') def _check_first_run(self): return defer.succeed(self.first_run) - def _send_name_claim(self, name, val, amount): - cmd = known_commands['claim'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, name, json.dumps(val), amount) - def _get_claims_for_name(self, name): - cmd = known_commands['getclaimsforname'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, name) + return self._run_cmd_as_defer_to_thread('getclaimsforname', name) + + def _send_name_claim(self, name, val, amount): + broadcast = False + log.debug("Name claim %s %s %f", name, val, amount) + d = self._run_cmd_as_defer_succeed('claim', name, json.dumps(val), amount, broadcast) + d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out)) + return d def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): metadata = json.dumps(value) - log.debug("Update %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'], + log.debug("Update %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'], amount, name, claim_id, metadata) - cmd = known_commands['update'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout'], name, claim_id, metadata, amount) + broadcast = False + d = self._run_cmd_as_defer_succeed('update', claim_outpoint['txid'], claim_outpoint['nout'], + name, claim_id, metadata, amount, broadcast) + d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out)) + return d def _abandon_claim(self, claim_outpoint): log.debug("Abandon %s %s" % (claim_outpoint['txid'], claim_outpoint['nout'])) - cmd = known_commands['abandon'] - func = getattr(self.cmd_runner, cmd.name) - d = threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout']) + broadcast = False + d = self._run_cmd_as_defer_succeed('abandon', claim_outpoint['txid'], claim_outpoint['nout'], broadcast) + d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out)) return d def _support_claim(self, name, claim_id, amount): log.debug("Support %s %s %f" % (name, claim_id, amount)) - cmd = known_commands['support'] - func = getattr(self.cmd_runner, cmd.name) - d = threads.deferToThread(func, name, claim_id, amount) + broadcast = False + d = self._run_cmd_as_defer_succeed('support', name, claim_id, amount, broadcast) + d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out)) return d + def _broadcast_claim_transaction(self, claim_out): + if 'success' not in claim_out: + raise Exception('Unexpected claim command output:{}'.format(claim_out)) + if claim_out['success']: + d = self._broadcast_transaction(claim_out['tx']) + d.addCallback(lambda _: claim_out) + return d + else: + return claim_out + def _broadcast_transaction(self, raw_tx): def _log_tx(r): log.debug("Broadcast tx: %s", r) return r - cmd = known_commands['broadcast'] - func = getattr(self.cmd_runner, cmd.name) - d = threads.deferToThread(func, raw_tx) + d = self._run_cmd_as_defer_to_thread('broadcast', raw_tx) d.addCallback(_log_tx) d.addCallback( lambda r: r if len(r) == 64 else defer.fail(Exception("Transaction rejected"))) - d.addCallback(self._save_wallet) return d def _do_send_many(self, payments_to_send): - log.warning("Doing send many. payments to send: %s", str(payments_to_send)) - cmd = known_commands['paytomanyandsend'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, payments_to_send.iteritems()) + def broadcast_send_many(paytomany_out): + if 'hex' not in paytomany_out: + raise Exception('Unepxected paytomany output:{}'.format(paytomany_out)) + return self._broadcast_transaction(paytomany_out['hex']) + log.debug("Doing send many. payments to send: %s", str(payments_to_send)) + d = self._run_cmd_as_defer_succeed('paytomany', payments_to_send.iteritems()) + d.addCallback(lambda out: broadcast_send_many(out)) + return d def _get_value_for_name(self, name): - cmd = known_commands['getvalueforname'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, name) + block_header = self.network.blockchain.read_header(self.network.get_local_height() - RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS) + block_hash = self.network.blockchain.hash_header(block_header) + d = self._run_cmd_as_defer_to_thread('requestvalueforname', name, block_hash) + d.addCallback(lambda response: Commands._verify_proof(name, block_header['claim_trie_root'], response)) + return d def get_claims_from_tx(self, txid): - cmd = known_commands['getclaimsfromtx'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, txid) + return self._run_cmd_as_defer_to_thread('getclaimsfromtx', txid) def _get_balance_for_address(self, address): return defer.succeed(Decimal(self.wallet.get_addr_received(address))/COIN) def get_nametrie(self): - cmd = known_commands['getclaimtrie'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func) + return self._run_cmd_as_defer_to_thread('getclaimtrie') def _get_history(self): - cmd = known_commands['history'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func) + return self._run_cmd_as_defer_succeed('history') def _address_is_mine(self, address): - cmd = known_commands['ismine'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, address) + return self._run_cmd_as_defer_succeed('ismine', address) def get_pub_keys(self, wallet): - cmd = known_commands['getpubkeys'] - func = getattr(self.cmd_runner, cmd.name) - return threads.deferToThread(func, wallet) + return self._run_cmd_as_defer_succeed('getpubkyes', wallet) def _save_wallet(self, val): - d = threads.deferToThread(self.wallet.storage.write) - d.addCallback(lambda _: val) - return d + self.wallet.storage.write() + return defer.succeed(val) + class LBRYcrdAddressRequester(object):