work on resolving threading issues with lbryum in Wallet.py

This commit is contained in:
Kay Kurokawa 2016-12-12 14:00:23 -05:00
parent 965f8f1238
commit 6f60b8b827

View file

@ -14,7 +14,7 @@ from jsonschema import ValidationError
from decimal import Decimal from decimal import Decimal
from lbryum import SimpleConfig, Network from lbryum import SimpleConfig, Network
from lbryum.lbrycrd import COIN from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS
import lbryum.wallet import lbryum.wallet
from lbryum.commands import known_commands, Commands from lbryum.commands import known_commands, Commands
@ -92,7 +92,6 @@ class Wallet(object):
self._first_run = self._FIRST_RUN_UNKNOWN self._first_run = self._FIRST_RUN_UNKNOWN
def start(self): def start(self):
def start_manage(): def start_manage():
self.stopped = False self.stopped = False
self.manage() self.manage()
@ -765,7 +764,6 @@ class LBRYumWallet(Wallet):
self.max_behind = 0 self.max_behind = 0
def _start(self): def _start(self):
network_start_d = defer.Deferred() network_start_d = defer.Deferred()
def setup_network(): def setup_network():
@ -774,6 +772,7 @@ class LBRYumWallet(Wallet):
alert.info("Loading the wallet...") alert.info("Loading the wallet...")
return defer.succeed(self.network.start()) return defer.succeed(self.network.start())
d = setup_network() d = setup_network()
def check_started(): def check_started():
@ -838,6 +837,7 @@ class LBRYumWallet(Wallet):
wallet.create_main_account() wallet.create_main_account()
wallet.synchronize() wallet.synchronize()
self.wallet = wallet self.wallet = wallet
return defer.succeed(True)
blockchain_caught_d = defer.Deferred() blockchain_caught_d = defer.Deferred()
@ -881,8 +881,7 @@ class LBRYumWallet(Wallet):
return defer.fail(err) return defer.fail(err)
self._catch_up_check = task.LoopingCall(check_caught_up) self._catch_up_check = task.LoopingCall(check_caught_up)
d = get_wallet()
d = threads.deferToThread(get_wallet)
d.addCallback(self._save_wallet) d.addCallback(self._save_wallet)
d.addCallback(lambda _: self.wallet.start_threads(self.network)) d.addCallback(lambda _: self.wallet.start_threads(self.network))
d.addCallback(lambda _: self._catch_up_check.start(.1)) d.addCallback(lambda _: self._catch_up_check.start(.1))
@ -893,24 +892,36 @@ class LBRYumWallet(Wallet):
def _get_cmd_runner(self): def _get_cmd_runner(self):
self.cmd_runner = Commands(self.config, self.wallet, self.network) self.cmd_runner = Commands(self.config, self.wallet, self.network)
def get_balance(self): # run commands as a defer.succeed,
cmd = known_commands['getbalance'] # lbryum commands should be run this way , unless if the command
# only makes a lbrum server query, use _run_cmd_as_defer_to_thread()
def _run_cmd_as_defer_succeed(self, command_name, *args):
cmd = known_commands[command_name]
func = getattr(self.cmd_runner, cmd.name) func = getattr(self.cmd_runner, cmd.name)
return defer.succeed(func(*args))
# run commands as a deferToThread, lbryum commands that only make
# queries to lbryum server should be run this way
def _run_cmd_as_defer_to_thread(self, command_name, *args):
cmd = known_commands[command_name]
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, *args)
def get_balance(self):
accounts = None accounts = None
exclude_claimtrietx = True exclude_claimtrietx = True
d = threads.deferToThread(func, accounts, exclude_claimtrietx) d = self._run_cmd_as_defer_succeed('getbalance', accounts, exclude_claimtrietx)
d.addCallback(lambda result: Decimal(result['confirmed']) + Decimal(result.get('unconfirmed', 0.0))) d.addCallback(lambda result: Decimal(result['confirmed']) + Decimal(result.get('unconfirmed', 0.0)))
return d return d
def get_new_address(self): def get_new_address(self):
d = threads.deferToThread(self.wallet.create_new_address) d = defer.succeed(self.wallet.create_new_address())
d.addCallback(self._save_wallet) d.addCallback(self._save_wallet)
return d return d
def get_block(self, blockhash): def get_block(self, blockhash):
cmd = known_commands['getblock'] return self._run_cmd_as_defer_to_thread('getblock', blockhash)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, blockhash)
def get_most_recent_blocktime(self): def get_most_recent_blocktime(self):
header = self.network.get_header(self.network.get_local_height()) header = self.network.get_header(self.network.get_local_height())
@ -918,111 +929,112 @@ class LBRYumWallet(Wallet):
def get_best_blockhash(self): def get_best_blockhash(self):
height = self.network.get_local_height() height = self.network.get_local_height()
d = threads.deferToThread(self.network.blockchain.read_header, height) header = self.network.blockchain.read_header(height)
d.addCallback(lambda header: self.network.blockchain.hash_header(header)) return defer.succeed(self.network.blockchain.hash_header(header))
return d
def _get_blockhash(self, height): def _get_blockhash(self, height):
d = threads.deferToThread(self.network.blockchain.read_header, height) header = self.network.blockchain.read_header(height)
d.addCallback(lambda header: self.network.blockchain.hash_header(header)) return defer.succeed(self.network.blockchain.hash_header(header))
return d
def get_name_claims(self): def get_name_claims(self):
cmd = known_commands['getnameclaims'] return self._run_cmd_as_defer_succeed('getnameclaims')
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func)
def _check_first_run(self): def _check_first_run(self):
return defer.succeed(self.first_run) return defer.succeed(self.first_run)
def _send_name_claim(self, name, val, amount):
cmd = known_commands['claim']
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, name, json.dumps(val), amount)
def _get_claims_for_name(self, name): def _get_claims_for_name(self, name):
cmd = known_commands['getclaimsforname'] return self._run_cmd_as_defer_to_thread('getclaimsforname', name)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, name) def _send_name_claim(self, name, val, amount):
broadcast = False
log.debug("Name claim %s %s %f", name, val, amount)
d = self._run_cmd_as_defer_succeed('claim', name, json.dumps(val), amount, broadcast)
d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d
def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount):
metadata = json.dumps(value) metadata = json.dumps(value)
log.debug("Update %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'], log.debug("Update %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'],
amount, name, claim_id, metadata) amount, name, claim_id, metadata)
cmd = known_commands['update'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) d = self._run_cmd_as_defer_succeed('update', claim_outpoint['txid'], claim_outpoint['nout'],
return threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout'], name, claim_id, metadata, amount) name, claim_id, metadata, amount, broadcast)
d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d
def _abandon_claim(self, claim_outpoint): def _abandon_claim(self, claim_outpoint):
log.debug("Abandon %s %s" % (claim_outpoint['txid'], claim_outpoint['nout'])) log.debug("Abandon %s %s" % (claim_outpoint['txid'], claim_outpoint['nout']))
cmd = known_commands['abandon'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) d = self._run_cmd_as_defer_succeed('abandon', claim_outpoint['txid'], claim_outpoint['nout'], broadcast)
d = threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout']) d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d return d
def _support_claim(self, name, claim_id, amount): def _support_claim(self, name, claim_id, amount):
log.debug("Support %s %s %f" % (name, claim_id, amount)) log.debug("Support %s %s %f" % (name, claim_id, amount))
cmd = known_commands['support'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) d = self._run_cmd_as_defer_succeed('support', name, claim_id, amount, broadcast)
d = threads.deferToThread(func, name, claim_id, amount) d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d return d
def _broadcast_claim_transaction(self, claim_out):
if 'success' not in claim_out:
raise Exception('Unexpected claim command output:{}'.format(claim_out))
if claim_out['success']:
d = self._broadcast_transaction(claim_out['tx'])
d.addCallback(lambda _: claim_out)
return d
else:
return claim_out
def _broadcast_transaction(self, raw_tx): def _broadcast_transaction(self, raw_tx):
def _log_tx(r): def _log_tx(r):
log.debug("Broadcast tx: %s", r) log.debug("Broadcast tx: %s", r)
return r return r
cmd = known_commands['broadcast'] d = self._run_cmd_as_defer_to_thread('broadcast', raw_tx)
func = getattr(self.cmd_runner, cmd.name)
d = threads.deferToThread(func, raw_tx)
d.addCallback(_log_tx) d.addCallback(_log_tx)
d.addCallback( d.addCallback(
lambda r: r if len(r) == 64 else defer.fail(Exception("Transaction rejected"))) lambda r: r if len(r) == 64 else defer.fail(Exception("Transaction rejected")))
d.addCallback(self._save_wallet)
return d return d
def _do_send_many(self, payments_to_send): def _do_send_many(self, payments_to_send):
log.warning("Doing send many. payments to send: %s", str(payments_to_send)) def broadcast_send_many(paytomany_out):
cmd = known_commands['paytomanyandsend'] if 'hex' not in paytomany_out:
func = getattr(self.cmd_runner, cmd.name) raise Exception('Unepxected paytomany output:{}'.format(paytomany_out))
return threads.deferToThread(func, payments_to_send.iteritems()) return self._broadcast_transaction(paytomany_out['hex'])
log.debug("Doing send many. payments to send: %s", str(payments_to_send))
d = self._run_cmd_as_defer_succeed('paytomany', payments_to_send.iteritems())
d.addCallback(lambda out: broadcast_send_many(out))
return d
def _get_value_for_name(self, name): def _get_value_for_name(self, name):
cmd = known_commands['getvalueforname'] block_header = self.network.blockchain.read_header(self.network.get_local_height() - RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS)
func = getattr(self.cmd_runner, cmd.name) block_hash = self.network.blockchain.hash_header(block_header)
return threads.deferToThread(func, name) d = self._run_cmd_as_defer_to_thread('requestvalueforname', name, block_hash)
d.addCallback(lambda response: Commands._verify_proof(name, block_header['claim_trie_root'], response))
return d
def get_claims_from_tx(self, txid): def get_claims_from_tx(self, txid):
cmd = known_commands['getclaimsfromtx'] return self._run_cmd_as_defer_to_thread('getclaimsfromtx', txid)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, txid)
def _get_balance_for_address(self, address): def _get_balance_for_address(self, address):
return defer.succeed(Decimal(self.wallet.get_addr_received(address))/COIN) return defer.succeed(Decimal(self.wallet.get_addr_received(address))/COIN)
def get_nametrie(self): def get_nametrie(self):
cmd = known_commands['getclaimtrie'] return self._run_cmd_as_defer_to_thread('getclaimtrie')
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func)
def _get_history(self): def _get_history(self):
cmd = known_commands['history'] return self._run_cmd_as_defer_succeed('history')
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func)
def _address_is_mine(self, address): def _address_is_mine(self, address):
cmd = known_commands['ismine'] return self._run_cmd_as_defer_succeed('ismine', address)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, address)
def get_pub_keys(self, wallet): def get_pub_keys(self, wallet):
cmd = known_commands['getpubkeys'] return self._run_cmd_as_defer_succeed('getpubkyes', wallet)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, wallet)
def _save_wallet(self, val): def _save_wallet(self, val):
d = threads.deferToThread(self.wallet.storage.write) self.wallet.storage.write()
d.addCallback(lambda _: val) return defer.succeed(val)
return d
class LBRYcrdAddressRequester(object): class LBRYcrdAddressRequester(object):