diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 2bccb4fe3..ce2102f95 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -51,6 +51,7 @@ import json import logging from twisted.protocols.basic import FileSender from twisted.internet.protocol import Protocol, ClientFactory +from twisted.internet import defer, error log = logging.getLogger(__name__) @@ -74,8 +75,11 @@ class LBRYFileReflectorClient(Protocol): self.received_handshake_response = False self.protocol_version = None self.file_sender = None + self.producer = None + self.streaming = False d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) d.addCallback(lambda _: self.send_handshake()) + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) def dataReceived(self, data): self.response_buff += data @@ -86,45 +90,64 @@ class LBRYFileReflectorClient(Protocol): else: self.response_buff = '' d = self.handle_response(msg) - d.addCallbacks(lambda _: self.send_next_request(), self.response_failure_handler) + d.addCallback(lambda _: self.send_next_request()) + d.addErrback(self.response_failure_handler) def connectionLost(self, reason): - pass + if reason.check(error.ConnectionDone): + self.factory.finished_deferred.callback(True) + else: + self.factory.finished_deferred.callback(reason) # IConsumer stuff def registerProducer(self, producer, streaming): - assert streaming is True + self.producer = producer + self.streaming = streaming + if self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) def unregisterProducer(self): - self.transport.loseConnection() + self.producer = None def write(self, data): self.transport.write(data) + if self.producer is not None and self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) def get_blobs_to_send(self, stream_info_manager, stream_hash): d = stream_info_manager.get_blobs_for_stream(stream_hash) def set_blobs(blob_hashes): for blob_hash, position, iv, length in blob_hashes: - self.blob_hashes_to_send.append(blob_hash) + if blob_hash is not None: + self.blob_hashes_to_send.append(blob_hash) d.addCallback(set_blobs) - d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream) + d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash)) def set_sd_blobs(sd_blob_hashes): - for sd_blob_hash, in sd_blob_hashes: + for sd_blob_hash in sd_blob_hashes: self.blob_hashes_to_send.append(sd_blob_hash) + d.addCallback(set_sd_blobs) return d + def send_handshake(self): + self.write(json.dumps({'version': 0})) + def parse_response(self, buff): try: return json.loads(buff) except ValueError: raise IncompleteResponseError() + def response_failure_handler(self, err): + log.warning("An error occurred handling the response: %s", err.getTraceback()) + def handle_response(self, response_dict): if self.received_handshake_response is False: return self.handle_handshake_response(response_dict) @@ -132,15 +155,15 @@ class LBRYFileReflectorClient(Protocol): return self.handle_normal_response(response_dict) def set_not_uploading(self): - if self.currently_uploading is not None: - self.currently_uploading.close_read_handle(self.read_handle) + if self.next_blob_to_send is not None: + self.next_blob_to_send.close_read_handle(self.read_handle) self.read_handle = None - self.currently_uploading = None + self.next_blob_to_send = None self.file_sender = None + return defer.succeed(None) def start_transfer(self): - self.write(json.dumps("{}")) - log.info("Starting the file upload") + self.write(json.dumps({})) assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) return d @@ -152,14 +175,15 @@ class LBRYFileReflectorClient(Protocol): if self.protocol_version != 0: raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) self.received_handshake_response = True + return defer.succeed(True) def handle_normal_response(self, response_dict): - if self.next_blob_to_send is not None: # Expecting Server Info Response + if self.file_sender is None: # Expecting Server Info Response if 'send_blob' not in response_dict: raise ValueError("I don't know whether to send the blob or not!") if response_dict['send_blob'] is True: self.file_sender = FileSender() - return True + return defer.succeed(True) else: return self.set_not_uploading() else: # Expecting Server Blob Response @@ -175,13 +199,13 @@ class LBRYFileReflectorClient(Protocol): self.next_blob_to_send = blob self.read_handle = read_handle return None - raise ValueError("Couldn't open that blob for some reason") + raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) def send_blob_info(self): assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" self.write(json.dumps({ 'blob_hash': self.next_blob_to_send.blob_hash, - 'blob_length': self.next_blob_to_send.length + 'blob_size': self.next_blob_to_send.length })) def send_next_request(self): @@ -210,6 +234,7 @@ class LBRYFileReflectorClientFactory(ClientFactory): self.stream_info_manager = stream_info_manager self.stream_hash = stream_hash self.p = None + self.finished_deferred = defer.Deferred() def buildProtocol(self, addr): p = self.protocol() diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index f0ee24f87..a8f36ae22 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -1,6 +1,6 @@ import logging from twisted.python import failure -from twisted.internet import error +from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory import json @@ -72,7 +72,8 @@ class ReflectorServer(Protocol): self.peer_version = int(request_dict['version']) if self.peer_version != 0: raise ValueError("I don't know that version!") - return {'version': 0} + self.received_handshake = True + return defer.succeed({'version': 0}) def determine_blob_needed(self, blob): if blob.is_validated(): @@ -87,6 +88,7 @@ class ReflectorServer(Protocol): self.blob_write = None self.cancel_write = None self.incoming_blob = None + self.receiving_blob = False def handle_normal_request(self, request_dict): if self.blob_write is None: @@ -113,11 +115,10 @@ class ReflectorServer(Protocol): d = self.blob_finished_d d.addCallback(lambda _: self.close_blob()) d.addCallback(lambda _: {'received_blob': True}) - d.addCallback(self.send_response) return d def send_response(self, response_dict): - self.write(json.dumps(response_dict)) + self.transport.write(json.dumps(response_dict)) def handle_error(self, err): pass diff --git a/tests/functional_tests.py b/tests/functional_tests.py index 76af2b5ed..c9e062d1c 100644 --- a/tests/functional_tests.py +++ b/tests/functional_tests.py @@ -26,7 +26,7 @@ from lbrynet.core.StreamDescriptor import download_sd_blob from lbrynet.lbryfilemanager.LBRYFileCreator import create_lbry_file from lbrynet.lbryfile.client.LBRYFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.StreamDescriptor import get_sd_info -from twisted.internet import defer, threads, task +from twisted.internet import defer, threads, task, error from twisted.trial.unittest import TestCase from twisted.python.failure import Failure import os @@ -38,6 +38,10 @@ from lbrynet.core.server.ServerProtocol import ServerProtocolFactory from lbrynet.lbrylive.server.LiveBlobInfoQueryHandler import CryptBlobInfoQueryHandlerFactory from lbrynet.lbrylive.client.LiveStreamOptions import add_live_stream_to_sd_identifier from lbrynet.lbrylive.client.LiveStreamDownloader import add_full_live_stream_downloader_to_sd_identifier +from lbrynet.core.BlobManager import TempBlobManager +from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory +from lbrynet.reflector.server.server import ReflectorServerFactory +from lbrynet.lbryfile.StreamDescriptor import publish_sd_blob log_format = "%(funcName)s(): %(message)s" @@ -1369,4 +1373,147 @@ class TestStreamify(TestCase): d.addCallback(lambda _: self.lbry_file_manager.setup()) d.addCallback(lambda _: create_stream()) d.addCallback(combine_stream) + return d + + +class TestReflector(TestCase): + + def setUp(self): + self.session = None + self.stream_info_manager = None + self.lbry_file_manager = None + self.server_blob_manager = None + self.reflector_port = None + self.addCleanup(self.take_down_env) + + def take_down_env(self): + + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + if self.server_blob_manager is not None: + d.addCallback(lambda _: self.server_blob_manager.stop()) + if self.reflector_port is not None: + d.addCallback(lambda _: self.reflector_port.stopListening()) + + def delete_test_env(): + shutil.rmtree('client') + + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + return d + + def test_reflector(self): + + wallet = FakeWallet() + peer_manager = PeerManager() + peer_finder = FakePeerFinder(5553, peer_manager, 2) + hash_announcer = FakeAnnouncer() + rate_limiter = DummyRateLimiter() + sd_identifier = StreamDescriptorIdentifier() + + self.expected_blobs = [ + ('dc4708f76a5e7af0f1cae0ee96b824e2ed9250c9346c093b' + '441f0a20d3607c17948b6fcfb4bc62020fe5286693d08586', 2097152), + ('f4067522c1b49432a2a679512e3917144317caa1abba0c04' + '1e0cd2cf9f635d4cf127ce1824fa04189b63916174951f70', 2097152), + ('305486c434260484fcb2968ce0e963b72f81ba56c11b08b1' + 'af0789b55b44d78422600f9a38e3cf4f2e9569897e5646a9', 1015056), + ] + + db_dir = "client" + os.mkdir(db_dir) + + self.session = LBRYSession(MIN_BLOB_DATA_PAYMENT_RATE, db_dir=db_dir, lbryid="abcd", + peer_finder=peer_finder, hash_announcer=hash_announcer, + blob_dir=None, peer_port=5553, + use_upnp=False, rate_limiter=rate_limiter, wallet=wallet) + + self.stream_info_manager = TempLBRYFileMetadataManager() + + self.lbry_file_manager = LBRYFileManager(self.session, self.stream_info_manager, sd_identifier) + + self.server_blob_manager = TempBlobManager(hash_announcer) + + d = self.session.setup() + d.addCallback(lambda _: self.stream_info_manager.setup()) + d.addCallback(lambda _: add_lbry_file_to_sd_identifier(sd_identifier)) + d.addCallback(lambda _: self.lbry_file_manager.setup()) + d.addCallback(lambda _: self.server_blob_manager.setup()) + + def verify_equal(sd_info): + self.assertEqual(sd_info, test_create_stream_sd_file) + + def save_sd_blob_hash(sd_hash): + self.expected_blobs.append((sd_hash, 923)) + + def verify_stream_descriptor_file(stream_hash): + d = get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) + d.addCallback(verify_equal) + d.addCallback(lambda _: publish_sd_blob(self.lbry_file_manager.stream_info_manager, self.session.blob_manager, stream_hash)) + d.addCallback(save_sd_blob_hash) + d.addCallback(lambda _: stream_hash) + return d + + def iv_generator(): + iv = 0 + while 1: + iv += 1 + yield "%016d" % iv + + def create_stream(): + test_file = GenFile(5209343, b''.join([chr(i + 3) for i in xrange(0, 64, 6)])) + d = create_lbry_file(self.session, self.lbry_file_manager, "test_file", test_file, + key="0123456701234567", iv_generator=iv_generator()) + return d + + def start_server(): + server_factory = ReflectorServerFactory(peer_manager, self.server_blob_manager) + from twisted.internet import reactor + port = 8943 + while self.reflector_port is None: + try: + self.reflector_port = reactor.listenTCP(port, server_factory) + except error.CannotListenError: + port += 1 + return defer.succeed(port) + + def send_to_server(port, stream_hash): + factory = LBRYFileReflectorClientFactory( + self.session.blob_manager, + self.stream_info_manager, + stream_hash + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def upload_to_reflector(stream_hash): + d = start_server() + d.addCallback(lambda port: send_to_server(port, stream_hash)) + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + d.addCallback(upload_to_reflector) return d \ No newline at end of file