Merge pull request #465 from lbryio/dht_hash_announcer_tests

Adjust hash reannounce based on queue size , test for DHTHashAnnouncer, immediately announce new blobs
This commit is contained in:
Umpei Kay Kurokawa 2017-02-13 21:08:48 -05:00 committed by GitHub
commit 37cc257555
6 changed files with 115 additions and 20 deletions

View file

@ -13,6 +13,7 @@ at anytime.
### Fixed ### Fixed
* Change EWOULDBLOCK error in DHT to warning. #481 * Change EWOULDBLOCK error in DHT to warning. #481
* mark peers as down if it fails download protocol * mark peers as down if it fails download protocol
* Made hash reannounce time to be adjustable to fix [#432](https://github.com/lbryio/lbry/issues/432)
## [0.8.3rc0] - 2017-02-10 ## [0.8.3rc0] - 2017-02-10
### Changed ### Changed

View file

@ -128,7 +128,7 @@ class DiskBlobManager(BlobManager):
def blob_completed(self, blob, next_announce_time=None): def blob_completed(self, blob, next_announce_time=None):
if next_announce_time is None: if next_announce_time is None:
next_announce_time = time.time() + self.hash_reannounce_time next_announce_time = self.get_next_announce_time()
d = self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) d = self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time)
d.addCallback(lambda _: self._immediate_announce([blob.blob_hash])) d.addCallback(lambda _: self._immediate_announce([blob.blob_hash]))
return d return d
@ -137,8 +137,7 @@ class DiskBlobManager(BlobManager):
return self._completed_blobs(blobhashes_to_check) return self._completed_blobs(blobhashes_to_check)
def hashes_to_announce(self): def hashes_to_announce(self):
next_announce_time = time.time() + self.hash_reannounce_time return self._get_blobs_to_announce()
return self._get_blobs_to_announce(next_announce_time)
def creator_finished(self, blob_creator): def creator_finished(self, blob_creator):
log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash)
@ -148,7 +147,7 @@ class DiskBlobManager(BlobManager):
new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, True, blob_creator.length) new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, True, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob self.blobs[blob_creator.blob_hash] = new_blob
self._immediate_announce([blob_creator.blob_hash]) self._immediate_announce([blob_creator.blob_hash])
next_announce_time = time.time() + self.hash_reannounce_time next_announce_time = self.get_next_announce_time()
d = self.blob_completed(new_blob, next_announce_time) d = self.blob_completed(new_blob, next_announce_time)
return d return d
@ -283,7 +282,7 @@ class DiskBlobManager(BlobManager):
(blob, timestamp)) (blob, timestamp))
@rerun_if_locked @rerun_if_locked
def _get_blobs_to_announce(self, next_announce_time): def _get_blobs_to_announce(self):
def get_and_update(transaction): def get_and_update(transaction):
timestamp = time.time() timestamp = time.time()
@ -291,9 +290,12 @@ class DiskBlobManager(BlobManager):
"where next_announce_time < ? and blob_hash is not null", "where next_announce_time < ? and blob_hash is not null",
(timestamp,)) (timestamp,))
blobs = [b for b, in r.fetchall()] blobs = [b for b, in r.fetchall()]
next_announce_time = self.get_next_announce_time(len(blobs))
transaction.execute( transaction.execute(
"update blobs set next_announce_time = ? where next_announce_time < ?", "update blobs set next_announce_time = ? where next_announce_time < ?",
(next_announce_time, timestamp)) (next_announce_time, timestamp))
log.debug("Got %s blobs to announce, next announce time is in %s seconds",
len(blobs), next_announce_time-time.time())
return blobs return blobs
return self.db_conn.runInteraction(get_and_update) return self.db_conn.runInteraction(get_and_update)
@ -398,7 +400,7 @@ class TempBlobManager(BlobManager):
blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems() blob_hash for blob_hash, announce_time in self.blob_next_announces.iteritems()
if announce_time < now if announce_time < now
] ]
next_announce_time = now + self.hash_reannounce_time next_announce_time = self.get_next_announce_time(len(blobs))
for b in blobs: for b in blobs:
self.blob_next_announces[b] = next_announce_time self.blob_next_announces[b] = next_announce_time
return defer.succeed(blobs) return defer.succeed(blobs)
@ -415,7 +417,7 @@ class TempBlobManager(BlobManager):
new_blob._verified = True new_blob._verified = True
self.blobs[blob_creator.blob_hash] = new_blob self.blobs[blob_creator.blob_hash] = new_blob
self._immediate_announce([blob_creator.blob_hash]) self._immediate_announce([blob_creator.blob_hash])
next_announce_time = time.time() + self.hash_reannounce_time next_announce_time = self.get_next_announce_time()
d = self.blob_completed(new_blob, next_announce_time) d = self.blob_completed(new_blob, next_announce_time)
d.addCallback(lambda _: new_blob) d.addCallback(lambda _: new_blob)
return d return d

