From 0cd92a96c003a8dfe3fde49fa05db05d30765f63 Mon Sep 17 00:00:00 2001 From: Jack Date: Tue, 9 Aug 2016 17:46:25 -0400 Subject: [PATCH] add reflector files to tests directory --- tests/lbrynet/lbrynet/reflector/__init__.py | 0 .../lbrynet/reflector/client/__init__.py | 0 .../lbrynet/reflector/client/client.py | 243 ++++++++++++++++++ .../lbrynet/reflector/server/__init__.py | 0 .../lbrynet/reflector/server/server.py | 132 ++++++++++ 5 files changed, 375 insertions(+) create mode 100644 tests/lbrynet/lbrynet/reflector/__init__.py create mode 100644 tests/lbrynet/lbrynet/reflector/client/__init__.py create mode 100644 tests/lbrynet/lbrynet/reflector/client/client.py create mode 100644 tests/lbrynet/lbrynet/reflector/server/__init__.py create mode 100644 tests/lbrynet/lbrynet/reflector/server/server.py diff --git a/tests/lbrynet/lbrynet/reflector/__init__.py b/tests/lbrynet/lbrynet/reflector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lbrynet/lbrynet/reflector/client/__init__.py b/tests/lbrynet/lbrynet/reflector/client/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lbrynet/lbrynet/reflector/client/client.py b/tests/lbrynet/lbrynet/reflector/client/client.py new file mode 100644 index 000000000..ce2102f95 --- /dev/null +++ b/tests/lbrynet/lbrynet/reflector/client/client.py @@ -0,0 +1,243 @@ +""" +The reflector protocol (all dicts encoded in json): + +Client Handshake (sent once per connection, at the start of the connection): + +{ + 'version': 0, +} + + +Server Handshake (sent once per connection, after receiving the client handshake): + +{ + 'version': 0, +} + + +Client Info Request: + +{ + 'blob_hash': "", + 'blob_size': +} + + +Server Info Response (sent in response to Client Info Request): + +{ + 'send_blob': True|False +} + +If response is 'YES', client may send a Client Blob Request or a Client Info Request. +If response is 'NO', client may only send a Client Info Request + + +Client Blob Request: + +{} # Yes, this is an empty dictionary, in case something needs to go here in the future + # this blob data must match the info sent in the most recent Client Info Request + + +Server Blob Response (sent in response to Client Blob Request): +{ + 'received_blob': True +} + +Client may now send another Client Info Request + +""" +import json +import logging +from twisted.protocols.basic import FileSender +from twisted.internet.protocol import Protocol, ClientFactory +from twisted.internet import defer, error + + +log = logging.getLogger(__name__) + + +class IncompleteResponseError(Exception): + pass + + +class LBRYFileReflectorClient(Protocol): + + # Protocol stuff + + def connectionMade(self): + self.blob_manager = self.factory.blob_manager + self.response_buff = '' + self.outgoing_buff = '' + self.blob_hashes_to_send = [] + self.next_blob_to_send = None + self.blob_read_handle = None + self.received_handshake_response = False + self.protocol_version = None + self.file_sender = None + self.producer = None + self.streaming = False + d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) + d.addCallback(lambda _: self.send_handshake()) + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) + + def dataReceived(self, data): + self.response_buff += data + try: + msg = self.parse_response(self.response_buff) + except IncompleteResponseError: + pass + else: + self.response_buff = '' + d = self.handle_response(msg) + d.addCallback(lambda _: self.send_next_request()) + d.addErrback(self.response_failure_handler) + + def connectionLost(self, reason): + if reason.check(error.ConnectionDone): + self.factory.finished_deferred.callback(True) + else: + self.factory.finished_deferred.callback(reason) + + # IConsumer stuff + + def registerProducer(self, producer, streaming): + self.producer = producer + self.streaming = streaming + if self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.producer = None + + def write(self, data): + self.transport.write(data) + if self.producer is not None and self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) + + def get_blobs_to_send(self, stream_info_manager, stream_hash): + d = stream_info_manager.get_blobs_for_stream(stream_hash) + + def set_blobs(blob_hashes): + for blob_hash, position, iv, length in blob_hashes: + if blob_hash is not None: + self.blob_hashes_to_send.append(blob_hash) + + d.addCallback(set_blobs) + + d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash)) + + def set_sd_blobs(sd_blob_hashes): + for sd_blob_hash in sd_blob_hashes: + self.blob_hashes_to_send.append(sd_blob_hash) + + d.addCallback(set_sd_blobs) + return d + + def send_handshake(self): + self.write(json.dumps({'version': 0})) + + def parse_response(self, buff): + try: + return json.loads(buff) + except ValueError: + raise IncompleteResponseError() + + def response_failure_handler(self, err): + log.warning("An error occurred handling the response: %s", err.getTraceback()) + + def handle_response(self, response_dict): + if self.received_handshake_response is False: + return self.handle_handshake_response(response_dict) + else: + return self.handle_normal_response(response_dict) + + def set_not_uploading(self): + if self.next_blob_to_send is not None: + self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle = None + self.next_blob_to_send = None + self.file_sender = None + return defer.succeed(None) + + def start_transfer(self): + self.write(json.dumps({})) + assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" + d = self.file_sender.beginFileTransfer(self.read_handle, self) + return d + + def handle_handshake_response(self, response_dict): + if 'version' not in response_dict: + raise ValueError("Need protocol version number!") + self.protocol_version = int(response_dict['version']) + if self.protocol_version != 0: + raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) + self.received_handshake_response = True + return defer.succeed(True) + + def handle_normal_response(self, response_dict): + if self.file_sender is None: # Expecting Server Info Response + if 'send_blob' not in response_dict: + raise ValueError("I don't know whether to send the blob or not!") + if response_dict['send_blob'] is True: + self.file_sender = FileSender() + return defer.succeed(True) + else: + return self.set_not_uploading() + else: # Expecting Server Blob Response + if 'received_blob' not in response_dict: + raise ValueError("I don't know if the blob made it to the intended destination!") + else: + return self.set_not_uploading() + + def open_blob_for_reading(self, blob): + if blob.is_validated(): + read_handle = blob.open_for_reading() + if read_handle is not None: + self.next_blob_to_send = blob + self.read_handle = read_handle + return None + raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) + + def send_blob_info(self): + assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" + self.write(json.dumps({ + 'blob_hash': self.next_blob_to_send.blob_hash, + 'blob_size': self.next_blob_to_send.length + })) + + def send_next_request(self): + if self.file_sender is not None: + # send the blob + return self.start_transfer() + elif self.blob_hashes_to_send: + # open the next blob to send + blob_hash = self.blob_hashes_to_send[0] + self.blob_hashes_to_send = self.blob_hashes_to_send[1:] + d = self.blob_manager.get_blob(blob_hash, True) + d.addCallback(self.open_blob_for_reading) + # send the server the next blob hash + length + d.addCallback(lambda _: self.send_blob_info()) + return d + else: + # close connection + self.transport.loseConnection() + + +class LBRYFileReflectorClientFactory(ClientFactory): + protocol = LBRYFileReflectorClient + + def __init__(self, blob_manager, stream_info_manager, stream_hash): + self.blob_manager = blob_manager + self.stream_info_manager = stream_info_manager + self.stream_hash = stream_hash + self.p = None + self.finished_deferred = defer.Deferred() + + def buildProtocol(self, addr): + p = self.protocol() + p.factory = self + self.p = p + return p \ No newline at end of file diff --git a/tests/lbrynet/lbrynet/reflector/server/__init__.py b/tests/lbrynet/lbrynet/reflector/server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lbrynet/lbrynet/reflector/server/server.py b/tests/lbrynet/lbrynet/reflector/server/server.py new file mode 100644 index 000000000..a8f36ae22 --- /dev/null +++ b/tests/lbrynet/lbrynet/reflector/server/server.py @@ -0,0 +1,132 @@ +import logging +from twisted.python import failure +from twisted.internet import error, defer +from twisted.internet.protocol import Protocol, ServerFactory +import json + +from lbrynet.core.utils import is_valid_blobhash + + +log = logging.getLogger(__name__) + + +class ReflectorServer(Protocol): + + def connectionMade(self): + peer_info = self.transport.getPeer() + self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) + self.blob_manager = self.factory.blob_manager + self.received_handshake = False + self.peer_version = None + self.receiving_blob = False + self.incoming_blob = None + self.blob_write = None + self.blob_finished_d = None + self.cancel_write = None + self.request_buff = "" + + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): + pass + + def dataReceived(self, data): + if self.receiving_blob is False: + self.request_buff += data + msg, extra_data = self._get_valid_response(self.request_buff) + if msg is not None: + self.request_buff = '' + d = self.handle_request(msg) + d.addCallbacks(self.send_response, self.handle_error) + if self.receiving_blob is True and len(extra_data) != 0: + self.blob_write(extra_data) + else: + self.blob_write(data) + + def _get_valid_response(self, response_msg): + extra_data = None + response = None + curr_pos = 0 + while 1: + next_close_paren = response_msg.find('}', curr_pos) + if next_close_paren != -1: + curr_pos = next_close_paren + 1 + try: + response = json.loads(response_msg[:curr_pos]) + except ValueError: + pass + else: + extra_data = response_msg[curr_pos:] + break + else: + break + return response, extra_data + + def handle_request(self, request_dict): + if self.received_handshake is False: + return self.handle_handshake(request_dict) + else: + return self.handle_normal_request(request_dict) + + def handle_handshake(self, request_dict): + if 'version' not in request_dict: + raise ValueError("Client should send version") + self.peer_version = int(request_dict['version']) + if self.peer_version != 0: + raise ValueError("I don't know that version!") + self.received_handshake = True + return defer.succeed({'version': 0}) + + def determine_blob_needed(self, blob): + if blob.is_validated(): + return {'send_blob': False} + else: + self.incoming_blob = blob + self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + return {'send_blob': True} + + def close_blob(self): + self.blob_finished_d = None + self.blob_write = None + self.cancel_write = None + self.incoming_blob = None + self.receiving_blob = False + + def handle_normal_request(self, request_dict): + if self.blob_write is None: + # we haven't opened a blob yet, meaning we must be waiting for the + # next message containing a blob hash and a length. this message + # should be it. if it's one we want, open the blob for writing, and + # return a nice response dict (in a Deferred) saying go ahead + if not 'blob_hash' in request_dict or not 'blob_size' in request_dict: + raise ValueError("Expected a blob hash and a blob size") + if not is_valid_blobhash(request_dict['blob_hash']): + raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash'])) + d = self.blob_manager.get_blob( + request_dict['blob_hash'], + True, + int(request_dict['blob_size']) + ) + d.addCallback(self.determine_blob_needed) + else: + # we have a blob open already, so this message should have nothing + # important in it. to the deferred that fires when the blob is done, + # add a callback which returns a nice response dict saying to keep + # sending, and then return that deferred + self.receiving_blob = True + d = self.blob_finished_d + d.addCallback(lambda _: self.close_blob()) + d.addCallback(lambda _: {'received_blob': True}) + return d + + def send_response(self, response_dict): + self.transport.write(json.dumps(response_dict)) + + def handle_error(self, err): + pass + + +class ReflectorServerFactory(ServerFactory): + protocol = ReflectorServer + + def __init__(self, peer_manager, blob_manager): + self.peer_manager = peer_manager + self.blob_manager = blob_manager \ No newline at end of file