diff --git a/lbrynet/cli.py b/lbrynet/cli.py index 2a7feabe7..4972ef9b0 100644 --- a/lbrynet/cli.py +++ b/lbrynet/cli.py @@ -1,4 +1,16 @@ import sys +from twisted.internet import asyncioreactor +if 'twisted.internet.reactor' not in sys.modules: + asyncioreactor.install() +else: + from twisted.internet import reactor + if not isinstance(reactor, asyncioreactor.AsyncioSelectorReactor): + # pyinstaller hooks install the default reactor before + # any of our code runs, see kivy for similar problem: + # https://github.com/kivy/kivy/issues/4182 + del sys.modules['twisted.internet.reactor'] + asyncioreactor.install() + import json import asyncio from aiohttp.client_exceptions import ClientConnectorError diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py deleted file mode 100644 index 5e08edb74..000000000 --- a/lbrynet/core/Wallet.py +++ /dev/null @@ -1,1319 +0,0 @@ -# pylint: skip-file -from collections import defaultdict, deque -import datetime -import logging -from decimal import Decimal - -from zope.interface import implements -from twisted.internet import threads, reactor, defer, task -from twisted.python.failure import Failure -from twisted.internet.error import ConnectionAborted - -from lbryschema.uri import parse_lbry_uri -from lbryschema.claim import ClaimDict -from lbryschema.error import DecodeError -from lbryschema.decode import smart_decode - -from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet -from lbrynet.core.utils import DeferredDict -from lbrynet.core.client.ClientRequest import ClientRequest -from lbrynet.core.Error import InsufficientFundsError, UnknownNameError -from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint -from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError - -log = logging.getLogger(__name__) - - -class ReservedPoints: - def __init__(self, identifier, amount): - self.identifier = identifier - self.amount = amount - - -class ClaimOutpoint(dict): - def __init__(self, txid, nout): - if len(txid) != 64: - raise TypeError('{} is not a txid'.format(txid)) - self['txid'] = txid - self['nout'] = nout - - def __repr__(self): - return "{}:{}".format(self['txid'], self['nout']) - - def __eq__(self, compare): - if isinstance(compare, dict): - # TODO: lbryum returns nout's in dicts as "nOut" , need to fix this - if 'nOut' in compare: - return (self['txid'], self['nout']) == (compare['txid'], compare['nOut']) - elif 'nout' in compare: - return (self['txid'], self['nout']) == (compare['txid'], compare['nout']) - elif isinstance(compare, (str, unicode)): - return compare == self.__repr__() - else: - raise TypeError('cannot compare {}'.format(type(compare))) - - def __ne__(self, compare): - return not self.__eq__(compare) - - -class Wallet: - """This class implements the Wallet interface for the LBRYcrd payment system""" - implements(IWallet) - - def __init__(self, storage): - self.storage = storage - self.next_manage_call = None - self.wallet_balance = Decimal(0.0) - self.total_reserved_points = Decimal(0.0) - self.peer_addresses = {} # {Peer: string} - self.queued_payments = defaultdict(Decimal) # {address(string): amount(Decimal)} - self.expected_balances = defaultdict(Decimal) # {address(string): amount(Decimal)} - self.current_address_given_to_peer = {} # {Peer: address(string)} - # (Peer, address(string), amount(Decimal), time(datetime), count(int), - # incremental_amount(float)) - self.expected_balance_at_time = deque() - self.max_expected_payment_time = datetime.timedelta(minutes=3) - self.stopped = True - - self.manage_running = False - self._manage_count = 0 - self._balance_refresh_time = 3 - self._batch_count = 20 - self._pending_claim_checker = task.LoopingCall(self.fetch_and_save_heights_for_pending_claims) - - @defer.inlineCallbacks - def start(self): - log.info("Starting wallet.") - yield self._start() - self.stopped = False - self.manage() - self._pending_claim_checker.start(30) - defer.returnValue(True) - - @staticmethod - def log_stop_error(err): - log.error("An error occurred stopping the wallet: %s", err.getTraceback()) - - def stop(self): - log.info("Stopping wallet.") - self.stopped = True - - if self._pending_claim_checker.running: - self._pending_claim_checker.stop() - # If self.next_manage_call is None, then manage is currently running or else - # start has not been called, so set stopped and do nothing else. - if self.next_manage_call is not None: - self.next_manage_call.cancel() - self.next_manage_call = None - - d = self.manage(do_full=True) - d.addErrback(self.log_stop_error) - d.addCallback(lambda _: self._stop()) - d.addErrback(self.log_stop_error) - return d - - def manage(self, do_full=False): - self.next_manage_call = None - have_set_manage_running = [False] - self._manage_count += 1 - if self._manage_count % self._batch_count == 0: - self._manage_count = 0 - do_full = True - - def check_if_manage_running(): - - d = defer.Deferred() - - def fire_if_not_running(): - if self.manage_running is False: - self.manage_running = True - have_set_manage_running[0] = True - d.callback(True) - elif do_full is False: - d.callback(False) - else: - task.deferLater(reactor, 1, fire_if_not_running) - - fire_if_not_running() - return d - - d = check_if_manage_running() - - def do_manage(): - if do_full: - d = self._check_expected_balances() - d.addCallback(lambda _: self._send_payments()) - else: - d = defer.succeed(True) - - def log_error(err): - if isinstance(err, AttributeError): - log.warning("Failed to get an updated balance") - log.warning("Last balance update: %s", str(self.wallet_balance)) - - d.addCallbacks(lambda _: self.update_balance(), log_error) - return d - - d.addCallback(lambda should_run: do_manage() if should_run else None) - - def set_next_manage_call(): - if not self.stopped: - self.next_manage_call = reactor.callLater(self._balance_refresh_time, self.manage) - - d.addCallback(lambda _: set_next_manage_call()) - - def log_error(err): - log.error("Something went wrong during manage. Error message: %s", - err.getErrorMessage()) - return err - - d.addErrback(log_error) - - def set_manage_not_running(arg): - if have_set_manage_running[0] is True: - self.manage_running = False - return arg - - d.addBoth(set_manage_not_running) - return d - - @defer.inlineCallbacks - def update_balance(self): - """ obtain balance from lbryum wallet and set self.wallet_balance - """ - balance = yield self._update_balance() - if self.wallet_balance != balance: - log.debug("Got a new balance: %s", balance) - self.wallet_balance = balance - - def get_info_exchanger(self): - return LBRYcrdAddressRequester(self) - - def get_wallet_info_query_handler_factory(self): - return LBRYcrdAddressQueryHandlerFactory(self) - - def reserve_points(self, identifier, amount): - """Ensure a certain amount of points are available to be sent as - payment, before the service is rendered - - @param identifier: The peer to which the payment will ultimately be sent - - @param amount: The amount of points to reserve - - @return: A ReservedPoints object which is given to send_points - once the service has been rendered - """ - rounded_amount = Decimal(str(round(amount, 8))) - if rounded_amount < 0: - raise NegativeFundsError(rounded_amount) - if self.get_balance() >= rounded_amount: - self.total_reserved_points += rounded_amount - return ReservedPoints(identifier, rounded_amount) - return None - - def cancel_point_reservation(self, reserved_points): - """ - Return all of the points that were reserved previously for some ReservedPoints object - - @param reserved_points: ReservedPoints previously returned by reserve_points - - @return: None - """ - self.total_reserved_points -= reserved_points.amount - - def send_points(self, reserved_points, amount): - """ - Schedule a payment to be sent to a peer - - @param reserved_points: ReservedPoints object previously returned by reserve_points - - @param amount: amount of points to actually send, must be less than or equal to the - amount reserved in reserved_points - - @return: Deferred which fires when the payment has been scheduled - """ - rounded_amount = Decimal(str(round(amount, 8))) - peer = reserved_points.identifier - assert rounded_amount <= reserved_points.amount - assert peer in self.peer_addresses - self.queued_payments[self.peer_addresses[peer]] += rounded_amount - # make any unused points available - self.total_reserved_points -= (reserved_points.amount - rounded_amount) - log.debug("ordering that %s points be sent to %s", str(rounded_amount), - str(self.peer_addresses[peer])) - peer.update_stats('points_sent', amount) - return defer.succeed(True) - - def send_points_to_address(self, reserved_points, amount): - """ - Schedule a payment to be sent to an address - - @param reserved_points: ReservedPoints object previously returned by reserve_points - - @param amount: amount of points to actually send. must be less than or equal to the - amount reserved in reserved_points - - @return: Deferred which fires when the payment has been scheduled - """ - rounded_amount = Decimal(str(round(amount, 8))) - address = reserved_points.identifier - assert rounded_amount <= reserved_points.amount - self.queued_payments[address] += rounded_amount - self.total_reserved_points -= (reserved_points.amount - rounded_amount) - log.debug("Ordering that %s points be sent to %s", str(rounded_amount), - str(address)) - return defer.succeed(True) - - def add_expected_payment(self, peer, amount): - """Increase the number of points expected to be paid by a peer""" - rounded_amount = Decimal(str(round(amount, 8))) - assert peer in self.current_address_given_to_peer - address = self.current_address_given_to_peer[peer] - log.debug("expecting a payment at address %s in the amount of %s", - str(address), str(rounded_amount)) - self.expected_balances[address] += rounded_amount - expected_balance = self.expected_balances[address] - expected_time = datetime.datetime.now() + self.max_expected_payment_time - self.expected_balance_at_time.append( - (peer, address, expected_balance, expected_time, 0, amount)) - peer.update_stats('expected_points', amount) - - def update_peer_address(self, peer, address): - self.peer_addresses[peer] = address - - def get_unused_address_for_peer(self, peer): - def set_address_for_peer(address): - self.current_address_given_to_peer[peer] = address - return address - - d = self.get_least_used_address() - d.addCallback(set_address_for_peer) - return d - - def _send_payments(self): - payments_to_send = {} - for address, points in self.queued_payments.items(): - if points > 0: - log.debug("Should be sending %s points to %s", str(points), str(address)) - payments_to_send[address] = points - self.total_reserved_points -= points - else: - log.info("Skipping dust") - - del self.queued_payments[address] - - if payments_to_send: - log.debug("Creating a transaction with outputs %s", str(payments_to_send)) - d = self._do_send_many(payments_to_send) - d.addCallback(lambda txid: log.debug("Sent transaction %s", txid)) - return d - - log.debug("There were no payments to send") - return defer.succeed(True) - - ###### - - @defer.inlineCallbacks - def fetch_and_save_heights_for_pending_claims(self): - pending_outpoints = yield self.storage.get_pending_claim_outpoints() - if pending_outpoints: - tx_heights = yield DeferredDict({txid: self.get_height_for_txid(txid) for txid in pending_outpoints}, - consumeErrors=True) - outpoint_heights = {} - for txid, outputs in pending_outpoints.items(): - if txid in tx_heights: - for nout in outputs: - outpoint_heights["%s:%i" % (txid, nout)] = tx_heights[txid] - yield self.storage.save_claim_tx_heights(outpoint_heights) - - @defer.inlineCallbacks - def get_claim_by_claim_id(self, claim_id, check_expire=True): - claim = yield self._get_claim_by_claimid(claim_id) - try: - result = self._handle_claim_result(claim) - except (UnknownNameError, UnknownClaimID, UnknownURI) as err: - result = {'error': err.message} - defer.returnValue(result) - - @defer.inlineCallbacks - def get_my_claim(self, name): - my_claims = yield self.get_name_claims() - my_claim = False - for claim in my_claims: - if claim['name'] == name: - claim['value'] = ClaimDict.load_dict(claim['value']) - my_claim = claim - break - defer.returnValue(my_claim) - - def _decode_claim_result(self, claim): - if 'has_signature' in claim and claim['has_signature']: - if not claim['signature_is_valid']: - log.warning("lbry://%s#%s has an invalid signature", - claim['name'], claim['claim_id']) - try: - decoded = smart_decode(claim['value']) - claim_dict = decoded.claim_dict - claim['value'] = claim_dict - claim['hex'] = decoded.serialized.encode('hex') - except DecodeError: - claim['hex'] = claim['value'] - claim['value'] = None - claim['error'] = "Failed to decode value" - return claim - - def _handle_claim_result(self, results): - if not results: - #TODO: cannot determine what name we searched for here - # we should fix lbryum commands that return None - raise UnknownNameError("") - - if 'error' in results: - if results['error'] in ['name is not claimed', 'claim not found']: - if 'claim_id' in results: - raise UnknownClaimID(results['claim_id']) - elif 'name' in results: - raise UnknownNameError(results['name']) - elif 'uri' in results: - raise UnknownURI(results['uri']) - elif 'outpoint' in results: - raise UnknownOutpoint(results['outpoint']) - raise Exception(results['error']) - - # case where return value is {'certificate':{'txid', 'value',...},...} - if 'certificate' in results: - results['certificate'] = self._decode_claim_result(results['certificate']) - - # case where return value is {'claim':{'txid','value',...},...} - if 'claim' in results: - results['claim'] = self._decode_claim_result(results['claim']) - - # case where return value is {'txid','value',...} - # returned by queries that are not name resolve related - # (getclaimbyoutpoint, getclaimbyid, getclaimsfromtx) - elif 'value' in results: - results = self._decode_claim_result(results) - - # case where there is no 'certificate', 'value', or 'claim' key - elif 'certificate' not in results: - msg = 'result in unexpected format:{}'.format(results) - assert False, msg - - return results - - @defer.inlineCallbacks - def save_claim(self, claim_info): - claims = [] - if 'value' in claim_info: - if claim_info['value']: - claims.append(claim_info) - else: - if 'certificate' in claim_info and claim_info['certificate']['value']: - claims.append(claim_info['certificate']) - if 'claim' in claim_info and claim_info['claim']['value']: - claims.append(claim_info['claim']) - yield self.storage.save_claims(claims) - - @defer.inlineCallbacks - def save_claims(self, claim_infos): - to_save = [] - for info in claim_infos: - if 'value' in info: - if info['value']: - to_save.append(info) - else: - if 'certificate' in info and info['certificate']['value']: - to_save.append(info['certificate']) - if 'claim' in info and info['claim']['value']: - to_save.append(info['claim']) - yield self.storage.save_claims(to_save) - - @defer.inlineCallbacks - def resolve(self, *uris, **kwargs): - page = kwargs.get('page', 0) - page_size = kwargs.get('page_size', 10) - - result = {} - batch_results = yield self._get_values_for_uris(page, page_size, *uris) - to_save = [] - for uri, resolve_results in batch_results.items(): - try: - result[uri] = self._handle_claim_result(resolve_results) - to_save.append(result[uri]) - except (UnknownNameError, UnknownClaimID, UnknownURI) as err: - result[uri] = {'error': err.message} - yield self.save_claims(to_save) - defer.returnValue(result) - - @defer.inlineCallbacks - def get_claims_by_ids(self, *claim_ids): - claims = yield self._get_claims_by_claimids(*claim_ids) - for claim in claims.values(): - yield self.save_claim(claim) - defer.returnValue(claims) - - @defer.inlineCallbacks - def get_claim_by_outpoint(self, txid, nout, check_expire=True): - claim = yield self._get_claim_by_outpoint(txid, nout) - try: - result = self._handle_claim_result(claim) - yield self.save_claim(result) - except UnknownOutpoint as err: - result = {'error': err.message} - defer.returnValue(result) - - @defer.inlineCallbacks - def get_claim_by_name(self, name): - get_name_result = yield self._get_value_for_name(name) - result = self._handle_claim_result(get_name_result) - yield self.save_claim(result) - defer.returnValue(result) - - @defer.inlineCallbacks - def get_claims_for_name(self, name): - result = yield self._get_claims_for_name(name) - claims = result['claims'] - claims_for_return = [] - for claim in claims: - try: - decoded = smart_decode(claim['value']) - claim['value'] = decoded.claim_dict - claim['hex'] = decoded.serialized.encode('hex') - yield self.save_claim(claim) - claims_for_return.append(claim) - except DecodeError: - claim['hex'] = claim['value'] - claim['value'] = None - claim['error'] = "Failed to decode" - log.warning("Failed to decode claim value for lbry://%s#%s", claim['name'], - claim['claim_id']) - claims_for_return.append(claim) - - result['claims'] = claims_for_return - defer.returnValue(result) - - def _process_claim_out(self, claim_out): - claim_out.pop('success') - claim_out['fee'] = float(claim_out['fee']) - return claim_out - - @defer.inlineCallbacks - def claim_new_channel(self, channel_name, amount): - parsed_channel_name = parse_lbry_uri(channel_name) - if not parsed_channel_name.is_channel: - raise Exception("Invalid channel name") - elif (parsed_channel_name.path or parsed_channel_name.claim_id or - parsed_channel_name.bid_position or parsed_channel_name.claim_sequence): - raise Exception("New channel claim should have no fields other than name") - log.info("Preparing to make certificate claim for %s", channel_name) - channel_claim = yield self._claim_certificate(parsed_channel_name.name, amount) - if not channel_claim['success']: - msg = 'Claiming of channel {} failed: {}'.format(channel_name, channel_claim['reason']) - log.error(msg) - raise Exception(msg) - yield self.save_claim(self._get_temp_claim_info(channel_claim, channel_name, amount)) - defer.returnValue(channel_claim) - - @defer.inlineCallbacks - def channel_list(self): - certificates = yield self.get_certificates_for_signing() - results = [] - for claim in certificates: - formatted = self._handle_claim_result(claim) - results.append(formatted) - defer.returnValue(results) - - def _get_temp_claim_info(self, claim_result, name, bid): - # save the claim information with a height and sequence of 0, this will be reset upon next resolve - return { - "claim_id": claim_result['claim_id'], - "name": name, - "amount": bid, - "address": claim_result['claim_address'], - "txid": claim_result['txid'], - "nout": claim_result['nout'], - "value": claim_result['value'], - "height": -1, - "claim_sequence": -1, - } - - @defer.inlineCallbacks - def claim_name(self, name, bid, metadata, certificate_id=None, claim_address=None, - change_address=None): - """ - Claim a name, or update if name already claimed by user - - @param name: str, name to claim - @param bid: float, bid amount - @param metadata: ClaimDict compliant dict - @param certificate_id: str (optional), claim id of channel certificate - @param claim_address: str (optional), address to send claim to - @param change_address: str (optional), address to send change - - @return: Deferred which returns a dict containing below items - txid - txid of the resulting transaction - nout - nout of the resulting claim - fee - transaction fee paid to make claim - claim_id - claim id of the claim - """ - - decoded = ClaimDict.load_dict(metadata) - serialized = decoded.serialized - - if self.get_balance() <= bid: - amt = yield self.get_max_usable_balance_for_claim(name) - if bid > amt: - raise InsufficientFundsError() - - claim = yield self._send_name_claim(name, serialized.encode('hex'), - bid, certificate_id, claim_address, change_address) - - if not claim['success']: - msg = 'Claiming of name {} failed: {}'.format(name, claim['reason']) - log.error(msg) - raise Exception(msg) - claim = self._process_claim_out(claim) - yield self.storage.save_claims([self._get_temp_claim_info(claim, name, bid)]) - defer.returnValue(claim) - - @defer.inlineCallbacks - def abandon_claim(self, claim_id, txid, nout): - claim_out = yield self._abandon_claim(claim_id, txid, nout) - - if not claim_out['success']: - msg = 'Abandon of {}/{}:{} failed: {}'.format( - claim_id, txid, nout, claim_out['reason']) - raise Exception(msg) - - claim_out = self._process_claim_out(claim_out) - defer.returnValue(claim_out) - - def support_claim(self, name, claim_id, amount): - def _parse_support_claim_out(claim_out): - if not claim_out['success']: - msg = 'Support of {}:{} failed: {}'.format(name, claim_id, claim_out['reason']) - raise Exception(msg) - claim_out = self._process_claim_out(claim_out) - return defer.succeed(claim_out) - - if self.get_balance() < amount: - raise InsufficientFundsError() - - d = self._support_claim(name, claim_id, amount) - d.addCallback(lambda claim_out: _parse_support_claim_out(claim_out)) - return d - - @defer.inlineCallbacks - def tip_claim(self, claim_id, amount): - claim_out = yield self._tip_claim(claim_id, amount) - if claim_out: - result = self._process_claim_out(claim_out) - defer.returnValue(result) - else: - raise Exception("failed to send tip of %f to claim id %s" % (amount, claim_id)) - - def get_block_info(self, height): - d = self._get_blockhash(height) - return d - - def get_history(self): - d = self._get_history() - return d - - def address_is_mine(self, address): - d = self._address_is_mine(address) - return d - - def get_transaction(self, txid): - d = self._get_transaction(txid) - return d - - def wait_for_tx_in_wallet(self, txid): - return self._wait_for_tx_in_wallet(txid) - - def get_balance(self): - return self.wallet_balance - self.total_reserved_points - sum(self.queued_payments.values()) - - def _check_expected_balances(self): - now = datetime.datetime.now() - balances_to_check = [] - try: - while self.expected_balance_at_time[0][3] < now: - balances_to_check.append(self.expected_balance_at_time.popleft()) - except IndexError: - pass - ds = [] - for balance_to_check in balances_to_check: - log.debug("Checking balance of address %s", str(balance_to_check[1])) - d = self._get_balance_for_address(balance_to_check[1]) - d.addCallback(lambda bal: bal >= balance_to_check[2]) - ds.append(d) - dl = defer.DeferredList(ds) - - def handle_checks(results): - for balance, (success, result) in zip(balances_to_check, results): - peer = balance[0] - if success is True: - if result is False: - if balance[4] <= 1: # first or second strike, give them another chance - new_expected_balance = ( - balance[0], - balance[1], - balance[2], - datetime.datetime.now() + self.max_expected_payment_time, - balance[4] + 1, - balance[5] - ) - self.expected_balance_at_time.append(new_expected_balance) - peer.update_score(-5.0) - else: - peer.update_score(-50.0) - else: - if balance[4] == 0: - peer.update_score(balance[5]) - peer.update_stats('points_received', balance[5]) - else: - log.warning("Something went wrong checking a balance. Peer: %s, account: %s," - "expected balance: %s, expected time: %s, count: %s, error: %s", - str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]), - str(balance[4]), str(result.getErrorMessage())) - - dl.addCallback(handle_checks) - return dl - - # ======== Must be overridden ======== # - - def _get_blockhash(self, height): - return defer.fail(NotImplementedError()) - - def _get_transaction(self, txid): - return defer.fail(NotImplementedError()) - - def _wait_for_tx_in_wallet(self, txid): - return defer.fail(NotImplementedError()) - - def _update_balance(self): - return defer.fail(NotImplementedError()) - - def get_new_address(self): - return defer.fail(NotImplementedError()) - - def get_address_balance(self, address): - return defer.fail(NotImplementedError()) - - def get_block(self, blockhash): - return defer.fail(NotImplementedError()) - - def get_most_recent_blocktime(self): - return defer.fail(NotImplementedError()) - - def get_best_blockhash(self): - return defer.fail(NotImplementedError()) - - def get_name_claims(self): - return defer.fail(NotImplementedError()) - - def _get_claims_for_name(self, name): - return defer.fail(NotImplementedError()) - - def _claim_certificate(self, name, amount): - return defer.fail(NotImplementedError()) - - def _send_name_claim(self, name, val, amount, certificate_id=None, claim_address=None, - change_address=None): - return defer.fail(NotImplementedError()) - - def _abandon_claim(self, claim_id, txid, nout): - return defer.fail(NotImplementedError()) - - def _support_claim(self, name, claim_id, amount): - return defer.fail(NotImplementedError()) - - def _tip_claim(self, claim_id, amount): - return defer.fail(NotImplementedError()) - - def _do_send_many(self, payments_to_send): - return defer.fail(NotImplementedError()) - - def _get_value_for_name(self, name): - return defer.fail(NotImplementedError()) - - def get_claims_from_tx(self, txid): - return defer.fail(NotImplementedError()) - - def _get_balance_for_address(self, address): - return defer.fail(NotImplementedError()) - - def _get_history(self): - return defer.fail(NotImplementedError()) - - def _address_is_mine(self, address): - return defer.fail(NotImplementedError()) - - def _get_value_for_uri(self, uri): - return defer.fail(NotImplementedError()) - - def _get_certificate_claims(self): - return defer.fail(NotImplementedError()) - - def _get_claim_by_outpoint(self, txid, nout): - return defer.fail(NotImplementedError()) - - def _get_claim_by_claimid(self, claim_id): - return defer.fail(NotImplementedError()) - - def _get_claims_by_claimids(self, *claim_ids): - return defer.fail(NotImplementedError()) - - def _get_values_for_uris(self, page, page_size, *uris): - return defer.fail(NotImplementedError()) - - def claim_renew_all_before_expiration(self, height): - return defer.fail(NotImplementedError()) - - def claim_renew(self, txid, nout): - return defer.fail(NotImplementedError()) - - def send_claim_to_address(self, claim_id, destination, amount): - return defer.fail(NotImplementedError()) - - def import_certificate_info(self, serialized_certificate_info): - return defer.fail(NotImplementedError()) - - def export_certificate_info(self, certificate_claim_id): - return defer.fail(NotImplementedError()) - - def get_certificates_for_signing(self): - return defer.fail(NotImplementedError()) - - def get_unused_address(self): - return defer.fail(NotImplementedError()) - - def get_least_used_address(self, account=None, for_change=False, max_count=100): - return defer.fail(NotImplementedError()) - - def decrypt_wallet(self): - return defer.fail(NotImplementedError()) - - def encrypt_wallet(self, new_password, update_keyring=False): - return defer.fail(NotImplementedError()) - - def get_max_usable_balance_for_claim(self, claim_name): - return defer.fail(NotImplementedError()) - - def get_height_for_txid(self, txid): - return defer.fail(NotImplementedError()) - - def _start(self): - return defer.fail(NotImplementedError()) - - def _stop(self): - pass - - -class LBRYumWallet(Wallet): - def __init__(self, storage, config=None): - super().__init__(storage) - self._config = config - self.config = make_config(self._config) - self.network = None - self.wallet = None - self._cmd_runner = None - self.wallet_unlocked_d = defer.Deferred() - self.is_first_run = False - self.printed_retrieving_headers = False - self._start_check = None - self._catch_up_check = None - self._caught_up_counter = 0 - self._lag_counter = 0 - self.blocks_behind = 0 - self.catchup_progress = 0 - self.is_wallet_unlocked = None - - def _is_first_run(self): - return (not self.printed_retrieving_headers and - self.network.blockchain.retrieving_headers) - - def get_cmd_runner(self): - if self._cmd_runner is None: - self._cmd_runner = Commands(self.config, self.wallet, self.network) - - return self._cmd_runner - - def check_locked(self): - """ - Checks if the wallet is encrypted(locked) or not - - :return: (boolean) indicating whether the wallet is locked or not - """ - if not self._cmd_runner: - raise Exception("Command runner hasn't been initialized yet") - elif self._cmd_runner.locked: - log.info("Waiting for wallet password") - self.wallet_unlocked_d.addCallback(self.unlock) - return self.is_wallet_unlocked - - def unlock(self, password): - if self._cmd_runner and self._cmd_runner.locked: - try: - self._cmd_runner.unlock_wallet(password) - self.is_wallet_unlocked = True - log.info("Unlocked the wallet!") - except InvalidPassword: - log.warning("Incorrect password, try again") - self.wallet_unlocked_d = defer.Deferred() - self.wallet_unlocked_d.addCallback(self.unlock) - return defer.succeed(False) - return defer.succeed(True) - - def _start(self): - network_start_d = defer.Deferred() - - def setup_network(): - self.network = Network(self.config) - log.info("Loading the wallet") - return defer.succeed(self.network.start()) - - def check_started(): - if self.network.is_connecting(): - if self._is_first_run(): - log.info("Running the wallet for the first time. This may take a moment.") - self.printed_retrieving_headers = True - return False - self._start_check.stop() - self._start_check = None - if self.network.is_connected(): - network_start_d.callback(True) - else: - network_start_d.errback(ValueError("Failed to connect to network.")) - - self._start_check = task.LoopingCall(check_started) - - d = setup_network() - d.addCallback(lambda _: self._load_wallet()) - d.addCallback(lambda _: self._start_check.start(.1)) - d.addCallback(lambda _: network_start_d) - d.addCallback(lambda _: self._load_blockchain()) - d.addCallback(lambda _: log.info("Subscribing to addresses")) - d.addCallback(lambda _: self.wallet.wait_until_synchronized(lambda _: None)) - d.addCallback(lambda _: log.info("Synchronized wallet")) - d.addCallback(lambda _: self.get_cmd_runner()) - d.addCallbacks(lambda _: log.info("Set up lbryum command runner")) - return d - - def _stop(self): - if self._start_check is not None: - self._start_check.stop() - self._start_check = None - - if self._catch_up_check is not None: - if self._catch_up_check.running: - self._catch_up_check.stop() - self._catch_up_check = None - - d = defer.Deferred() - - def check_stopped(): - if self.network: - if self.network.is_connected(): - return False - stop_check.stop() - self.network = None - d.callback(True) - - if self.wallet: - self.wallet.stop_threads() - log.info("Stopped wallet") - if self.network: - self.network.stop() - log.info("Stopped connection to lbryum server") - - stop_check = task.LoopingCall(check_stopped) - stop_check.start(.1) - return d - - def _load_wallet(self): - path = self.config.get_wallet_path() - storage = lbryum_wallet.WalletStorage(path) - wallet = lbryum_wallet.Wallet(storage) - if not storage.file_exists: - self.is_first_run = True - seed = wallet.make_seed() - wallet.add_seed(seed, None) - wallet.create_master_keys(None) - wallet.create_main_account() - wallet.synchronize() - self.wallet = wallet - self.is_wallet_unlocked = not self.wallet.use_encryption - self._check_large_wallet() - return defer.succeed(True) - - def _check_large_wallet(self): - addr_count = len(self.wallet.addresses(include_change=False)) - if addr_count > 1000: - log.warning("Your wallet is excessively large (%i addresses), " - "please follow instructions here: " - "https://github.com/lbryio/lbry/issues/437 to reduce your wallet size", - addr_count) - else: - log.info("Wallet has %i addresses", addr_count) - - def _load_blockchain(self): - blockchain_caught_d = defer.Deferred() - - def on_update_callback(event, *args): - # This callback is called by lbryum when something chain - # related has happened - local_height = self.network.get_local_height() - remote_height = self.network.get_server_height() - updated_blocks_behind = self.network.get_blocks_behind() - log.info( - 'Local Height: %s, remote height: %s, behind: %s', - local_height, remote_height, updated_blocks_behind) - - self.blocks_behind = updated_blocks_behind - if local_height != remote_height: - return - - assert self.blocks_behind == 0 - self.network.unregister_callback(on_update_callback) - log.info("Wallet Loaded") - reactor.callFromThread(blockchain_caught_d.callback, True) - - self.network.register_callback(on_update_callback, ['updated']) - - d = defer.succeed(self.wallet.start_threads(self.network)) - d.addCallback(lambda _: blockchain_caught_d) - return d - - # run commands as a defer.succeed, - # lbryum commands should be run this way , unless if the command - # only makes a lbrum server query, use _run_cmd_as_defer_to_thread() - def _run_cmd_as_defer_succeed(self, command_name, *args, **kwargs): - cmd_runner = self.get_cmd_runner() - cmd = Commands.known_commands[command_name] - func = getattr(cmd_runner, cmd.name) - return defer.succeed(func(*args, **kwargs)) - - # run commands as a deferToThread, lbryum commands that only make - # queries to lbryum server should be run this way - # TODO: keep track of running threads and cancel them on `stop` - # otherwise the application will hang, waiting for threads to complete - def _run_cmd_as_defer_to_thread(self, command_name, *args, **kwargs): - cmd_runner = self.get_cmd_runner() - cmd = Commands.known_commands[command_name] - func = getattr(cmd_runner, cmd.name) - return threads.deferToThread(func, *args, **kwargs) - - def _update_balance(self): - accounts = None - exclude_claimtrietx = True - d = self._run_cmd_as_defer_succeed('getbalance', accounts, exclude_claimtrietx) - d.addCallback( - lambda result: Decimal(result['confirmed']) + Decimal(result.get('unconfirmed', 0.0))) - return d - - def get_max_usable_balance_for_claim(self, claim_name): - return self._run_cmd_as_defer_to_thread('get_max_spendable_amount_for_claim', claim_name) - - # Always create and return a brand new address - def get_new_address(self, for_change=False, account=None): - return defer.succeed(self.wallet.create_new_address(account=account, - for_change=for_change)) - - # Get the balance of a given address. - def get_address_balance(self, address, include_balance=False): - c, u, x = self.wallet.get_addr_balance(address) - if include_balance is False: - return Decimal(float(c) / COIN) - else: - return Decimal((float(c) + float(u) + float(x)) / COIN) - - @defer.inlineCallbacks - def create_addresses_with_balance(self, num_addresses, amount, broadcast=True): - addresses = self.wallet.get_unused_addresses(account=None) - if len(addresses) > num_addresses: - addresses = addresses[:num_addresses] - elif len(addresses) < num_addresses: - for i in range(len(addresses), num_addresses): - address = self.wallet.create_new_address(account=None) - addresses.append(address) - - outputs = [[address, amount] for address in addresses] - tx = yield self._run_cmd_as_defer_succeed('payto', outputs, broadcast=broadcast) - defer.returnValue(tx) - - # Return an address with no balance in it, if - # there is none, create a brand new address - @defer.inlineCallbacks - def get_unused_address(self): - addr = self.wallet.get_unused_address(account=None) - if addr is None: - addr = yield self.get_new_address() - defer.returnValue(addr) - - def get_least_used_address(self, account=None, for_change=False, max_count=100): - return defer.succeed(self.wallet.get_least_used_address(account, for_change, max_count)) - - def get_block(self, blockhash): - return self._run_cmd_as_defer_to_thread('getblock', blockhash) - - def get_most_recent_blocktime(self): - height = self.network.get_local_height() - if height < 0: - return defer.succeed(None) - header = self.network.get_header(self.network.get_local_height()) - return defer.succeed(header['timestamp']) - - def get_best_blockhash(self): - height = self.network.get_local_height() - if height < 0: - return defer.succeed(None) - header = self.network.blockchain.read_header(height) - return defer.succeed(self.network.blockchain.hash_header(header)) - - def _get_blockhash(self, height): - header = self.network.blockchain.read_header(height) - return defer.succeed(self.network.blockchain.hash_header(header)) - - def _get_transaction(self, txid): - return self._run_cmd_as_defer_to_thread("gettransaction", txid) - - def _wait_for_tx_in_wallet(self, txid): - return self._run_cmd_as_defer_to_thread("waitfortxinwallet", txid) - - def get_name_claims(self): - return self._run_cmd_as_defer_succeed('getnameclaims') - - def _get_claims_for_name(self, name): - return self._run_cmd_as_defer_to_thread('getclaimsforname', name) - - @defer.inlineCallbacks - def _send_name_claim(self, name, value, amount, - certificate_id=None, claim_address=None, change_address=None): - log.info("Send claim: %s for %s: %s ", name, amount, value) - claim_out = yield self._run_cmd_as_defer_succeed('claim', name, value, amount, - certificate_id=certificate_id, - claim_addr=claim_address, - change_addr=change_address) - defer.returnValue(claim_out) - - @defer.inlineCallbacks - def _abandon_claim(self, claim_id, txid, nout): - log.debug("Abandon %s" % claim_id) - tx_out = yield self._run_cmd_as_defer_succeed('abandon', claim_id, txid, nout) - defer.returnValue(tx_out) - - @defer.inlineCallbacks - def _support_claim(self, name, claim_id, amount): - log.debug("Support %s %s %f" % (name, claim_id, amount)) - claim_out = yield self._run_cmd_as_defer_succeed('support', name, claim_id, amount) - defer.returnValue(claim_out) - - @defer.inlineCallbacks - def _tip_claim(self, claim_id, amount): - log.debug("Tip %s %f", claim_id, amount) - claim_out = yield self._run_cmd_as_defer_succeed('sendwithsupport', claim_id, amount) - defer.returnValue(claim_out) - - def _do_send_many(self, payments_to_send): - def handle_payto_out(payto_out): - if not payto_out['success']: - raise Exception("Failed payto, reason:{}".format(payto_out['reason'])) - return payto_out['txid'] - - log.debug("Doing send many. payments to send: %s", str(payments_to_send)) - d = self._run_cmd_as_defer_succeed('payto', payments_to_send.items()) - d.addCallback(lambda out: handle_payto_out(out)) - return d - - def _get_value_for_name(self, name): - if not name: - raise Exception("No name given") - return self._run_cmd_as_defer_to_thread('getvalueforname', name) - - def _get_value_for_uri(self, uri): - if not uri: - raise Exception("No uri given") - return self._run_cmd_as_defer_to_thread('getvalueforuri', uri) - - def _get_values_for_uris(self, page, page_size, *uris): - return self._run_cmd_as_defer_to_thread('getvaluesforuris', False, page, page_size, - *uris) - - def _claim_certificate(self, name, amount): - return self._run_cmd_as_defer_succeed('claimcertificate', name, amount) - - def _get_certificate_claims(self): - return self._run_cmd_as_defer_succeed('getcertificateclaims') - - def get_claims_from_tx(self, txid): - return self._run_cmd_as_defer_to_thread('getclaimsfromtx', txid) - - def _get_claim_by_outpoint(self, txid, nout): - return self._run_cmd_as_defer_to_thread('getclaimbyoutpoint', txid, nout) - - def _get_claim_by_claimid(self, claim_id): - return self._run_cmd_as_defer_to_thread('getclaimbyid', claim_id) - - def _get_claims_by_claimids(self, *claim_ids): - return self._run_cmd_as_defer_to_thread('getclaimsbyids', claim_ids) - - def _get_balance_for_address(self, address): - return defer.succeed(Decimal(self.wallet.get_addr_received(address)) / COIN) - - def get_nametrie(self): - return self._run_cmd_as_defer_to_thread('getclaimtrie') - - def _get_history(self): - return self._run_cmd_as_defer_succeed('claimhistory') - - def _address_is_mine(self, address): - return self._run_cmd_as_defer_succeed('ismine', address) - - # returns a list of public keys associated with address - # (could be multiple public keys if a multisig address) - def get_pub_keys(self, address): - return self._run_cmd_as_defer_succeed('getpubkeys', address) - - def list_addresses(self): - return self._run_cmd_as_defer_succeed('listaddresses') - - def list_unspent(self): - return self._run_cmd_as_defer_succeed('listunspent') - - def send_claim_to_address(self, claim_id, destination, amount): - return self._run_cmd_as_defer_succeed('sendclaimtoaddress', claim_id, destination, amount) - - def import_certificate_info(self, serialized_certificate_info): - return self._run_cmd_as_defer_succeed('importcertificateinfo', serialized_certificate_info) - - def export_certificate_info(self, certificate_claim_id): - return self._run_cmd_as_defer_succeed('exportcertificateinfo', certificate_claim_id) - - def get_certificates_for_signing(self): - return self._run_cmd_as_defer_succeed('getcertificatesforsigning') - - def claim_renew_all_before_expiration(self, height): - return self._run_cmd_as_defer_succeed('renewclaimsbeforeexpiration', height) - - def claim_renew(self, txid, nout): - return self._run_cmd_as_defer_succeed('renewclaim', txid, nout) - - def get_height_for_txid(self, txid): - return self._run_cmd_as_defer_to_thread('gettransactionheight', txid) - - def decrypt_wallet(self): - if not self.wallet.use_encryption: - return False - if not self._cmd_runner: - return False - if self._cmd_runner.locked: - return False - self._cmd_runner.decrypt_wallet() - return not self.wallet.use_encryption - - def encrypt_wallet(self, new_password, update_keyring=False): - if not self._cmd_runner: - return False - if self._cmd_runner.locked: - return False - self._cmd_runner.update_password(new_password, update_keyring) - return not self.wallet.use_encryption - - -class LBRYcrdAddressRequester: - implements([IRequestCreator]) - - def __init__(self, wallet): - self.wallet = wallet - self._protocols = [] - - # ======== IRequestCreator ======== # - - def send_next_request(self, peer, protocol): - - if not protocol in self._protocols: - r = ClientRequest({'lbrycrd_address': True}, 'lbrycrd_address') - d = protocol.add_request(r) - d.addCallback(self._handle_address_response, peer, r, protocol) - d.addErrback(self._request_failed, peer) - self._protocols.append(protocol) - return defer.succeed(True) - else: - return defer.succeed(False) - - # ======== internal calls ======== # - - def _handle_address_response(self, response_dict, peer, request, protocol): - if request.response_identifier not in response_dict: - raise ValueError( - "Expected {} in response but did not get it".format(request.response_identifier)) - assert protocol in self._protocols, "Responding protocol is not in our list of protocols" - address = response_dict[request.response_identifier] - self.wallet.update_peer_address(peer, address) - - def _request_failed(self, err, peer): - if not err.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted): - log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s", - err.getErrorMessage(), str(peer)) - return err - - -class LBRYcrdAddressQueryHandlerFactory: - implements(IQueryHandlerFactory) - - def __init__(self, wallet): - self.wallet = wallet - - # ======== IQueryHandlerFactory ======== # - - def build_query_handler(self): - q_h = LBRYcrdAddressQueryHandler(self.wallet) - return q_h - - def get_primary_query_identifier(self): - return 'lbrycrd_address' - - def get_description(self): - return "LBRYcrd Address - an address for receiving payments via LBRYcrd" - - -class LBRYcrdAddressQueryHandler: - implements(IQueryHandler) - - def __init__(self, wallet): - self.wallet = wallet - self.query_identifiers = ['lbrycrd_address'] - self.address = None - self.peer = None - - # ======== IQueryHandler ======== # - - def register_with_request_handler(self, request_handler, peer): - self.peer = peer - request_handler.register_query_handler(self, self.query_identifiers) - - def handle_queries(self, queries): - - def create_response(address): - self.address = address - fields = {'lbrycrd_address': address} - return fields - - if self.query_identifiers[0] in queries: - d = self.wallet.get_unused_address_for_peer(self.peer) - d.addCallback(create_response) - return d - if self.address is None: - log.warning("Expected a request for an address, but did not receive one") - return defer.fail( - Failure(ValueError("Expected but did not receive an address request"))) - else: - return defer.succeed({}) - - -def make_config(config=None): - if config is None: - config = {} - return SimpleConfig(config) if isinstance(config, dict) else config diff --git a/lbrynet/core/client/ClientProtocol.py b/lbrynet/core/client/ClientProtocol.py index fac2eaa90..dc47a881d 100644 --- a/lbrynet/core/client/ClientProtocol.py +++ b/lbrynet/core/client/ClientProtocol.py @@ -73,7 +73,7 @@ class ClientProtocol(Protocol, TimeoutMixin): log.debug("Connection lost to %s: %s", self.peer, reason) self.setTimeout(None) self.connection_closed = True - if reason.check(error.ConnectionDone) or reason is None: + if reason is None or reason.check(error.ConnectionDone): err = failure.Failure(ConnectionClosedBeforeResponseError()) else: err = reason diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 9169b29f3..50867ebce 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -6,19 +6,7 @@ import urllib import json import textwrap -#import sys -#from twisted.internet import asyncioreactor -#if 'twisted.internet.reactor' not in sys.modules: -# asyncioreactor.install() -#else: -# from twisted.internet import reactor -# if not isinstance(reactor, asyncioreactor.AsyncioSelectorReactor): -# # pyinstaller hooks install the default reactor before -# # any of our code runs, see kivy for similar problem: -# # https://github.com/kivy/kivy/issues/4182 -# del sys.modules['twisted.internet.reactor'] -# asyncioreactor.install() - +from operator import itemgetter from binascii import hexlify, unhexlify from copy import deepcopy from decimal import Decimal, InvalidOperation @@ -92,6 +80,7 @@ DIRECTION_ASCENDING = 'asc' DIRECTION_DESCENDING = 'desc' DIRECTIONS = DIRECTION_ASCENDING, DIRECTION_DESCENDING + class IterableContainer: def __iter__(self): for attr in dir(self): @@ -251,7 +240,10 @@ class Daemon(AuthJSONRPCServer): @property def ledger(self): - return self.wallet.default_account.ledger + try: + return self.wallet.default_account.ledger + except AttributeError: + return None @defer.inlineCallbacks def setup(self): @@ -395,51 +387,15 @@ class Daemon(AuthJSONRPCServer): log.exception) self.analytics_manager.send_claim_action('publish') nout = 0 - log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout) - defer.returnValue(self._txo_to_response(tx, nout)) - - def _txo_to_response(self, tx, nout): txo = tx.outputs[nout] - return { + log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout) + defer.returnValue({ "success": True, - "txid": tx.id, - "nout": nout, - "tx": hexlify(tx.raw), - "fee": str(Decimal(tx.fee) / COIN), + "tx": tx, "claim_id": txo.claim_id, - "value": hexlify(txo.claim).decode(), - "claim_address": self.ledger.hash160_to_address(txo.script.values['pubkey_hash']) - } - - @defer.inlineCallbacks - def _resolve(self, *uris, **kwargs): - """Resolves a URI. Can check the cache first before going out to the blockchain and stores the result. - - Args: - name: the lbry:// to resolve - force_refresh: if True, always go out to the blockchain to resolve. - """ - - page = kwargs.get('page', 0) - page_size = kwargs.get('page_size', 10) - check_cache = kwargs.get('check_cache', False) # TODO: put caching back (was force_refresh parameter) - results = yield self.wallet.resolve(*uris, page=page, page_size=page_size) - self.save_claims((value for value in results.values() if 'error' not in value)) - yield defer.returnValue(results) - - @defer.inlineCallbacks - def save_claims(self, claim_infos): - to_save = [] - for info in claim_infos: - if 'value' in info: - if info['value']: - to_save.append(info) - else: - if 'certificate' in info and info['certificate']['value']: - to_save.append(info['certificate']) - if 'claim' in info and info['claim']['value']: - to_save.append(info['claim']) - yield self.storage.save_claims(to_save) + "claim_address": self.ledger.hash160_to_address(txo.script.values['pubkey_hash']), + "output": tx.outputs[nout] + }) def _get_or_download_sd_blob(self, blob, sd_hash): if blob: @@ -484,7 +440,7 @@ class Daemon(AuthJSONRPCServer): cost = self._get_est_cost_from_stream_size(size) - resolved = yield self._resolve(uri) + resolved = yield self.wallet.resolve(uri) if uri in resolved and 'claim' in resolved[uri]: claim = ClaimDict.load_dict(resolved[uri]['claim']['value']) @@ -531,7 +487,7 @@ class Daemon(AuthJSONRPCServer): Resolve a name and return the estimated stream cost """ - resolved = (yield self._resolve(uri))[uri] + resolved = (yield self.wallet.resolve(uri))[uri] if resolved: claim_response = resolved[uri] else: @@ -1012,7 +968,8 @@ class Daemon(AuthJSONRPCServer): Returns: (float) amount of lbry credits in wallet """ - assert address is None, "Limiting by address needs to be re-implemented in new wallet." + if address is not None: + raise NotImplementedError("Limiting by address needs to be re-implemented in new wallet.") dewies = yield self.wallet.default_account.get_balance( 0 if include_unconfirmed else 6 ) @@ -1046,7 +1003,6 @@ class Daemon(AuthJSONRPCServer): defer.returnValue(response) @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - @defer.inlineCallbacks def jsonrpc_wallet_decrypt(self): """ Decrypt an encrypted wallet, this will remove the wallet password @@ -1060,13 +1016,9 @@ class Daemon(AuthJSONRPCServer): Returns: (bool) true if wallet is decrypted, otherwise false """ - - result = self.wallet.decrypt_wallet() - response = yield self._render_response(result) - defer.returnValue(response) + return defer.succeed(self.wallet.decrypt_wallet()) @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - @defer.inlineCallbacks def jsonrpc_wallet_encrypt(self, new_password): """ Encrypt a wallet with a password, if the wallet is already encrypted this will update @@ -1081,13 +1033,11 @@ class Daemon(AuthJSONRPCServer): Returns: (bool) true if wallet is decrypted, otherwise false """ - - self.wallet.encrypt_wallet(new_password) - response = yield self._render_response(self.wallet.wallet.use_encryption) - defer.returnValue(response) + return defer.succeed(self.wallet.encrypt_wallet(new_password)) @defer.inlineCallbacks - def jsonrpc_stop(self): + @AuthJSONRPCServer.deprecated("stop") + def jsonrpc_daemon_stop(self): """ Stop lbrynet-daemon @@ -1100,11 +1050,24 @@ class Daemon(AuthJSONRPCServer): Returns: (string) Shutdown message """ + return self.jsonrpc_stop() + def jsonrpc_stop(self): + """ + Stop lbrynet + + Usage: + stop + + Options: + None + + Returns: + (string) Shutdown message + """ log.info("Shutting down lbrynet daemon") - response = yield self._render_response("Shutting down") reactor.callLater(0.1, reactor.fireSystemEvent, "shutdown") - defer.returnValue(response) + defer.returnValue("Shutting down") @requires(FILE_MANAGER_COMPONENT) @defer.inlineCallbacks @@ -1198,7 +1161,7 @@ class Daemon(AuthJSONRPCServer): try: name = parse_lbry_uri(name).name - metadata = yield self._resolve(name, check_cache=not force) + metadata = yield self.wallet.resolve(name, check_cache=not force) if name in metadata: metadata = metadata[name] except UnknownNameError: @@ -1337,7 +1300,7 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[u] = {"error": "%s is not a valid uri" % u} - resolved = yield self._resolve(*valid_uris, check_cache=not force) + resolved = yield self.wallet.resolve(*valid_uris, check_cache=not force) for resolved_uri in resolved: results[resolved_uri] = resolved[resolved_uri] @@ -1398,7 +1361,7 @@ class Daemon(AuthJSONRPCServer): if parsed_uri.is_channel and not parsed_uri.path: raise Exception("cannot download a channel claim, specify a /path") - resolved_result = yield self._resolve(uri) + resolved_result = yield self.wallet.resolve(uri) if resolved_result and uri in resolved_result: resolved = resolved_result[uri] else: @@ -1584,13 +1547,30 @@ class Daemon(AuthJSONRPCServer): 'claim_id' : (str) claim ID of the resulting claim } """ + try: + parsed = parse_lbry_uri(channel_name) + if not parsed.is_channel: + raise Exception("Cannot make a new channel for a non channel name") + if parsed.path: + raise Exception("Invalid channel uri") + except (TypeError, URIParseError): + raise Exception("Invalid channel name") + if amount <= 0: + raise Exception("Invalid amount") amount = int(amount * COIN) tx = yield self.wallet.claim_new_channel(channel_name, amount) self.wallet.save() - result = self._txo_to_response(tx, 0) self.analytics_manager.send_new_channel() - log.info("Claimed a new channel! Result: %s", result) - defer.returnValue(result) + nout = 0 + txo = tx.outputs[nout] + log.info("Claimed a new channel! lbry://%s txid: %s nout: %d", channel_name, tx.id, nout) + defer.returnValue({ + "success": True, + "tx": tx, + "claim_id": txo.claim_id, + "claim_address": self.ledger.hash160_to_address(txo.script.values['pubkey_hash']), + "output": txo + }) @requires(WALLET_COMPONENT) @defer.inlineCallbacks @@ -1764,6 +1744,11 @@ class Daemon(AuthJSONRPCServer): bid = int(bid * COIN) + for address in [claim_address, change_address]: + if address is not None: + # raises an error if the address is invalid + decode_address(address) + available = yield self.wallet.default_account.get_balance() if bid >= available: # TODO: add check for existing claim balance @@ -1880,8 +1865,7 @@ class Daemon(AuthJSONRPCServer): result = yield self._publish_stream(name, bid, claim_dict, file_path, certificate, claim_address, change_address) - response = yield self._render_response(result) - defer.returnValue(response) + defer.returnValue(result) @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks @@ -2179,7 +2163,7 @@ class Daemon(AuthJSONRPCServer): except URIParseError: results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri} - resolved = yield self._resolve(*valid_uris, page=page, page_size=page_size) + resolved = yield self.wallet.resolve(*valid_uris, page=page, page_size=page_size) for u in resolved: if 'error' in resolved[u]: results[u] = resolved[u] @@ -2384,6 +2368,32 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda address: self._render_response(address)) return d + @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @AuthJSONRPCServer.deprecated("wallet_send") + @defer.inlineCallbacks + def jsonrpc_send_amount_to_address(self, amount, address): + """ + Queue a payment of credits to an address + Usage: + send_amount_to_address ( | --amount=) (
| --address=
) + Options: + --amount= : (float) amount to send + --address=
: (str) address to send credits to + Returns: + (bool) true if payment successfully scheduled + """ + if amount < 0: + raise NegativeFundsError() + elif not amount: + raise NullFundsError() + + reserved_points = self.wallet.reserve_points(address, amount) + if reserved_points is None: + raise InsufficientFundsError() + yield self.wallet.send_points_to_address(reserved_points, amount) + self.analytics_manager.send_credits_sent() + defer.returnValue(True) + @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_wallet_send(self, amount, address=None, claim_id=None): @@ -2749,7 +2759,7 @@ class Daemon(AuthJSONRPCServer): """ if uri or stream_hash or sd_hash: if uri: - metadata = (yield self._resolve(uri))[uri] + metadata = (yield self.wallet.resolve(uri))[uri] sd_hash = utils.get_sd_hash(metadata) stream_hash = yield self.storage.get_stream_hash_for_sd_hash(sd_hash) elif stream_hash: @@ -2898,7 +2908,7 @@ class Daemon(AuthJSONRPCServer): hosts = {} for k, v in data_store.items(): - for contact, _ in v: + for contact in map(itemgetter(0), v): hosts.setdefault(contact, []).append(hexlify(k).decode()) contact_set = set() @@ -2907,7 +2917,7 @@ class Daemon(AuthJSONRPCServer): for i in range(len(self.dht_node._routingTable._buckets)): for contact in self.dht_node._routingTable._buckets[i]._contacts: - blobs = [hexlify(raw_hash).decode() for raw_hash in hosts.pop(contact)] if contact in hosts else [] + blobs = list(hosts.pop(contact)) if contact in hosts else [] blob_hashes.update(blobs) host = { "address": contact.address, @@ -2949,6 +2959,24 @@ class Daemon(AuthJSONRPCServer): return self._blob_availability(blob_hash, search_timeout, blob_timeout) + @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) + @AuthJSONRPCServer.deprecated("stream_availability") + def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): + """ + Get stream availability for lbry uri + Usage: + get_availability ( | --uri=) [ | --sd_timeout=] + [ | --peer_timeout=] + Options: + --uri= : (str) check availability for this uri + --sd_timeout= : (int) sd blob download timeout + --peer_timeout= : (int) how long to look for peers + Returns: + (float) Peers per blob / total blobs + """ + + return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout) + @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) @defer.inlineCallbacks def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): @@ -3002,7 +3030,7 @@ class Daemon(AuthJSONRPCServer): } try: - resolved_result = (yield self._resolve(uri))[uri] + resolved_result = (yield self.wallet.resolve(uri))[uri] response['did_resolve'] = True except UnknownNameError: response['error'] = "Failed to resolve name" @@ -3229,10 +3257,9 @@ class Daemon(AuthJSONRPCServer): everything=False, outputs=1, broadcast=False): """ Transfer some amount (or --everything) to an account from another - account (can be the same account). Decimal amounts are interpreted - as LBC and non-decimal amounts are interpreted as dewies. You can - also spread the transfer across a number of --outputs (cannot be - used together with --everything). + account (can be the same account). Amounts are interpreted as LBC. + You can also spread the transfer across a number of --outputs (cannot + be used together with --everything). Usage: fund ( | --to_account=) @@ -3244,7 +3271,7 @@ class Daemon(AuthJSONRPCServer): Options: --to_account= : (str) send to this account --from_account= : (str) spend from this account - --amount= : (str) the amount to transfer (lbc or dewies) + --amount= : (str) the amount to transfer lbc --everything : (bool) transfer everything (excluding claims), default: false. --outputs= : (int) split payment across many outputs, default: 1. --broadcast : (bool) actually broadcast the transaction, default: false. @@ -3263,25 +3290,7 @@ class Daemon(AuthJSONRPCServer): return from_account.fund( to_account=to_account, amount=amount, everything=everything, outputs=outputs, broadcast=broadcast - ).addCallback(lambda tx: self.tx_to_json(tx, from_account.ledger)) - - @staticmethod - def tx_to_json(tx, ledger): - return { - 'txid': tx.id, - 'inputs': [ - {'amount': txi.amount, 'address': txi.txo_ref.txo.get_address(ledger)} - for txi in tx.inputs - ], - 'outputs': [ - {'amount': txo.amount, 'address': txo.get_address(ledger)} - for txo in tx.outputs - ], - 'total_input': tx.input_sum, - 'total_output': tx.input_sum, - 'total_fee': tx.fee, - 'xhex': hexlify(tx.raw).decode(), - } + ) def get_account_or_error(self, argument: str, account_name: str, lbc_only=False): for account in self.wallet.default_wallet.accounts: @@ -3306,9 +3315,9 @@ class Daemon(AuthJSONRPCServer): if '.' in amount: return int(Decimal(amount) * COIN) elif amount.isdigit(): - return int(amount) - elif isinstance(amount, int): - return amount + amount = int(amount) + if isinstance(amount, int): + return amount * COIN raise ValueError("Invalid value for '{}' argument: {}".format(argument, amount)) diff --git a/lbrynet/daemon/DaemonCLI.py b/lbrynet/daemon/DaemonCLI.py deleted file mode 100644 index 74dfcf0ad..000000000 --- a/lbrynet/daemon/DaemonCLI.py +++ /dev/null @@ -1,225 +0,0 @@ -# pylint: skip-file -import json -import os -import sys -import colorama -from docopt import docopt -from collections import OrderedDict -from lbrynet import conf -from lbrynet.core import utils -from lbrynet.daemon.auth.client import JSONRPCException, LBRYAPIClient, AuthAPIClient -from lbrynet.daemon.Daemon import Daemon -from lbrynet.core.system_info import get_platform -from jsonrpc.common import RPCError -from requests.exceptions import ConnectionError -from urllib2 import URLError, HTTPError -from httplib import UNAUTHORIZED - - -def remove_brackets(key): - if key.startswith("<") and key.endswith(">"): - return str(key[1:-1]) - return key - - -def set_kwargs(parsed_args): - kwargs = OrderedDict() - for key, arg in parsed_args.iteritems(): - if arg is None: - continue - elif key.startswith("--") and remove_brackets(key[2:]) not in kwargs: - k = remove_brackets(key[2:]) - elif remove_brackets(key) not in kwargs: - k = remove_brackets(key) - kwargs[k] = guess_type(arg, k) - return kwargs - - -def main(): - argv = sys.argv[1:] - - # check if a config file has been specified. If so, shift - # all the arguments so that the parsing can continue without - # noticing - if len(argv) and argv[0] == "--conf": - if len(argv) < 2: - print_error("No config file specified for --conf option") - print_help() - return - - conf.conf_file = argv[1] - argv = argv[2:] - - if len(argv): - method, args = argv[0], argv[1:] - else: - print_help() - return - - if method in ['help', '--help', '-h']: - if len(args) == 1: - print_help_for_command(args[0]) - else: - print_help() - return - - elif method in ['version', '--version']: - print(utils.json_dumps_pretty(get_platform(get_ip=False))) - return - - if method not in Daemon.callable_methods: - if method not in Daemon.deprecated_methods: - print_error("\"%s\" is not a valid command." % method) - return - new_method = Daemon.deprecated_methods[method].new_command - print_error("\"%s\" is deprecated, using \"%s\"." % (method, new_method)) - method = new_method - - fn = Daemon.callable_methods[method] - - parsed = docopt(fn.__doc__, args) - kwargs = set_kwargs(parsed) - colorama.init() - conf.initialize_settings() - - try: - api = LBRYAPIClient.get_client() - api.status() - except (URLError, ConnectionError) as err: - if isinstance(err, HTTPError) and err.code == UNAUTHORIZED: - api = AuthAPIClient.config() - # this can happen if the daemon is using auth with the --http-auth flag - # when the config setting is to not use it - try: - api.status() - except: - print_error("Daemon requires authentication, but none was provided.", - suggest_help=False) - return 1 - else: - print_error("Could not connect to daemon. Are you sure it's running?", - suggest_help=False) - return 1 - - # TODO: check if port is bound. Error if its not - - try: - result = api.call(method, kwargs) - if isinstance(result, basestring): - # printing the undumped string is prettier - print(result) - else: - print(utils.json_dumps_pretty(result)) - except (RPCError, KeyError, JSONRPCException, HTTPError) as err: - if isinstance(err, HTTPError): - error_body = err.read() - try: - error_data = json.loads(error_body) - except ValueError: - print( - "There was an error, and the response was not valid JSON.\n" + - "Raw JSONRPC response:\n" + error_body - ) - return 1 - - print_error(error_data['error']['message'] + "\n", suggest_help=False) - - if 'data' in error_data['error'] and 'traceback' in error_data['error']['data']: - print("Here's the traceback for the error you encountered:") - print("\n".join(error_data['error']['data']['traceback'])) - - print_help_for_command(method) - elif isinstance(err, RPCError): - print_error(err.msg, suggest_help=False) - # print_help_for_command(method) - else: - print_error("Something went wrong\n", suggest_help=False) - print(str(err)) - - return 1 - - -def guess_type(x, key=None): - if not isinstance(x, (unicode, str)): - return x - if key in ('uri', 'channel_name', 'name', 'file_name', 'download_directory'): - return x - if x in ('true', 'True', 'TRUE'): - return True - if x in ('false', 'False', 'FALSE'): - return False - if '.' in x: - try: - return float(x) - except ValueError: - # not a float - pass - try: - return int(x) - except ValueError: - return x - - -def print_help_suggestion(): - print("See `{} help` for more information.".format(os.path.basename(sys.argv[0]))) - - -def print_error(message, suggest_help=True): - error_style = colorama.Style.BRIGHT + colorama.Fore.RED - print(error_style + "ERROR: " + message + colorama.Style.RESET_ALL) - if suggest_help: - print_help_suggestion() - - -def print_help(): - print("\n".join([ - "NAME", - " lbrynet-cli - LBRY command line client.", - "", - "USAGE", - " lbrynet-cli [--conf ] []", - "", - "EXAMPLES", - " lbrynet-cli commands # list available commands", - " lbrynet-cli status # get daemon status", - " lbrynet-cli --conf ~/l1.conf status # like above but using ~/l1.conf as config file", - " lbrynet-cli resolve_name what # resolve a name", - " lbrynet-cli help resolve_name # get help for a command", - ])) - - -def print_help_for_command(command): - fn = Daemon.callable_methods.get(command) - if fn: - print("Help for %s method:\n%s" % (command, fn.__doc__)) - - -def wrap_list_to_term_width(l, width=None, separator=', ', prefix=''): - if width is None: - try: - _, width = os.popen('stty size', 'r').read().split() - width = int(width) - except: - pass - if not width: - width = 80 - - lines = [] - curr_line = '' - for item in l: - new_line = curr_line + item + separator - if len(new_line) + len(prefix) > width: - lines.append(curr_line) - curr_line = item + separator - else: - curr_line = new_line - lines.append(curr_line) - - ret = prefix + ("\n" + prefix).join(lines) - if ret.endswith(separator): - ret = ret[:-len(separator)] - return ret - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/lbrynet/daemon/DaemonControl.py b/lbrynet/daemon/DaemonControl.py index 49807c3fb..65402531b 100644 --- a/lbrynet/daemon/DaemonControl.py +++ b/lbrynet/daemon/DaemonControl.py @@ -64,15 +64,6 @@ def start(argv=None, conf_path=None): log_support.configure_loggly_handler() log.debug('Final Settings: %s', conf.settings.get_current_settings_dict()) - # fixme: fix that, JSONRPCProxy is gone on py3 - #try: - # log.debug('Checking for an existing lbrynet daemon instance') - # JSONRPCProxy.from_url(conf.settings.get_api_connection_string()).status() - # log.info("lbrynet-daemon is already running") - # return - #except Exception: - # log.debug('No lbrynet instance found, continuing to start') - log.info("Starting lbrynet-daemon from command line") if test_internet_connection(): diff --git a/lbrynet/daemon/auth/server.py b/lbrynet/daemon/auth/server.py index e08562fb8..a012b5107 100644 --- a/lbrynet/daemon/auth/server.py +++ b/lbrynet/daemon/auth/server.py @@ -22,6 +22,7 @@ from lbrynet.daemon.ComponentManager import ComponentManager from .util import APIKey, get_auth_message, LBRY_SECRET from .undecorated import undecorated from .factory import AuthJSONRPCResource +from lbrynet.daemon.json_response_encoder import JSONResponseEncoder log = logging.getLogger(__name__) EMPTY_PARAMS = [{}] @@ -85,11 +86,6 @@ class JSONRPCError: return cls(message, code=code, traceback=traceback) -def default_decimal(obj): - if isinstance(obj, Decimal): - return float(obj) - - class UnknownAPIMethodError(Exception): pass @@ -109,7 +105,7 @@ def jsonrpc_dumps_pretty(obj, **kwargs): else: data = {"jsonrpc": "2.0", "result": obj, "id": id_} - return json.dumps(data, cls=jsonrpclib.JSONRPCEncoder, sort_keys=True, indent=2, **kwargs) + "\n" + return json.dumps(data, cls=JSONResponseEncoder, sort_keys=True, indent=2, **kwargs) + "\n" class JSONRPCServerType(type): @@ -314,7 +310,7 @@ class AuthJSONRPCServer(AuthorizedBase): # last resort, just cast it as a string error = JSONRPCError(str(failure)) - response_content = jsonrpc_dumps_pretty(error, id=id_) + response_content = jsonrpc_dumps_pretty(error, id=id_, ledger=self.ledger) self._set_headers(request, response_content) request.setResponseCode(200) self._render_message(request, response_content) @@ -575,7 +571,7 @@ class AuthJSONRPCServer(AuthorizedBase): def _callback_render(self, result, request, id_, auth_required=False): try: - message = jsonrpc_dumps_pretty(result, id=id_, default=default_decimal) + message = jsonrpc_dumps_pretty(result, id=id_, ledger=self.ledger) request.setResponseCode(200) self._set_headers(request, message, auth_required) self._render_message(request, message) diff --git a/lbrynet/daemon/json_response_encoder.py b/lbrynet/daemon/json_response_encoder.py new file mode 100644 index 000000000..622b60fa9 --- /dev/null +++ b/lbrynet/daemon/json_response_encoder.py @@ -0,0 +1,44 @@ +from decimal import Decimal +from binascii import hexlify +from datetime import datetime +from json import JSONEncoder +from wallet.transaction import Transaction, Output + + +class JSONResponseEncoder(JSONEncoder): + + def __init__(self, *args, ledger, **kwargs): + super().__init__(*args, **kwargs) + self.ledger = ledger + + def default(self, obj): + if isinstance(obj, Transaction): + return self.encode_transaction(obj) + if isinstance(obj, Output): + return self.encode_output(obj) + if isinstance(obj, datetime): + return obj.strftime("%Y%m%dT%H:%M:%S") + if isinstance(obj, Decimal): + return float(obj) + return super().default(obj) + + def encode_transaction(self, tx): + return { + 'txid': tx.id, + 'inputs': [self.encode_input(txo) for txo in tx.inputs], + 'outputs': [self.encode_output(txo) for txo in tx.outputs], + 'total_input': tx.input_sum, + 'total_output': tx.input_sum - tx.fee, + 'total_fee': tx.fee, + 'hex': hexlify(tx.raw).decode(), + } + + def encode_output(self, txo): + return { + 'nout': txo.position, + 'amount': txo.amount, + 'address': txo.get_address(self.ledger) + } + + def encode_input(self, txi): + return self.encode_output(txi.txo_ref.txo) diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 216faf421..c25c50271 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -656,6 +656,19 @@ class SQLiteStorage: if support_dl: yield defer.DeferredList(support_dl) + def save_claims_for_resolve(self, claim_infos): + to_save = [] + for info in claim_infos: + if 'value' in info: + if info['value']: + to_save.append(info) + else: + if 'certificate' in info and info['certificate']['value']: + to_save.append(info['certificate']) + if 'claim' in info and info['claim']['value']: + to_save.append(info['claim']) + return self.save_claims(to_save) + def get_old_stream_hashes_for_claim_id(self, claim_id, new_stream_hash): return self.run_and_return_list( "select f.stream_hash from file f " diff --git a/lbrynet/wallet/manager.py b/lbrynet/wallet/manager.py index f1028d867..4101586ba 100644 --- a/lbrynet/wallet/manager.py +++ b/lbrynet/wallet/manager.py @@ -147,11 +147,16 @@ class LbryWalletManager(BaseWalletManager): def get_info_exchanger(self): return LBRYcrdAddressRequester(self) + @defer.inlineCallbacks def resolve(self, *uris, **kwargs): page = kwargs.get('page', 0) page_size = kwargs.get('page_size', 10) + check_cache = kwargs.get('check_cache', False) # TODO: put caching back (was force_refresh parameter) ledger = self.default_account.ledger # type: MainNetLedger - return ledger.resolve(page, page_size, *uris) + results = ledger.resolve(page, page_size, *uris) + yield self.old_db.save_claims_for_resolve( + (value for value in results.values() if 'error' not in value)) + defer.returnValue(results) def get_name_claims(self): return defer.succeed([]) @@ -214,16 +219,6 @@ class LbryWalletManager(BaseWalletManager): @defer.inlineCallbacks def claim_new_channel(self, channel_name, amount): - try: - parsed = parse_lbry_uri(channel_name) - if not parsed.is_channel: - raise Exception("Cannot make a new channel for a non channel name") - if parsed.path: - raise Exception("Invalid channel uri") - except (TypeError, URIParseError): - raise Exception("Invalid channel name") - if amount <= 0: - raise Exception("Invalid amount") account = self.default_account address = yield account.receiving.get_or_create_usable_address() cert, key = generate_certificate()