diff --git a/lbrynet/core/server/DHTHashAnnouncer.py b/lbrynet/core/server/DHTHashAnnouncer.py index 8bef7b177..877b0538c 100644 --- a/lbrynet/core/server/DHTHashAnnouncer.py +++ b/lbrynet/core/server/DHTHashAnnouncer.py @@ -2,8 +2,9 @@ import binascii import collections import logging import time +import datetime -from twisted.internet import defer +from twisted.internet import defer, task from lbrynet.core import utils log = logging.getLogger(__name__) @@ -14,6 +15,8 @@ class DHTHashAnnouncer(object): CONCURRENT_ANNOUNCERS = 5 """This class announces to the DHT that this peer has certain blobs""" + STORE_RETRIES = 3 + def __init__(self, dht_node, peer_port): self.dht_node = dht_node self.peer_port = peer_port @@ -21,17 +24,42 @@ class DHTHashAnnouncer(object): self.next_manage_call = None self.hash_queue = collections.deque() self._concurrent_announcers = 0 + self._manage_call_lc = task.LoopingCall(self.manage_lc) + self._lock = utils.DeferredLockContextManager(defer.DeferredLock()) + self._last_checked = time.time(), self.CONCURRENT_ANNOUNCERS + self._retries = {} + self._total = None def run_manage_loop(self): + log.info("Starting hash announcer") + if not self._manage_call_lc.running: + self._manage_call_lc.start(self.ANNOUNCE_CHECK_INTERVAL) + + def manage_lc(self): + last_time, last_hashes = self._last_checked + hashes = len(self.hash_queue) + if hashes: + t, h = time.time() - last_time, last_hashes - hashes + blobs_per_second = float(h) / float(t) + if blobs_per_second > 0: + estimated_time_remaining = int(float(hashes) / blobs_per_second) + remaining = str(datetime.timedelta(seconds=estimated_time_remaining)) + else: + remaining = "unknown" + log.info("Announcing blobs: %i blobs left to announce, %i%s complete, " + "est time remaining: %s", hashes + self._concurrent_announcers, + 100 - int(100.0 * float(hashes + self._concurrent_announcers) / + float(self._total)), "%", remaining) + self._last_checked = t + last_time, hashes + else: + self._total = 0 if self.peer_port is not None: - self._announce_available_hashes() - self.next_manage_call = utils.call_later(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop) + return self._announce_available_hashes() def stop(self): log.info("Stopping DHT hash announcer.") - if self.next_manage_call is not None: - self.next_manage_call.cancel() - self.next_manage_call = None + if self._manage_call_lc.running: + self._manage_call_lc.stop() def add_supplier(self, supplier): self.suppliers.append(supplier) @@ -45,60 +73,101 @@ class DHTHashAnnouncer(object): def hash_queue_size(self): return len(self.hash_queue) + @defer.inlineCallbacks def _announce_available_hashes(self): log.debug('Announcing available hashes') - ds = [] for supplier in self.suppliers: - d = supplier.hashes_to_announce() - d.addCallback(self._announce_hashes) - ds.append(d) - dl = defer.DeferredList(ds) - return dl + hashes = yield supplier.hashes_to_announce() + yield self._announce_hashes(hashes) + @defer.inlineCallbacks def _announce_hashes(self, hashes, immediate=False): if not hashes: - return - log.debug('Announcing %s hashes', len(hashes)) + defer.returnValue(None) + if not self.dht_node.can_store: + log.warning("Client only DHT node cannot store, skipping announce") + defer.returnValue(None) + log.info('Announcing %s hashes', len(hashes)) # TODO: add a timeit decorator start = time.time() - ds = [] - for h in hashes: - announce_deferred = defer.Deferred() - ds.append(announce_deferred) - if immediate: - self.hash_queue.appendleft((h, announce_deferred)) - else: - self.hash_queue.append((h, announce_deferred)) + ds = [] + with self._lock: + for h in hashes: + announce_deferred = defer.Deferred() + if immediate: + self.hash_queue.appendleft((h, announce_deferred)) + else: + self.hash_queue.append((h, announce_deferred)) + if not self._total: + self._total = len(hashes) + log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size()) - def announce(): + @defer.inlineCallbacks + def do_store(blob_hash, announce_d): + if announce_d.called: + defer.returnValue(announce_deferred.result) + try: + store_nodes = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash)) + if not store_nodes: + retries = self._retries.get(blob_hash, 0) + retries += 1 + self._retries[blob_hash] = retries + if retries <= self.STORE_RETRIES: + log.debug("No nodes stored %s, retrying", blob_hash) + result = yield do_store(blob_hash, announce_d) + else: + log.warning("No nodes stored %s", blob_hash) + else: + result = store_nodes + if not announce_d.called: + announce_d.callback(result) + defer.returnValue(result) + except Exception as err: + if not announce_d.called: + announce_d.errback(err) + raise err + + @defer.inlineCallbacks + def announce(progress=None): + progress = progress or {} if len(self.hash_queue): - h, announce_deferred = self.hash_queue.popleft() - log.debug('Announcing blob %s to dht', h) - d = self.dht_node.announceHaveBlob(binascii.unhexlify(h)) - d.chainDeferred(announce_deferred) - d.addBoth(lambda _: utils.call_later(0, announce)) + with self._lock: + h, announce_deferred = self.hash_queue.popleft() + log.debug('Announcing blob %s to dht', h[:16]) + stored_to_nodes = yield do_store(h, announce_deferred) + progress[h] = stored_to_nodes + log.debug("Stored %s to %i peers (hashes announced by this announcer: %i)", + h.encode('hex')[:16], + len(stored_to_nodes), len(progress)) + + yield announce(progress) else: - self._concurrent_announcers -= 1 + with self._lock: + self._concurrent_announcers -= 1 + defer.returnValue(progress) for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS): self._concurrent_announcers += 1 - announce() - d = defer.DeferredList(ds) - d.addCallback(lambda _: log.debug('Took %s seconds to announce %s hashes', - time.time() - start, len(hashes))) - return d + ds.append(announce()) + announcer_results = yield defer.DeferredList(ds) + stored_to = {} + for _, announced_to in announcer_results: + stored_to.update(announced_to) + log.info('Took %s seconds to announce %s hashes', time.time() - start, len(hashes)) + defer.returnValue(stored_to) class DHTHashSupplier(object): # 1 hour is the min time hash will be reannounced - MIN_HASH_REANNOUNCE_TIME = 60*60 + MIN_HASH_REANNOUNCE_TIME = 60 * 60 # conservative assumption of the time it takes to announce # a single hash SINGLE_HASH_ANNOUNCE_DURATION = 5 """Classes derived from this class give hashes to a hash announcer""" + def __init__(self, announcer): if announcer is not None: announcer.add_supplier(self) @@ -107,7 +176,6 @@ class DHTHashSupplier(object): def hashes_to_announce(self): pass - def get_next_announce_time(self, num_hashes_to_announce=1): """ Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, @@ -121,9 +189,7 @@ class DHTHashSupplier(object): Returns: timestamp for next announce time """ - queue_size = self.hash_announcer.hash_queue_size()+num_hashes_to_announce + queue_size = self.hash_announcer.hash_queue_size() + num_hashes_to_announce reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, - queue_size*self.SINGLE_HASH_ANNOUNCE_DURATION) + queue_size * self.SINGLE_HASH_ANNOUNCE_DURATION) return time.time() + reannounce - - diff --git a/lbrynet/core/utils.py b/lbrynet/core/utils.py index ae67c9885..ce0d433f2 100644 --- a/lbrynet/core/utils.py +++ b/lbrynet/core/utils.py @@ -148,6 +148,17 @@ def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) +class DeferredLockContextManager(object): + def __init__(self, lock): + self._lock = lock + + def __enter__(self): + yield self._lock.aquire() + + def __exit__(self, exc_type, exc_val, exc_tb): + yield self._lock.release() + + @defer.inlineCallbacks def DeferredDict(d, consumeErrors=False): keys = [] diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index a4f2acf3c..bbf75471c 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -117,10 +117,17 @@ class Node(object): self.peerPort = peerPort self.hash_watcher = HashWatcher() + # will be used later + self._can_store = True + def __del__(self): if self._listeningPort is not None: self._listeningPort.stopListening() + @property + def can_store(self): + return self._can_store is True + def stop(self): # stop LoopingCalls: if self.refresh_node_lc.running: @@ -252,20 +259,7 @@ class Node(object): def iterativeAnnounceHaveBlob(self, blob_hash, value): known_nodes = {} - def log_error(err, n): - if err.check(protocol.TimeoutError): - log.debug( - "Timeout while storing blob_hash %s at %s", - binascii.hexlify(blob_hash), n) - else: - log.error( - "Unexpected error while storing blob_hash %s at %s: %s", - binascii.hexlify(blob_hash), n, err.getErrorMessage()) - - def log_success(res): - log.debug("Response to store request: %s", str(res)) - return res - + @defer.inlineCallbacks def announce_to_peer(responseTuple): """ @type responseMsg: kademlia.msgtypes.ResponseMessage """ # The "raw response" tuple contains the response message, @@ -274,40 +268,65 @@ class Node(object): originAddress = responseTuple[1] # tuple: (ip adress, udp port) # Make sure the responding node is valid, and abort the operation if it isn't if not responseMsg.nodeID in known_nodes: - return responseMsg.nodeID - + log.warning("Responding node was not expected") + defer.returnValue(responseMsg.nodeID) n = known_nodes[responseMsg.nodeID] result = responseMsg.response + announced = False if 'token' in result: value['token'] = result['token'] - d = n.store(blob_hash, value, self.node_id, 0) - d.addCallback(log_success) - d.addErrback(log_error, n) + try: + res = yield n.store(blob_hash, value, self.node_id) + log.debug("Response to store request: %s", str(res)) + announced = True + except protocol.TimeoutError: + log.debug("Timeout while storing blob_hash %s at %s", + blob_hash.encode('hex')[:16], n.id.encode('hex')) + except Exception as err: + log.error("Unexpected error while storing blob_hash %s at %s: %s", + blob_hash.encode('hex')[:16], n.id.encode('hex'), err) else: - d = defer.succeed(False) - return d + log.warning("missing token") + defer.returnValue(announced) + @defer.inlineCallbacks def requestPeers(contacts): if self.externalIP is not None and len(contacts) >= constants.k: is_closer = Distance(blob_hash).is_closer(self.node_id, contacts[-1].id) if is_closer: contacts.pop() - self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) + yield self.store(blob_hash, value, originalPublisherID=self.node_id, + self_store=True) elif self.externalIP is not None: - self.store(blob_hash, value, self_store=True, originalPublisherID=self.node_id) - ds = [] + yield self.store(blob_hash, value, originalPublisherID=self.node_id, + self_store=True) + else: + raise Exception("Cannot determine external IP: %s" % self.externalIP) + + contacted = [] for contact in contacts: known_nodes[contact.id] = contact rpcMethod = getattr(contact, "findValue") - df = rpcMethod(blob_hash, rawResponse=True) - df.addCallback(announce_to_peer) - df.addErrback(log_error, contact) - ds.append(df) - return defer.DeferredList(ds) + try: + response = yield rpcMethod(blob_hash, rawResponse=True) + stored = yield announce_to_peer(response) + if stored: + contacted.append(contact) + except protocol.TimeoutError: + log.debug("Timeout while storing blob_hash %s at %s", + binascii.hexlify(blob_hash), contact) + except Exception as err: + log.error("Unexpected error while storing blob_hash %s at %s: %s", + binascii.hexlify(blob_hash), contact, err) + log.debug("Stored %s to %i of %i attempted peers", blob_hash.encode('hex')[:16], + len(contacted), len(contacts)) + + contacted_node_ids = [c.id.encode('hex') for c in contacts] + defer.returnValue(contacted_node_ids) d = self.iterativeFindNode(blob_hash) - d.addCallbacks(requestPeers) + d.addCallback(requestPeers) return d def change_token(self): @@ -638,28 +657,13 @@ class Node(object): self._dataStore.removeExpiredPeers() defer.returnValue(None) + @defer.inlineCallbacks def _refreshRoutingTable(self): nodeIDs = self._routingTable.getRefreshList(0, False) - outerDf = defer.Deferred() - - def searchForNextNodeID(dfResult=None): - if len(nodeIDs) > 0: - searchID = nodeIDs.pop() - df = self.iterativeFindNode(searchID) - df.addCallback(searchForNextNodeID) - else: - # If this is reached, we have finished refreshing the routing table - outerDf.callback(None) - - # Start the refreshing cycle - searchForNextNodeID() - return outerDf - - - # args put here because _refreshRoutingTable does outerDF.callback(None) - def _removeExpiredPeers(self, *args): - df = threads.deferToThread(self._dataStore.removeExpiredPeers) - return df + while nodeIDs: + searchID = nodeIDs.pop() + yield self.iterativeFindNode(searchID) + defer.returnValue(None) # This was originally a set of nested methods in _iterativeFind diff --git a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py index 60021ffc9..0802cb731 100644 --- a/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/lbrynet/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -1,21 +1,30 @@ from twisted.trial import unittest -from twisted.internet import defer, task +from twisted.internet import defer, task, reactor from lbrynet.core import utils from lbrynet.tests.util import random_lbry_hash +from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer + class MocDHTNode(object): def __init__(self): + self.can_store = True self.blobs_announced = 0 + @defer.inlineCallbacks def announceHaveBlob(self, blob): self.blobs_announced += 1 - return defer.succeed(True) + d = defer.Deferred(None) + reactor.callLater(1, d.callback, {blob: ["ab" * 48]}) + result = yield d + defer.returnValue(result) + class MocSupplier(object): def __init__(self, blobs_to_announce): self.blobs_to_announce = blobs_to_announce self.announced = False + def hashes_to_announce(self): if not self.announced: self.announced = True @@ -23,8 +32,8 @@ class MocSupplier(object): else: return defer.succeed([]) -class DHTHashAnnouncerTest(unittest.TestCase): +class DHTHashAnnouncerTest(unittest.TestCase): def setUp(self): self.num_blobs = 10 self.blobs_to_announce = [] @@ -33,23 +42,26 @@ class DHTHashAnnouncerTest(unittest.TestCase): self.clock = task.Clock() self.dht_node = MocDHTNode() utils.call_later = self.clock.callLater - from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333) self.supplier = MocSupplier(self.blobs_to_announce) self.announcer.add_supplier(self.supplier) + @defer.inlineCallbacks def test_basic(self): - self.announcer._announce_available_hashes() + d = self.announcer._announce_available_hashes() self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS) self.clock.advance(1) + yield d self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) self.assertEqual(self.announcer.hash_queue_size(), 0) + @defer.inlineCallbacks def test_immediate_announce(self): # Test that immediate announce puts a hash at the front of the queue - self.announcer._announce_available_hashes() + d = self.announcer._announce_available_hashes() + self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS) blob_hash = random_lbry_hash() self.announcer.immediate_announce([blob_hash]) self.assertEqual(self.announcer.hash_queue_size(), self.announcer.CONCURRENT_ANNOUNCERS+1) self.assertEqual(blob_hash, self.announcer.hash_queue[0][0]) - + yield d