diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f4f81391..eb821fa83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ at anytime. ### Deprecated * `channel_list_mine`, replaced with `channel_list` - * + * `get_availability`, replaced with `stream_availability` ### Changed * Check claim schema in `publish` before trying to make the claim, return better error messages @@ -57,6 +57,7 @@ at anytime. * Added a table to the lbry file database to store the outpoint of the claim downloaded from * Added `wallet_unlock`, a command available during startup to unlock an encrypted wallet * Added support for wallet encryption via new commands `wallet_decrypt` and `wallet_encrypt` + * Added `blob_availability` and `stream_availability` commands for debugging download issues ### Removed * Removed claim related filter arguments `name`, `claim_id`, and `outpoint` from `file_list`, `file_delete`, `file_set_status`, and `file_reflect` diff --git a/lbrynet/core/SinglePeerDownloader.py b/lbrynet/core/SinglePeerDownloader.py new file mode 100644 index 000000000..0f056d747 --- /dev/null +++ b/lbrynet/core/SinglePeerDownloader.py @@ -0,0 +1,111 @@ +import logging +import shutil +import tempfile + +from twisted.internet import defer, threads, reactor + +from lbrynet.blob import BlobFile +from lbrynet.core.BlobManager import DiskBlobManager +from lbrynet.core.HashAnnouncer import DummyHashAnnouncer +from lbrynet.core.RateLimiter import DummyRateLimiter +from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager +from lbrynet.core.PeerFinder import DummyPeerFinder +from lbrynet.core.client.BlobRequester import BlobRequester +from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader +from lbrynet.core.client.ConnectionManager import ConnectionManager + +log = logging.getLogger(__name__) + + +class TempBlobManager(DiskBlobManager): + def stop(self): + self.db_conn.close() + return defer.succeed(True) + + +class SinglePeerFinder(DummyPeerFinder): + def __init__(self, peer): + DummyPeerFinder.__init__(self) + self.peer = peer + + def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False): + return defer.succeed([self.peer]) + + +class BlobCallback(BlobFile): + def __init__(self, blob_dir, blob_hash, timeout): + BlobFile.__init__(self, blob_dir, blob_hash) + self.callback = defer.Deferred() + reactor.callLater(timeout, self._cancel) + + def _cancel(self): + if not self.callback.called: + self.callback.callback(False) + + def save_verified_blob(self, writer): + result = BlobFile.save_verified_blob(self, writer) + if not self.callback.called: + self.callback.callback(True) + return result + + +class SingleBlobDownloadManager(object): + def __init__(self, blob): + self.blob = blob + + def needed_blobs(self): + if self.blob.verified: + return [] + else: + return [self.blob] + + def get_head_blob_hash(self): + return self.blob.blob_hash + + +class SinglePeerDownloader(object): + def __init__(self): + self._payment_rate_manager = OnlyFreePaymentsManager() + self._announcer = DummyHashAnnouncer() + self._rate_limiter = DummyRateLimiter() + self._wallet = None + self._blob_manager = None + + def setup(self, wallet, blob_manager=None): + if not self._wallet: + self._wallet = wallet + if not self._blob_manager: + self._blob_manager = blob_manager + + @defer.inlineCallbacks + def download_blob_from_peer(self, peer, timeout, blob_hash, blob_manager): + log.debug("Try to download %s from %s", blob_hash, peer.host) + blob_manager = blob_manager + blob = BlobCallback(blob_manager.blob_dir, blob_hash, timeout) + download_manager = SingleBlobDownloadManager(blob) + peer_finder = SinglePeerFinder(peer) + requester = BlobRequester(blob_manager, peer_finder, self._payment_rate_manager, + self._wallet, download_manager) + downloader = StandaloneBlobDownloader(blob_hash, blob_manager, peer_finder, + self._rate_limiter, self._payment_rate_manager, + self._wallet, timeout=timeout) + info_exchanger = self._wallet.get_info_exchanger() + connection_manager = ConnectionManager(downloader, self._rate_limiter, [requester], + [info_exchanger]) + connection_manager.start() + result = yield blob.callback + if not result: + log.debug("Failed to downloaded %s from %s", blob_hash[:16], peer.host) + yield connection_manager.stop() + defer.returnValue(result) + + @defer.inlineCallbacks + def download_temp_blob_from_peer(self, peer, timeout, blob_hash): + tmp_dir = yield threads.deferToThread(tempfile.mkdtemp) + tmp_blob_manager = TempBlobManager(self._announcer, tmp_dir, tmp_dir) + try: + result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager) + finally: + yield tmp_blob_manager.stop() + yield threads.deferToThread(shutil.rmtree, tmp_dir) + defer.returnValue(result) diff --git a/lbrynet/core/Wallet.py b/lbrynet/core/Wallet.py index e7a875a45..6c237c368 100644 --- a/lbrynet/core/Wallet.py +++ b/lbrynet/core/Wallet.py @@ -27,8 +27,10 @@ from lbrynet import conf from lbrynet.core.sqlite_helpers import rerun_if_locked from lbrynet.interfaces import IRequestCreator, IQueryHandlerFactory, IQueryHandler, IWallet from lbrynet.core.client.ClientRequest import ClientRequest -from lbrynet.core.Error import RequestCanceledError, InsufficientFundsError, UnknownNameError +from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import UnknownClaimID, UnknownURI, NegativeFundsError, UnknownOutpoint +from lbrynet.core.Error import DownloadCanceledError, RequestCanceledError +from twisted.internet.error import ConnectionAborted log = logging.getLogger(__name__) @@ -1610,7 +1612,7 @@ class LBRYcrdAddressRequester(object): self.wallet.update_peer_address(peer, address) def _request_failed(self, err, peer): - if not err.check(RequestCanceledError): + if not err.check(DownloadCanceledError, RequestCanceledError, ConnectionAborted): log.warning("A peer failed to send a valid public key response. Error: %s, peer: %s", err.getErrorMessage(), str(peer)) return err diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index ac8da9844..8bf607679 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -484,7 +484,7 @@ class DownloadRequest(RequestHelper): blob_details.cancel_func, blob ) - log.debug("Requesting blob %s from %s", blob.blob_hash, self.peer) + log.info("Requesting blob %s from %s", blob.blob_hash, self.peer) return request def _handle_download_request(self, client_blob_request): @@ -508,10 +508,10 @@ class DownloadRequest(RequestHelper): self._download_failed, callbackArgs=(client_blob_request.blob,), ) - client_blob_request.finished_deferred.addBoth( - self._pay_or_cancel_payment, reserved_points, client_blob_request.blob) - client_blob_request.finished_deferred.addErrback( - _handle_download_error, self.peer, client_blob_request.blob) + client_blob_request.finished_deferred.addBoth(self._pay_or_cancel_payment, + reserved_points, client_blob_request.blob) + client_blob_request.finished_deferred.addErrback(_handle_download_error, self.peer, + client_blob_request.blob) def _pay_or_cancel_payment(self, arg, reserved_points, blob): if self._can_pay_peer(blob, arg): diff --git a/lbrynet/core/client/ConnectionManager.py b/lbrynet/core/client/ConnectionManager.py index ae04e8355..af15c509e 100644 --- a/lbrynet/core/client/ConnectionManager.py +++ b/lbrynet/core/client/ConnectionManager.py @@ -39,7 +39,7 @@ class ConnectionManager(object): # a deferred that gets fired when a _manage call is set self._manage_deferred = None self.stopped = True - log.info("%s initialized", self._get_log_name()) + log.debug("%s initialized", self._get_log_name()) # this identifies what the connection manager is for, # used for logging purposes only diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index eafa5b5c9..0993407b4 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -7,7 +7,6 @@ import requests import urllib import json import textwrap -import random import signal from copy import deepcopy from twisted.web import server @@ -20,6 +19,7 @@ from lbryschema.uri import parse_lbry_uri from lbryschema.error import URIParseError, DecodeError from lbryschema.validator import validate_claim_id from lbryschema.address import decode_address +from lbryschema.decode import smart_decode # TODO: importing this when internet is disabled raises a socket.gaierror from lbrynet.core.system_info import get_lbrynet_version @@ -48,6 +48,8 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError, NoSuchSDHash from lbrynet.core.Error import NoSuchStreamHash, DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError +from lbrynet.core.Peer import Peer +from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader log = logging.getLogger(__name__) @@ -190,7 +192,6 @@ class Daemon(AuthJSONRPCServer): self.use_upnp = conf.settings['use_upnp'] self.auto_renew_claim_height_delta = conf.settings['auto_renew_claim_height_delta'] - self.startup_status = STARTUP_STAGES[0] self.connected_to_internet = True self.connection_status_code = None @@ -295,10 +296,10 @@ class Daemon(AuthJSONRPCServer): for outpoint, result in results.iteritems(): if result['success']: log.info("Renewed claim at outpoint:%s claim ID:%s, paid fee:%s", - outpoint, result['claim_id'], result['fee']) + outpoint, result['claim_id'], result['fee']) else: log.info("Failed to renew claim at outpoint:%s, reason:%s", - outpoint, result['reason']) + outpoint, result['reason']) def _start_server(self): if self.peer_port is not None: @@ -667,10 +668,10 @@ class Daemon(AuthJSONRPCServer): self.analytics_manager.send_download_started(download_id, name, claim_dict) self.streams[sd_hash] = GetStream(self.sd_identifier, self.session, - self.exchange_rate_manager, self.max_key_fee, - self.disable_max_key_fee, - conf.settings['data_rate'], timeout, - file_name) + self.exchange_rate_manager, self.max_key_fee, + self.disable_max_key_fee, + conf.settings['data_rate'], timeout, + file_name) try: lbry_file, finished_deferred = yield self.streams[sd_hash].start(claim_dict, name) yield self.stream_info_manager.save_outpoint_to_file(lbry_file.rowid, txid, nout) @@ -951,6 +952,52 @@ class Daemon(AuthJSONRPCServer): d.addCallback(self.get_blobs_for_stream_hash) return d + def _get_single_peer_downloader(self): + downloader = SinglePeerDownloader() + downloader.setup(self.session.wallet) + return downloader + + @defer.inlineCallbacks + def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None): + if not downloader: + downloader = self._get_single_peer_downloader() + result = {} + search_timeout = search_timeout or conf.settings['peer_search_timeout'] + blob_timeout = blob_timeout or conf.settings['sd_download_timeout'] + is_available = False + reachable_peers = [] + unreachable_peers = [] + try: + peers = yield self.jsonrpc_peer_list(blob_hash, search_timeout) + peer_infos = [{"peer": Peer(x[0], x[1]), + "blob_hash": blob_hash, + "timeout": blob_timeout} for x in peers if x[2]] + dl = [] + dl_peers = [] + dl_results = [] + for peer_info in peer_infos: + d = downloader.download_temp_blob_from_peer(**peer_info) + dl.append(d) + dl_peers.append("%s:%i" % (peer_info['peer'].host, peer_info['peer'].port)) + for dl_peer, (success, download_result) in zip(dl_peers, + (yield defer.DeferredList(dl))): + if success: + if download_result: + reachable_peers.append(dl_peer) + else: + unreachable_peers.append(dl_peer) + dl_results.append(download_result) + is_available = any(dl_results) + except Exception as err: + result['error'] = "Failed to get peers for blob: %s" % err + + response = { + 'is_available': is_available, + 'reachable_peers': reachable_peers, + 'unreachable_peers': unreachable_peers, + } + defer.returnValue(response) + ############################################################################ # # # JSON-RPC API methods start here # @@ -1514,7 +1561,7 @@ class Daemon(AuthJSONRPCServer): for u in uris: try: parse_lbry_uri(u) - valid_uris += (u, ) + valid_uris += (u,) except URIParseError: results[u] = {"error": "%s is not a valid uri" % u} @@ -2099,7 +2146,7 @@ class Daemon(AuthJSONRPCServer): else: raise Exception("invalid outpoint") result = yield self.session.wallet.claim_renew(txid, nout) - result = {outpoint:result} + result = {outpoint: result} else: height = int(height) result = yield self.session.wallet.claim_renew_all_before_expiration(height) @@ -2247,7 +2294,7 @@ class Daemon(AuthJSONRPCServer): uris = tuple(uris) if uri is not None: - uris += (uri, ) + uris += (uri,) results = {} @@ -2260,7 +2307,7 @@ class Daemon(AuthJSONRPCServer): elif parsed.path: results[chan_uri] = {"error": "%s is a claim in a channel" % parsed.path} else: - valid_uris += (chan_uri, ) + valid_uris += (chan_uri,) except URIParseError: results[chan_uri] = {"error": "%s is not a valid uri" % chan_uri} @@ -2271,8 +2318,8 @@ class Daemon(AuthJSONRPCServer): results[u] = resolved[u] else: results[u] = { - 'claims_in_channel': resolved[u]['claims_in_channel'] - } + 'claims_in_channel': resolved[u]['claims_in_channel'] + } if page: results[u]['returned_page'] = page results[u]['claims_in_channel'] = resolved[u].get('claims_in_channel', []) @@ -2919,7 +2966,29 @@ class Daemon(AuthJSONRPCServer): result['node_id'] = self.session.dht_node.node_id.encode('hex') return self._render_response(result) - @defer.inlineCallbacks + def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None): + """ + Get blob availability + + Usage: + blob_availability (<blob_hash>) [<search_timeout> | --search_timeout=<search_timeout>] + [<blob_timeout> | --blob_timeout=<blob_timeout>] + + Options: + <search_timeout> : how long to search for peers for the blob in the dht + <blob_timeout> : how long to try downloading from a peer + + Returns: + (dict) { + "is_available": <bool, true if blob is available from a peer from peer list> + "reachable_peers": ["<ip>:<port>"], + "unreachable_peers": ["<ip>:<port>"] + } + """ + + return self._blob_availability(blob_hash, search_timeout, blob_timeout) + + @AuthJSONRPCServer.deprecated("stream_availability") def jsonrpc_get_availability(self, uri, sd_timeout=None, peer_timeout=None): """ Get stream availability for lbry uri @@ -2936,63 +3005,105 @@ class Daemon(AuthJSONRPCServer): (float) Peers per blob / total blobs """ - def _get_mean(blob_availabilities): - peer_counts = [] - for blob_availability in blob_availabilities: - for blob, peers in blob_availability.iteritems(): - peer_counts.append(peers) - if peer_counts: - return round(1.0 * sum(peer_counts) / len(peer_counts), 2) - else: - return 0.0 + return self.jsonrpc_stream_availability(uri, peer_timeout, sd_timeout) - def read_sd_blob(sd_blob): - sd_blob_file = sd_blob.open_for_reading() - decoded_sd_blob = json.loads(sd_blob_file.read()) - sd_blob_file.close() - return decoded_sd_blob + @defer.inlineCallbacks + def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): + """ + Get stream availability for lbry uri - resolved_result = yield self.session.wallet.resolve(uri) - if resolved_result and uri in resolved_result: - resolved = resolved_result[uri] - else: - defer.returnValue(None) + Usage: + stream_availability (<uri>) [<search_timeout> | --search_timeout=<search_timeout>] + [<blob_timeout> | --blob_timeout=<blob_timeout>] - if 'claim' in resolved: - metadata = resolved['claim']['value'] - else: - defer.returnValue(None) + Options: + <search_timeout> : how long to search for peers for the blob in the dht + <blob_timeout> : how long to try downloading from a peer + + Returns: + (dict) { + 'is_available': <bool>, + 'did_decode': <bool>, + 'did_resolve': <bool>, + 'is_stream': <bool>, + 'num_blobs_in_stream': <int>, + 'sd_hash': <str>, + 'sd_blob_availability': <dict> see `blob_availability`, + 'head_blob_hash': <str>, + 'head_blob_availability': <dict> see `blob_availability`, + 'use_upnp': <bool>, + 'upnp_redirect_is_set': <bool>, + 'error': <None> | <str> error message + } + """ + + search_timeout = search_timeout or conf.settings['peer_search_timeout'] + blob_timeout = blob_timeout or conf.settings['sd_download_timeout'] + + response = { + 'is_available': False, + 'did_decode': False, + 'did_resolve': False, + 'is_stream': False, + 'num_blobs_in_stream': None, + 'sd_hash': None, + 'sd_blob_availability': {}, + 'head_blob_hash': None, + 'head_blob_availability': {}, + 'use_upnp': conf.settings['use_upnp'], + 'upnp_redirect_is_set': len(self.session.upnp_redirects) > 0, + 'error': None + } - sd_hash = utils.get_sd_hash(metadata) - sd_timeout = sd_timeout or conf.settings['sd_download_timeout'] - peer_timeout = peer_timeout or conf.settings['peer_search_timeout'] - blobs = [] try: - blobs = yield self.get_blobs_for_sd_hash(sd_hash) - need_sd_blob = False - log.info("Already have sd blob") - except NoSuchSDHash: - need_sd_blob = True - log.info("Need sd blob") + resolved_result = yield self.session.wallet.resolve(uri) + response['did_resolve'] = True + except UnknownNameError: + response['error'] = "Failed to resolve name" + defer.returnValue(response) + except URIParseError: + response['error'] = "Invalid URI" + defer.returnValue(response) - blob_hashes = [blob.blob_hash for blob in blobs] - if need_sd_blob: - # we don't want to use self._download_descriptor here because it would create a stream - try: - sd_blob = yield self._download_blob(sd_hash, timeout=sd_timeout) - except Exception as err: - response = yield self._render_response(0.0) - log.warning(err) - defer.returnValue(response) - decoded = read_sd_blob(sd_blob) - blob_hashes = [blob.get("blob_hash") for blob in decoded['blobs'] - if blob.get("blob_hash")] - sample = random.sample(blob_hashes, min(len(blob_hashes), 5)) - log.info("check peers for %i of %i blobs in stream", len(sample), len(blob_hashes)) - availabilities = yield self.session.blob_tracker.get_availability_for_blobs(sample, - peer_timeout) - mean_availability = _get_mean(availabilities) - response = yield self._render_response(mean_availability) + try: + claim_obj = smart_decode(resolved_result[uri]['claim']['hex']) + response['did_decode'] = True + except DecodeError: + response['error'] = "Failed to decode claim value" + defer.returnValue(response) + + response['is_stream'] = claim_obj.is_stream + if not claim_obj.is_stream: + response['error'] = "Claim for \"%s\" does not contain a stream" % uri + defer.returnValue(response) + + sd_hash = claim_obj.source_hash + response['sd_hash'] = sd_hash + head_blob_hash = None + downloader = self._get_single_peer_downloader() + have_sd_blob = sd_hash in self.session.blob_manager.blobs + try: + sd_blob = yield self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, + encoding="json") + if not have_sd_blob: + yield self.jsonrpc_blob_delete(sd_hash) + if sd_blob and 'blobs' in sd_blob: + response['num_blobs_in_stream'] = len(sd_blob['blobs']) - 1 + head_blob_hash = sd_blob['blobs'][0]['blob_hash'] + head_blob_availability = yield self._blob_availability(head_blob_hash, + search_timeout, + blob_timeout, + downloader) + response['head_blob_availability'] = head_blob_availability + except Exception as err: + response['error'] = err + response['head_blob_hash'] = head_blob_hash + response['sd_blob_availability'] = yield self._blob_availability(sd_hash, + search_timeout, + blob_timeout, + downloader) + response['is_available'] = response['sd_blob_availability'].get('is_available') and \ + response['head_blob_availability'].get('is_available') defer.returnValue(response) @defer.inlineCallbacks