diff --git a/CHANGELOG.md b/CHANGELOG.md index 6281c5236..bd95f777a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,11 +42,13 @@ at anytime. * if the `use_authentication` setting is configured, use authentication for all api methods instead of only those with the `auth_required` decorator * regenerate api keys on startup if the using authentication * support both positional and keyword args for api calls + * `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results ### Added * virtual kademlia network and mock udp transport for dht integration tests * integration tests for bootstrapping the dht * configurable `concurrent_announcers` setting + * `peer_ping` command ### Removed * `announce_all` argument from `blob_announce` diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 44a87200c..4b9b8643a 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -48,6 +48,7 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.core.Error import InsufficientFundsError, UnknownNameError from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout from lbrynet.core.Error import NullFundsError, NegativeFundsError +from lbrynet.dht.error import TimeoutError from lbrynet.core.Peer import Peer from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader @@ -2849,6 +2850,7 @@ class Daemon(AuthJSONRPCServer): response = yield self._render_response("Deleted %s" % blob_hash) defer.returnValue(response) + @defer.inlineCallbacks def jsonrpc_peer_list(self, blob_hash, timeout=None): """ Get peers for blob hash @@ -2861,15 +2863,32 @@ class Daemon(AuthJSONRPCServer): --timeout= : (int) peer search timeout in seconds Returns: - (list) List of contacts + (list) List of contact dictionaries {'host': , 'port': , 'node_id': } """ - timeout = timeout or conf.settings['peer_search_timeout'] + if not utils.is_valid_blobhash(blob_hash): + raise Exception("invalid blob hash") - d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout) - d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r]) - d.addCallback(lambda r: self._render_response(r)) - return d + finished_deferred = self.session.dht_node.getPeersForBlob(binascii.unhexlify(blob_hash), True) + + def _trigger_timeout(): + if not finished_deferred.called: + log.debug("Peer search for %s timed out", blob_hash) + finished_deferred.cancel() + + timeout = timeout or conf.settings['peer_search_timeout'] + self.session.dht_node.reactor_callLater(timeout, _trigger_timeout) + + peers = yield finished_deferred + results = [ + { + "host": host, + "port": port, + "node_id": node_id + } + for host, port, node_id in peers + ] + defer.returnValue(results) @defer.inlineCallbacks def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None): @@ -3040,6 +3059,32 @@ class Daemon(AuthJSONRPCServer): d.addCallback(lambda r: self._render_response(r)) return d + @defer.inlineCallbacks + def jsonrpc_peer_ping(self, node_id): + """ + Find and ping a peer by node id + + Usage: + peer_ping ( | --node_id=) + + Returns: + (str) pong, or {'error': } if an error is encountered + """ + + contact = None + try: + contact = yield self.session.dht_node.findContact(node_id.decode('hex')) + except TimeoutError: + result = {'error': 'timeout finding peer'} + defer.returnValue(result) + if not contact: + defer.returnValue({'error': 'peer not found'}) + try: + result = yield contact.ping() + except TimeoutError: + result = {'error': 'ping timeout'} + defer.returnValue(result) + def jsonrpc_routing_table_get(self): """ Get DHT routing information diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 0e5c980ab..9ad105c1e 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -249,7 +249,7 @@ class Node(object): ) @defer.inlineCallbacks - def getPeersForBlob(self, blob_hash): + def getPeersForBlob(self, blob_hash, include_node_ids=False): result = yield self.iterativeFindValue(blob_hash) expanded_peers = [] if result: @@ -257,8 +257,13 @@ class Node(object): for peer in result[blob_hash]: host = ".".join([str(ord(d)) for d in peer[:4]]) port, = struct.unpack('>H', peer[4:6]) - if (host, port) not in expanded_peers: - expanded_peers.append((host, port)) + if not include_node_ids: + if (host, port) not in expanded_peers: + expanded_peers.append((host, port)) + else: + peer_node_id = peer[6:].encode('hex') + if (host, port, peer_node_id) not in expanded_peers: + expanded_peers.append((host, port, peer_node_id)) defer.returnValue(expanded_peers) def get_most_popular_hashes(self, num_to_return):