diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 379a86a96..2bccb4fe3 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -26,7 +26,7 @@ Client Info Request: Server Info Response (sent in response to Client Info Request): { - 'response': ['YES', 'NO'] + 'send_blob': True|False } If response is 'YES', client may send a Client Blob Request or a Client Info Request. @@ -41,7 +41,7 @@ Client Blob Request: Server Blob Response (sent in response to Client Blob Request): { - 'received': True + 'received_blob': True } Client may now send another Client Info Request @@ -49,6 +49,7 @@ Client may now send another Client Info Request """ import json import logging +from twisted.protocols.basic import FileSender from twisted.internet.protocol import Protocol, ClientFactory @@ -60,13 +61,19 @@ class IncompleteResponseError(Exception): class LBRYFileReflectorClient(Protocol): + + # Protocol stuff + def connectionMade(self): - self.peer = self.factory.peer + self.blob_manager = self.factory.blob_manager self.response_buff = '' self.outgoing_buff = '' self.blob_hashes_to_send = [] self.next_blob_to_send = None + self.blob_read_handle = None self.received_handshake_response = False + self.protocol_version = None + self.file_sender = None d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) d.addCallback(lambda _: self.send_handshake()) @@ -84,6 +91,17 @@ class LBRYFileReflectorClient(Protocol): def connectionLost(self, reason): pass + # IConsumer stuff + + def registerProducer(self, producer, streaming): + assert streaming is True + + def unregisterProducer(self): + self.transport.loseConnection() + + def write(self, data): + self.transport.write(data) + def get_blobs_to_send(self, stream_info_manager, stream_hash): d = stream_info_manager.get_blobs_for_stream(stream_hash) @@ -109,33 +127,86 @@ class LBRYFileReflectorClient(Protocol): def handle_response(self, response_dict): if self.received_handshake_response is False: - self.handle_handshake_response(response_dict) + return self.handle_handshake_response(response_dict) else: - self.handle_normal_response(response_dict) + return self.handle_normal_response(response_dict) + + def set_not_uploading(self): + if self.currently_uploading is not None: + self.currently_uploading.close_read_handle(self.read_handle) + self.read_handle = None + self.currently_uploading = None + self.file_sender = None + + def start_transfer(self): + self.write(json.dumps("{}")) + log.info("Starting the file upload") + assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" + d = self.file_sender.beginFileTransfer(self.read_handle, self) + return d def handle_handshake_response(self, response_dict): - pass + if 'version' not in response_dict: + raise ValueError("Need protocol version number!") + self.protocol_version = int(response_dict['version']) + if self.protocol_version != 0: + raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) + self.received_handshake_response = True def handle_normal_response(self, response_dict): - pass + if self.next_blob_to_send is not None: # Expecting Server Info Response + if 'send_blob' not in response_dict: + raise ValueError("I don't know whether to send the blob or not!") + if response_dict['send_blob'] is True: + self.file_sender = FileSender() + return True + else: + return self.set_not_uploading() + else: # Expecting Server Blob Response + if 'received_blob' not in response_dict: + raise ValueError("I don't know if the blob made it to the intended destination!") + else: + return self.set_not_uploading() + + def open_blob_for_reading(self, blob): + if blob.is_validated(): + read_handle = blob.open_for_reading() + if read_handle is not None: + self.next_blob_to_send = blob + self.read_handle = read_handle + return None + raise ValueError("Couldn't open that blob for some reason") + + def send_blob_info(self): + assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" + self.write(json.dumps({ + 'blob_hash': self.next_blob_to_send.blob_hash, + 'blob_length': self.next_blob_to_send.length + })) def send_next_request(self): - if self.next_blob_to_send is not None: + if self.file_sender is not None: # send the blob - pass - elif self.blobs_to_send: + return self.start_transfer() + elif self.blob_hashes_to_send: + # open the next blob to send + blob_hash = self.blob_hashes_to_send[0] + self.blob_hashes_to_send = self.blob_hashes_to_send[1:] + d = self.blob_manager.get_blob(blob_hash, True) + d.addCallback(self.open_blob_for_reading) # send the server the next blob hash + length - pass + d.addCallback(lambda _: self.send_blob_info()) + return d else: # close connection - pass + self.transport.loseConnection() class LBRYFileReflectorClientFactory(ClientFactory): protocol = LBRYFileReflectorClient - def __init__(self, stream_info_manager, peer, stream_hash): - self.peer = peer + def __init__(self, blob_manager, stream_info_manager, stream_hash): + self.blob_manager = blob_manager self.stream_info_manager = stream_info_manager self.stream_hash = stream_hash self.p = None diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index ca43e224b..f0ee24f87 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -4,26 +4,25 @@ from twisted.internet import error from twisted.internet.protocol import Protocol, ServerFactory import json +from lbrynet.core.utils import is_valid_blobhash + log = logging.getLogger(__name__) -class IncompleteMessageError(Exception): - pass - - class ReflectorServer(Protocol): - """ - """ def connectionMade(self): peer_info = self.transport.getPeer() self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) + self.blob_manager = self.factory.blob_manager self.received_handshake = False self.peer_version = None self.receiving_blob = False + self.incoming_blob = None self.blob_write = None self.blob_finished_d = None + self.cancel_write = None self.request_buff = "" def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): @@ -32,22 +31,34 @@ class ReflectorServer(Protocol): def dataReceived(self, data): if self.receiving_blob is False: self.request_buff += data - try: - msg = self.parse_request(self.request_buff) - except IncompleteMessageError: - pass - else: + msg, extra_data = self._get_valid_response(self.request_buff) + if msg is not None: self.request_buff = '' d = self.handle_request(msg) d.addCallbacks(self.send_response, self.handle_error) + if self.receiving_blob is True and len(extra_data) != 0: + self.blob_write(extra_data) else: self.blob_write(data) - def parse_request(self, buff): - try: - return json.loads(buff) - except ValueError: - raise IncompleteMessageError() + def _get_valid_response(self, response_msg): + extra_data = None + response = None + curr_pos = 0 + while 1: + next_close_paren = response_msg.find('}', curr_pos) + if next_close_paren != -1: + curr_pos = next_close_paren + 1 + try: + response = json.loads(response_msg[:curr_pos]) + except ValueError: + pass + else: + extra_data = response_msg[curr_pos:] + break + else: + break + return response, extra_data def handle_request(self, request_dict): if self.received_handshake is False: @@ -56,7 +67,26 @@ class ReflectorServer(Protocol): return self.handle_normal_request(request_dict) def handle_handshake(self, request_dict): - pass + if 'version' not in request_dict: + raise ValueError("Client should send version") + self.peer_version = int(request_dict['version']) + if self.peer_version != 0: + raise ValueError("I don't know that version!") + return {'version': 0} + + def determine_blob_needed(self, blob): + if blob.is_validated(): + return {'send_blob': False} + else: + self.incoming_blob = blob + self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + return {'send_blob': True} + + def close_blob(self): + self.blob_finished_d = None + self.blob_write = None + self.cancel_write = None + self.incoming_blob = None def handle_normal_request(self, request_dict): if self.blob_write is None: @@ -64,16 +94,30 @@ class ReflectorServer(Protocol): # next message containing a blob hash and a length. this message # should be it. if it's one we want, open the blob for writing, and # return a nice response dict (in a Deferred) saying go ahead - pass + if not 'blob_hash' in request_dict or not 'blob_size' in request_dict: + raise ValueError("Expected a blob hash and a blob size") + if not is_valid_blobhash(request_dict['blob_hash']): + raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash'])) + d = self.blob_manager.get_blob( + request_dict['blob_hash'], + True, + int(request_dict['blob_size']) + ) + d.addCallback(self.determine_blob_needed) else: # we have a blob open already, so this message should have nothing # important in it. to the deferred that fires when the blob is done, # add a callback which returns a nice response dict saying to keep # sending, and then return that deferred - pass + self.receiving_blob = True + d = self.blob_finished_d + d.addCallback(lambda _: self.close_blob()) + d.addCallback(lambda _: {'received_blob': True}) + d.addCallback(self.send_response) + return d def send_response(self, response_dict): - pass + self.write(json.dumps(response_dict)) def handle_error(self, err): pass @@ -82,5 +126,6 @@ class ReflectorServer(Protocol): class ReflectorServerFactory(ServerFactory): protocol = ReflectorServer - def __init__(self, peer_manager): - self.peer_manager = peer_manager \ No newline at end of file + def __init__(self, peer_manager, blob_manager): + self.peer_manager = peer_manager + self.blob_manager = blob_manager \ No newline at end of file