remove DHTHashSupplier class, move former functions into DHTHashAnnouncer

This commit is contained in:
Jack Robison 2018-02-20 13:33:59 -05:00
parent 6666468640
commit e6caedac91
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 40 additions and 43 deletions

View file

@ -1,16 +1,15 @@
import logging import logging
import os import os
from sqlite3 import IntegrityError from sqlite3 import IntegrityError
from twisted.internet import threads, defer, reactor, task from twisted.internet import threads, defer, task
from lbrynet import conf from lbrynet import conf
from lbrynet.blob.blob_file import BlobFile from lbrynet.blob.blob_file import BlobFile
from lbrynet.blob.creator import BlobFileCreator from lbrynet.blob.creator import BlobFileCreator
from lbrynet.dht.hashannouncer import DHTHashSupplier
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class DiskBlobManager(DHTHashSupplier): class DiskBlobManager(object):
def __init__(self, hash_announcer, blob_dir, storage): def __init__(self, hash_announcer, blob_dir, storage):
""" """
@ -18,8 +17,7 @@ class DiskBlobManager(DHTHashSupplier):
blob_dir - directory where blobs are stored blob_dir - directory where blobs are stored
db_dir - directory where sqlite database of blob information is stored db_dir - directory where sqlite database of blob information is stored
""" """
self.hash_announcer = hash_announcer
DHTHashSupplier.__init__(self, hash_announcer)
self.storage = storage self.storage = storage
self.announce_head_blobs_only = conf.settings['announce_head_blobs_only'] self.announce_head_blobs_only = conf.settings['announce_head_blobs_only']
self.blob_dir = blob_dir self.blob_dir = blob_dir
@ -70,14 +68,14 @@ class DiskBlobManager(DHTHashSupplier):
@defer.inlineCallbacks @defer.inlineCallbacks
def blob_completed(self, blob, next_announce_time=None, should_announce=True): def blob_completed(self, blob, next_announce_time=None, should_announce=True):
if next_announce_time is None: if next_announce_time is None:
next_announce_time = self.get_next_announce_time() next_announce_time = self.hash_announcer.get_next_announce_time()
yield self.storage.add_completed_blob( yield self.storage.add_completed_blob(
blob.blob_hash, blob.length, next_announce_time, should_announce blob.blob_hash, blob.length, next_announce_time, should_announce
) )
# we announce all blobs immediately, if announce_head_blob_only is False # we announce all blobs immediately, if announce_head_blob_only is False
# otherwise, announce only if marked as should_announce # otherwise, announce only if marked as should_announce
if not self.announce_head_blobs_only or should_announce: if not self.announce_head_blobs_only or should_announce:
reactor.callLater(0, self.immediate_announce, [blob.blob_hash]) self.immediate_announce([blob.blob_hash])
def completed_blobs(self, blobhashes_to_check): def completed_blobs(self, blobhashes_to_check):
return self._completed_blobs(blobhashes_to_check) return self._completed_blobs(blobhashes_to_check)
@ -93,7 +91,7 @@ class DiskBlobManager(DHTHashSupplier):
blob = self.blobs[blob_hash] blob = self.blobs[blob_hash]
if blob.get_is_verified(): if blob.get_is_verified():
return self.storage.set_should_announce( return self.storage.set_should_announce(
blob_hash, self.get_next_announce_time(), should_announce blob_hash, self.hash_announcer.get_next_announce_time(), should_announce
) )
return defer.succeed(False) return defer.succeed(False)
@ -110,7 +108,7 @@ class DiskBlobManager(DHTHashSupplier):
raise Exception("Blob has a length of 0") raise Exception("Blob has a length of 0")
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob self.blobs[blob_creator.blob_hash] = new_blob
next_announce_time = self.get_next_announce_time() next_announce_time = self.hash_announcer.get_next_announce_time()
return self.blob_completed(new_blob, next_announce_time, should_announce) return self.blob_completed(new_blob, next_announce_time, should_announce)
def immediate_announce_all_blobs(self): def immediate_announce_all_blobs(self):

View file

