forked from LBRYCommunity/lbry-sdk
remove popular hash tracking, simplify DHTPeerFinder
This commit is contained in:
parent
e6ffd7caf8
commit
4bd9f3bd68
3 changed files with 10 additions and 54 deletions
|
@ -23,18 +23,14 @@ class BlobAvailabilityTracker(object):
|
||||||
self._blob_manager = blob_manager
|
self._blob_manager = blob_manager
|
||||||
self._peer_finder = peer_finder
|
self._peer_finder = peer_finder
|
||||||
self._dht_node = dht_node
|
self._dht_node = dht_node
|
||||||
self._check_popular = LoopingCall(self._update_most_popular)
|
|
||||||
self._check_mine = LoopingCall(self._update_mine)
|
self._check_mine = LoopingCall(self._update_mine)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
log.info("Starting blob availability tracker.")
|
log.info("Starting blob availability tracker.")
|
||||||
self._check_popular.start(600)
|
|
||||||
self._check_mine.start(600)
|
self._check_mine.start(600)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
log.info("Stopping blob availability tracker.")
|
log.info("Stopping blob availability tracker.")
|
||||||
if self._check_popular.running:
|
|
||||||
self._check_popular.stop()
|
|
||||||
if self._check_mine.running:
|
if self._check_mine.running:
|
||||||
self._check_mine.stop()
|
self._check_mine.stop()
|
||||||
|
|
||||||
|
@ -68,17 +64,6 @@ class BlobAvailabilityTracker(object):
|
||||||
d.addCallback(lambda peers: _save_peer_info(blob, peers))
|
d.addCallback(lambda peers: _save_peer_info(blob, peers))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_most_popular(self):
|
|
||||||
dl = []
|
|
||||||
for (hash, _) in self._dht_node.get_most_popular_hashes(10):
|
|
||||||
encoded = hash.encode('hex')
|
|
||||||
dl.append(self._update_peers_for_blob(encoded))
|
|
||||||
return defer.DeferredList(dl)
|
|
||||||
|
|
||||||
def _update_most_popular(self):
|
|
||||||
d = self._get_most_popular()
|
|
||||||
d.addCallback(lambda _: self._set_mean_peers())
|
|
||||||
|
|
||||||
def _update_mine(self):
|
def _update_mine(self):
|
||||||
def _get_peers(blobs):
|
def _get_peers(blobs):
|
||||||
dl = []
|
dl = []
|
||||||
|
|
|
@ -261,9 +261,6 @@ class Node(object):
|
||||||
expanded_peers.append((host, port, peer_node_id))
|
expanded_peers.append((host, port, peer_node_id))
|
||||||
defer.returnValue(expanded_peers)
|
defer.returnValue(expanded_peers)
|
||||||
|
|
||||||
def get_most_popular_hashes(self, num_to_return):
|
|
||||||
return self.hash_watcher.most_popular_hashes(num_to_return)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
def iterativeAnnounceHaveBlob(self, blob_hash, value):
|
||||||
known_nodes = {}
|
known_nodes = {}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import logging
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from lbrynet.interfaces import IPeerFinder
|
from lbrynet.interfaces import IPeerFinder
|
||||||
from lbrynet.core.utils import short_hash
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
@ -13,18 +12,9 @@ log = logging.getLogger(__name__)
|
||||||
class DummyPeerFinder(object):
|
class DummyPeerFinder(object):
|
||||||
"""This class finds peers which have announced to the DHT that they have certain blobs"""
|
"""This class finds peers which have announced to the DHT that they have certain blobs"""
|
||||||
|
|
||||||
def run_manage_loop(self):
|
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True):
|
||||||
pass
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def find_peers_for_blob(self, blob_hash):
|
|
||||||
return defer.succeed([])
|
return defer.succeed([])
|
||||||
|
|
||||||
def get_most_popular_hashes(self, num_to_return):
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
class DHTPeerFinder(DummyPeerFinder):
|
class DHTPeerFinder(DummyPeerFinder):
|
||||||
"""This class finds peers which have announced to the DHT that they have certain blobs"""
|
"""This class finds peers which have announced to the DHT that they have certain blobs"""
|
||||||
|
@ -39,11 +29,8 @@ class DHTPeerFinder(DummyPeerFinder):
|
||||||
self.peer_manager = peer_manager
|
self.peer_manager = peer_manager
|
||||||
self.peers = []
|
self.peers = []
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=False):
|
def find_peers_for_blob(self, blob_hash, timeout=None, filter_self=True):
|
||||||
"""
|
"""
|
||||||
Find peers for blob in the DHT
|
Find peers for blob in the DHT
|
||||||
blob_hash (str): blob hash to look for
|
blob_hash (str): blob hash to look for
|
||||||
|
@ -54,32 +41,19 @@ class DHTPeerFinder(DummyPeerFinder):
|
||||||
Returns:
|
Returns:
|
||||||
list of peers for the blob
|
list of peers for the blob
|
||||||
"""
|
"""
|
||||||
def _trigger_timeout():
|
|
||||||
if not finished_deferred.called:
|
|
||||||
log.debug("Peer search for %s timed out", short_hash(blob_hash))
|
|
||||||
finished_deferred.cancel()
|
|
||||||
|
|
||||||
bin_hash = binascii.unhexlify(blob_hash)
|
bin_hash = binascii.unhexlify(blob_hash)
|
||||||
finished_deferred = self.dht_node.getPeersForBlob(bin_hash)
|
finished_deferred = self.dht_node.iterativeFindValue(bin_hash)
|
||||||
|
if timeout:
|
||||||
if timeout is not None:
|
finished_deferred.addTimeout(timeout, self.dht_node.clock)
|
||||||
self.dht_node.reactor_callLater(timeout, _trigger_timeout)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
peer_list = yield finished_deferred
|
peer_list = yield finished_deferred
|
||||||
except defer.CancelledError:
|
except defer.TimeoutError:
|
||||||
peer_list = []
|
peer_list = []
|
||||||
|
|
||||||
peers = set(peer_list)
|
peers = set(peer_list)
|
||||||
good_peers = []
|
results = []
|
||||||
for host, port in peers:
|
for node_id, host, port in peers:
|
||||||
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
if filter_self and (host, port) == (self.dht_node.externalIP, self.dht_node.peerPort):
|
||||||
continue
|
continue
|
||||||
peer = self.peer_manager.get_peer(host, port)
|
results.append(self.peer_manager.get_peer(host, port))
|
||||||
if peer.is_available() is True:
|
defer.returnValue(results)
|
||||||
good_peers.append(peer)
|
|
||||||
|
|
||||||
defer.returnValue(good_peers)
|
|
||||||
|
|
||||||
def get_most_popular_hashes(self, num_to_return):
|
|
||||||
return self.dht_node.get_most_popular_hashes(num_to_return)
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue