forked from LBRYCommunity/lbry-sdk
Merge branch 'daemon-dht-commands'
This commit is contained in:
commit
ad6a2ccc1e
3 changed files with 61 additions and 9 deletions
|
@ -42,11 +42,13 @@ at anytime.
|
|||
* if the `use_authentication` setting is configured, use authentication for all api methods instead of only those with the `auth_required` decorator
|
||||
* regenerate api keys on startup if the using authentication
|
||||
* support both positional and keyword args for api calls
|
||||
* `peer_list` to return a list of dictionaries instead of a list of lists, added peer node ids to the results
|
||||
|
||||
### Added
|
||||
* virtual kademlia network and mock udp transport for dht integration tests
|
||||
* integration tests for bootstrapping the dht
|
||||
* configurable `concurrent_announcers` setting
|
||||
* `peer_ping` command
|
||||
|
||||
### Removed
|
||||
* `announce_all` argument from `blob_announce`
|
||||
|
|
|
@ -48,6 +48,7 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory
|
|||
from lbrynet.core.Error import InsufficientFundsError, UnknownNameError
|
||||
from lbrynet.core.Error import DownloadDataTimeout, DownloadSDTimeout
|
||||
from lbrynet.core.Error import NullFundsError, NegativeFundsError
|
||||
from lbrynet.dht.error import TimeoutError
|
||||
from lbrynet.core.Peer import Peer
|
||||
from lbrynet.core.SinglePeerDownloader import SinglePeerDownloader
|
||||
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
|
||||
|
@ -2849,6 +2850,7 @@ class Daemon(AuthJSONRPCServer):
|
|||
response = yield self._render_response("Deleted %s" % blob_hash)
|
||||
defer.returnValue(response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_peer_list(self, blob_hash, timeout=None):
|
||||
"""
|
||||
Get peers for blob hash
|
||||
|
@ -2861,15 +2863,32 @@ class Daemon(AuthJSONRPCServer):
|
|||
--timeout=<timeout> : (int) peer search timeout in seconds
|
||||
|
||||
Returns:
|
||||
(list) List of contacts
|
||||
(list) List of contact dictionaries {'host': <peer ip>, 'port': <peer port>, 'node_id': <peer node id>}
|
||||
"""
|
||||
|
||||
timeout = timeout or conf.settings['peer_search_timeout']
|
||||
if not utils.is_valid_blobhash(blob_hash):
|
||||
raise Exception("invalid blob hash")
|
||||
|
||||
d = self.session.peer_finder.find_peers_for_blob(blob_hash, timeout=timeout)
|
||||
d.addCallback(lambda r: [[c.host, c.port, c.is_available()] for c in r])
|
||||
d.addCallback(lambda r: self._render_response(r))
|
||||
return d
|
||||
finished_deferred = self.session.dht_node.getPeersForBlob(binascii.unhexlify(blob_hash), True)
|
||||
|
||||
def _trigger_timeout():
|
||||
if not finished_deferred.called:
|
||||
log.debug("Peer search for %s timed out", blob_hash)
|
||||
finished_deferred.cancel()
|
||||
|
||||
timeout = timeout or conf.settings['peer_search_timeout']
|
||||
self.session.dht_node.reactor_callLater(timeout, _trigger_timeout)
|
||||
|
||||
peers = yield finished_deferred
|
||||
results = [
|
||||
{
|
||||
"host": host,
|
||||
"port": port,
|
||||
"node_id": node_id
|
||||
}
|
||||
for host, port, node_id in peers
|
||||
]
|
||||
defer.returnValue(results)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):
|
||||
|
@ -3040,6 +3059,32 @@ class Daemon(AuthJSONRPCServer):
|
|||
d.addCallback(lambda r: self._render_response(r))
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def jsonrpc_peer_ping(self, node_id):
|
||||
"""
|
||||
Find and ping a peer by node id
|
||||
|
||||
Usage:
|
||||
peer_ping (<node_id> | --node_id=<node_id>)
|
||||
|
||||
Returns:
|
||||
(str) pong, or {'error': <error message>} if an error is encountered
|
||||
"""
|
||||
|
||||
contact = None
|
||||
try:
|
||||
contact = yield self.session.dht_node.findContact(node_id.decode('hex'))
|
||||
except TimeoutError:
|
||||
result = {'error': 'timeout finding peer'}
|
||||
defer.returnValue(result)
|
||||
if not contact:
|
||||
defer.returnValue({'error': 'peer not found'})
|
||||
try:
|
||||
result = yield contact.ping()
|
||||
except TimeoutError:
|
||||
result = {'error': 'ping timeout'}
|
||||
defer.returnValue(result)
|
||||
|
||||
def jsonrpc_routing_table_get(self):
|
||||
"""
|
||||
Get DHT routing information
|
||||
|
|
|
@ -249,7 +249,7 @@ class Node(object):
|
|||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def getPeersForBlob(self, blob_hash):
|
||||
def getPeersForBlob(self, blob_hash, include_node_ids=False):
|
||||
result = yield self.iterativeFindValue(blob_hash)
|
||||
expanded_peers = []
|
||||
if result:
|
||||
|
@ -257,8 +257,13 @@ class Node(object):
|
|||
for peer in result[blob_hash]:
|
||||
host = ".".join([str(ord(d)) for d in peer[:4]])
|
||||
port, = struct.unpack('>H', peer[4:6])
|
||||
if (host, port) not in expanded_peers:
|
||||
expanded_peers.append((host, port))
|
||||
if not include_node_ids:
|
||||
if (host, port) not in expanded_peers:
|
||||
expanded_peers.append((host, port))
|
||||
else:
|
||||
peer_node_id = peer[6:].encode('hex')
|
||||
if (host, port, peer_node_id) not in expanded_peers:
|
||||
expanded_peers.append((host, port, peer_node_id))
|
||||
defer.returnValue(expanded_peers)
|
||||
|
||||
def get_most_popular_hashes(self, num_to_return):
|
||||
|
|
Loading…
Reference in a new issue