diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 52032d12d..9eb21e915 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -29,32 +29,8 @@ class TestReflector(unittest.TestCase): self.lbry_file_manager = None self.server_blob_manager = None self.reflector_port = None + self.port = None self.addCleanup(self.take_down_env) - - def take_down_env(self): - d = defer.succeed(True) - if self.lbry_file_manager is not None: - d.addCallback(lambda _: self.lbry_file_manager.stop()) - if self.session is not None: - d.addCallback(lambda _: self.session.shut_down()) - if self.stream_info_manager is not None: - d.addCallback(lambda _: self.stream_info_manager.stop()) - if self.server_blob_manager is not None: - d.addCallback(lambda _: self.server_blob_manager.stop()) - if self.reflector_port is not None: - d.addCallback(lambda _: self.reflector_port.stopListening()) - - def delete_test_env(): - try: - shutil.rmtree('client') - except: - raise unittest.SkipTest("TODO: fix this for windows") - - d.addCallback(lambda _: threads.deferToThread(delete_test_env)) - d.addErrback(lambda err: str(err)) - return d - - def test_reflector(self): wallet = mocks.Wallet() peer_manager = PeerManager.PeerManager() peer_finder = mocks.PeerFinder(5553, peer_manager, 2) @@ -98,7 +74,7 @@ class TestReflector(unittest.TestCase): dht_node_class=Node ) - self.stream_info_manager = EncryptedFileMetadataManager.TempEncryptedFileMetadataManager() + self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) @@ -118,6 +94,7 @@ class TestReflector(unittest.TestCase): self.expected_blobs.append((sd_hash, 923)) def verify_stream_descriptor_file(stream_hash): + self.stream_hash = stream_hash d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) d.addCallback(verify_equal) d.addCallback( @@ -127,7 +104,6 @@ class TestReflector(unittest.TestCase): ) ) d.addCallback(save_sd_blob_hash) - d.addCallback(lambda _: stream_hash) return d def create_stream(): @@ -149,45 +125,129 @@ class TestReflector(unittest.TestCase): while self.reflector_port is None: try: self.reflector_port = reactor.listenTCP(port, server_factory) + self.port = port except error.CannotListenError: port += 1 - return defer.succeed(port) - def send_to_server(port, stream_hash): - factory = reflector.ClientFactory( - self.session.blob_manager, - self.stream_info_manager, - stream_hash - ) + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + d.addCallback(lambda _: start_server()) + return d - from twisted.internet import reactor - reactor.connectTCP('localhost', port, factory) - return factory.finished_deferred + def take_down_env(self): + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + if self.server_blob_manager is not None: + d.addCallback(lambda _: self.server_blob_manager.stop()) + if self.reflector_port is not None: + d.addCallback(lambda _: self.reflector_port.stopListening()) - def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) - self.assertEqual(blob_size, blob.length) + def delete_test_env(): + try: + shutil.rmtree('client') + except: + raise unittest.SkipTest("TODO: fix this for windows") - def verify_have_blob(blob_hash, blob_size): - d = self.server_blob_manager.get_blob(blob_hash, True) - d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) - return d + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + d.addErrback(lambda err: str(err)) + return d + def test_stream_reflector(self): def verify_data_on_reflector(): check_blob_ds = [] for blob_hash, blob_size in self.expected_blobs: check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) return defer.DeferredList(check_blob_ds) - def upload_to_reflector(stream_hash): - d = start_server() - d.addCallback(lambda port: send_to_server(port, stream_hash)) - d.addCallback(lambda _: verify_data_on_reflector()) + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) return d - d.addCallback(lambda _: create_stream()) - d.addCallback(verify_stream_descriptor_file) - d.addCallback(upload_to_reflector) + def send_to_server(): + factory = reflector.ClientFactory( + self.session.blob_manager, + self.stream_info_manager, + self.stream_hash + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + return + + d = send_to_server() + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + def test_blob_reflector(self): + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def send_to_server(blob_hashes_to_send): + factory = reflector.BlobClientFactory( + self.session.blob_manager, + blob_hashes_to_send + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + d = send_to_server([x[0] for x in self.expected_blobs]) + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + def test_blob_reflector_v1(self): + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def send_to_server(blob_hashes_to_send): + factory = reflector.BlobClientFactory( + self.session.blob_manager, + blob_hashes_to_send + ) + factory.protocol_version = 0 + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + d = send_to_server([x[0] for x in self.expected_blobs]) + d.addCallback(lambda _: verify_data_on_reflector()) return d