refactor DHTHashAnnouncer

-remove hash_announcer from Node and DiskBlobManager
-remove announcement related functions from DiskBlobManager
-update SQLiteStorage to store announcement times and provide blob hashes needing to be announced
-use dataExpireTimeout from lbrynet.dht.constants for re-announce timing
-use DeferredSemaphore for concurrent blob announcement
This commit is contained in:
Jack Robison 2018-03-27 15:12:44 -04:00
parent ea0ea704a2
commit c5bf64cf0a
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 86 additions and 294 deletions

View file

@ -10,16 +10,14 @@ log = logging.getLogger(__name__)
class DiskBlobManager(object):
def __init__(self, hash_announcer, blob_dir, storage):
def __init__(self, blob_dir, storage):
"""
This class stores blobs on the hard disk
"""
This class stores blobs on the hard disk,
blob_dir - directory where blobs are stored
db_dir - directory where sqlite database of blob information is stored
storage - SQLiteStorage object
"""
self.hash_announcer = hash_announcer
self.storage = storage
self.announce_head_blobs_only = conf.settings['announce_head_blobs_only']
self.blob_dir = blob_dir
self.blob_creator_type = BlobFileCreator
# TODO: consider using an LRU for blobs as there could potentially
@ -28,7 +26,7 @@ class DiskBlobManager(object):
self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)}
self.check_should_announce_lc = None
if conf.settings['run_reflector_server']:
if conf.settings['run_reflector_server']: # TODO: move this looping call to SQLiteStorage
self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs)
def setup(self):
@ -60,40 +58,20 @@ class DiskBlobManager(object):
self.blobs[blob_hash] = blob
return defer.succeed(blob)
def immediate_announce(self, blob_hashes):
if self.hash_announcer:
return self.hash_announcer.immediate_announce(blob_hashes)
raise Exception("Hash announcer not set")
@defer.inlineCallbacks
def blob_completed(self, blob, next_announce_time=None, should_announce=True):
if next_announce_time is None:
next_announce_time = self.hash_announcer.get_next_announce_time()
yield self.storage.add_completed_blob(
blob.blob_hash, blob.length, next_announce_time, should_announce
)
# we announce all blobs immediately, if announce_head_blob_only is False
# otherwise, announce only if marked as should_announce
if not self.announce_head_blobs_only or should_announce:
self.immediate_announce([blob.blob_hash])
def completed_blobs(self, blobhashes_to_check):
return self._completed_blobs(blobhashes_to_check)
def hashes_to_announce(self):
return self.storage.get_blobs_to_announce(self.hash_announcer)
def count_should_announce_blobs(self):
return self.storage.count_should_announce_blobs()
def set_should_announce(self, blob_hash, should_announce):
if blob_hash in self.blobs:
blob = self.blobs[blob_hash]
if blob.get_is_verified():
return self.storage.set_should_announce(
blob_hash, self.hash_announcer.get_next_announce_time(), should_announce
)
return defer.succeed(False)
return self.storage.set_should_announce(blob_hash, should_announce)
def get_should_announce(self, blob_hash):
return self.storage.should_announce(blob_hash)
@ -108,13 +86,7 @@ class DiskBlobManager(object):
raise Exception("Blob has a length of 0")
new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length)
self.blobs[blob_creator.blob_hash] = new_blob
next_announce_time = self.hash_announcer.get_next_announce_time()
return self.blob_completed(new_blob, next_announce_time, should_announce)
def immediate_announce_all_blobs(self):
d = self._get_all_verified_blob_hashes()
d.addCallback(self.immediate_announce)
return d
return self.blob_completed(new_blob, should_announce)
def get_all_verified_blobs(self):
d = self._get_all_verified_blob_hashes()

View file

@ -2,7 +2,7 @@ import logging
import miniupnpc
from twisted.internet import threads, defer
from lbrynet.core.BlobManager import DiskBlobManager
from lbrynet.dht import node
from lbrynet.dht import node, hashannouncer
from lbrynet.database.storage import SQLiteStorage
from lbrynet.core.RateLimiter import RateLimiter
from lbrynet.core.utils import generate_id
@ -136,6 +136,7 @@ class Session(object):
d = self._try_upnp()
else:
d = defer.succeed(True)
d.addCallback(lambda _: self.storage.setup())
d.addCallback(lambda _: self._setup_dht())
d.addCallback(lambda _: self._setup_other_components())
return d
@ -144,6 +145,8 @@ class Session(object):
"""Stop all services"""
log.info('Stopping session.')
ds = []
if self.hash_announcer:
self.hash_announcer.stop()
if self.blob_tracker is not None:
ds.append(defer.maybeDeferred(self.blob_tracker.stop))
if self.dht_node is not None:
@ -220,19 +223,20 @@ class Session(object):
def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary
self.dht_node = self.dht_node_class(
self.hash_announcer,
udpPort=self.dht_node_port,
node_id=self.node_id,
udpPort=self.dht_node_port,
externalIP=self.external_ip,
peerPort=self.peer_port,
peer_manager=self.peer_manager,
peer_finder=self.peer_finder,
)
if not self.hash_announcer:
self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage)
self.peer_manager = self.dht_node.peer_manager
self.peer_finder = self.dht_node.peer_finder
self.hash_announcer = self.dht_node.hash_announcer
self._join_dht_deferred = self.dht_node.joinNetwork(self.known_dht_nodes)
self._join_dht_deferred.addCallback(lambda _: log.info("Joined the dht"))
self._join_dht_deferred.addCallback(lambda _: self.hash_announcer.start())
def _setup_other_components(self):
log.debug("Setting up the rest of the components")
@ -245,9 +249,7 @@ class Session(object):
raise Exception(
"TempBlobManager is no longer supported, specify BlobManager or db_dir")
else:
self.blob_manager = DiskBlobManager(
self.dht_node.hash_announcer, self.blob_dir, self.storage
)
self.blob_manager = DiskBlobManager(self.blob_dir, self.storage)
if self.blob_tracker is None:
self.blob_tracker = self.blob_tracker_class(
@ -259,8 +261,7 @@ class Session(object):
)
self.rate_limiter.start()
d = self.storage.setup()
d.addCallback(lambda _: self.blob_manager.setup())
d = self.blob_manager.setup()
d.addCallback(lambda _: self.wallet.start())
d.addCallback(lambda _: self.blob_tracker.start())
return d

View file

@ -11,7 +11,6 @@ from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager
from lbrynet.core.client.BlobRequester import BlobRequester
from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader
from lbrynet.core.client.ConnectionManager import ConnectionManager
from lbrynet.dht.hashannouncer import DummyHashAnnouncer
from lbrynet.dht.peerfinder import DummyPeerFinder
@ -61,7 +60,6 @@ class SingleBlobDownloadManager(object):
class SinglePeerDownloader(object):
def __init__(self):
self._payment_rate_manager = OnlyFreePaymentsManager()
self._announcer = DummyHashAnnouncer()
self._rate_limiter = DummyRateLimiter()
self._wallet = None
self._blob_manager = None
@ -98,7 +96,7 @@ class SinglePeerDownloader(object):
@defer.inlineCallbacks
def download_temp_blob_from_peer(self, peer, timeout, blob_hash):
tmp_dir = yield threads.deferToThread(tempfile.mkdtemp)
tmp_blob_manager = DiskBlobManager(self._announcer, tmp_dir, tmp_dir)
tmp_blob_manager = DiskBlobManager(tmp_dir, tmp_dir)
try:
result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager)
finally:

View file

@ -1,6 +1,5 @@
import logging
import os
import time
import sqlite3
import traceback
from decimal import Decimal
@ -11,6 +10,7 @@ from lbryschema.claim import ClaimDict
from lbryschema.decode import smart_decode
from lbrynet import conf
from lbrynet.cryptstream.CryptBlob import CryptBlobInfo
from lbrynet.dht.constants import dataExpireTimeout
from lbryum.constants import COIN
log = logging.getLogger(__name__)
@ -49,26 +49,6 @@ def open_file_for_writing(download_directory, suggested_file_name):
return threads.deferToThread(_open_file_for_writing, download_directory, suggested_file_name)
def get_next_announce_time(hash_announcer, num_hashes_to_announce=1, min_reannounce_time=60*60,
single_announce_duration=5):
"""
Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME,
unless we are announcing a lot of hashes at once which could cause the
the announce queue to pile up. To prevent pile up, reannounce
only after a conservative estimate of when it will finish
to announce all the hashes.
Args:
num_hashes_to_announce: number of hashes that will be added to the queue
Returns:
timestamp for next announce time
"""
queue_size = hash_announcer.hash_queue_size() + num_hashes_to_announce
reannounce = max(min_reannounce_time,
queue_size * single_announce_duration)
return time.time() + reannounce
def rerun_if_locked(f):
max_attempts = 3
@ -186,6 +166,7 @@ class SQLiteStorage(object):
log.info("connecting to database: %s", self._db_path)
self.db = SqliteConnection(self._db_path)
self.db.set_reactor(reactor)
self.clock = reactor
# used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a
# change to the associated content claim occurs. these are added by the file manager
@ -270,9 +251,15 @@ class SQLiteStorage(object):
"select blob_hash from blob where should_announce=1 and status='finished'"
)
def get_blobs_to_announce(self, hash_announcer):
def update_last_announced_blob(self, blob_hash, last_announced):
return self.db.runOperation(
"update blob set next_announce_time=?, last_announced_time=? where blob_hash=?",
(int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash)
)
def get_blobs_to_announce(self):
def get_and_update(transaction):
timestamp = time.time()
timestamp = self.clock.seconds()
if conf.settings['announce_head_blobs_only']:
r = transaction.execute(
"select blob_hash from blob "
@ -284,16 +271,8 @@ class SQLiteStorage(object):
"select blob_hash from blob where blob_hash is not null "
"and next_announce_time<? and status='finished'", (timestamp,)
)
blobs = [b for b, in r.fetchall()]
next_announce_time = get_next_announce_time(hash_announcer, len(blobs))
transaction.execute(
"update blob set next_announce_time=? where next_announce_time<?", (next_announce_time, timestamp)
)
log.debug("Got %s blobs to announce, next announce time is in %s seconds", len(blobs),
next_announce_time-time.time())
blobs = [b[0] for b in r.fetchall()]
return blobs
return self.db.runInteraction(get_and_update)
def delete_blobs_from_db(self, blob_hashes):

View file

@ -1,7 +1,5 @@
import binascii
import collections
import logging
import datetime
from twisted.internet import defer, task
from lbrynet.core import utils
@ -9,217 +7,70 @@ from lbrynet.core import utils
log = logging.getLogger(__name__)
class DummyHashAnnouncer(object):
def __init__(self):
pass
def run_manage_loop(self):
pass
def stop(self):
pass
def hash_queue_size(self):
return 0
def immediate_announce(self, blob_hashes):
pass
def get_next_announce_time(self):
return 0
class DHTHashAnnouncer(DummyHashAnnouncer):
ANNOUNCE_CHECK_INTERVAL = 60
CONCURRENT_ANNOUNCERS = 5
# 1 hour is the min time hash will be reannounced
MIN_HASH_REANNOUNCE_TIME = 60 * 60
# conservative assumption of the time it takes to announce
# a single hash
DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION = 1
"""This class announces to the DHT that this peer has certain blobs"""
STORE_RETRIES = 3
def __init__(self, dht_node):
class DHTHashAnnouncer(object):
def __init__(self, dht_node, storage, concurrent_announcers=25):
self.dht_node = dht_node
self.storage = storage
self.clock = dht_node.clock
self.peer_port = dht_node.peerPort
self.next_manage_call = None
self.hash_queue = collections.deque()
self._concurrent_announcers = 0
self._manage_call_lc = task.LoopingCall(self.manage_lc)
self._manage_call_lc.clock = dht_node.clock
self._lock = utils.DeferredLockContextManager(defer.DeferredLock())
self._last_checked = dht_node.clock.seconds(), self.CONCURRENT_ANNOUNCERS
self._total = None
self.single_hash_announce_duration = self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION
self._hashes_to_announce = []
self.hash_queue = []
self.concurrent_announcers = concurrent_announcers
self._manage_lc = task.LoopingCall(self.manage)
self._manage_lc.clock = self.clock
def run_manage_loop(self):
log.info("Starting hash announcer")
if not self._manage_call_lc.running:
self._manage_call_lc.start(self.ANNOUNCE_CHECK_INTERVAL)
def manage_lc(self):
last_time, last_hashes = self._last_checked
hashes = len(self.hash_queue)
if hashes:
t, h = self.dht_node.clock.seconds() - last_time, last_hashes - hashes
blobs_per_second = float(h) / float(t)
if blobs_per_second > 0:
estimated_time_remaining = int(float(hashes) / blobs_per_second)
remaining = str(datetime.timedelta(seconds=estimated_time_remaining))
else:
remaining = "unknown"
log.info("Announcing blobs: %i blobs left to announce, %i%s complete, "
"est time remaining: %s", hashes + self._concurrent_announcers,
100 - int(100.0 * float(hashes + self._concurrent_announcers) /
float(self._total)), "%", remaining)
self._last_checked = t + last_time, hashes
else:
self._total = 0
if self.peer_port is not None:
return self._announce_available_hashes()
def start(self):
self._manage_lc.start(30)
def stop(self):
log.info("Stopping DHT hash announcer.")
if self._manage_call_lc.running:
return self._manage_call_lc.stop()
def immediate_announce(self, blob_hashes):
if self.peer_port is not None:
return self._announce_hashes(blob_hashes, immediate=True)
else:
return defer.succeed(False)
def hash_queue_size(self):
return len(self.hash_queue)
if self._manage_lc.running:
self._manage_lc.stop()
@defer.inlineCallbacks
def _announce_available_hashes(self):
log.debug('Announcing available hashes')
hashes = yield self.hashes_to_announce()
yield self._announce_hashes(hashes)
@defer.inlineCallbacks
def _announce_hashes(self, hashes, immediate=False):
if not hashes:
defer.returnValue(None)
if not self.dht_node.can_store:
log.warning("Client only DHT node cannot store, skipping announce")
defer.returnValue(None)
log.info('Announcing %s hashes', len(hashes))
# TODO: add a timeit decorator
start = self.dht_node.clock.seconds()
ds = []
with self._lock:
for h in hashes:
announce_deferred = defer.Deferred()
if immediate:
self.hash_queue.appendleft((h, announce_deferred))
def do_store(self, blob_hash):
storing_node_ids = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash))
now = self.clock.seconds()
if storing_node_ids:
result = (now, storing_node_ids)
yield self.storage.update_last_announced_blob(blob_hash, now)
log.debug("Stored %s to %i peers", blob_hash[:16], len(storing_node_ids))
else:
self.hash_queue.append((h, announce_deferred))
if not self._total:
self._total = len(hashes)
log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size())
@defer.inlineCallbacks
def do_store(blob_hash, announce_d, retry_count=0):
if announce_d.called:
defer.returnValue(announce_deferred.result)
try:
store_nodes = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash))
if not store_nodes:
retry_count += 1
if retry_count <= self.STORE_RETRIES:
log.debug("No nodes stored %s, retrying", blob_hash)
result = yield do_store(blob_hash, announce_d, retry_count)
else:
result = {}
log.warning("No nodes stored %s", blob_hash)
else:
result = store_nodes
if not announce_d.called:
announce_d.callback(result)
result = (None, [])
self.hash_queue.remove(blob_hash)
defer.returnValue(result)
except Exception as err:
if not announce_d.called:
announce_d.errback(err)
raise err
def _show_announce_progress(self, size, start):
queue_size = len(self.hash_queue)
average_blobs_per_second = float(size - queue_size) / (self.clock.seconds() - start)
log.info("Announced %i/%i blobs, %f blobs per second", size - queue_size, size, average_blobs_per_second)
@defer.inlineCallbacks
def announce(progress=None):
progress = progress or {}
if len(self.hash_queue):
with self._lock:
h, announce_deferred = self.hash_queue.popleft()
log.debug('Announcing blob %s to dht', h[:16])
stored_to_nodes = yield do_store(h, announce_deferred)
progress[h] = stored_to_nodes
log.debug("Stored %s to %i peers (hashes announced by this announcer: %i)",
h.encode('hex')[:16],
len(stored_to_nodes), len(progress))
def immediate_announce(self, blob_hashes):
blob_hashes = [b for b in blob_hashes if b not in self.hash_queue]
self.hash_queue.extend(blob_hashes)
yield announce(progress)
log.info("Announcing %i blobs", len(self.hash_queue))
start = self.clock.seconds()
progress_lc = task.LoopingCall(self._show_announce_progress, len(self.hash_queue), start)
progress_lc.start(60, now=False)
s = defer.DeferredSemaphore(self.concurrent_announcers)
results = yield utils.DeferredDict({blob_hash: s.run(self.do_store, blob_hash) for blob_hash in blob_hashes})
now = self.clock.seconds()
progress_lc.stop()
announced_to = [blob_hash for blob_hash in results if results[blob_hash][0]]
if len(announced_to) != len(results):
log.debug("Failed to announce %i blobs", len(results) - len(announced_to))
if announced_to:
log.info('Took %s seconds to announce %i of %i attempted hashes (%f hashes per second)',
now - start, len(blob_hashes), len(announced_to),
int(float(len(blob_hashes)) / float(now - start)))
defer.returnValue(results)
@defer.inlineCallbacks
def manage(self):
need_reannouncement = yield self.storage.get_blobs_to_announce()
if need_reannouncement:
yield self.immediate_announce(need_reannouncement)
else:
with self._lock:
self._concurrent_announcers -= 1
defer.returnValue(progress)
for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS):
self._concurrent_announcers += 1
ds.append(announce())
announcer_results = yield defer.DeferredList(ds)
stored_to = {}
for _, announced_to in announcer_results:
stored_to.update(announced_to)
log.info('Took %s seconds to announce %s hashes', self.dht_node.clock.seconds() - start, len(hashes))
seconds_per_blob = (self.dht_node.clock.seconds() - start) / len(hashes)
self.set_single_hash_announce_duration(seconds_per_blob)
defer.returnValue(stored_to)
@defer.inlineCallbacks
def add_hashes_to_announce(self, blob_hashes):
yield self._lock._lock.acquire()
self._hashes_to_announce.extend(blob_hashes)
yield self._lock._lock.release()
@defer.inlineCallbacks
def hashes_to_announce(self):
hashes_to_announce = []
yield self._lock._lock.acquire()
while self._hashes_to_announce:
hashes_to_announce.append(self._hashes_to_announce.pop())
yield self._lock._lock.release()
defer.returnValue(hashes_to_announce)
def set_single_hash_announce_duration(self, seconds):
"""
Set the duration it takes to announce a single hash
in seconds, cannot be less than the default single
hash announce duration
"""
seconds = max(seconds, self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION)
self.single_hash_announce_duration = seconds
def get_next_announce_time(self, num_hashes_to_announce=1):
"""
Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME,
unless we are announcing a lot of hashes at once which could cause the
the announce queue to pile up. To prevent pile up, reannounce
only after a conservative estimate of when it will finish
to announce all the hashes.
Args:
num_hashes_to_announce: number of hashes that will be added to the queue
Returns:
timestamp for next announce time
"""
queue_size = self.hash_queue_size() + num_hashes_to_announce
reannounce = max(self.MIN_HASH_REANNOUNCE_TIME,
queue_size * self.single_hash_announce_duration)
return self.dht_node.clock.seconds() + reannounce
log.debug("Nothing to announce")

View file

@ -54,7 +54,7 @@ class Node(object):
application is performed via this class (or a subclass).
"""
def __init__(self, hash_announcer=None, node_id=None, udpPort=4000, dataStore=None,
def __init__(self, node_id=None, udpPort=4000, dataStore=None,
routingTableClass=None, networkProtocol=None,
externalIP=None, peerPort=None, listenUDP=None,
callLater=None, resolve=None, clock=None, peer_finder=None,
@ -108,6 +108,7 @@ class Node(object):
self.change_token_lc.clock = self.clock
self.refresh_node_lc = task.LoopingCall(self._refreshNode)
self.refresh_node_lc.clock = self.clock
# Create k-buckets (for storing contacts)
if routingTableClass is None:
self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id, self.clock.seconds)
@ -138,25 +139,16 @@ class Node(object):
self.peerPort = peerPort
self.hash_watcher = HashWatcher(self.clock)
# will be used later
self._can_store = True
self.peer_manager = peer_manager or PeerManager()
self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager)
self.hash_announcer = hash_announcer or DHTHashAnnouncer(self)
def __del__(self):
log.warning("unclean shutdown of the dht node")
if self._listeningPort is not None:
self._listeningPort.stopListening()
@property
def can_store(self):
return self._can_store is True
@defer.inlineCallbacks
def stop(self):
yield self.hash_announcer.stop()
# stop LoopingCalls:
if self.refresh_node_lc.running:
yield self.refresh_node_lc.stop()
@ -234,7 +226,6 @@ class Node(object):
self.hash_watcher.start()
self.change_token_lc.start(constants.tokenSecretChangeInterval)
self.refresh_node_lc.start(constants.checkRefreshInterval)
self.hash_announcer.run_manage_loop()
@property
def contacts(self):