Merge pull request #331 from lbryio/fix_wallet_threading_issue

Fix wallet concurrency issue
This commit is contained in:
Umpei Kay Kurokawa 2016-12-15 10:57:11 -05:00 committed by GitHub
commit 27a09d4a4f
2 changed files with 79 additions and 124 deletions

View file

@ -14,10 +14,9 @@ from jsonschema import ValidationError
from decimal import Decimal from decimal import Decimal
from lbryum import SimpleConfig, Network from lbryum import SimpleConfig, Network
from lbryum.lbrycrd import COIN from lbryum.lbrycrd import COIN, RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS
import lbryum.wallet import lbryum.wallet
from lbryum.commands import known_commands, Commands from lbryum.commands import known_commands, Commands
from lbryum.transaction import Transaction
from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet
from lbrynet.core.client.ClientRequest import ClientRequest from lbrynet.core.client.ClientRequest import ClientRequest
@ -93,7 +92,6 @@ class Wallet(object):
self._first_run = self._FIRST_RUN_UNKNOWN self._first_run = self._FIRST_RUN_UNKNOWN
def start(self): def start(self):
def start_manage(): def start_manage():
self.stopped = False self.stopped = False
self.manage() self.manage()
@ -500,11 +498,6 @@ class Wallet(object):
def support_claim(self, name, claim_id, amount): def support_claim(self, name, claim_id, amount):
return self._support_claim(name, claim_id, amount) return self._support_claim(name, claim_id, amount)
def get_tx(self, txid):
d = self._get_raw_tx(txid)
d.addCallback(self._get_decoded_tx)
return d
def get_block_info(self, height): def get_block_info(self, height):
d = self._get_blockhash(height) d = self._get_blockhash(height)
return d return d
@ -715,15 +708,9 @@ class Wallet(object):
def _check_first_run(self): def _check_first_run(self):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _get_raw_tx(self, txid):
return defer.fail(NotImplementedError())
def _send_name_claim(self, name, val, amount): def _send_name_claim(self, name, val, amount):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _get_decoded_tx(self, raw_tx):
return defer.fail(NotImplementedError())
def _abandon_claim(self, claim_outpoint): def _abandon_claim(self, claim_outpoint):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
@ -751,9 +738,6 @@ class Wallet(object):
def _address_is_mine(self, address): def _address_is_mine(self, address):
return defer.fail(NotImplementedError()) return defer.fail(NotImplementedError())
def _get_transaction(self, txid):
return defer.fail(NotImplementedError())
def _start(self): def _start(self):
pass pass
@ -780,7 +764,6 @@ class LBRYumWallet(Wallet):
self.max_behind = 0 self.max_behind = 0
def _start(self): def _start(self):
network_start_d = defer.Deferred() network_start_d = defer.Deferred()
def setup_network(): def setup_network():
@ -789,6 +772,7 @@ class LBRYumWallet(Wallet):
alert.info("Loading the wallet...") alert.info("Loading the wallet...")
return defer.succeed(self.network.start()) return defer.succeed(self.network.start())
d = setup_network() d = setup_network()
def check_started(): def check_started():
@ -853,6 +837,7 @@ class LBRYumWallet(Wallet):
wallet.create_main_account() wallet.create_main_account()
wallet.synchronize() wallet.synchronize()
self.wallet = wallet self.wallet = wallet
return defer.succeed(True)
blockchain_caught_d = defer.Deferred() blockchain_caught_d = defer.Deferred()
@ -896,8 +881,7 @@ class LBRYumWallet(Wallet):
return defer.fail(err) return defer.fail(err)
self._catch_up_check = task.LoopingCall(check_caught_up) self._catch_up_check = task.LoopingCall(check_caught_up)
d = get_wallet()
d = threads.deferToThread(get_wallet)
d.addCallback(self._save_wallet) d.addCallback(self._save_wallet)
d.addCallback(lambda _: self.wallet.start_threads(self.network)) d.addCallback(lambda _: self.wallet.start_threads(self.network))
d.addCallback(lambda _: self._catch_up_check.start(.1)) d.addCallback(lambda _: self._catch_up_check.start(.1))
@ -908,24 +892,36 @@ class LBRYumWallet(Wallet):
def _get_cmd_runner(self): def _get_cmd_runner(self):
self.cmd_runner = Commands(self.config, self.wallet, self.network) self.cmd_runner = Commands(self.config, self.wallet, self.network)
def get_balance(self): # run commands as a defer.succeed,
cmd = known_commands['getbalance'] # lbryum commands should be run this way , unless if the command
# only makes a lbrum server query, use _run_cmd_as_defer_to_thread()
def _run_cmd_as_defer_succeed(self, command_name, *args):
cmd = known_commands[command_name]
func = getattr(self.cmd_runner, cmd.name) func = getattr(self.cmd_runner, cmd.name)
return defer.succeed(func(*args))
# run commands as a deferToThread, lbryum commands that only make
# queries to lbryum server should be run this way
def _run_cmd_as_defer_to_thread(self, command_name, *args):
cmd = known_commands[command_name]
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, *args)
def get_balance(self):
accounts = None accounts = None
exclude_claimtrietx = True exclude_claimtrietx = True
d = threads.deferToThread(func, accounts, exclude_claimtrietx) d = self._run_cmd_as_defer_succeed('getbalance', accounts, exclude_claimtrietx)
d.addCallback(lambda result: Decimal(result['confirmed']) + Decimal(result.get('unconfirmed', 0.0))) d.addCallback(lambda result: Decimal(result['confirmed']) + Decimal(result.get('unconfirmed', 0.0)))
return d return d
def get_new_address(self): def get_new_address(self):
d = threads.deferToThread(self.wallet.create_new_address) d = defer.succeed(self.wallet.create_new_address())
d.addCallback(self._save_wallet) d.addCallback(self._save_wallet)
return d return d
def get_block(self, blockhash): def get_block(self, blockhash):
cmd = known_commands['getblock'] return self._run_cmd_as_defer_to_thread('getblock', blockhash)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, blockhash)
def get_most_recent_blocktime(self): def get_most_recent_blocktime(self):
header = self.network.get_header(self.network.get_local_height()) header = self.network.get_header(self.network.get_local_height())
@ -933,137 +929,112 @@ class LBRYumWallet(Wallet):
def get_best_blockhash(self): def get_best_blockhash(self):
height = self.network.get_local_height() height = self.network.get_local_height()
d = threads.deferToThread(self.network.blockchain.read_header, height) header = self.network.blockchain.read_header(height)
d.addCallback(lambda header: self.network.blockchain.hash_header(header)) return defer.succeed(self.network.blockchain.hash_header(header))
return d
def _get_blockhash(self, height): def _get_blockhash(self, height):
d = threads.deferToThread(self.network.blockchain.read_header, height) header = self.network.blockchain.read_header(height)
d.addCallback(lambda header: self.network.blockchain.hash_header(header)) return defer.succeed(self.network.blockchain.hash_header(header))
return d
def get_name_claims(self): def get_name_claims(self):
cmd = known_commands['getnameclaims'] return self._run_cmd_as_defer_succeed('getnameclaims')
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func)
def _check_first_run(self): def _check_first_run(self):
return defer.succeed(self.first_run) return defer.succeed(self.first_run)
def _get_raw_tx(self, txid): def _get_claims_for_name(self, name):
cmd = known_commands['gettransaction'] return self._run_cmd_as_defer_to_thread('getclaimsforname', name)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, txid)
def _get_transaction(self, txid):
def _add_confirms(tx):
tx['confirmations'] = self.wallet.get_confirmations(txid)
return tx
d = self._get_raw_tx(txid)
d.addCallback(self._get_decoded_tx)
d.addCallback(_add_confirms)
return d
def _send_name_claim(self, name, val, amount): def _send_name_claim(self, name, val, amount):
cmd = known_commands['claim'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) log.debug("Name claim %s %s %f", name, val, amount)
return threads.deferToThread(func, name, json.dumps(val), amount) d = self._run_cmd_as_defer_succeed('claim', name, json.dumps(val), amount, broadcast)
d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
def _get_claims_for_name(self, name): return d
cmd = known_commands['getclaimsforname']
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, name)
def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount): def _send_name_claim_update(self, name, claim_id, claim_outpoint, value, amount):
metadata = json.dumps(value) metadata = json.dumps(value)
log.debug("Update %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'], log.debug("Update %s %d %f %s %s '%s'", claim_outpoint['txid'], claim_outpoint['nout'],
amount, name, claim_id, metadata) amount, name, claim_id, metadata)
cmd = known_commands['update'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) d = self._run_cmd_as_defer_succeed('update', claim_outpoint['txid'], claim_outpoint['nout'],
return threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout'], name, claim_id, metadata, amount) name, claim_id, metadata, amount, broadcast)
d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d
def _get_decoded_tx(self, raw_tx):
tx = Transaction(raw_tx)
decoded_tx = {}
decoded_tx['vout'] = []
for output in tx.outputs():
out = {}
out['value'] = Decimal(output[2]) / Decimal(COIN)
decoded_tx['vout'].append(out)
return decoded_tx
def _abandon_claim(self, claim_outpoint): def _abandon_claim(self, claim_outpoint):
log.debug("Abandon %s %s" % (claim_outpoint['txid'], claim_outpoint['nout'])) log.debug("Abandon %s %s" % (claim_outpoint['txid'], claim_outpoint['nout']))
cmd = known_commands['abandon'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) d = self._run_cmd_as_defer_succeed('abandon', claim_outpoint['txid'], claim_outpoint['nout'], broadcast)
d = threads.deferToThread(func, claim_outpoint['txid'], claim_outpoint['nout']) d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d return d
def _support_claim(self, name, claim_id, amount): def _support_claim(self, name, claim_id, amount):
log.debug("Support %s %s %f" % (name, claim_id, amount)) log.debug("Support %s %s %f" % (name, claim_id, amount))
cmd = known_commands['support'] broadcast = False
func = getattr(self.cmd_runner, cmd.name) d = self._run_cmd_as_defer_succeed('support', name, claim_id, amount, broadcast)
d = threads.deferToThread(func, name, claim_id, amount) d.addCallback(lambda claim_out: self._broadcast_claim_transaction(claim_out))
return d return d
def _broadcast_claim_transaction(self, claim_out):
if 'success' not in claim_out:
raise Exception('Unexpected claim command output:{}'.format(claim_out))
if claim_out['success']:
d = self._broadcast_transaction(claim_out['tx'])
d.addCallback(lambda _: claim_out)
return d
else:
return defer.succeed(claim_out)
def _broadcast_transaction(self, raw_tx): def _broadcast_transaction(self, raw_tx):
def _log_tx(r): def _log_tx(r):
log.debug("Broadcast tx: %s", r) log.debug("Broadcast tx: %s", r)
return r return r
cmd = known_commands['broadcast'] d = self._run_cmd_as_defer_to_thread('broadcast', raw_tx)
func = getattr(self.cmd_runner, cmd.name)
d = threads.deferToThread(func, raw_tx)
d.addCallback(_log_tx) d.addCallback(_log_tx)
d.addCallback( d.addCallback(
lambda r: r if len(r) == 64 else defer.fail(Exception("Transaction rejected"))) lambda r: r if len(r) == 64 else defer.fail(Exception("Transaction rejected")))
d.addCallback(self._save_wallet)
return d return d
def _do_send_many(self, payments_to_send): def _do_send_many(self, payments_to_send):
log.warning("Doing send many. payments to send: %s", str(payments_to_send)) def broadcast_send_many(paytomany_out):
cmd = known_commands['paytomanyandsend'] if 'hex' not in paytomany_out:
func = getattr(self.cmd_runner, cmd.name) raise Exception('Unepxected paytomany output:{}'.format(paytomany_out))
return threads.deferToThread(func, payments_to_send.iteritems()) return self._broadcast_transaction(paytomany_out['hex'])
log.debug("Doing send many. payments to send: %s", str(payments_to_send))
d = self._run_cmd_as_defer_succeed('paytomany', payments_to_send.iteritems())
d.addCallback(lambda out: broadcast_send_many(out))
return d
def _get_value_for_name(self, name): def _get_value_for_name(self, name):
cmd = known_commands['getvalueforname'] block_header = self.network.blockchain.read_header(self.network.get_local_height() - RECOMMENDED_CLAIMTRIE_HASH_CONFIRMS)
func = getattr(self.cmd_runner, cmd.name) block_hash = self.network.blockchain.hash_header(block_header)
return threads.deferToThread(func, name) d = self._run_cmd_as_defer_to_thread('requestvalueforname', name, block_hash)
d.addCallback(lambda response: Commands._verify_proof(name, block_header['claim_trie_root'], response))
return d
def get_claims_from_tx(self, txid): def get_claims_from_tx(self, txid):
cmd = known_commands['getclaimsfromtx'] return self._run_cmd_as_defer_to_thread('getclaimsfromtx', txid)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, txid)
def _get_balance_for_address(self, address): def _get_balance_for_address(self, address):
return defer.succeed(Decimal(self.wallet.get_addr_received(address))/COIN) return defer.succeed(Decimal(self.wallet.get_addr_received(address))/COIN)
def get_nametrie(self): def get_nametrie(self):
cmd = known_commands['getclaimtrie'] return self._run_cmd_as_defer_to_thread('getclaimtrie')
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func)
def _get_history(self): def _get_history(self):
cmd = known_commands['history'] return self._run_cmd_as_defer_succeed('history')
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func)
def _address_is_mine(self, address): def _address_is_mine(self, address):
cmd = known_commands['ismine'] return self._run_cmd_as_defer_succeed('ismine', address)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, address)
def get_pub_keys(self, wallet): def get_pub_keys(self, wallet):
cmd = known_commands['getpubkeys'] return self._run_cmd_as_defer_succeed('getpubkyes', wallet)
func = getattr(self.cmd_runner, cmd.name)
return threads.deferToThread(func, wallet)
def _save_wallet(self, val): def _save_wallet(self, val):
d = threads.deferToThread(self.wallet.storage.write) self.wallet.storage.write()
d.addCallback(lambda _: val) return defer.succeed(val)
return d
class LBRYcrdAddressRequester(object): class LBRYcrdAddressRequester(object):

View file

@ -1886,22 +1886,6 @@ class Daemon(AuthJSONRPCServer):
d.addCallback(lambda r: self._render_response(r, OK_CODE)) d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d return d
def jsonrpc_get_transaction(self, p):
"""
Get a decoded transaction from a txid
Args:
txid: txid hex string
Returns:
JSON formatted transaction
"""
txid = p['txid']
d = self.session.wallet.get_transaction(txid)
d.addCallback(lambda r: self._render_response(r, OK_CODE))
return d
@AuthJSONRPCServer.auth_required @AuthJSONRPCServer.auth_required
def jsonrpc_address_is_mine(self, p): def jsonrpc_address_is_mine(self, p):
""" """