From 8e0f71ac7467290883d0440a9d2a1c73795a5921 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 7 Feb 2017 18:17:03 -0500 Subject: [PATCH 1/4] Make hash reannounce time adjusted based on the queue of hashes yet to be announced, add test for DHTHashAnnouncer --- lbrynet/core/BlobManager.py | 16 +++--- lbrynet/core/server/DHTHashAnnouncer.py | 47 +++++++++++++---- .../unit/core/server/test_DHTHashAnnouncer.py | 50 +++++++++++++++++++ 3 files changed, 97 insertions(+), 16 deletions(-) create mode 100644 tests/unit/core/server/test_DHTHashAnnouncer.py diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 4cf9ef4b0..97faf1d91 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -128,7 +128,7 @@ class DiskBlobManager(BlobManager): def blob_completed(self, blob, next_announce_time=None): if next_announce_time is None: - next_announce_time = time.time() + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time() d = self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) d.addCallback(lambda _: self._immediate_announce([blob.blob_hash])) return d @@ -137,8 +137,7 @@ class DiskBlobManager(BlobManager): return self._completed_blobs(blobhashes_to_check) def hashes_to_announce(self): - next_announce_time = time.time() + self.hash_reannounce_time - return self._get_blobs_to_announce(next_announce_time) + return self._get_blobs_to_announce() def creator_finished(self, blob_creator): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) @@ -148,7 +147,7 @@ class DiskBlobManager(BlobManager): new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, True, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob self._immediate_announce([blob_creator.blob_hash]) - next_announce_time = time.time() + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time) return d @@ -283,7 +282,7 @@ class DiskBlobManager(BlobManager): (blob, timestamp)) @rerun_if_locked - def _get_blobs_to_announce(self, next_announce_time): + def _get_blobs_to_announce(self): def get_and_update(transaction): timestamp = time.time() @@ -291,9 +290,12 @@ class DiskBlobManager(BlobManager): "where next_announce_time < ? and blob_hash is not null", (timestamp,)) blobs = [b for b, in r.fetchall()] + next_announce_time = self.get_next_announce_time(len(blobs)) transaction.execute( "update blobs set next_announce_time = ? where next_announce_time < ?", (next_announce_time, timestamp)) + log.debug("Got %s blobs to announce, next announce time is in %s seconds", + len(blobs), next_announce_time-time.time()) return blobs return self.db_conn.runInteraction(get_and_update) @@ -398,7 +400,7 @@ class TempBlobManager(BlobManager): blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems() if announce_time < now ] - next_announce_time = now + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time(len(blobs)) for b in blobs: self.blob_next_announces[b] = next_announce_time return defer.succeed(blobs) @@ -415,7 +417,7 @@ class TempBlobManager(BlobManager): new_blob._verified = True self.blobs[blob_creator.blob_hash] = new_blob self._immediate_announce([blob_creator.blob_hash]) - next_announce_time = time.time() + self.hash_reannounce_time + next_announce_time = self.get_next_announce_time() d = self.blob_completed(new_blob, next_announce_time) d.addCallback(lambda _: new_blob) return d diff --git a/lbrynet/core/server/DHTHashAnnouncer.py b/lbrynet/core/server/DHTHashAnnouncer.py index 6c2fbd951..98e5a24bb 100644 --- a/lbrynet/core/server/DHTHashAnnouncer.py +++ b/lbrynet/core/server/DHTHashAnnouncer.py @@ -10,6 +10,10 @@ log = logging.getLogger(__name__) class DHTHashAnnouncer(object): + callLater = reactor.callLater + ANNOUNCE_CHECK_INTERVAL = 60 + CONCURRENT_ANNOUNCERS = 5 + """This class announces to the DHT that this peer has certain blobs""" def __init__(self, dht_node, peer_port): self.dht_node = dht_node @@ -20,12 +24,9 @@ class DHTHashAnnouncer(object): self._concurrent_announcers = 0 def run_manage_loop(self): - - from twisted.internet import reactor - if self.peer_port is not None: self._announce_available_hashes() - self.next_manage_call = reactor.callLater(60, self.run_manage_loop) + self.next_manage_call = self.callLater(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop) def stop(self): log.info("Stopping %s", self) @@ -42,6 +43,9 @@ class DHTHashAnnouncer(object): else: return defer.succeed(False) + def hash_queue_size(self): + return len(self.hash_queue) + def _announce_available_hashes(self): log.debug('Announcing available hashes') ds = [] @@ -64,7 +68,7 @@ class DHTHashAnnouncer(object): announce_deferred = defer.Deferred() ds.append(announce_deferred) self.hash_queue.append((h, announce_deferred)) - log.debug('There are now %s hashes remaining to be announced', len(self.hash_queue)) + log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size()) def announce(): if len(self.hash_queue): @@ -72,12 +76,11 @@ class DHTHashAnnouncer(object): log.debug('Announcing blob %s to dht', h) d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port) d.chainDeferred(announce_deferred) - d.addBoth(lambda _: reactor.callLater(0, announce)) + d.addBoth(lambda _: self.callLater(0, announce)) else: self._concurrent_announcers -= 1 - for i in range(self._concurrent_announcers, 5): - # TODO: maybe make the 5 configurable + for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS): self._concurrent_announcers += 1 announce() d = defer.DeferredList(ds) @@ -87,12 +90,38 @@ class DHTHashAnnouncer(object): class DHTHashSupplier(object): + # 1 hour is the min time hash will be reannounced + MIN_HASH_REANNOUNCE_TIME = 60*60 + # conservative assumption of the time it takes to announce + # a single hash + SINGLE_HASH_ANNOUNCE_DURATION = 1 + """Classes derived from this class give hashes to a hash announcer""" def __init__(self, announcer): if announcer is not None: announcer.add_supplier(self) self.hash_announcer = announcer - self.hash_reannounce_time = 60 * 60 # 1 hour def hashes_to_announce(self): pass + + + def get_next_announce_time(self, num_hashes_to_announce=1): + """ + Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, + unless we are announcing a lot of hashes at once which could cause the + the announce queue to pile up. To prevent pile up, reannounce + only after a conservative estimate of when it will finish + to announce all the hashes. + + Args: + num_hashes_to_announce: number of hashes that will be added to the queue + Returns: + timestamp for next announce time + """ + queue_size = self.hash_announcer.hash_queue_size()+num_hashes_to_announce + reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, + queue_size*self.SINGLE_HASH_ANNOUNCE_DURATION) + return time.time() + reannounce + + diff --git a/tests/unit/core/server/test_DHTHashAnnouncer.py b/tests/unit/core/server/test_DHTHashAnnouncer.py new file mode 100644 index 000000000..726f03749 --- /dev/null +++ b/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -0,0 +1,50 @@ +import os +import binascii +from twisted.trial import unittest +from twisted.internet import defer,task +from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer,DHTHashSupplier +from lbrynet.core.utils import random_string +from lbrynet.core import log_support + + +class MocDHTNode(object): + def __init__(self): + self.blobs_announced = 0 + + def announceHaveBlob(self,blob,port): + self.blobs_announced += 1 + return defer.succeed(True) + +class MocSupplier(object): + def __init__(self, blobs_to_announce): + self.blobs_to_announce = blobs_to_announce + self.announced = False + def hashes_to_announce(self): + if not self.announced: + self.announced = True + return defer.succeed(self.blobs_to_announce) + else: + return defer.succeed([]) + +class DHTHashAnnouncerTest(unittest.TestCase): + + def setUp(self): + self.num_blobs = 10 + self.blobs_to_announce = [] + for i in range(0, self.num_blobs): + self.blobs_to_announce.append(binascii.b2a_hex(os.urandom(32))) + self.clock = task.Clock() + self.dht_node = MocDHTNode() + self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333) + self.announcer.callLater = self.clock.callLater + self.supplier = MocSupplier(self.blobs_to_announce) + self.announcer.add_supplier(self.supplier) + + def test_basic(self): + self.announcer._announce_available_hashes() + self.clock.advance(1) + self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) + self.assertEqual(self.announcer.hash_queue_size(), 0) + + + From aa45b0e121b238a3d31b9a8d1137bb20ef43cbbd Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Mon, 13 Feb 2017 13:53:55 -0500 Subject: [PATCH 2/4] Fixes for other tests now that BlobManager relies on DHTHashAnnouncer.get_next_announce_time() --- tests/mocks.py | 3 +++ tests/unit/lbryfilemanager/test_EncryptedFileCreator.py | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/mocks.py b/tests/mocks.py index 96845ad43..ca321b464 100644 --- a/tests/mocks.py +++ b/tests/mocks.py @@ -97,6 +97,9 @@ class Announcer(object): def __init__(self, *args): pass + def hash_queue_size(self): + return 0 + def add_supplier(self, supplier): pass diff --git a/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py b/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py index 60d1d4f39..704274444 100644 --- a/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py +++ b/tests/unit/lbryfilemanager/test_EncryptedFileCreator.py @@ -34,7 +34,7 @@ class CreateEncryptedFileTest(unittest.TestCase): def create_file(self, filename): session = mock.Mock(spec=Session.Session)(None, None) - hash_announcer = mock.Mock(spec=DHTHashAnnouncer.DHTHashAnnouncer)(None, None) + hash_announcer = DHTHashAnnouncer.DHTHashAnnouncer(None, None) session.blob_manager = BlobManager.TempBlobManager(hash_announcer) session.db_dir = self.tmp_dir manager = mock.Mock(spec=EncryptedFileManager.EncryptedFileManager)() From 34f5bc93ae256c92b6a6ced5bc542a2339df5c44 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 9 Feb 2017 10:09:58 -0500 Subject: [PATCH 3/4] adding immediate announce option to hash announce --- lbrynet/core/server/DHTHashAnnouncer.py | 9 ++++++--- tests/unit/core/server/test_DHTHashAnnouncer.py | 9 ++++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/lbrynet/core/server/DHTHashAnnouncer.py b/lbrynet/core/server/DHTHashAnnouncer.py index 98e5a24bb..a68a97d0f 100644 --- a/lbrynet/core/server/DHTHashAnnouncer.py +++ b/lbrynet/core/server/DHTHashAnnouncer.py @@ -39,7 +39,7 @@ class DHTHashAnnouncer(object): def immediate_announce(self, blob_hashes): if self.peer_port is not None: - return self._announce_hashes(blob_hashes) + return self._announce_hashes(blob_hashes, immediate=True) else: return defer.succeed(False) @@ -56,7 +56,7 @@ class DHTHashAnnouncer(object): dl = defer.DeferredList(ds) return dl - def _announce_hashes(self, hashes): + def _announce_hashes(self, hashes, immediate=False): if not hashes: return log.debug('Announcing %s hashes', len(hashes)) @@ -67,7 +67,10 @@ class DHTHashAnnouncer(object): for h in hashes: announce_deferred = defer.Deferred() ds.append(announce_deferred) - self.hash_queue.append((h, announce_deferred)) + if immediate: + self.hash_queue.appendleft((h, announce_deferred)) + else: + self.hash_queue.append((h, announce_deferred)) log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size()) def announce(): diff --git a/tests/unit/core/server/test_DHTHashAnnouncer.py b/tests/unit/core/server/test_DHTHashAnnouncer.py index 726f03749..c6bb05967 100644 --- a/tests/unit/core/server/test_DHTHashAnnouncer.py +++ b/tests/unit/core/server/test_DHTHashAnnouncer.py @@ -42,9 +42,16 @@ class DHTHashAnnouncerTest(unittest.TestCase): def test_basic(self): self.announcer._announce_available_hashes() + self.assertEqual(self.announcer.hash_queue_size(),self.announcer.CONCURRENT_ANNOUNCERS) self.clock.advance(1) self.assertEqual(self.dht_node.blobs_announced, self.num_blobs) self.assertEqual(self.announcer.hash_queue_size(), 0) - + def test_immediate_announce(self): + # Test that immediate announce puts a hash at the front of the queue + self.announcer._announce_available_hashes() + blob_hash = binascii.b2a_hex(os.urandom(32)) + self.announcer.immediate_announce([blob_hash]) + self.assertEqual(self.announcer.hash_queue_size(),self.announcer.CONCURRENT_ANNOUNCERS+1) + self.assertEqual(blob_hash, self.announcer.hash_queue[0][0]) From 5831e253cd66aba31cb748fc288301af41b557d1 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Thu, 9 Feb 2017 12:42:14 -0500 Subject: [PATCH 4/4] adding change log --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 89e8eabe2..982d45058 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ at anytime. ### Fixed * Change EWOULDBLOCK error in DHT to warning. #481 * mark peers as down if it fails download protocol + * Made hash reannounce time to be adjustable to fix [#432](https://github.com/lbryio/lbry/issues/432) ## [0.8.3rc0] - 2017-02-10 ### Changed