From 0639bb98652b38a29cb4f22ff04e1e166d54fd65 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 2 Aug 2017 12:11:41 -0400 Subject: [PATCH] add support for should_announce feature --- lbrynet/conf.py | 1 + lbrynet/core/BlobManager.py | 44 +++++++++++++++++------ lbrynet/core/StreamDescriptor.py | 2 +- lbrynet/core/client/BlobRequester.py | 10 ++++-- lbrynet/cryptstream/CryptStreamCreator.py | 4 ++- tests/unit/core/test_BlobManager.py | 2 ++ 6 files changed, 47 insertions(+), 16 deletions(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index c8c58ae8d..caa9ff134 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -251,6 +251,7 @@ ADJUSTABLE_SETTINGS = { 'download_directory': (str, default_download_dir), 'download_timeout': (int, 180), 'is_generous_host': (bool, True), + 'announce_head_blobs_only': (bool, False), 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'lbryum_wallet_dir': (str, default_lbryum_dir), 'max_connections_per_stream': (int, 5), diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index 54aa2cd13..3e55d78c0 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -5,6 +5,7 @@ import sqlite3 from twisted.internet import threads, defer, reactor from twisted.enterprise import adbapi +from lbrynet import conf from lbrynet.core.HashBlob import BlobFile, BlobFileCreator from lbrynet.core.server.DHTHashAnnouncer import DHTHashSupplier from lbrynet.core.sqlite_helpers import rerun_if_locked @@ -13,9 +14,18 @@ log = logging.getLogger(__name__) class DiskBlobManager(DHTHashSupplier): - """This class stores blobs on the hard disk""" def __init__(self, hash_announcer, blob_dir, db_dir): + + """ + This class stores blobs on the hard disk, + blob_dir - directory where blobs are stored + db_dir - directory where sqlite database of blob information is stored + """ + DHTHashSupplier.__init__(self, hash_announcer) + + self.announce_head_blobs_only = conf.settings['announce_head_blobs_only'] + self.blob_dir = blob_dir self.db_file = os.path.join(db_dir, "blobs.db") self.db_conn = adbapi.ConnectionPool('sqlite3', self.db_file, check_same_thread=False) @@ -61,11 +71,15 @@ class DiskBlobManager(DHTHashSupplier): raise Exception("Hash announcer not set") @defer.inlineCallbacks - def blob_completed(self, blob, next_announce_time=None): + def blob_completed(self, blob, next_announce_time=None, should_announce=False): if next_announce_time is None: next_announce_time = self.get_next_announce_time() - yield self._add_completed_blob(blob.blob_hash, blob.length, next_announce_time) - reactor.callLater(0, self._immediate_announce, [blob.blob_hash]) + yield self._add_completed_blob(blob.blob_hash, blob.length, + next_announce_time, should_announce) + # we announce all blobs immediately, if announce_head_blob_only is False + # otherwise, announce only if marked as should_announce + if not self.announce_head_blobs_only or should_announce: + reactor.callLater(0, self._immediate_announce, [blob.blob_hash]) def completed_blobs(self, blobhashes_to_check): return self._completed_blobs(blobhashes_to_check) @@ -73,16 +87,15 @@ class DiskBlobManager(DHTHashSupplier): def hashes_to_announce(self): return self._get_blobs_to_announce() - def creator_finished(self, blob_creator): + def creator_finished(self, blob_creator, should_announce): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) assert blob_creator.blob_hash is not None assert blob_creator.blob_hash not in self.blobs assert blob_creator.length is not None new_blob = self.blob_type(self.blob_dir, blob_creator.blob_hash, blob_creator.length) self.blobs[blob_creator.blob_hash] = new_blob - self._immediate_announce([blob_creator.blob_hash]) next_announce_time = self.get_next_announce_time() - d = self.blob_completed(new_blob, next_announce_time) + d = self.blob_completed(new_blob, next_announce_time, should_announce) return d def immediate_announce_all_blobs(self): @@ -150,11 +163,13 @@ class DiskBlobManager(DHTHashSupplier): return self.db_conn.runInteraction(create_tables) @rerun_if_locked - def _add_completed_blob(self, blob_hash, length, next_announce_time): + def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce): log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) + should_announce = 1 if should_announce else 0 d = self.db_conn.runQuery( - "insert into blobs (blob_hash, blob_length, next_announce_time) values (?, ?, ?)", - (blob_hash, length, next_announce_time) + "insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+ + "values (?, ?, ?, ?)", + (blob_hash, length, next_announce_time, should_announce) ) d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) return d @@ -176,9 +191,16 @@ class DiskBlobManager(DHTHashSupplier): def get_and_update(transaction): timestamp = time.time() - r = transaction.execute("select blob_hash from blobs " + + if self.announce_head_blobs_only is True: + r = transaction.execute("select blob_hash from blobs " + + "where next_announce_time < ? and blob_hash is not null "+ + "and should_announce = 1", + (timestamp,)) + else: + r = transaction.execute("select blob_hash from blobs " + "where next_announce_time < ? and blob_hash is not null", (timestamp,)) + blobs = [b for b, in r.fetchall()] next_announce_time = self.get_next_announce_time(len(blobs)) transaction.execute( diff --git a/lbrynet/core/StreamDescriptor.py b/lbrynet/core/StreamDescriptor.py index 4f1d95d26..b3fb714cb 100644 --- a/lbrynet/core/StreamDescriptor.py +++ b/lbrynet/core/StreamDescriptor.py @@ -108,7 +108,7 @@ class BlobStreamDescriptorWriter(StreamDescriptorWriter): blob_creator.write(raw_data) log.debug("Wrote the data to the new blob") sd_hash = yield blob_creator.close() - yield self.blob_manager.creator_finished(blob_creator) + yield self.blob_manager.creator_finished(blob_creator, should_announce=True) defer.returnValue(sd_hash) diff --git a/lbrynet/core/client/BlobRequester.py b/lbrynet/core/client/BlobRequester.py index ee9fa5e1b..d7492d041 100644 --- a/lbrynet/core/client/BlobRequester.py +++ b/lbrynet/core/client/BlobRequester.py @@ -79,7 +79,9 @@ class BlobRequester(object): def _send_next_request(self, peer, protocol): log.debug('Sending a blob request for %s and %s', peer, protocol) availability = AvailabilityRequest(self, peer, protocol, self.payment_rate_manager) - download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, self.wallet) + head_blob_hash = self._download_manager.get_head_blob_hash() + download = DownloadRequest(self, peer, protocol, self.payment_rate_manager, + self.wallet, head_blob_hash) price = PriceRequest(self, peer, protocol, self.payment_rate_manager) sent_request = False @@ -406,9 +408,10 @@ class PriceRequest(RequestHelper): class DownloadRequest(RequestHelper): """Choose a blob and download it from a peer and also pay the peer for the data.""" - def __init__(self, requester, peer, protocol, payment_rate_manager, wallet): + def __init__(self, requester, peer, protocol, payment_rate_manager, wallet, head_blob_hash): RequestHelper.__init__(self, requester, peer, protocol, payment_rate_manager) self.wallet = wallet + self.head_blob_hash = head_blob_hash def can_make_request(self): if self.protocol in self.protocol_prices: @@ -546,7 +549,8 @@ class DownloadRequest(RequestHelper): self.update_local_score(5.0) self.peer.update_stats('blobs_downloaded', 1) self.peer.update_score(5.0) - self.requestor.blob_manager.blob_completed(blob) + should_announce = blob.blob_hash == self.head_blob_hash + self.requestor.blob_manager.blob_completed(blob, should_announce=should_announce) return arg def _download_failed(self, reason): diff --git a/lbrynet/cryptstream/CryptStreamCreator.py b/lbrynet/cryptstream/CryptStreamCreator.py index c256c0c75..e5b3c8bf3 100644 --- a/lbrynet/cryptstream/CryptStreamCreator.py +++ b/lbrynet/cryptstream/CryptStreamCreator.py @@ -91,9 +91,11 @@ class CryptStreamCreator(StreamCreator): done, num_bytes_written = self.current_blob.write(data) data = data[num_bytes_written:] if done is True: + should_announce = self.blob_count == 0 d = self.current_blob.close() d.addCallback(self._blob_finished) - d.addCallback(lambda _: self.blob_manager.creator_finished(self.next_blob_creator)) + d.addCallback(lambda _: self.blob_manager.creator_finished( + self.next_blob_creator, should_announce)) self.finished_deferreds.append(d) self.current_blob = None diff --git a/tests/unit/core/test_BlobManager.py b/tests/unit/core/test_BlobManager.py index 2ddc812e2..1b7271dc2 100644 --- a/tests/unit/core/test_BlobManager.py +++ b/tests/unit/core/test_BlobManager.py @@ -9,6 +9,7 @@ from tests.util import random_lbry_hash from lbrynet.core.BlobManager import DiskBlobManager from lbrynet.core.HashAnnouncer import DummyHashAnnouncer from lbrynet.core.Peer import Peer +from lbrynet import conf from lbrynet.core.cryptoutils import get_lbry_hash_obj from twisted.trial import unittest @@ -16,6 +17,7 @@ from twisted.internet import defer class BlobManagerTest(unittest.TestCase): def setUp(self): + conf.initialize_settings() self.blob_dir = tempfile.mkdtemp() self.db_dir = tempfile.mkdtemp() hash_announcer = DummyHashAnnouncer()