fill in much of the skeleton for the reflector protocol

This commit is contained in:
Jimmy Kiselak 2016-08-07 22:33:40 -04:00
parent 1dea20a358
commit b7e2e87ac1
2 changed files with 152 additions and 36 deletions

View file

@ -26,7 +26,7 @@ Client Info Request:
Server Info Response (sent in response to Client Info Request): Server Info Response (sent in response to Client Info Request):
{ {
'response': ['YES', 'NO'] 'send_blob': True|False
} }
If response is 'YES', client may send a Client Blob Request or a Client Info Request. If response is 'YES', client may send a Client Blob Request or a Client Info Request.
@ -41,7 +41,7 @@ Client Blob Request:
Server Blob Response (sent in response to Client Blob Request): Server Blob Response (sent in response to Client Blob Request):
{ {
'received': True 'received_blob': True
} }
Client may now send another Client Info Request Client may now send another Client Info Request
@ -49,6 +49,7 @@ Client may now send another Client Info Request
""" """
import json import json
import logging import logging
from twisted.protocols.basic import FileSender
from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet.protocol import Protocol, ClientFactory
@ -60,13 +61,19 @@ class IncompleteResponseError(Exception):
class LBRYFileReflectorClient(Protocol): class LBRYFileReflectorClient(Protocol):
# Protocol stuff
def connectionMade(self): def connectionMade(self):
self.peer = self.factory.peer self.blob_manager = self.factory.blob_manager
self.response_buff = '' self.response_buff = ''
self.outgoing_buff = '' self.outgoing_buff = ''
self.blob_hashes_to_send = [] self.blob_hashes_to_send = []
self.next_blob_to_send = None self.next_blob_to_send = None
self.blob_read_handle = None
self.received_handshake_response = False self.received_handshake_response = False
self.protocol_version = None
self.file_sender = None
d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash)
d.addCallback(lambda _: self.send_handshake()) d.addCallback(lambda _: self.send_handshake())
@ -84,6 +91,17 @@ class LBRYFileReflectorClient(Protocol):
def connectionLost(self, reason): def connectionLost(self, reason):
pass pass
# IConsumer stuff
def registerProducer(self, producer, streaming):
assert streaming is True
def unregisterProducer(self):
self.transport.loseConnection()
def write(self, data):
self.transport.write(data)
def get_blobs_to_send(self, stream_info_manager, stream_hash): def get_blobs_to_send(self, stream_info_manager, stream_hash):
d = stream_info_manager.get_blobs_for_stream(stream_hash) d = stream_info_manager.get_blobs_for_stream(stream_hash)
@ -109,33 +127,86 @@ class LBRYFileReflectorClient(Protocol):
def handle_response(self, response_dict): def handle_response(self, response_dict):
if self.received_handshake_response is False: if self.received_handshake_response is False:
self.handle_handshake_response(response_dict) return self.handle_handshake_response(response_dict)
else: else:
self.handle_normal_response(response_dict) return self.handle_normal_response(response_dict)
def set_not_uploading(self):
if self.currently_uploading is not None:
self.currently_uploading.close_read_handle(self.read_handle)
self.read_handle = None
self.currently_uploading = None
self.file_sender = None
def start_transfer(self):
self.write(json.dumps("{}"))
log.info("Starting the file upload")
assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer"
d = self.file_sender.beginFileTransfer(self.read_handle, self)
return d
def handle_handshake_response(self, response_dict): def handle_handshake_response(self, response_dict):
pass if 'version' not in response_dict:
raise ValueError("Need protocol version number!")
self.protocol_version = int(response_dict['version'])
if self.protocol_version != 0:
raise ValueError("I can't handle protocol version {}!".format(self.protocol_version))
self.received_handshake_response = True
def handle_normal_response(self, response_dict): def handle_normal_response(self, response_dict):
pass if self.next_blob_to_send is not None: # Expecting Server Info Response
if 'send_blob' not in response_dict:
raise ValueError("I don't know whether to send the blob or not!")
if response_dict['send_blob'] is True:
self.file_sender = FileSender()
return True
else:
return self.set_not_uploading()
else: # Expecting Server Blob Response
if 'received_blob' not in response_dict:
raise ValueError("I don't know if the blob made it to the intended destination!")
else:
return self.set_not_uploading()
def open_blob_for_reading(self, blob):
if blob.is_validated():
read_handle = blob.open_for_reading()
if read_handle is not None:
self.next_blob_to_send = blob
self.read_handle = read_handle
return None
raise ValueError("Couldn't open that blob for some reason")
def send_blob_info(self):
assert self.next_blob_to_send is not None, "need to have a next blob to send at this point"
self.write(json.dumps({
'blob_hash': self.next_blob_to_send.blob_hash,
'blob_length': self.next_blob_to_send.length
}))
def send_next_request(self): def send_next_request(self):
if self.next_blob_to_send is not None: if self.file_sender is not None:
# send the blob # send the blob
pass return self.start_transfer()
elif self.blobs_to_send: elif self.blob_hashes_to_send:
# open the next blob to send
blob_hash = self.blob_hashes_to_send[0]
self.blob_hashes_to_send = self.blob_hashes_to_send[1:]
d = self.blob_manager.get_blob(blob_hash, True)
d.addCallback(self.open_blob_for_reading)
# send the server the next blob hash + length # send the server the next blob hash + length
pass d.addCallback(lambda _: self.send_blob_info())
return d
else: else:
# close connection # close connection
pass self.transport.loseConnection()
class LBRYFileReflectorClientFactory(ClientFactory): class LBRYFileReflectorClientFactory(ClientFactory):
protocol = LBRYFileReflectorClient protocol = LBRYFileReflectorClient
def __init__(self, stream_info_manager, peer, stream_hash): def __init__(self, blob_manager, stream_info_manager, stream_hash):
self.peer = peer self.blob_manager = blob_manager
self.stream_info_manager = stream_info_manager self.stream_info_manager = stream_info_manager
self.stream_hash = stream_hash self.stream_hash = stream_hash
self.p = None self.p = None