@ -20,27 +20,32 @@ class DummyHashAnnouncer(object):
def stop(self): def stop(self):
pass pass
def add_supplier(self, supplier):
pass
def hash_queue_size(self): def hash_queue_size(self):
return 0 return 0
def immediate_announce(self, blob_hashes): def immediate_announce(self, blob_hashes):
pass pass
def get_next_announce_time(self):
return 0
class DHTHashAnnouncer(DummyHashAnnouncer): class DHTHashAnnouncer(DummyHashAnnouncer):
ANNOUNCE_CHECK_INTERVAL = 60 ANNOUNCE_CHECK_INTERVAL = 60
CONCURRENT_ANNOUNCERS = 5 CONCURRENT_ANNOUNCERS = 5
# 1 hour is the min time hash will be reannounced
MIN_HASH_REANNOUNCE_TIME = 60 * 60
# conservative assumption of the time it takes to announce
# a single hash
DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION = 1
"""This class announces to the DHT that this peer has certain blobs""" """This class announces to the DHT that this peer has certain blobs"""
STORE_RETRIES = 3 STORE_RETRIES = 3
def __init__(self, dht_node, peer_port): def __init__(self, dht_node):
self.dht_node = dht_node self.dht_node = dht_node
self.peer_port = peer_port self.peer_port = dht_node.peerPort
self.supplier = None
self.next_manage_call = None self.next_manage_call = None
self.hash_queue = collections.deque() self.hash_queue = collections.deque()
self._concurrent_announcers = 0 self._concurrent_announcers = 0
@ -49,6 +54,8 @@ class DHTHashAnnouncer(DummyHashAnnouncer):
self._lock = utils.DeferredLockContextManager(defer.DeferredLock()) self._lock = utils.DeferredLockContextManager(defer.DeferredLock())
self._last_checked = dht_node.clock.seconds(), self.CONCURRENT_ANNOUNCERS self._last_checked = dht_node.clock.seconds(), self.CONCURRENT_ANNOUNCERS
self._total = None self._total = None
self.single_hash_announce_duration = self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION
self._hashes_to_announce = []
def run_manage_loop(self): def run_manage_loop(self):
log.info("Starting hash announcer") log.info("Starting hash announcer")
@ -79,10 +86,7 @@ class DHTHashAnnouncer(DummyHashAnnouncer):
def stop(self): def stop(self):
log.info("Stopping DHT hash announcer.") log.info("Stopping DHT hash announcer.")
if self._manage_call_lc.running: if self._manage_call_lc.running:
self._manage_call_lc.stop() return self._manage_call_lc.stop()
def add_supplier(self, supplier):
self.supplier = supplier
def immediate_announce(self, blob_hashes): def immediate_announce(self, blob_hashes):
if self.peer_port is not None: if self.peer_port is not None:
@ -96,9 +100,8 @@ class DHTHashAnnouncer(DummyHashAnnouncer):
@defer.inlineCallbacks @defer.inlineCallbacks
def _announce_available_hashes(self): def _announce_available_hashes(self):
log.debug('Announcing available hashes') log.debug('Announcing available hashes')
if self.supplier: hashes = yield self.hashes_to_announce()
hashes = yield self.supplier.hashes_to_announce() yield self._announce_hashes(hashes)
yield self._announce_hashes(hashes)
@defer.inlineCallbacks @defer.inlineCallbacks
def _announce_hashes(self, hashes, immediate=False): def _announce_hashes(self, hashes, immediate=False):
@ -180,24 +183,20 @@ class DHTHashAnnouncer(DummyHashAnnouncer):
self.set_single_hash_announce_duration(seconds_per_blob) self.set_single_hash_announce_duration(seconds_per_blob)
defer.returnValue(stored_to) defer.returnValue(stored_to)
@defer.inlineCallbacks
def add_hashes_to_announce(self, blob_hashes):
yield self._lock._lock.acquire()
self._hashes_to_announce.extend(blob_hashes)
yield self._lock._lock.release()
class DHTHashSupplier(object): @defer.inlineCallbacks
# 1 hour is the min time hash will be reannounced
MIN_HASH_REANNOUNCE_TIME = 60 * 60
# conservative assumption of the time it takes to announce
# a single hash
DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION = 1
"""Classes derived from this class give hashes to a hash announcer"""
def __init__(self, announcer):
if announcer is not None:
announcer.add_supplier(self)
self.hash_announcer = announcer
self.single_hash_announce_duration = self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION
def hashes_to_announce(self): def hashes_to_announce(self):
pass hashes_to_announce = []
yield self._lock._lock.acquire()
while self._hashes_to_announce:
hashes_to_announce.append(self._hashes_to_announce.pop())
yield self._lock._lock.release()
defer.returnValue(hashes_to_announce)
def set_single_hash_announce_duration(self, seconds): def set_single_hash_announce_duration(self, seconds):
""" """
@ -221,7 +220,7 @@ class DHTHashSupplier(object):
Returns: Returns:
timestamp for next announce time timestamp for next announce time
""" """
queue_size = self.hash_announcer.hash_queue_size() + num_hashes_to_announce queue_size = self.hash_queue_size() + num_hashes_to_announce
reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, reannounce = max(self.MIN_HASH_REANNOUNCE_TIME,
queue_size * self.single_hash_announce_duration) queue_size * self.single_hash_announce_duration)
return time.time() + reannounce return self.dht_node.clock.seconds() + reannounce

View file

@ -138,9 +138,9 @@ class Node(object):
# will be used later # will be used later
self._can_store = True self._can_store = True
self.peer_manager = PeerManager() self.peer_manager = peer_manager or PeerManager()
self.peer_finder = DHTPeerFinder(self, self.peer_manager) self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager)
self.hash_announcer = DHTHashAnnouncer(self, self.port) self.hash_announcer = hash_announcer or DHTHashAnnouncer(self)
def __del__(self): def __del__(self):
log.warning("unclean shutdown of the dht node") log.warning("unclean shutdown of the dht node")