diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 4cf9ef4b0..97faf1d91 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -128,7 +128,7 @@ class DiskBlobManager(BlobManager): def blob_completed(self, blob, next_announce_time=None): if next_announce_time is None: - next_announce_time = time.time() + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time() d = self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) d.addCallback(lambda _: self._immediate_announce([blob.blob_hash])) return d @@ -137,8 +137,7 @@ class DiskBlobManager(BlobManager): return self._completed_blobs(blobhashes_to_check) def hashes_to_announce(self): - next_announce_time = time.time() + self.hash_reannounce_time - return self._get_blobs_to_announce(next_announce_time) + return self._get_blobs_to_announce() def creator_finished(self, blob_creator): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) @@ -148,7 +147,7 @@ class DiskBlobManager(BlobManager): new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, True, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob self._immediate_announce([blob_creator.blob_hash]) - next_announce_time = time.time() + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time) return d @@ -283,7 +282,7 @@ class DiskBlobManager(BlobManager): (blob, timestamp)) @rerun_if_locked - def _get_blobs_to_announce(self, next_announce_time): + def _get_blobs_to_announce(self): def get_and_update(transaction): timestamp = time.time() @@ -291,9 +290,12 @@ class DiskBlobManager(BlobManager): "where next_announce_time < ? and blob_hash is not null", (timestamp,)) blobs = [b for b, in r.fetchall()] + next_announce_time = self.get_next_announce_time(len(blobs)) transaction.execute( "update blobs set next_announce_time = ? where next_announce_time < ?", (next_announce_time, timestamp)) + log.debug("Got %s blobs to announce, next announce time is in %s seconds", + len(blobs), next_announce_time-time.time()) return blobs return self.db_conn.runInteraction(get_and_update) @@ -398,7 +400,7 @@ class TempBlobManager(BlobManager): blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems() if announce_time < now ] - next_announce_time = now + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time(len(blobs)) for b in blobs: self.blob_next_announces[b] = next_announce_time return defer.succeed(blobs) @@ -415,7 +417,7 @@ class TempBlobManager(BlobManager): new_blob._verified = True self.blobs[blob_creator.blob_hash] = new_blob self._immediate_announce([blob_creator.blob_hash]) - next_announce_time = time.time() + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time) d.addCallback(lambda _: new_blob) return d diff --git a/lbrynet/core/server/DHTHashAnnouncer.py b/lbrynet/core/server/DHTHashAnnouncer.py index 6c2fbd951..98e5a24bb 100644 --- a/lbrynet/core/server/DHTHashAnnouncer.py +++ b/lbrynet/core/server/DHTHashAnnouncer.py @@ -10,6 +10,10 @@ log = logging.getLogger(__name__) class DHTHashAnnouncer(object): + callLater = reactor.callLater + ANNOUNCE_CHECK_INTERVAL = 60 + CONCURRENT_ANNOUNCERS = 5 + """This class announces to the DHT that this peer has certain blobs""" def __init__(self, dht_node, peer_port): self.dht_node = dht_node @@ -20,12 +24,9 @@ class DHTHashAnnouncer(object): self._concurrent_announcers = 0 def run_manage_loop(self): - - from twisted.internet import reactor - if self.peer_port is not None: self._announce_available_hashes() - self.next_manage_call = reactor.callLater(60, self.run_manage_loop) + self.next_manage_call = self.callLater(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop) def stop(self): log.info("Stopping %s", self) @@ -42,6 +43,9 @@ class DHTHashAnnouncer(object): else: return defer.succeed(False) + def hash_queue_size(self): + return len(self.hash_queue) + def _announce_available_hashes(self): log.debug('Announcing available hashes') ds = [] @@ -64,7 +68,7 @@ class DHTHashAnnouncer(object): announce_deferred = defer.Deferred() ds.append(announce_deferred) self.hash_queue.append((h, announce_deferred)) - log.debug('There are now %s hashes remaining to be announced', len(self.hash_queue)) + log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size()) def announce(): if len(self.hash_queue): @@ -72,12 +76,11 @@ class DHTHashAnnouncer(object): log.debug('Announcing blob %s to dht', h) d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port) d.chainDeferred(announce_deferred) - d.addBoth(lambda _: reactor.callLater(0, announce)) + d.addBoth(lambda _: self.callLater(0, announce)) else: self._concurrent_announcers -= 1 - for i in range(self._concurrent_announcers, 5): - # TODO: maybe make the 5 configurable + for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS): self._concurrent_announcers += 1 announce() d = defer.DeferredList(ds) @@ -87,12 +90,38 @@ class DHTHashAnnouncer(object): class DHTHashSupplier(object): + # 1 hour is the min time hash will be reannounced + MIN_HASH_REANNOUNCE_TIME = 60*60 + # conservative assumption of the time it takes to announce + # a single hash + SINGLE_HASH_ANNOUNCE_DURATION = 1 + """Classes derived from this class give hashes to a hash announcer""" def __init__(self, announcer): if announcer is not None: announcer.add_supplier(self) self.hash_announcer = announcer - self.hash_reannounce_time = 60 * 60 # 1 hour def hashes_to_announce(self): pass + + + def get_next_announce_time(self, num_hashes_to_announce=1): + """ + Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, + unless we are announcing a lot of hashes at once which could cause the + the announce queue to pile up. To prevent pile up, reannounce + only after a conservative estimate of when it will finish + to announce all the hashes. + + Args: + num_hashes_to_announce: number of hashes that will be added to the queue + Returns: + timestamp for next announce time + """ + queue_size = self.hash_announcer.hash_queue_size()+num_hashes_to_announce + reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, + queue_size*self.SINGLE_HASH_ANNOUNCE_DURATION) + return time.time() + reannounce + + diff --git a/tests/unit/core/server/test_DHTHashAnnouncer.py b/tests/unit/core/server/test_DHTHashAnnouncer.py new file mode 100644 index 000000000..726f03749 --- /dev/null +++ b/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -0,0 +1,50 @@ +import os +import binascii +from twisted.trial import unittest +from twisted.internet import defer,task +from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer,DHTHashSupplier +from lbrynet.core.utils import random_string +from lbrynet.core import log_support + + +class MocDHTNode(object): + def __init__(self): + self.blobs_announced = 0 + + def announceHaveBlob(self,blob,port): + self.blobs_announced += 1 + return defer.succeed(True) + +class MocSupplier(object): + def __init__(self, blobs_to_announce): + self.blobs_to_announce = blobs_to_announce + self.announced = False + def hashes_to_announce(self): + if not self.announced: + self.announced = True + return defer.succeed(self.blobs_to_announce) + else: + return defer.succeed([]) + +class DHTHashAnnouncerTest(unittest.TestCase): + + def setUp(self): + self.num_blobs = 10 + self.blobs_to_announce = [] + for i in range(0, self.num_blobs): + self.blobs_to_announce.append(binascii.b2a_hex(os.urandom(32))) + self.clock = task.Clock() + self.dht_node = MocDHTNode() + self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333) + self.announcer.callLater = self.clock.callLater + self.supplier = MocSupplier(self.blobs_to_announce) + self.announcer.add_supplier(self.supplier) + + def test_basic(self): + self.announcer._announce_available_hashes() + self.clock.advance(1) + self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) + self.assertEqual(self.announcer.hash_queue_size(), 0) + + +