View file

@ -4,26 +4,25 @@ from twisted.internet import error
from twisted.internet.protocol import Protocol, ServerFactory from twisted.internet.protocol import Protocol, ServerFactory
import json import json
from lbrynet.core.utils import is_valid_blobhash
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class IncompleteMessageError(Exception):
pass
class ReflectorServer(Protocol): class ReflectorServer(Protocol):
"""
"""
def connectionMade(self): def connectionMade(self):
peer_info = self.transport.getPeer() peer_info = self.transport.getPeer()
self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port)
self.blob_manager = self.factory.blob_manager
self.received_handshake = False self.received_handshake = False
self.peer_version = None self.peer_version = None
self.receiving_blob = False self.receiving_blob = False
self.incoming_blob = None
self.blob_write = None self.blob_write = None
self.blob_finished_d = None self.blob_finished_d = None
self.cancel_write = None
self.request_buff = "" self.request_buff = ""
def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): def connectionLost(self, reason=failure.Failure(error.ConnectionDone())):
@ -32,22 +31,34 @@ class ReflectorServer(Protocol):
def dataReceived(self, data): def dataReceived(self, data):
if self.receiving_blob is False: if self.receiving_blob is False:
self.request_buff += data self.request_buff += data
try: msg, extra_data = self._get_valid_response(self.request_buff)
msg = self.parse_request(self.request_buff) if msg is not None:
except IncompleteMessageError:
pass
else:
self.request_buff = '' self.request_buff = ''
d = self.handle_request(msg) d = self.handle_request(msg)
d.addCallbacks(self.send_response, self.handle_error) d.addCallbacks(self.send_response, self.handle_error)
if self.receiving_blob is True and len(extra_data) != 0:
self.blob_write(extra_data)
else: else:
self.blob_write(data) self.blob_write(data)
def parse_request(self, buff): def _get_valid_response(self, response_msg):
try: extra_data = None
return json.loads(buff) response = None
except ValueError: curr_pos = 0
raise IncompleteMessageError() while 1:
next_close_paren = response_msg.find('}', curr_pos)
if next_close_paren != -1:
curr_pos = next_close_paren + 1
try:
response = json.loads(response_msg[:curr_pos])
except ValueError:
pass
else:
extra_data = response_msg[curr_pos:]
break
else:
break
return response, extra_data
def handle_request(self, request_dict): def handle_request(self, request_dict):
if self.received_handshake is False: if self.received_handshake is False:
@ -56,7 +67,26 @@ class ReflectorServer(Protocol):
return self.handle_normal_request(request_dict) return self.handle_normal_request(request_dict)
def handle_handshake(self, request_dict): def handle_handshake(self, request_dict):
pass if 'version' not in request_dict:
raise ValueError("Client should send version")
self.peer_version = int(request_dict['version'])
if self.peer_version != 0:
raise ValueError("I don't know that version!")
return {'version': 0}
def determine_blob_needed(self, blob):
if blob.is_validated():
return {'send_blob': False}
else:
self.incoming_blob = blob
self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer)
return {'send_blob': True}
def close_blob(self):
self.blob_finished_d = None
self.blob_write = None
self.cancel_write = None
self.incoming_blob = None
def handle_normal_request(self, request_dict): def handle_normal_request(self, request_dict):
if self.blob_write is None: if self.blob_write is None:
@ -64,16 +94,30 @@ class ReflectorServer(Protocol):
# next message containing a blob hash and a length. this message # next message containing a blob hash and a length. this message
# should be it. if it's one we want, open the blob for writing, and # should be it. if it's one we want, open the blob for writing, and
# return a nice response dict (in a Deferred) saying go ahead # return a nice response dict (in a Deferred) saying go ahead
pass if not 'blob_hash' in request_dict or not 'blob_size' in request_dict:
raise ValueError("Expected a blob hash and a blob size")
if not is_valid_blobhash(request_dict['blob_hash']):
raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash']))
d = self.blob_manager.get_blob(
request_dict['blob_hash'],
True,
int(request_dict['blob_size'])
)
d.addCallback(self.determine_blob_needed)
else: else:
# we have a blob open already, so this message should have nothing # we have a blob open already, so this message should have nothing
# important in it. to the deferred that fires when the blob is done, # important in it. to the deferred that fires when the blob is done,
# add a callback which returns a nice response dict saying to keep # add a callback which returns a nice response dict saying to keep
# sending, and then return that deferred # sending, and then return that deferred
pass self.receiving_blob = True
d = self.blob_finished_d
d.addCallback(lambda _: self.close_blob())
d.addCallback(lambda _: {'received_blob': True})
d.addCallback(self.send_response)
return d
def send_response(self, response_dict): def send_response(self, response_dict):
pass self.write(json.dumps(response_dict))
def handle_error(self, err): def handle_error(self, err):
pass pass
@ -82,5 +126,6 @@ class ReflectorServer(Protocol):
class ReflectorServerFactory(ServerFactory): class ReflectorServerFactory(ServerFactory):
protocol = ReflectorServer protocol = ReflectorServer
def __init__(self, peer_manager): def __init__(self, peer_manager, blob_manager):
self.peer_manager = peer_manager self.peer_manager = peer_manager
self.blob_manager = blob_manager