View file

@ -10,6 +10,10 @@ log = logging.getLogger(__name__)
class DHTHashAnnouncer(object): class DHTHashAnnouncer(object):
callLater = reactor.callLater
ANNOUNCE_CHECK_INTERVAL = 60
CONCURRENT_ANNOUNCERS = 5
"""This class announces to the DHT that this peer has certain blobs""" """This class announces to the DHT that this peer has certain blobs"""
def __init__(self, dht_node, peer_port): def __init__(self, dht_node, peer_port):
self.dht_node = dht_node self.dht_node = dht_node
@ -20,12 +24,9 @@ class DHTHashAnnouncer(object):
self._concurrent_announcers = 0 self._concurrent_announcers = 0
def run_manage_loop(self): def run_manage_loop(self):
from twisted.internet import reactor
if self.peer_port is not None: if self.peer_port is not None:
self._announce_available_hashes() self._announce_available_hashes()
self.next_manage_call = reactor.callLater(60, self.run_manage_loop) self.next_manage_call = self.callLater(self.ANNOUNCE_CHECK_INTERVAL, self.run_manage_loop)
def stop(self): def stop(self):
log.info("Stopping %s", self) log.info("Stopping %s", self)
@ -38,10 +39,13 @@ class DHTHashAnnouncer(object):
def immediate_announce(self, blob_hashes): def immediate_announce(self, blob_hashes):
if self.peer_port is not None: if self.peer_port is not None:
return self._announce_hashes(blob_hashes) return self._announce_hashes(blob_hashes, immediate=True)
else: else:
return defer.succeed(False) return defer.succeed(False)
def hash_queue_size(self):
return len(self.hash_queue)
def _announce_available_hashes(self): def _announce_available_hashes(self):
log.debug('Announcing available hashes') log.debug('Announcing available hashes')
ds = [] ds = []
@ -52,7 +56,7 @@ class DHTHashAnnouncer(object):
dl = defer.DeferredList(ds) dl = defer.DeferredList(ds)
return dl return dl
def _announce_hashes(self, hashes): def _announce_hashes(self, hashes, immediate=False):
if not hashes: if not hashes:
return return
log.debug('Announcing %s hashes', len(hashes)) log.debug('Announcing %s hashes', len(hashes))
@ -63,8 +67,11 @@ class DHTHashAnnouncer(object):
for h in hashes: for h in hashes:
announce_deferred = defer.Deferred() announce_deferred = defer.Deferred()
ds.append(announce_deferred) ds.append(announce_deferred)
if immediate:
self.hash_queue.appendleft((h, announce_deferred))
else:
self.hash_queue.append((h, announce_deferred)) self.hash_queue.append((h, announce_deferred))
log.debug('There are now %s hashes remaining to be announced', len(self.hash_queue)) log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size())
def announce(): def announce():
if len(self.hash_queue): if len(self.hash_queue):
@ -72,12 +79,11 @@ class DHTHashAnnouncer(object):
log.debug('Announcing blob %s to dht', h) log.debug('Announcing blob %s to dht', h)
d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port) d = self.dht_node.announceHaveBlob(binascii.unhexlify(h), self.peer_port)
d.chainDeferred(announce_deferred) d.chainDeferred(announce_deferred)
d.addBoth(lambda _: reactor.callLater(0, announce)) d.addBoth(lambda _: self.callLater(0, announce))
else: else:
self._concurrent_announcers -= 1 self._concurrent_announcers -= 1
for i in range(self._concurrent_announcers, 5): for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS):
# TODO: maybe make the 5 configurable
self._concurrent_announcers += 1 self._concurrent_announcers += 1
announce() announce()
d = defer.DeferredList(ds) d = defer.DeferredList(ds)
@ -87,12 +93,38 @@ class DHTHashAnnouncer(object):
class DHTHashSupplier(object): class DHTHashSupplier(object):
# 1 hour is the min time hash will be reannounced
MIN_HASH_REANNOUNCE_TIME = 60*60
# conservative assumption of the time it takes to announce
# a single hash
SINGLE_HASH_ANNOUNCE_DURATION = 1
"""Classes derived from this class give hashes to a hash announcer""" """Classes derived from this class give hashes to a hash announcer"""
def __init__(self, announcer): def __init__(self, announcer):
if announcer is not None: if announcer is not None:
announcer.add_supplier(self) announcer.add_supplier(self)
self.hash_announcer = announcer self.hash_announcer = announcer
self.hash_reannounce_time = 60 * 60 # 1 hour
def hashes_to_announce(self): def hashes_to_announce(self):
pass pass
def get_next_announce_time(self, num_hashes_to_announce=1):
"""
Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME,
unless we are announcing a lot of hashes at once which could cause the
the announce queue to pile up. To prevent pile up, reannounce
only after a conservative estimate of when it will finish
to announce all the hashes.
Args:
num_hashes_to_announce: number of hashes that will be added to the queue
Returns:
timestamp for next announce time
"""
queue_size = self.hash_announcer.hash_queue_size()+num_hashes_to_announce
reannounce = max(self.MIN_HASH_REANNOUNCE_TIME,
queue_size*self.SINGLE_HASH_ANNOUNCE_DURATION)
return time.time() + reannounce

