diff --git a/lbrynet/core/LBRYcrdWallet.py b/lbrynet/core/LBRYcrdWallet.py index 61de7f5c8..bc7c51a92 100644 --- a/lbrynet/core/LBRYcrdWallet.py +++ b/lbrynet/core/LBRYcrdWallet.py @@ -39,11 +39,11 @@ def _catch_connection_error(f): return w -class LBRYcrdWallet(object): +class LBRYWallet(object): """This class implements the LBRYWallet interface for the LBRYcrd payment system""" implements(ILBRYWallet) - def __init__(self, db_dir, wallet_dir=None, wallet_conf=None, lbrycrdd_path=None): + def __init__(self, db_dir): self.db_dir = db_dir self.db = None @@ -58,44 +58,29 @@ class LBRYcrdWallet(object): # incremental_amount(float)) self.max_expected_payment_time = datetime.timedelta(minutes=3) self.stopped = True - self.started_lbrycrdd = False - self.wallet_dir = wallet_dir - self.wallet_conf = wallet_conf - self.lbrycrdd = None - self.manage_running = False - self.lbrycrdd_path = lbrycrdd_path - settings = self.get_rpc_conf() - rpc_user = settings["username"] - rpc_pass = settings["password"] - rpc_port = settings["rpc_port"] - rpc_url = "127.0.0.1" - self.rpc_conn_string = "http://%s:%s@%s:%s" % (rpc_user, rpc_pass, rpc_url, str(rpc_port)) + self.manage_running = False def start(self): - def make_connection(): - alert.info("Connecting to lbrycrdd...") - if self.lbrycrdd_path is not None: - self._start_daemon() - self._get_info() - log.info("Connected!") - alert.info("Connected to lbrycrdd.") - def start_manage(): self.stopped = False self.manage() return True d = self._open_db() - d.addCallback(lambda _: threads.deferToThread(make_connection)) + d.addCallback(lambda _: self._start()) d.addCallback(lambda _: start_manage()) return d - def stop(self): + def _start(self): + pass - def log_stop_error(err): - log.error("An error occurred stopping the wallet. %s", err.getTraceback()) + @staticmethod + def log_stop_error(err): + log.error("An error occurred stopping the wallet: %s", err.getTraceback()) + + def stop(self): self.stopped = True # If self.next_manage_call is None, then manage is currently running or else @@ -105,12 +90,14 @@ class LBRYcrdWallet(object): self.next_manage_call = None d = self.manage() - d.addErrback(log_stop_error) - if self.lbrycrdd_path is not None: - d.addCallback(lambda _: self._stop_daemon()) - d.addErrback(log_stop_error) + d.addErrback(self.log_stop_error) + d.addCallback(lambda _: self._stop()) + d.addErrback(self.log_stop_error) return d + def _stop(self): + pass + def manage(self): log.info("Doing manage") self.next_manage_call = None @@ -137,7 +124,7 @@ class LBRYcrdWallet(object): d.addCallback(lambda _: self._send_payments()) - d.addCallback(lambda _: threads.deferToThread(self._get_wallet_balance)) + d.addCallback(lambda _: self.get_balance()) def set_wallet_balance(balance): self.wallet_balance = balance @@ -164,16 +151,6 @@ class LBRYcrdWallet(object): d.addBoth(set_manage_not_running) return d - def get_info_exchanger(self): - return LBRYcrdAddressRequester(self) - - def get_wallet_info_query_handler_factory(self): - return LBRYcrdAddressQueryHandlerFactory(self) - - def get_balance(self): - d = threads.deferToThread(self._get_wallet_balance) - return d - def reserve_points(self, identifier, amount): """ Ensure a certain amount of points are available to be sent as payment, before the service is rendered @@ -263,37 +240,66 @@ class LBRYcrdWallet(object): def set_address_for_peer(address): self.current_address_given_to_peer[peer] = address return address - d = threads.deferToThread(self._get_new_address) + d = self.get_new_address() d.addCallback(set_address_for_peer) return d - def get_stream_info_for_name(self, name, txid=None): + def _send_payments(self): + log.info("Trying to send payments, if there are any to be sent") - def get_stream_info_from_value(result): - r_dict = {} - if 'value' in result: - value = result['value'] - try: - value_dict = json.loads(value) - except ValueError: - return Failure(InvalidStreamInfoError(name)) - known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail', - 'content_license'] - for field in known_fields: - if field in value_dict: - r_dict[field] = value_dict[field] - if 'stream_hash' in r_dict and 'txid' in result: - d = self._save_name_metadata(name, r_dict['stream_hash'], str(result['txid'])) - else: - d = defer.succeed(True) - d.addCallback(lambda _: r_dict) - return d + payments_to_send = {} + for address, points in self.queued_payments.items(): + log.info("Should be sending %s points to %s", str(points), str(address)) + payments_to_send[address] = float(points) + self.total_reserved_points -= points + self.wallet_balance -= points + del self.queued_payments[address] + if payments_to_send: + log.info("Creating a transaction with outputs %s", str(payments_to_send)) + return self._do_send_many(payments_to_send) + log.info("There were no payments to send") + return defer.succeed(True) + + def get_stream_info_for_name(self, name): + d = self._get_value_for_name(name) + d.addCallback(self._get_stream_info_from_value, name) + return d + + def get_stream_info_from_txid(self, name, txid): + d = self._get_claims_from_tx(txid) + + def get_claim_for_name(claims): + for claim in claims: + if claim['name'] == name: + claim['txid'] = txid + return claim return Failure(UnknownNameError(name)) - d = threads.deferToThread(self._get_value_for_name, name, txid) - d.addCallback(get_stream_info_from_value) + d.addCallback(get_claim_for_name) + d.addCallback(self._get_stream_info_from_value, name) return d + def _get_stream_info_from_value(self, result, name): + r_dict = {} + if 'value' in result: + value = result['value'] + try: + value_dict = json.loads(value) + except ValueError: + return Failure(InvalidStreamInfoError(name)) + known_fields = ['stream_hash', 'name', 'description', 'key_fee', 'key_fee_address', 'thumbnail', + 'content_license'] + for field in known_fields: + if field in value_dict: + r_dict[field] = value_dict[field] + if 'stream_hash' in r_dict and 'txid' in result: + d = self._save_name_metadata(name, r_dict['stream_hash'], str(result['txid'])) + else: + d = defer.succeed(True) + d.addCallback(lambda _: r_dict) + return d + return Failure(UnknownNameError(name)) + def claim_name(self, name, sd_hash, amount, description=None, key_fee=None, key_fee_address=None, thumbnail=None, content_license=None): value = {"stream_hash": sd_hash} @@ -308,7 +314,7 @@ class LBRYcrdWallet(object): if content_license is not None: value['content_license'] = content_license - d = threads.deferToThread(self._claim_name, name, json.dumps(value), amount) + d = self._send_name_claim(name, json.dumps(value), amount) def _save_metadata(txid): d = self._save_name_metadata(name, sd_hash, txid) @@ -319,70 +325,230 @@ class LBRYcrdWallet(object): return d def abandon_name(self, txid): - address = self._get_new_address() - raw = self._get_raw_tx(txid) - transaction = self._get_decoded_tx(raw) - amount = float(transaction['vout'][1]['value']) - return self._abandon_name(txid, address, amount) + d1 = self.get_new_address() + d2 = self._get_claims_from_tx(txid) + + def get_txout_of_claim(claims): + for claim in claims: + if 'name' in claim and 'nOut' in claim: + return claim['nOut'] + return defer.fail(ValueError("No claims in tx")) + + def get_value_of_txout(nOut): + d = self._get_raw_tx(txid) + d.addCallback(self._get_decoded_tx) + d.addCallback(lambda tx: tx['vout'][nOut]['value']) + return d + + d2.addCallback(get_txout_of_claim) + d2.addCallback(get_value_of_txout) + dl = defer.DeferredList([d1, d2], consumeErrors=True) + + def abandon(results): + if results[0][0] and results[1][0]: + address = results[0][1] + amount = results[1][1] + return self._send_abandon(txid, address, amount) + elif results[0][0] is False: + return defer.fail(Failure(ValueError("Couldn't get a new address"))) + else: + return results[1][1] + + dl.addCallback(abandon) + return dl def get_tx(self, txid): - raw = self._get_raw_tx(txid) - return self._get_decoded_tx(raw) - - def get_name_claims(self): - return threads.deferToThread(self._get_name_claims) - - def start_miner(self): - if not self._get_gen_status(): - return self._set_gen_status(True) - else: - return "Miner was already running" - - def stop_miner(self): - if self._get_gen_status(): - return self._set_gen_status(False) - else: - return "Miner wasn't running" - - def get_miner_status(self): - return self._get_gen_status() - - def get_block(self, blockhash): - return self._get_block(blockhash) - - def get_blockchain_info(self): - return self._get_blockchain_info() - - def get_claims_for_tx(self, txid): - return self._get_claims_for_tx(txid) - - def get_nametrie(self): - return self._get_nametrie() + d = self._get_raw_tx(txid) + d.addCallback(self._get_decoded_tx) + return d # def update_name(self, name_value): # return self._update_name(name_value) def get_name_and_validity_for_sd_hash(self, sd_hash): d = self._get_claim_metadata_for_sd_hash(sd_hash) - d.addCallback(lambda name_txid: threads.deferToThread(self._get_status_of_claim, name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None) + d.addCallback(lambda name_txid: self._get_status_of_claim(name_txid[1], name_txid[0], sd_hash) if name_txid is not None else None) return d def get_available_balance(self): return float(self.wallet_balance - self.total_reserved_points) - def get_new_address(self): - return threads.deferToThread(self._get_new_address) + def _get_status_of_claim(self, txid, name, sd_hash): + d = self._get_claims_from_tx(txid) - def check_first_run(self): - d = threads.deferToThread(self._get_wallet_balance) - d.addCallback(lambda bal: threads.deferToThread(self._get_num_addresses) if bal == 0 else 2) - d.addCallback(lambda num_addresses: True if num_addresses <= 1 else False) + def get_status(claims): + if claims is None: + claims = [] + for claim in claims: + if 'in claim trie' in claim: + if 'name' in claim and str(claim['name']) == name and 'value' in claim: + try: + value_dict = json.loads(claim['value']) + except ValueError: + return None + if 'stream_hash' in value_dict and str(value_dict['stream_hash']) == sd_hash: + if 'is controlling' in claim and claim['is controlling']: + return name, "valid" + if claim['in claim trie']: + return name, "invalid" + if 'in queue' in claim and claim['in queue']: + return name, "pending" + return name, "unconfirmed" + return None + + d.addCallback(get_status) return d - def get_most_recent_blocktime(self): - return threads.deferToThread(self._get_best_block_time) + def _check_expected_balances(self): + now = datetime.datetime.now() + balances_to_check = [] + try: + while self.expected_balance_at_time[0][3] < now: + balances_to_check.append(self.expected_balance_at_time.popleft()) + except IndexError: + pass + ds = [] + for balance_to_check in balances_to_check: + log.info("Checking balance of address %s", str(balance_to_check[1])) + d = self._get_balance_for_address(balance_to_check[1]) + d.addCallback(lambda bal: bal >= balance_to_check[2]) + ds.append(d) + dl = defer.DeferredList(ds) - def get_rpc_conf(self): + def handle_checks(results): + from future_builtins import zip + for balance, (success, result) in zip(balances_to_check, results): + peer = balance[0] + if success is True: + if result is False: + if balance[4] <= 1: # first or second strike, give them another chance + new_expected_balance = (balance[0], + balance[1], + balance[2], + datetime.datetime.now() + self.max_expected_payment_time, + balance[4] + 1, + balance[5]) + self.expected_balance_at_time.append(new_expected_balance) + peer.update_score(-5.0) + else: + peer.update_score(-50.0) + else: + if balance[4] == 0: + peer.update_score(balance[5]) + peer.update_stats('points_received', balance[5]) + else: + log.warning("Something went wrong checking a balance. Peer: %s, account: %s," + "expected balance: %s, expected time: %s, count: %s, error: %s", + str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]), + str(balance[4]), str(result.getErrorMessage())) + + dl.addCallback(handle_checks) + return dl + + def _open_db(self): + self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"), + check_same_thread=False) + return self.db.runQuery("create table if not exists name_metadata (" + + " name text, " + + " txid text, " + + " sd_hash text)") + + def _save_name_metadata(self, name, sd_hash, txid): + d = self.db.runQuery("insert into name_metadata values (?, ?, ?)", + (name, txid, sd_hash)) + return d + + def _get_claim_metadata_for_sd_hash(self, sd_hash): + d = self.db.runQuery("select name, txid from name_metadata where sd_hash=?", (sd_hash,)) + d.addCallback(lambda r: r[0] if len(r) else None) + return d + + ######### Must be overridden ######### + + def get_info_exchanger(self): + return defer.fail(NotImplementedError()) + + def get_wallet_info_query_handler_factory(self): + return defer.fail(NotImplementedError()) + + def get_balance(self): + return defer.fail(NotImplementedError()) + + def get_new_address(self): + return defer.fail(NotImplementedError()) + + def get_block(self, blockhash): + return defer.fail(NotImplementedError()) + + def get_most_recent_blocktime(self): + return defer.fail(NotImplementedError()) + + def get_blockchain_info(self): + return defer.fail(NotImplementedError()) + + def get_name_claims(self): + return defer.fail(NotImplementedError()) + + def check_first_run(self): + return defer.fail(NotImplementedError()) + + def _get_raw_tx(self, txid): + return defer.fail(NotImplementedError()) + + def _send_name_claim(self, name, val, amount): + return defer.fail(NotImplementedError()) + + def _get_decoded_tx(self, raw_tx): + return defer.fail(NotImplementedError()) + + def _send_abandon(self, txid, address, amount): + return defer.fail(NotImplementedError()) + + def _do_send_many(self, payments_to_send): + return defer.fail(NotImplementedError()) + + def _get_value_for_name(self, name): + return defer.fail(NotImplementedError()) + + def _get_claims_from_tx(self, txid): + return defer.fail(NotImplementedError()) + + def _get_balance_for_address(self, address): + return defer.fail(NotImplementedError()) + + +class LBRYcrdWallet(LBRYWallet): + def __init__(self, db_dir, wallet_dir=None, wallet_conf=None, lbrycrdd_path=None): + LBRYWallet.__init__(self, db_dir) + self.started_lbrycrdd = False + self.wallet_dir = wallet_dir + self.wallet_conf = wallet_conf + self.lbrycrdd = None + self.lbrycrdd_path = lbrycrdd_path + + settings = self._get_rpc_conf() + rpc_user = settings["username"] + rpc_pass = settings["password"] + rpc_port = settings["rpc_port"] + rpc_url = "127.0.0.1" + self.rpc_conn_string = "http://%s:%s@%s:%s" % (rpc_user, rpc_pass, rpc_url, str(rpc_port)) + + def _start(self): + return threads.deferToThread(self._make_connection) + + def _stop(self): + if self.lbrycrdd_path is not None: + return self._stop_daemon() + + def _make_connection(self): + alert.info("Connecting to lbrycrdd...") + if self.lbrycrdd_path is not None: + self._start_daemon() + self._get_info_rpc() + log.info("Connected!") + alert.info("Connected to lbrycrdd.") + + def _get_rpc_conf(self): settings = {"username": "rpcuser", "password": "rpcpassword", "rpc_port": 8332} @@ -397,6 +563,82 @@ class LBRYcrdWallet(object): settings["rpc_port"] = int(l[8:].rstrip('\n')) return settings + def get_info_exchanger(self): + return LBRYcrdAddressRequester(self) + + def get_wallet_info_query_handler_factory(self): + return LBRYcrdAddressQueryHandlerFactory(self) + + def check_first_run(self): + d = self.get_balance() + d.addCallback(lambda bal: threads.deferToThread(self._get_num_addresses_rpc) if bal == 0 else 2) + d.addCallback(lambda num_addresses: True if num_addresses <= 1 else False) + return d + + def get_new_address(self): + return threads.deferToThread(self._get_new_address_rpc) + + def get_balance(self): + return threads.deferToThread(self._get_wallet_balance_rpc) + + def get_most_recent_blocktime(self): + d = threads.deferToThread(self._get_best_blockhash_rpc) + d.addCallback(lambda blockhash: threads.deferToThread(self._get_block_rpc, blockhash)) + d.addCallback( + lambda block: block['time'] if 'time' in block else Failure(ValueError("Could not get a block time"))) + return d + + def get_name_claims(self): + return threads.deferToThread(self._get_name_claims_rpc) + + def get_block(self, blockhash): + return threads.deferToThread(self._get_block_rpc, blockhash) + + def get_blockchain_info(self): + return threads.deferToThread(self._get_blockchain_info_rpc) + + def get_nametrie(self): + return threads.deferToThread(self._get_nametrie_rpc) + + def start_miner(self): + d = threads.deferToThread(self._get_gen_status_rpc) + d.addCallback(lambda status: threads.deferToThread(self._set_gen_status_rpc, True) if not status + else "Miner was already running") + return d + + def stop_miner(self): + d = threads.deferToThread(self._get_gen_status_rpc) + d.addCallback(lambda status: threads.deferToThread(self._set_gen_status_rpc, False) if status + else "Miner wasn't running") + return d + + def get_miner_status(self): + return threads.deferToThread(self._get_gen_status_rpc) + + def _get_balance_for_address(self, address): + return threads.deferToThread(self._get_balance_for_address_rpc, address) + + def _do_send_many(self, payments_to_send): + return threads.deferToThread(self._do_send_many_rpc, payments_to_send) + + def _send_name_claim(self, name, value, amount): + return threads.deferToThread(self._send_name_claim_rpc, name, value, amount) + + def _get_raw_tx(self, txid): + return threads.deferToThread(self._get_raw_tx_rpc, txid) + + def _get_decoded_tx(self, raw_tx): + return threads.deferToThread(self._get_decoded_tx_rpc, raw_tx) + + def _send_abandon(self, txid, address, amount): + return threads.deferToThread(self._send_abandon_rpc, txid, address, amount) + + def _get_claims_from_tx(self, txid): + return threads.deferToThread(self._get_claims_from_tx_rpc, txid) + + def _get_value_for_name(self, name): + return threads.deferToThread(self._get_value_for_name_rpc, name) + def _get_rpc_conn(self): return AuthServiceProxy(self.rpc_conn_string) @@ -450,101 +692,41 @@ class LBRYcrdWallet(object): def _stop_daemon(self): if self.lbrycrdd is not None and self.started_lbrycrdd is True: alert.info("Stopping lbrycrdd...") - d = threads.deferToThread(self._rpc_stop) + d = threads.deferToThread(self._stop_rpc) d.addCallback(lambda _: alert.info("Stopped lbrycrdd.")) return d return defer.succeed(True) - def _check_expected_balances(self): - now = datetime.datetime.now() - balances_to_check = [] - try: - while self.expected_balance_at_time[0][3] < now: - balances_to_check.append(self.expected_balance_at_time.popleft()) - except IndexError: - pass - ds = [] - for balance_to_check in balances_to_check: - d = threads.deferToThread(self._check_expected_balance, balance_to_check) - ds.append(d) - dl = defer.DeferredList(ds) - - def handle_checks(results): - from future_builtins import zip - for balance, (success, result) in zip(balances_to_check, results): - peer = balance[0] - if success is True: - if result is False: - if balance[4] <= 1: # first or second strike, give them another chance - new_expected_balance = (balance[0], - balance[1], - balance[2], - datetime.datetime.now() + self.max_expected_payment_time, - balance[4] + 1, - balance[5]) - self.expected_balance_at_time.append(new_expected_balance) - peer.update_score(-5.0) - else: - peer.update_score(-50.0) - else: - if balance[4] == 0: - peer.update_score(balance[5]) - peer.update_stats('points_received', balance[5]) - else: - log.warning("Something went wrong checking a balance. Peer: %s, account: %s," - "expected balance: %s, expected time: %s, count: %s, error: %s", - str(balance[0]), str(balance[1]), str(balance[2]), str(balance[3]), - str(balance[4]), str(result.getErrorMessage())) - - dl.addCallback(handle_checks) - return dl - @_catch_connection_error - def _check_expected_balance(self, expected_balance): + def _get_balance_for_address_rpc(self, address): rpc_conn = self._get_rpc_conn() - log.info("Checking balance of address %s", str(expected_balance[1])) - balance = rpc_conn.getreceivedbyaddress(expected_balance[1]) - log.debug("received balance: %s", str(balance)) - log.debug("expected balance: %s", str(expected_balance[2])) - return balance >= expected_balance[2] - - def _send_payments(self): - log.info("Trying to send payments, if there are any to be sent") - - def do_send(payments): - rpc_conn = self._get_rpc_conn() - rpc_conn.sendmany("", payments) - - payments_to_send = {} - for address, points in self.queued_payments.items(): - log.info("Should be sending %s points to %s", str(points), str(address)) - payments_to_send[address] = float(points) - self.total_reserved_points -= points - self.wallet_balance -= points - del self.queued_payments[address] - if payments_to_send: - log.info("Creating a transaction with outputs %s", str(payments_to_send)) - return threads.deferToThread(do_send, payments_to_send) - log.info("There were no payments to send") - return defer.succeed(True) + balance = rpc_conn.getreceivedbyaddress(address) + log.debug("received balance for %s: %s", str(address), str(balance)) + return balance @_catch_connection_error - def _get_info(self): + def _do_send_many_rpc(self, payments): + rpc_conn = self._get_rpc_conn() + rpc_conn.sendmany("", payments) + return True + + @_catch_connection_error + def _get_info_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.getinfo() @_catch_connection_error - def _get_name_claims(self): + def _get_name_claims_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.listnameclaims() @_catch_connection_error - def _get_gen_status(self): + def _get_gen_status_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.getgenerate() @_catch_connection_error - def _set_gen_status(self, b): + def _set_gen_status_rpc(self, b): if b: log.info("Starting miner") else: @@ -553,69 +735,61 @@ class LBRYcrdWallet(object): return rpc_conn.setgenerate(b) @_catch_connection_error - def _get_raw_tx(self, txid): + def _get_raw_tx_rpc(self, txid): rpc_conn = self._get_rpc_conn() return rpc_conn.getrawtransaction(txid) @_catch_connection_error - def _get_decoded_tx(self, raw): + def _get_decoded_tx_rpc(self, raw): rpc_conn = self._get_rpc_conn() return rpc_conn.decoderawtransaction(raw) @_catch_connection_error - def _abandon_name(self, txid, address, amount): + def _send_abandon_rpc(self, txid, address, amount): rpc_conn = self._get_rpc_conn() return rpc_conn.abandonname(txid, address, amount) @_catch_connection_error - def _get_blockchain_info(self): + def _get_blockchain_info_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.getblockchaininfo() @_catch_connection_error - def _get_block(self, blockhash): + def _get_block_rpc(self, blockhash): rpc_conn = self._get_rpc_conn() return rpc_conn.getblock(blockhash) @_catch_connection_error - def _get_claims_for_tx(self, txid): + def _get_claims_from_tx_rpc(self, txid): rpc_conn = self._get_rpc_conn() return rpc_conn.getclaimsfortx(txid) @_catch_connection_error - def _get_nametrie(self): + def _get_nametrie_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.getnametrie() @_catch_connection_error - def _get_wallet_balance(self): + def _get_wallet_balance_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.getbalance("") @_catch_connection_error - def _get_new_address(self): + def _get_new_address_rpc(self): rpc_conn = self._get_rpc_conn() return rpc_conn.getnewaddress() @_catch_connection_error - def _get_value_for_name(self, name, txid=None): + def _get_value_for_name_rpc(self, name): rpc_conn = self._get_rpc_conn() - if not txid: - return rpc_conn.getvalueforname(name) - else: - claim = rpc_conn.getclaimsfortx(txid)[0] - if claim['name'] == name: - claim['txid'] = txid - return claim - else: - raise ValueError + return rpc_conn.getvalueforname(name) - # def _update_name(self, name_value): + # def _update_name_rpc(self, name_value): # rpc_conn = self._get_rpc_conn() # return rpc_conn.updatename(name_value) @_catch_connection_error - def _claim_name(self, name, value, amount): + def _send_name_claim_rpc(self, name, value, amount): rpc_conn = self._get_rpc_conn() try: return str(rpc_conn.claimname(name, value, amount)) @@ -626,45 +800,17 @@ class LBRYcrdWallet(object): raise ValueError(e.error['message']) @_catch_connection_error - def _get_status_of_claim(self, txhash, name, sd_hash): - rpc_conn = self._get_rpc_conn() - claims = rpc_conn.getclaimsfortx(txhash) - if claims is None: - claims = [] - for claim in claims: - if 'in claim trie' in claim: - if 'name' in claim and str(claim['name']) == name and 'value' in claim: - try: - value_dict = json.loads(claim['value']) - except ValueError: - return None - if 'stream_hash' in value_dict and str(value_dict['stream_hash']) == sd_hash: - if 'is controlling' in claim and claim['is controlling']: - return name, "valid" - if claim['in claim trie']: - return name, "invalid" - if 'in queue' in claim and claim['in queue']: - return name, "pending" - return name, "unconfirmed" - return None - - @_catch_connection_error - def _get_num_addresses(self): + def _get_num_addresses_rpc(self): rpc_conn = self._get_rpc_conn() return len(rpc_conn.getaddressesbyaccount("")) @_catch_connection_error - def _get_best_block_time(self): + def _get_best_blockhash_rpc(self): rpc_conn = self._get_rpc_conn() - best_block_hash = rpc_conn.getbestblockhash() - block = rpc_conn.getblock(best_block_hash) - if 'time' in block: - return block['time'] - raise ValueError("Could not get a block time") - + return rpc_conn.getbestblockhash() @_catch_connection_error - def _rpc_stop(self): + def _stop_rpc(self): # check if our lbrycrdd is actually running, or if we connected to one that was already # running and ours failed to start if self.lbrycrdd.poll() is None: @@ -672,24 +818,6 @@ class LBRYcrdWallet(object): rpc_conn.stop() self.lbrycrdd.wait() - def _open_db(self): - self.db = adbapi.ConnectionPool('sqlite3', os.path.join(self.db_dir, "blockchainname.db"), - check_same_thread=False) - return self.db.runQuery("create table if not exists name_metadata (" + - " name text, " + - " txid text, " + - " sd_hash text)") - - def _save_name_metadata(self, name, sd_hash, txid): - d = self.db.runQuery("insert into name_metadata values (?, ?, ?)", - (name, txid, sd_hash)) - return d - - def _get_claim_metadata_for_sd_hash(self, sd_hash): - d = self.db.runQuery("select name, txid from name_metadata where sd_hash=?", (sd_hash,)) - d.addCallback(lambda r: r[0] if len(r) else None) - return d - class LBRYcrdAddressRequester(object): implements([IRequestCreator]) diff --git a/lbrynet/lbrynet_console/ControlHandlers.py b/lbrynet/lbrynet_console/ControlHandlers.py index 926b96f39..6cac5da64 100644 --- a/lbrynet/lbrynet_console/ControlHandlers.py +++ b/lbrynet/lbrynet_console/ControlHandlers.py @@ -766,15 +766,15 @@ class AddStream(CommandHandler): def do_download(stream_downloader): d = stream_downloader.start() - d.addCallback(lambda _: self._download_succeeded(stream_downloader)) + d.addCallback(lambda result: self._download_succeeded(stream_downloader, result)) return d d.addCallback(do_download) d.addErrback(self._handle_download_error) return d - def _download_succeeded(self, stream_downloader): - self.console.sendLine("%s has successfully downloaded." % str(stream_downloader)) + def _download_succeeded(self, stream_downloader, result): + self.console.sendLine("%s: %s." % (str(stream_downloader), str(result))) def _handle_download_error(self, err): if err.check(InsufficientFundsError): diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index 0ba94e73f..0df0e99bc 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -799,12 +799,12 @@ class LBRYDaemon(xmlrpc.XMLRPC): @return: """ - def _return_d(x): - d = defer.Deferred() - d.addCallback(lambda _: x) - d.callback(None) + #def _return_d(x): + # d = defer.Deferred() + # d.addCallback(lambda _: x) + # d.callback(None) - return d + # return d def _clean(n): t = [] @@ -835,21 +835,36 @@ class LBRYDaemon(xmlrpc.XMLRPC): return f + def resolve_claims(claims): + ds = [] + for claim in claims: + d1 = defer.succeed(claim) + d2 = self._resolve_name_wc(claim['name']) + d3 = self._get_est_cost(claim['name']) + dl = defer.DeferredList([d1, d2, d3], consumeErrors=True) + ds.append(dl) + return defer.DeferredList(ds) + def _disp(results): print '[' + str(datetime.now()) + '] Found ' + str(len(results)) + ' results' return results print '[' + str(datetime.now()) + '] Search nametrie: ' + search - filtered_results = [n for n in self.session.wallet.get_nametrie() if n['name'].startswith(search)] - if len(filtered_results) > self.max_search_results: - filtered_results = filtered_results[:self.max_search_results] - filtered_results = [n for n in filtered_results if 'txid' in n.keys()] - resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name_wc(n['name']), - self._get_est_cost(n['name'])], consumeErrors=True) - for n in filtered_results] + d = self.session.wallet.get_nametrie() + d.addCallback(lambda trie: [claim for claim in trie if claim['name'].startswith(search) and 'txid' in claim]) + d.addCallback(lambda claims: claims[:self.max_search_results]) + d.addCallback(resolve_claims) - d = defer.DeferredList(resolved_results) + #filtered_results = [n for n in self.session.wallet.get_nametrie() if n['name'].startswith(search)] + #if len(filtered_results) > self.max_search_results: + # filtered_results = filtered_results[:self.max_search_results] + #filtered_results = [n for n in filtered_results if 'txid' in n.keys()] + #resolved_results = [defer.DeferredList([_return_d(n), self._resolve_name_wc(n['name']), + # self._get_est_cost(n['name'])], consumeErrors=True) + # for n in filtered_results] + + #d = defer.DeferredList(resolved_results) d.addCallback(_clean) d.addCallback(_parse) d.addCallback(_disp) diff --git a/lbrynet/lbrynet_daemon/LBRYDownloader.py b/lbrynet/lbrynet_daemon/LBRYDownloader.py index c880616bb..f5ba68ca6 100644 --- a/lbrynet/lbrynet_daemon/LBRYDownloader.py +++ b/lbrynet/lbrynet_daemon/LBRYDownloader.py @@ -135,52 +135,66 @@ class FetcherDaemon(object): return msg def _get_names(self): - c = self.wallet.get_blockchain_info() - rtn = [] - if self.lastbestblock != c: - block = self.wallet.get_block(c['bestblockhash']) - txids = block['tx'] - transactions = [self.wallet.get_tx(t) for t in txids] - for t in transactions: - claims = self.wallet.get_claims_for_tx(t['txid']) + d = self.wallet.get_blockchain_info() + d.addCallback(lambda c: get_new_streams if c != self.lastbestblock else []) + + def get_new_streams(c): + self.lastbestblock = c + d = self.wallet.get_block(c['bestblockhash']) + d.addCallback(lambda block: get_new_streams_in_txes(block['tx'], c)) + return d + + def get_new_streams_in_txes(txids, c): + ds = [] + for t in txids: + d = self.wallet.get_claims_from_tx(t) + d.addCallback(get_new_streams_in_tx, t, c) + ds.append(d) + d = defer.DeferredList(ds, consumeErrors=True) + d.addCallback(lambda result: [r[1] for r in result if r[0]]) + d.addCallback(lambda stream_lists: [stream for streams in stream_lists for stream in streams]) + return d + + def get_new_streams_in_tx(claims, t, c): + #claims = self.wallet.get_claims_for_tx(t['txid']) # if self.first_run: # # claims = self.rpc_conn.getclaimsfortx("96aca2c60efded5806b7336430c5987b9092ffbea9c6ed444e3bf8e008993e11") # # claims = self.rpc_conn.getclaimsfortx("cc9c7f5225ecb38877e6ca7574d110b23214ac3556b9d65784065ad3a85b4f74") # self.first_run = False - if claims: - for claim in claims: - if claim not in self.seen: - msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \ - " | stream hash: " + str(json.loads(claim['value'])['stream_hash']) - print msg - log.debug(msg) - rtn.append([claim['name'], t['txid']]) - self.seen.append(claim) - else: - if self.verbose: - print "[" + str(datetime.now()) + "] No claims in block", c['bestblockhash'] + rtn = [] + if claims: + for claim in claims: + if claim not in self.seen: + msg = "[" + str(datetime.now()) + "] New claim | lbry://" + str(claim['name']) + \ + " | stream hash: " + str(json.loads(claim['value'])['stream_hash']) + print msg + log.debug(msg) + rtn.append((claim['name'], t)) + self.seen.append(claim) + else: + if self.verbose: + print "[" + str(datetime.now()) + "] No claims in block", c['bestblockhash'] + return rtn - self.lastbestblock = c - - if len(rtn): - return defer.DeferredList([self.wallet.get_stream_info_for_name(name, txid=t) for name, t in rtn]) + d.addCallback(lambda streams: defer.DeferredList( + [self.wallet.get_stream_info_from_txid(name, t) for name, t in streams])) + # if len(rtn): + # return defer.DeferredList([self.wallet.get_stream_info_for_name(name, txid=t) for name, t in rtn]) + return d def _download_claims(self, claims): if claims: for claim in claims: - download = defer.Deferred() stream = GetStream(self.sd_identifier, self.session, self.wallet, self.lbry_file_manager, self.max_key_fee, pay_key=False) - download.addCallback(lambda _: stream.start(claim[1])) - download.callback(None) + stream.start(claim[1]) return defer.succeed(None) def _looped_search(self): - d = defer.Deferred() - d.addCallback(lambda _: self._get_names()) + d = self._get_names() d.addCallback(self._download_claims) - d.callback(None) + return d def _get_autofetcher_conf(self): settings = {"maxkey": "0.0"}