diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 8db8b4d05..e2ccbfb04 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -10,16 +10,14 @@ log = logging.getLogger(__name__) class DiskBlobManager(object): - def __init__(self, hash_announcer, blob_dir, storage): + def __init__(self, blob_dir, storage): + """ + This class stores blobs on the hard disk - """ - This class stores blobs on the hard disk, blob_dir - directory where blobs are stored - db_dir - directory where sqlite database of blob information is stored + storage - SQLiteStorage object """ - self.hash_announcer = hash_announcer self.storage = storage - self.announce_head_blobs_only = conf.settings['announce_head_blobs_only'] self.blob_dir = blob_dir self.blob_creator_type = BlobFileCreator # TODO: consider using an LRU for blobs as there could potentially @@ -28,7 +26,7 @@ class DiskBlobManager(object): self.blob_hashes_to_delete = {} # {blob_hash: being_deleted (True/False)} self.check_should_announce_lc = None - if conf.settings['run_reflector_server']: + if conf.settings['run_reflector_server']: # TODO: move this looping call to SQLiteStorage self.check_should_announce_lc = task.LoopingCall(self.storage.verify_will_announce_all_head_and_sd_blobs) def setup(self): @@ -60,40 +58,20 @@ class DiskBlobManager(object): self.blobs[blob_hash] = blob return defer.succeed(blob) - def immediate_announce(self, blob_hashes): - if self.hash_announcer: - return self.hash_announcer.immediate_announce(blob_hashes) - raise Exception("Hash announcer not set") - @defer.inlineCallbacks def blob_completed(self, blob, next_announce_time=None, should_announce=True): - if next_announce_time is None: - next_announce_time = self.hash_announcer.get_next_announce_time() yield self.storage.add_completed_blob( blob.blob_hash, blob.length, next_announce_time, should_announce ) - # we announce all blobs immediately, if announce_head_blob_only is False - # otherwise, announce only if marked as should_announce - if not self.announce_head_blobs_only or should_announce: - self.immediate_announce([blob.blob_hash]) def completed_blobs(self, blobhashes_to_check): return self._completed_blobs(blobhashes_to_check) - def hashes_to_announce(self): - return self.storage.get_blobs_to_announce(self.hash_announcer) - def count_should_announce_blobs(self): return self.storage.count_should_announce_blobs() def set_should_announce(self, blob_hash, should_announce): - if blob_hash in self.blobs: - blob = self.blobs[blob_hash] - if blob.get_is_verified(): - return self.storage.set_should_announce( - blob_hash, self.hash_announcer.get_next_announce_time(), should_announce - ) - return defer.succeed(False) + return self.storage.set_should_announce(blob_hash, should_announce) def get_should_announce(self, blob_hash): return self.storage.should_announce(blob_hash) @@ -108,13 +86,7 @@ class DiskBlobManager(object): raise Exception("Blob has a length of 0") new_blob = BlobFile(self.blob_dir, blob_creator.blob_hash, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob - next_announce_time = self.hash_announcer.get_next_announce_time() - return self.blob_completed(new_blob, next_announce_time, should_announce) - - def immediate_announce_all_blobs(self): - d = self._get_all_verified_blob_hashes() - d.addCallback(self.immediate_announce) - return d + return self.blob_completed(new_blob, should_announce) def get_all_verified_blobs(self): d = self._get_all_verified_blob_hashes() diff --git a/lbrynet/core/Session.py b/lbrynet/core/Session.py index 8ac992ec4..f65a331fe 100644 --- a/lbrynet/core/Session.py +++ b/lbrynet/core/Session.py @@ -2,7 +2,7 @@ import logging import miniupnpc from twisted.internet import threads, defer from lbrynet.core.BlobManager import DiskBlobManager -from lbrynet.dht import node +from lbrynet.dht import node, hashannouncer from lbrynet.database.storage import SQLiteStorage from lbrynet.core.RateLimiter import RateLimiter from lbrynet.core.utils import generate_id @@ -136,6 +136,7 @@ class Session(object): d = self._try_upnp() else: d = defer.succeed(True) + d.addCallback(lambda _: self.storage.setup()) d.addCallback(lambda _: self._setup_dht()) d.addCallback(lambda _: self._setup_other_components()) return d @@ -144,6 +145,8 @@ class Session(object): """Stop all services""" log.info('Stopping session.') ds = [] + if self.hash_announcer: + self.hash_announcer.stop() if self.blob_tracker is not None: ds.append(defer.maybeDeferred(self.blob_tracker.stop)) if self.dht_node is not None: @@ -220,19 +223,20 @@ class Session(object): def _setup_dht(self): # does not block startup, the dht will re-attempt if necessary self.dht_node = self.dht_node_class( - self.hash_announcer, - udpPort=self.dht_node_port, node_id=self.node_id, + udpPort=self.dht_node_port, externalIP=self.external_ip, peerPort=self.peer_port, peer_manager=self.peer_manager, peer_finder=self.peer_finder, ) + if not self.hash_announcer: + self.hash_announcer = hashannouncer.DHTHashAnnouncer(self.dht_node, self.storage) self.peer_manager = self.dht_node.peer_manager self.peer_finder = self.dht_node.peer_finder - self.hash_announcer = self.dht_node.hash_announcer self._join_dht_deferred = self.dht_node.joinNetwork(self.known_dht_nodes) self._join_dht_deferred.addCallback(lambda _: log.info("Joined the dht")) + self._join_dht_deferred.addCallback(lambda _: self.hash_announcer.start()) def _setup_other_components(self): log.debug("Setting up the rest of the components") @@ -245,9 +249,7 @@ class Session(object): raise Exception( "TempBlobManager is no longer supported, specify BlobManager or db_dir") else: - self.blob_manager = DiskBlobManager( - self.dht_node.hash_announcer, self.blob_dir, self.storage - ) + self.blob_manager = DiskBlobManager(self.blob_dir, self.storage) if self.blob_tracker is None: self.blob_tracker = self.blob_tracker_class( @@ -259,8 +261,7 @@ class Session(object): ) self.rate_limiter.start() - d = self.storage.setup() - d.addCallback(lambda _: self.blob_manager.setup()) + d = self.blob_manager.setup() d.addCallback(lambda _: self.wallet.start()) d.addCallback(lambda _: self.blob_tracker.start()) return d diff --git a/lbrynet/core/SinglePeerDownloader.py b/lbrynet/core/SinglePeerDownloader.py index 9073e980b..904927080 100644 --- a/lbrynet/core/SinglePeerDownloader.py +++ b/lbrynet/core/SinglePeerDownloader.py @@ -11,7 +11,6 @@ from lbrynet.core.PaymentRateManager import OnlyFreePaymentsManager from lbrynet.core.client.BlobRequester import BlobRequester from lbrynet.core.client.StandaloneBlobDownloader import StandaloneBlobDownloader from lbrynet.core.client.ConnectionManager import ConnectionManager -from lbrynet.dht.hashannouncer import DummyHashAnnouncer from lbrynet.dht.peerfinder import DummyPeerFinder @@ -61,7 +60,6 @@ class SingleBlobDownloadManager(object): class SinglePeerDownloader(object): def __init__(self): self._payment_rate_manager = OnlyFreePaymentsManager() - self._announcer = DummyHashAnnouncer() self._rate_limiter = DummyRateLimiter() self._wallet = None self._blob_manager = None @@ -98,7 +96,7 @@ class SinglePeerDownloader(object): @defer.inlineCallbacks def download_temp_blob_from_peer(self, peer, timeout, blob_hash): tmp_dir = yield threads.deferToThread(tempfile.mkdtemp) - tmp_blob_manager = DiskBlobManager(self._announcer, tmp_dir, tmp_dir) + tmp_blob_manager = DiskBlobManager(tmp_dir, tmp_dir) try: result = yield self.download_blob_from_peer(peer, timeout, blob_hash, tmp_blob_manager) finally: diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 88216475d..4e6a9c669 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -1,6 +1,5 @@ import logging import os -import time import sqlite3 import traceback from decimal import Decimal @@ -11,6 +10,7 @@ from lbryschema.claim import ClaimDict from lbryschema.decode import smart_decode from lbrynet import conf from lbrynet.cryptstream.CryptBlob import CryptBlobInfo +from lbrynet.dht.constants import dataExpireTimeout from lbryum.constants import COIN log = logging.getLogger(__name__) @@ -49,26 +49,6 @@ def open_file_for_writing(download_directory, suggested_file_name): return threads.deferToThread(_open_file_for_writing, download_directory, suggested_file_name) -def get_next_announce_time(hash_announcer, num_hashes_to_announce=1, min_reannounce_time=60*60, - single_announce_duration=5): - """ - Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, - unless we are announcing a lot of hashes at once which could cause the - the announce queue to pile up. To prevent pile up, reannounce - only after a conservative estimate of when it will finish - to announce all the hashes. - - Args: - num_hashes_to_announce: number of hashes that will be added to the queue - Returns: - timestamp for next announce time - """ - queue_size = hash_announcer.hash_queue_size() + num_hashes_to_announce - reannounce = max(min_reannounce_time, - queue_size * single_announce_duration) - return time.time() + reannounce - - def rerun_if_locked(f): max_attempts = 3 @@ -186,6 +166,7 @@ class SQLiteStorage(object): log.info("connecting to database: %s", self._db_path) self.db = SqliteConnection(self._db_path) self.db.set_reactor(reactor) + self.clock = reactor # used to refresh the claim attributes on a ManagedEncryptedFileDownloader when a # change to the associated content claim occurs. these are added by the file manager @@ -270,9 +251,15 @@ class SQLiteStorage(object): "select blob_hash from blob where should_announce=1 and status='finished'" ) - def get_blobs_to_announce(self, hash_announcer): + def update_last_announced_blob(self, blob_hash, last_announced): + return self.db.runOperation( + "update blob set next_announce_time=?, last_announced_time=? where blob_hash=?", + (int(last_announced + (dataExpireTimeout / 2)), int(last_announced), blob_hash) + ) + + def get_blobs_to_announce(self): def get_and_update(transaction): - timestamp = time.time() + timestamp = self.clock.seconds() if conf.settings['announce_head_blobs_only']: r = transaction.execute( "select blob_hash from blob " @@ -284,16 +271,8 @@ class SQLiteStorage(object): "select blob_hash from blob where blob_hash is not null " "and next_announce_time 0: - estimated_time_remaining = int(float(hashes) / blobs_per_second) - remaining = str(datetime.timedelta(seconds=estimated_time_remaining)) - else: - remaining = "unknown" - log.info("Announcing blobs: %i blobs left to announce, %i%s complete, " - "est time remaining: %s", hashes + self._concurrent_announcers, - 100 - int(100.0 * float(hashes + self._concurrent_announcers) / - float(self._total)), "%", remaining) - self._last_checked = t + last_time, hashes - else: - self._total = 0 - if self.peer_port is not None: - return self._announce_available_hashes() + def start(self): + self._manage_lc.start(30) def stop(self): - log.info("Stopping DHT hash announcer.") - if self._manage_call_lc.running: - return self._manage_call_lc.stop() + if self._manage_lc.running: + self._manage_lc.stop() - def immediate_announce(self, blob_hashes): - if self.peer_port is not None: - return self._announce_hashes(blob_hashes, immediate=True) + @defer.inlineCallbacks + def do_store(self, blob_hash): + storing_node_ids = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash)) + now = self.clock.seconds() + if storing_node_ids: + result = (now, storing_node_ids) + yield self.storage.update_last_announced_blob(blob_hash, now) + log.debug("Stored %s to %i peers", blob_hash[:16], len(storing_node_ids)) else: - return defer.succeed(False) + result = (None, []) + self.hash_queue.remove(blob_hash) + defer.returnValue(result) - def hash_queue_size(self): - return len(self.hash_queue) + def _show_announce_progress(self, size, start): + queue_size = len(self.hash_queue) + average_blobs_per_second = float(size - queue_size) / (self.clock.seconds() - start) + log.info("Announced %i/%i blobs, %f blobs per second", size - queue_size, size, average_blobs_per_second) @defer.inlineCallbacks - def _announce_available_hashes(self): - log.debug('Announcing available hashes') - hashes = yield self.hashes_to_announce() - yield self._announce_hashes(hashes) + def immediate_announce(self, blob_hashes): + blob_hashes = [b for b in blob_hashes if b not in self.hash_queue] + self.hash_queue.extend(blob_hashes) + + log.info("Announcing %i blobs", len(self.hash_queue)) + start = self.clock.seconds() + progress_lc = task.LoopingCall(self._show_announce_progress, len(self.hash_queue), start) + progress_lc.start(60, now=False) + s = defer.DeferredSemaphore(self.concurrent_announcers) + results = yield utils.DeferredDict({blob_hash: s.run(self.do_store, blob_hash) for blob_hash in blob_hashes}) + now = self.clock.seconds() + + progress_lc.stop() + + announced_to = [blob_hash for blob_hash in results if results[blob_hash][0]] + if len(announced_to) != len(results): + log.debug("Failed to announce %i blobs", len(results) - len(announced_to)) + if announced_to: + log.info('Took %s seconds to announce %i of %i attempted hashes (%f hashes per second)', + now - start, len(blob_hashes), len(announced_to), + int(float(len(blob_hashes)) / float(now - start))) + defer.returnValue(results) @defer.inlineCallbacks - def _announce_hashes(self, hashes, immediate=False): - if not hashes: - defer.returnValue(None) - if not self.dht_node.can_store: - log.warning("Client only DHT node cannot store, skipping announce") - defer.returnValue(None) - log.info('Announcing %s hashes', len(hashes)) - # TODO: add a timeit decorator - start = self.dht_node.clock.seconds() - - ds = [] - with self._lock: - for h in hashes: - announce_deferred = defer.Deferred() - if immediate: - self.hash_queue.appendleft((h, announce_deferred)) - else: - self.hash_queue.append((h, announce_deferred)) - if not self._total: - self._total = len(hashes) - - log.debug('There are now %s hashes remaining to be announced', self.hash_queue_size()) - - @defer.inlineCallbacks - def do_store(blob_hash, announce_d, retry_count=0): - if announce_d.called: - defer.returnValue(announce_deferred.result) - try: - store_nodes = yield self.dht_node.announceHaveBlob(binascii.unhexlify(blob_hash)) - if not store_nodes: - retry_count += 1 - if retry_count <= self.STORE_RETRIES: - log.debug("No nodes stored %s, retrying", blob_hash) - result = yield do_store(blob_hash, announce_d, retry_count) - else: - result = {} - log.warning("No nodes stored %s", blob_hash) - else: - result = store_nodes - if not announce_d.called: - announce_d.callback(result) - defer.returnValue(result) - except Exception as err: - if not announce_d.called: - announce_d.errback(err) - raise err - - @defer.inlineCallbacks - def announce(progress=None): - progress = progress or {} - if len(self.hash_queue): - with self._lock: - h, announce_deferred = self.hash_queue.popleft() - log.debug('Announcing blob %s to dht', h[:16]) - stored_to_nodes = yield do_store(h, announce_deferred) - progress[h] = stored_to_nodes - log.debug("Stored %s to %i peers (hashes announced by this announcer: %i)", - h.encode('hex')[:16], - len(stored_to_nodes), len(progress)) - - yield announce(progress) - else: - with self._lock: - self._concurrent_announcers -= 1 - defer.returnValue(progress) - - for i in range(self._concurrent_announcers, self.CONCURRENT_ANNOUNCERS): - self._concurrent_announcers += 1 - ds.append(announce()) - announcer_results = yield defer.DeferredList(ds) - stored_to = {} - for _, announced_to in announcer_results: - stored_to.update(announced_to) - - log.info('Took %s seconds to announce %s hashes', self.dht_node.clock.seconds() - start, len(hashes)) - seconds_per_blob = (self.dht_node.clock.seconds() - start) / len(hashes) - self.set_single_hash_announce_duration(seconds_per_blob) - defer.returnValue(stored_to) - - @defer.inlineCallbacks - def add_hashes_to_announce(self, blob_hashes): - yield self._lock._lock.acquire() - self._hashes_to_announce.extend(blob_hashes) - yield self._lock._lock.release() - - @defer.inlineCallbacks - def hashes_to_announce(self): - hashes_to_announce = [] - yield self._lock._lock.acquire() - while self._hashes_to_announce: - hashes_to_announce.append(self._hashes_to_announce.pop()) - yield self._lock._lock.release() - defer.returnValue(hashes_to_announce) - - def set_single_hash_announce_duration(self, seconds): - """ - Set the duration it takes to announce a single hash - in seconds, cannot be less than the default single - hash announce duration - """ - seconds = max(seconds, self.DEFAULT_SINGLE_HASH_ANNOUNCE_DURATION) - self.single_hash_announce_duration = seconds - - def get_next_announce_time(self, num_hashes_to_announce=1): - """ - Hash reannounce time is set to current time + MIN_HASH_REANNOUNCE_TIME, - unless we are announcing a lot of hashes at once which could cause the - the announce queue to pile up. To prevent pile up, reannounce - only after a conservative estimate of when it will finish - to announce all the hashes. - - Args: - num_hashes_to_announce: number of hashes that will be added to the queue - Returns: - timestamp for next announce time - """ - queue_size = self.hash_queue_size() + num_hashes_to_announce - reannounce = max(self.MIN_HASH_REANNOUNCE_TIME, - queue_size * self.single_hash_announce_duration) - return self.dht_node.clock.seconds() + reannounce + def manage(self): + need_reannouncement = yield self.storage.get_blobs_to_announce() + if need_reannouncement: + yield self.immediate_announce(need_reannouncement) + else: + log.debug("Nothing to announce") diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 0adc4f7b6..c77fce861 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -54,7 +54,7 @@ class Node(object): application is performed via this class (or a subclass). """ - def __init__(self, hash_announcer=None, node_id=None, udpPort=4000, dataStore=None, + def __init__(self, node_id=None, udpPort=4000, dataStore=None, routingTableClass=None, networkProtocol=None, externalIP=None, peerPort=None, listenUDP=None, callLater=None, resolve=None, clock=None, peer_finder=None, @@ -108,6 +108,7 @@ class Node(object): self.change_token_lc.clock = self.clock self.refresh_node_lc = task.LoopingCall(self._refreshNode) self.refresh_node_lc.clock = self.clock + # Create k-buckets (for storing contacts) if routingTableClass is None: self._routingTable = routingtable.OptimizedTreeRoutingTable(self.node_id, self.clock.seconds) @@ -138,25 +139,16 @@ class Node(object): self.peerPort = peerPort self.hash_watcher = HashWatcher(self.clock) - # will be used later - self._can_store = True - self.peer_manager = peer_manager or PeerManager() self.peer_finder = peer_finder or DHTPeerFinder(self, self.peer_manager) - self.hash_announcer = hash_announcer or DHTHashAnnouncer(self) def __del__(self): log.warning("unclean shutdown of the dht node") if self._listeningPort is not None: self._listeningPort.stopListening() - @property - def can_store(self): - return self._can_store is True - @defer.inlineCallbacks def stop(self): - yield self.hash_announcer.stop() # stop LoopingCalls: if self.refresh_node_lc.running: yield self.refresh_node_lc.stop() @@ -234,7 +226,6 @@ class Node(object): self.hash_watcher.start() self.change_token_lc.start(constants.tokenSecretChangeInterval) self.refresh_node_lc.start(constants.checkRefreshInterval) - self.hash_announcer.run_manage_loop() @property def contacts(self):