View file

@ -106,6 +106,9 @@ class Announcer(object):
def __init__(self, *args): def __init__(self, *args):
pass pass
def hash_queue_size(self):
return 0
def add_supplier(self, supplier): def add_supplier(self, supplier):
pass pass

View file

@ -0,0 +1,57 @@
import os
import binascii
from twisted.trial import unittest
from twisted.internet import defer,task
from lbrynet.core.server.DHTHashAnnouncer import DHTHashAnnouncer,DHTHashSupplier
from lbrynet.core.utils import random_string
from lbrynet.core import log_support
class MocDHTNode(object):
def __init__(self):
self.blobs_announced = 0
def announceHaveBlob(self,blob,port):
self.blobs_announced += 1
return defer.succeed(True)
class MocSupplier(object):
def __init__(self, blobs_to_announce):
self.blobs_to_announce = blobs_to_announce
self.announced = False
def hashes_to_announce(self):
if not self.announced:
self.announced = True
return defer.succeed(self.blobs_to_announce)
else:
return defer.succeed([])
class DHTHashAnnouncerTest(unittest.TestCase):
def setUp(self):
self.num_blobs = 10
self.blobs_to_announce = []
for i in range(0, self.num_blobs):
self.blobs_to_announce.append(binascii.b2a_hex(os.urandom(32)))
self.clock = task.Clock()
self.dht_node = MocDHTNode()
self.announcer = DHTHashAnnouncer(self.dht_node, peer_port=3333)
self.announcer.callLater = self.clock.callLater
self.supplier = MocSupplier(self.blobs_to_announce)
self.announcer.add_supplier(self.supplier)
def test_basic(self):
self.announcer._announce_available_hashes()
self.assertEqual(self.announcer.hash_queue_size(),self.announcer.CONCURRENT_ANNOUNCERS)
self.clock.advance(1)
self.assertEqual(self.dht_node.blobs_announced, self.num_blobs)
self.assertEqual(self.announcer.hash_queue_size(), 0)
def test_immediate_announce(self):
# Test that immediate announce puts a hash at the front of the queue
self.announcer._announce_available_hashes()
blob_hash = binascii.b2a_hex(os.urandom(32))
self.announcer.immediate_announce([blob_hash])
self.assertEqual(self.announcer.hash_queue_size(),self.announcer.CONCURRENT_ANNOUNCERS+1)
self.assertEqual(blob_hash, self.announcer.hash_queue[0][0])

View file

@ -34,7 +34,7 @@ class CreateEncryptedFileTest(unittest.TestCase):
def create_file(self, filename): def create_file(self, filename):
session = mock.Mock(spec=Session.Session)(None, None) session = mock.Mock(spec=Session.Session)(None, None)
hash_announcer = mock.Mock(spec=DHTHashAnnouncer.DHTHashAnnouncer)(None, None) hash_announcer = DHTHashAnnouncer.DHTHashAnnouncer(None, None)
session.blob_manager = BlobManager.TempBlobManager(hash_announcer) session.blob_manager = BlobManager.TempBlobManager(hash_announcer)
session.db_dir = self.tmp_dir session.db_dir = self.tmp_dir
manager = mock.Mock(spec=EncryptedFileManager.EncryptedFileManager)() manager = mock.Mock(spec=EncryptedFileManager.EncryptedFileManager)()