forked from LBRYCommunity/lbry-sdk
8a7e707504
Periodically check if the routing table is empty and, if so, re-attempt joining the DHT network. This check is performed in the main DHTPeerFinder loop every 60 secs. Closes: #1093 Signed-off-by: Antonio Quartulli <antonio@mandelbit.com>
87 lines
2.8 KiB
Python
87 lines
2.8 KiB
Python
import binascii
|
|
import logging
|
|
|
|
from zope.interface import implements
|
|
from twisted.internet import defer, reactor
|
|
from lbrynet.interfaces import IPeerFinder
|
|
from lbrynet.core.utils import short_hash
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class DHTPeerFinder(object):
|
|
"""This class finds peers which have announced to the DHT that they have certain blobs"""
|
|
implements(IPeerFinder)
|
|
|
|
def __init__(self, dht_node, peer_manager):
|
|
"""
|
|
dht_node - an instance of dht.Node class
|
|
peer_manager - an instance of PeerManager class
|
|
"""
|
|
self.dht_node = dht_node
|
|
self.peer_manager = peer_manager
|
|
self.peers = []
|
|
self.next_manage_call = None
|
|
|
|
def run_manage_loop(self):
|
|
self._manage_peers()
|
|
self.next_manage_call = reactor.callLater(60, self.run_manage_loop)
|
|
|
|
def stop(self):
|
|
log.info("Stopping DHT peer finder.")
|
|
if self.next_manage_call is not None and self.next_manage_call.active():
|
|
self.next_manage_call.cancel()
|
|
self.next_manage_call = None
|
|
|
|
@defer.inlineCallbacks
|
|
def _manage_peers(self):
|
|
"""
|
|
If we don't know any active peer, let's try to reconnect to the preconfigured
|
|
known DHT nodes
|
|
"""
|
|
if not self.dht_node.hasContacts():
|
|
log.info("No active peer. Re-attempt joining DHT")
|
|
yield self.dht_node.join_dht()
|
|
|
|
@defer.inlineCallbacks
|
|
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):
|
|
"""
|
|
Find peers for blob in the DHT
|
|
blob_hash (str): blob hash to look for
|
|
timeout (int): seconds to timeout after
|
|
filter_self (bool): if True, and if a peer for a blob is itself, filter it
|
|
from the result
|
|
|
|
Returns:
|
|
list of peers for the blob
|
|
"""
|
|
def _trigger_timeout():
|
|
if not finished_deferred.called:
|
|
log.debug("Peer search for %s timed out", short_hash(blob_hash))
|
|
finished_deferred.cancel()
|
|
|
|
bin_hash = binascii.unhexlify(blob_hash)
|
|
finished_deferred = self.dht_node.getPeersForBlob(bin_hash)
|
|
|
|
if timeout is not None:
|
|
reactor.callLater(timeout, _trigger_timeout)
|
|
|
|
try:
|
|
peer_list = yield finished_deferred
|
|
except defer.CancelledError:
|
|
peer_list = []
|
|
|
|
peers = set(peer_list)
|
|
good_peers = []
|
|
for host, port in peers:
|
|
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
|
continue
|
|
peer = self.peer_manager.get_peer(host, port)
|
|
if peer.is_available() is True:
|
|
good_peers.append(peer)
|
|
|
|
defer.returnValue(good_peers)
|
|
|
|
def get_most_popular_hashes(self, num_to_return):
|
|
return self.dht_node.get_most_popular_hashes(num_to_return)
|