From 1a8e11ead3e97868921890270b66a85346f4fea9 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 1 Aug 2017 14:53:25 -0400 Subject: [PATCH 1/9] work on reflector head blob announce only --- lbrynet/reflector/server/server.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 7ca4b3cde..f03284b48 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -5,6 +5,8 @@ from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory from lbrynet.core.utils import is_valid_blobhash from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError +from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader +from lbrynet.lbry_file.StreamDescriptor import save_sd_info from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError @@ -30,6 +32,7 @@ class ReflectorServer(Protocol): log.debug('Connection made to %s', peer_info) self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) self.blob_manager = self.factory.blob_manager + self.stream_info_manager = self.factory.stream_info_manager self.protocol_version = self.factory.protocol_version self.received_handshake = False self.peer_version = None @@ -63,7 +66,23 @@ class ReflectorServer(Protocol): @defer.inlineCallbacks def _on_completed_blob(self, blob, response_key): - yield self.blob_manager.blob_completed(blob) + should_announce = False + if response_key == RECEIVED_SD_BLOB: + sd_info = yield BlobStreamDescriptorReader(blob).get_info() + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream(sd_info['stream_hash'], + blob.blob_hash) + should_announce = True + + else: + stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash) + if stream_hash is not None: + blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash, + blob.blob_hash) + if blob_num == 0: + should_announce = True + + yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.close_blob() log.info("Received %s", blob) yield self.send_response({response_key: True}) @@ -318,9 +337,10 @@ class ReflectorServer(Protocol): class ReflectorServerFactory(ServerFactory): protocol = ReflectorServer - def __init__(self, peer_manager, blob_manager): + def __init__(self, peer_manager, blob_manager, stream_info_manager): self.peer_manager = peer_manager self.blob_manager = blob_manager + self.stream_info_manager = stream_info_manager self.protocol_version = REFLECTOR_V2 def buildProtocol(self, addr): From 32feb628c3abbf3c5c53e8e70c23404ed5c0b929 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 22 Aug 2017 11:13:00 -0700 Subject: [PATCH 2/9] add stream_info_manager to reflector server initialization --- lbrynet/daemon/Daemon.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 5a45b78dd..f77f183c0 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -316,7 +316,8 @@ class Daemon(AuthJSONRPCServer): if self.reflector_port is not None: reflector_factory = reflector_server_factory( self.session.peer_manager, - self.session.blob_manager + self.session.blob_manager, + self.stream_info_manager ) try: self.reflector_server_port = reactor.listenTCP(self.reflector_port, From 4d5ba94a6ef632b6126c565584fdf4873fca9caa Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 22 Aug 2017 09:46:12 -0700 Subject: [PATCH 3/9] add tests for checking stream info on reflector server --- lbrynet/core/BlobManager.py | 8 ++++++++ tests/functional/test_reflector.py | 33 +++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index e3b41d3cf..f73b6909f 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -230,6 +230,14 @@ class DiskBlobManager(DHTHashSupplier): d = self.db_conn.runQuery("select blob_hash from blobs") return d + @rerun_if_locked + @defer.inlineCallbacks + def _get_all_should_announce_blob_hashes(self): + # return a list of blob hashes where should_announce is True + blob_hashes = yield self.db_conn.runQuery( + "select blob_hash from blobs where should_announce = 1") + defer.returnValue([d[0] for d in blob_hashes]) + @rerun_if_locked def _get_all_verified_blob_hashes(self): d = self._get_all_blob_hashes() diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 10e598b9f..5cdd038c7 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -81,17 +81,21 @@ class TestReflector(unittest.TestCase): self.server_db_dir, self.server_blob_dir = mk_db_and_blob_dir() self.server_blob_manager = BlobManager.DiskBlobManager( hash_announcer, self.server_blob_dir, self.server_db_dir) + self.server_stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(self.server_db_dir) + d = self.session.setup() d.addCallback(lambda _: self.stream_info_manager.setup()) d.addCallback(lambda _: EncryptedFileOptions.add_lbry_file_to_sd_identifier(sd_identifier)) d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: self.server_blob_manager.setup()) + d.addCallback(lambda _: self.server_stream_info_manager.setup()) def verify_equal(sd_info): self.assertEqual(mocks.create_stream_sd_file, sd_info) def save_sd_blob_hash(sd_hash): + self.sd_hash = sd_hash self.expected_blobs.append((sd_hash, 923)) def verify_stream_descriptor_file(stream_hash): @@ -120,7 +124,7 @@ class TestReflector(unittest.TestCase): return d def start_server(): - server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager) + server_factory = reflector.ServerFactory(peer_manager, self.server_blob_manager, self.server_stream_info_manager) from twisted.internet import reactor port = 8943 while self.reflector_port is None: @@ -160,12 +164,34 @@ class TestReflector(unittest.TestCase): return d def test_stream_reflector(self): - def verify_data_on_reflector(): + def verify_blob_on_reflector(): check_blob_ds = [] for blob_hash, blob_size in self.expected_blobs: check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) return defer.DeferredList(check_blob_ds) + @defer.inlineCallbacks + def verify_stream_on_reflector(): + # check stream_info_manager has all the right information + streams = yield self.server_stream_info_manager.get_all_streams() + self.assertEqual(1, len(streams)) + self.assertEqual(self.stream_hash, streams[0]) + + blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b[0] for b in blobs if b[0] is not None] + expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] + self.assertEqual(expected_blob_hashes, blob_hashes) + sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + self.assertEqual(1, len(sd_hashes)) + expected_sd_hash = self.expected_blobs[-1][0] + self.assertEqual(self.sd_hash, sd_hashes[0]) + + # check should_announce blobs on blob_manager + blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() + self.assertEqual(2, len(blob_hashes)) + self.assertTrue(self.sd_hash in blob_hashes) + self.assertTrue(expected_blob_hashes[0] in blob_hashes) + def verify_have_blob(blob_hash, blob_size): d = self.server_blob_manager.get_blob(blob_hash) d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) @@ -187,7 +213,8 @@ class TestReflector(unittest.TestCase): return d = send_to_server() - d.addCallback(lambda _: verify_data_on_reflector()) + d.addCallback(lambda _: verify_blob_on_reflector()) + d.addCallback(lambda _: verify_stream_on_reflector()) return d def test_blob_reflector(self): From 9bf159433d67524fd85065ad622fc9eec9324c8f Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Tue, 5 Sep 2017 12:13:53 -0400 Subject: [PATCH 4/9] add changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e645f9b0..cbf7e29f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,10 @@ at anytime. * * +### Added + * Added ability for reflector to store stream information for head blob announce + * + ### Fixed * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer From 474ce21a51ff8bb6557352955e88322fbb7f3382 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 20 Sep 2017 15:50:54 -0400 Subject: [PATCH 5/9] add more tests in test_reflector, make sure stream info is not affected for blob client v1 --- tests/functional/test_reflector.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 5cdd038c7..a86ed2520 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -248,6 +248,15 @@ class TestReflector(unittest.TestCase): return d def test_blob_reflector_v1(self): + @defer.inlineCallbacks + def verify_stream_on_reflector(): + # this protocol should not have any impact on stream info manager + streams = yield self.server_stream_info_manager.get_all_streams() + self.assertEqual(0, len(streams)) + # there should be no should announce blobs here + blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() + self.assertEqual(0, len(blob_hashes)) + def verify_data_on_reflector(): check_blob_ds = [] for blob_hash, blob_size in self.expected_blobs: From 4b8700268ffa7dc79fd390240252f5590398a6f1 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 20 Sep 2017 22:04:23 -0400 Subject: [PATCH 6/9] discover stream info for uploads via reflector v1 --- lbrynet/core/BlobManager.py | 38 +++++++++++++--- lbrynet/reflector/server/server.py | 71 +++++++++++++++++++++++++++--- 2 files changed, 98 insertions(+), 11 deletions(-) diff --git a/lbrynet/core/BlobManager.py b/lbrynet/core/BlobManager.py index f73b6909f..f6c329dc1 100644 --- a/lbrynet/core/BlobManager.py +++ b/lbrynet/core/BlobManager.py @@ -88,6 +88,18 @@ class DiskBlobManager(DHTHashSupplier): def hashes_to_announce(self): return self._get_blobs_to_announce() + def set_should_announce(self, blob_hash, should_announce): + if blob_hash in self.blobs: + blob = self.blobs[blob_hash] + if blob.get_is_verified(): + return self._set_should_announce(blob_hash, + self.get_next_announce_time(), + should_announce) + return defer.succeed(False) + + def get_should_announce(self, blob_hash): + return self._should_announce(blob_hash) + def creator_finished(self, blob_creator, should_announce): log.debug("blob_creator.blob_hash: %s", blob_creator.blob_hash) if blob_creator.blob_hash is None: @@ -170,14 +182,29 @@ class DiskBlobManager(DHTHashSupplier): def _add_completed_blob(self, blob_hash, length, next_announce_time, should_announce): log.debug("Adding a completed blob. blob_hash=%s, length=%s", blob_hash, str(length)) should_announce = 1 if should_announce else 0 - d = self.db_conn.runQuery( - "insert into blobs (blob_hash, blob_length, next_announce_time, should_announce) "+ - "values (?, ?, ?, ?)", - (blob_hash, length, next_announce_time, should_announce) - ) + d = self.db_conn.runQuery("insert into blobs (blob_hash, blob_length, next_announce_time, " + "should_announce) values (?, ?, ?, ?)", (blob_hash, length, + next_announce_time, + should_announce)) + # TODO: why is this here? d.addErrback(lambda err: err.trap(sqlite3.IntegrityError)) return d + @rerun_if_locked + @defer.inlineCallbacks + def _set_should_announce(self, blob_hash, next_announce_time, should_announce): + yield self.db_conn.runOperation("update blobs set next_announce_time=?, should_announce=? " + "where blob_hash=?", (next_announce_time, should_announce, + blob_hash)) + defer.returnValue(True) + + @rerun_if_locked + @defer.inlineCallbacks + def _should_announce(self, blob_hash): + result = yield self.db_conn.runQuery("select should_announce from blobs where blob_hash=?", + (blob_hash,)) + defer.returnValue(result[0][0]) + @defer.inlineCallbacks def _completed_blobs(self, blobhashes_to_check): """Returns of the blobhashes_to_check, which are valid""" @@ -192,7 +219,6 @@ class DiskBlobManager(DHTHashSupplier): @rerun_if_locked def _get_blobs_to_announce(self): - def get_and_update(transaction): timestamp = time.time() if self.announce_head_blobs_only is True: diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index f03284b48..a22b93add 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -4,7 +4,7 @@ from twisted.python import failure from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory from lbrynet.core.utils import is_valid_blobhash -from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError +from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError, NoSuchSDHash from lbrynet.core.StreamDescriptor import BlobStreamDescriptorReader from lbrynet.lbry_file.StreamDescriptor import save_sd_info from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 @@ -64,6 +64,41 @@ class ReflectorServer(Protocol): else: log.exception(err) + @defer.inlineCallbacks + def check_head_blob_announce(self, stream_hash): + blob_infos = yield self.stream_info_manager.get_blobs_for_stream(stream_hash) + blob_hash, blob_num, blob_iv, blob_length = blob_infos[0] + if blob_hash in self.blob_manager.blobs: + head_blob = self.blob_manager.blobs[blob_hash] + if head_blob.get_is_verified(): + should_announce = yield self.blob_manager.get_should_announce(blob_hash) + if should_announce == 0: + yield self.blob_manager.set_should_announce(blob_hash, 1) + log.info("Discovered previously completed head blob (%s), " + "setting it to be announced", blob_hash[:8]) + defer.returnValue(None) + + @defer.inlineCallbacks + def check_sd_blob_announce(self, sd_hash): + if sd_hash in self.blob_manager.blobs: + sd_blob = self.blob_manager.blobs[sd_hash] + if sd_blob.get_is_verified(): + should_announce = yield self.blob_manager.get_should_announce(sd_hash) + if should_announce == 0: + yield self.blob_manager.set_should_announce(sd_hash, 1) + log.info("Discovered previously completed sd blob (%s), " + "setting it to be announced", sd_hash[:8]) + try: + yield self.stream_info_manager.get_stream_hash_for_sd_hash(sd_hash) + except NoSuchSDHash: + log.info("Adding blobs to stream") + sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream( + sd_info['stream_hash'], + sd_hash) + defer.returnValue(None) + @defer.inlineCallbacks def _on_completed_blob(self, blob, response_key): should_announce = False @@ -74,16 +109,29 @@ class ReflectorServer(Protocol): blob.blob_hash) should_announce = True + # if we already have the head blob, set it to be announced now that we know it's + # a head blob + d = self.check_head_blob_announce(sd_info['stream_hash']) + else: + d = defer.succeed(None) stream_hash = yield self.stream_info_manager.get_stream_of_blob(blob.blob_hash) if stream_hash is not None: blob_num = yield self.stream_info_manager._get_blob_num_by_hash(stream_hash, blob.blob_hash) if blob_num == 0: should_announce = True + sd_hashes = yield self.stream_info_manager.get_sd_blob_hashes_for_stream( + stream_hash) + + # if we already have the sd blob, set it to be announced now that we know it's + # a sd blob + for sd_hash in sd_hashes: + d.addCallback(lambda _: self.check_sd_blob_announce(sd_hash)) yield self.blob_manager.blob_completed(blob, should_announce=should_announce) yield self.close_blob() + yield d log.info("Received %s", blob) yield self.send_response({response_key: True}) @@ -249,16 +297,29 @@ class ReflectorServer(Protocol): d = self.blob_finished_d return d + @defer.inlineCallbacks def get_descriptor_response(self, sd_blob): if sd_blob.get_is_verified(): - d = defer.succeed({SEND_SD_BLOB: False}) - d.addCallback(self.request_needed_blobs, sd_blob) + # if we already have the sd blob being offered, make sure we have it and the head blob + # marked as such for announcement now that we know it's an sd blob that we have. + yield self.check_sd_blob_announce(sd_blob.blob_hash) + try: + stream_hash = yield self.stream_info_manager.get_stream_hash_for_sd_hash( + sd_blob.blob_hash) + except NoSuchSDHash: + sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() + stream_hash = sd_info['stream_hash'] + yield save_sd_info(self.stream_info_manager, sd_info) + yield self.stream_info_manager.save_sd_blob_hash_to_stream(stream_hash, + sd_blob.blob_hash) + yield self.check_head_blob_announce(stream_hash) + response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_blob) else: self.incoming_blob = sd_blob self.receiving_blob = True self.handle_incoming_blob(RECEIVED_SD_BLOB) - d = defer.succeed({SEND_SD_BLOB: True}) - return d + response = {SEND_SD_BLOB: True} + defer.returnValue(response) def request_needed_blobs(self, response, sd_blob): def _add_needed_blobs_to_response(needed_blobs): From 4941d1d31dd35fd08f6e545cbe7d2ae036724ce0 Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 20 Sep 2017 22:57:47 -0400 Subject: [PATCH 7/9] test when you have a v1 blob reflect before a stream reflect over the same blobs --- tests/functional/test_reflector.py | 72 ++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index a86ed2520..f0f8a76eb 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -287,6 +287,78 @@ class TestReflector(unittest.TestCase): d.addCallback(lambda _: verify_data_on_reflector()) return d + # test case when we reflect blob, and than that same blob + # is reflected as stream + def test_blob_reflect_and_stream(self): + + def verify_blob_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + @defer.inlineCallbacks + def verify_stream_on_reflector(): + # check stream_info_manager has all the right information + + streams = yield self.server_stream_info_manager.get_all_streams() + self.assertEqual(1, len(streams)) + self.assertEqual(self.stream_hash, streams[0]) + + blobs = yield self.server_stream_info_manager.get_blobs_for_stream(self.stream_hash) + blob_hashes = [b[0] for b in blobs if b[0] is not None] + expected_blob_hashes = [b[0] for b in self.expected_blobs[:-1] if b[0] is not None] + self.assertEqual(expected_blob_hashes, blob_hashes) + sd_hashes = yield self.server_stream_info_manager.get_sd_blob_hashes_for_stream(self.stream_hash) + self.assertEqual(1, len(sd_hashes)) + expected_sd_hash = self.expected_blobs[-1][0] + self.assertEqual(self.sd_hash, sd_hashes[0]) + + # check should_announce blobs on blob_manager + blob_hashes = yield self.server_blob_manager._get_all_should_announce_blob_hashes() + self.assertEqual(2, len(blob_hashes)) + self.assertTrue(self.sd_hash in blob_hashes) + self.assertTrue(expected_blob_hashes[0] in blob_hashes) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def send_to_server_as_blobs(blob_hashes_to_send): + factory = reflector.BlobClientFactory( + self.session.blob_manager, + blob_hashes_to_send + ) + factory.protocol_version = 0 + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def send_to_server_as_stream(result): + fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, + self.stream_info_manager, + self.stream_hash) + factory = reflector.ClientFactory(fake_lbry_file) + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.get_is_verified()) + self.assertEqual(blob_size, blob.length) + + # Modify this to change which blobs to send + blobs_to_send = self.expected_blobs + + d = send_to_server_as_blobs([x[0] for x in self.expected_blobs]) + d.addCallback(send_to_server_as_stream) + d.addCallback(lambda _: verify_blob_on_reflector()) + d.addCallback(lambda _: verify_stream_on_reflector()) + return d def iv_generator(): iv = 0 From cc98cdf933ac4d639992d3e250f875504c7357dd Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 20 Sep 2017 11:29:09 -0400 Subject: [PATCH 8/9] announce_head_blobs_only is True by default --- lbrynet/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 8cacc41af..ddd9036cb 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -251,7 +251,7 @@ ADJUSTABLE_SETTINGS = { 'download_directory': (str, default_download_dir), 'download_timeout': (int, 180), 'is_generous_host': (bool, True), - 'announce_head_blobs_only': (bool, False), + 'announce_head_blobs_only': (bool, True), 'known_dht_nodes': (list, DEFAULT_DHT_NODES, server_port), 'lbryum_wallet_dir': (str, default_lbryum_dir), 'max_connections_per_stream': (int, 5), From f835ef1be4ea61519893267a232f4c399f3f50fb Mon Sep 17 00:00:00 2001 From: Kay Kurokawa Date: Wed, 20 Sep 2017 11:41:16 -0400 Subject: [PATCH 9/9] add changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbf7e29f7..6cfd5ea21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ at anytime. * ### Changed - * + * Announcing by head blob is turned on by default * ### Added