From 191f661b357f4993115149cbe51e9da8232036a4 Mon Sep 17 00:00:00 2001 From: Jimmy Kiselak Date: Wed, 3 Aug 2016 23:03:14 -0400 Subject: [PATCH] skeleton for reflector protocol --- lbrynet/reflector/__init__.py | 0 lbrynet/reflector/client/__init__.py | 0 lbrynet/reflector/client/client.py | 140 +++++++++++++++++++++++++++ lbrynet/reflector/server/__init__.py | 0 lbrynet/reflector/server/server.py | 86 ++++++++++++++++ 5 files changed, 226 insertions(+) create mode 100644 lbrynet/reflector/__init__.py create mode 100644 lbrynet/reflector/client/__init__.py create mode 100644 lbrynet/reflector/client/client.py create mode 100644 lbrynet/reflector/server/__init__.py create mode 100644 lbrynet/reflector/server/server.py diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/reflector/client/__init__.py b/lbrynet/reflector/client/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py new file mode 100644 index 000000000..940dc7a27 --- /dev/null +++ b/lbrynet/reflector/client/client.py @@ -0,0 +1,140 @@ +""" +The reflector protocol (all dicts encoded in json): + +Client Handshake (sent once per connection, at the start of the connection): + +{ + 'version': 0, +} + + +Server Handshake (sent once per connection, after receiving the client handshake): + +{ + 'version': 0, +} + + +Client Info Request: + +{ + 'blob_hash': "", + 'blob_size': +} + + +Server Info Response (sent in response to Client Info Request): + +{ + 'response': ['YES', 'NO'] +} + +If response is 'YES', client may send a Client Blob Request or a Client Info Request. +If response is 'NO', client may only send a Client Info Request + + +Client Blob Request: + +{} # Yes, this is an empty dictionary, in case something needs to go here in the future + # this blob data must match the info sent in the most recent Client Info Request + + +Server Blob Response (sent in response to Client Blob Request): +{ + 'received': True +} + +Client may now send another Client Info Request + +""" +import json +import logging +from twisted.internet.protocol import Protocol, ClientFactory + + +log = logging.getLogger(__name__) + + +class IncompleteResponseError(Exception): + pass + + +class LBRYFileReflectorClient(Protocol): + def connectionMade(self): + self.peer = self.factory.peer + self.response_buff = '' + self.outgoing_buff = '' + self.blob_hashes_to_send = [] + self.next_blob_to_send = None + self.received_handshake_response = False + d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) + d.addCallback(lambda _: self.send_handshake()) + + def dataReceived(self, data): + self.response_buff += data + try: + msg = self.parse_response(self.response_buff) + except IncompleteResponseError: + pass + else: + self.response_buff = '' + d = self.handle_response(msg) + d.addCallbacks(lambda _: self.send_next_request(), self.response_failure_handler) + + def connectionLost(self, reason): + pass + + def get_blobs_to_send(self, stream_info_manager, stream_hash): + d = stream_info_manager.get_blobs_for_stream(stream_hash) + + def set_blobs(blob_hashes): + for blob_hash, position, iv, length in blob_hashes: + self.blob_hashes_to_send.append(blob_hash) + + d.addCallback(set_blobs) + return d + + def parse_response(self, buff): + try: + return json.loads(buff) + except ValueError: + raise IncompleteResponseError() + + def handle_response(self, response_dict): + if self.received_handshake_response is False: + self.handle_handshake_response(response_dict) + else: + self.handle_normal_response(response_dict) + + def handle_handshake_response(self, response_dict): + pass + + def handle_normal_response(self, response_dict): + pass + + def send_next_request(self): + if self.next_blob_to_send is not None: + # send the blob + pass + elif self.blobs_to_send: + # send the server the next blob hash + length + pass + else: + # close connection + pass + + +class LBRYFileReflectorClientFactory(ClientFactory): + protocol = LBRYFileReflectorClient + + def __init__(self, stream_info_manager, peer, stream_hash): + self.peer = peer + self.stream_info_manager = stream_info_manager + self.stream_hash = stream_hash + self.p = None + + def buildProtocol(self, addr): + p = self.protocol() + p.factory = self + self.p = p + return p \ No newline at end of file diff --git a/lbrynet/reflector/server/__init__.py b/lbrynet/reflector/server/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py new file mode 100644 index 000000000..ca43e224b --- /dev/null +++ b/lbrynet/reflector/server/server.py @@ -0,0 +1,86 @@ +import logging +from twisted.python import failure +from twisted.internet import error +from twisted.internet.protocol import Protocol, ServerFactory +import json + + +log = logging.getLogger(__name__) + + +class IncompleteMessageError(Exception): + pass + + +class ReflectorServer(Protocol): + """ + """ + + def connectionMade(self): + peer_info = self.transport.getPeer() + self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) + self.received_handshake = False + self.peer_version = None + self.receiving_blob = False + self.blob_write = None + self.blob_finished_d = None + self.request_buff = "" + + def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): + pass + + def dataReceived(self, data): + if self.receiving_blob is False: + self.request_buff += data + try: + msg = self.parse_request(self.request_buff) + except IncompleteMessageError: + pass + else: + self.request_buff = '' + d = self.handle_request(msg) + d.addCallbacks(self.send_response, self.handle_error) + else: + self.blob_write(data) + + def parse_request(self, buff): + try: + return json.loads(buff) + except ValueError: + raise IncompleteMessageError() + + def handle_request(self, request_dict): + if self.received_handshake is False: + return self.handle_handshake(request_dict) + else: + return self.handle_normal_request(request_dict) + + def handle_handshake(self, request_dict): + pass + + def handle_normal_request(self, request_dict): + if self.blob_write is None: + # we haven't opened a blob yet, meaning we must be waiting for the + # next message containing a blob hash and a length. this message + # should be it. if it's one we want, open the blob for writing, and + # return a nice response dict (in a Deferred) saying go ahead + pass + else: + # we have a blob open already, so this message should have nothing + # important in it. to the deferred that fires when the blob is done, + # add a callback which returns a nice response dict saying to keep + # sending, and then return that deferred + pass + + def send_response(self, response_dict): + pass + + def handle_error(self, err): + pass + + +class ReflectorServerFactory(ServerFactory): + protocol = ReflectorServer + + def __init__(self, peer_manager): + self.peer_manager = peer_manager \ No newline at end of file