diff --git a/lbrynet/lbrynet_daemon/Daemon.py b/lbrynet/lbrynet_daemon/Daemon.py index d4a56000b..16501d2e3 100644 --- a/lbrynet/lbrynet_daemon/Daemon.py +++ b/lbrynet/lbrynet_daemon/Daemon.py @@ -29,7 +29,6 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaverFa from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileOpenerFactory from lbrynet.lbryfile.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.lbryfile.EncryptedFileMetadataManager import DBEncryptedFileMetadataManager -from lbrynet.lbryfile.EncryptedFileMetadataManager import TempEncryptedFileMetadataManager from lbrynet.lbryfile.StreamDescriptor import EncryptedFileStreamType from lbrynet.lbryfilemanager.EncryptedFileManager import EncryptedFileManager from lbrynet.lbrynet_daemon.UIManager import UIManager @@ -266,9 +265,8 @@ class Daemon(AuthJSONRPCServer): } self.looping_call_manager = LoopingCallManager(calls) self.sd_identifier = StreamDescriptorIdentifier() - self.stream_info_manager = TempEncryptedFileMetadataManager() + self.stream_info_manager = DBEncryptedFileMetadataManager(self.db_dir) self.lbry_ui_manager = UIManager(root) - self.lbry_file_metadata_manager = None self.lbry_file_manager = None @defer.inlineCallbacks @@ -653,12 +651,11 @@ class Daemon(AuthJSONRPCServer): def _setup_lbry_file_manager(self): self.startup_status = STARTUP_STAGES[3] - self.lbry_file_metadata_manager = DBEncryptedFileMetadataManager(self.db_dir) - d = self.lbry_file_metadata_manager.setup() + d = self.stream_info_manager.setup() def set_lbry_file_manager(): self.lbry_file_manager = EncryptedFileManager( - self.session, self.lbry_file_metadata_manager, + self.session, self.stream_info_manager, self.sd_identifier, download_directory=self.download_directory) return self.lbry_file_manager.setup() diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py index 801cd377c..01e4da72c 100644 --- a/lbrynet/reflector/__init__.py +++ b/lbrynet/reflector/__init__.py @@ -1,3 +1,69 @@ +""" +Reflector is a protocol to re-host lbry blobs and streams +Client queries and server responses follow, all dicts are encoded as json + +############# Handshake request and response ############# +Upon connecting, the client sends a version handshake: +{ + 'version': int, +} + +The server replies with the same version +{ + 'version': int, +} + +############# Stream descriptor requests and responses ############# +(if sending blobs directly this is skipped) +If the client is reflecting a whole stream, they send a stream descriptor request: +{ + 'sd_blob_hash': str, + 'sd_blob_size': int +} + +The server indicates if it's aware of this stream already by requesting (or not requesting) +the stream descriptor blob. If the server has a validated copy of the sd blob, it will +include the needed_blobs field (a list of blob hashes missing from reflector) in the response. +If the server does not have the sd blob the needed_blobs field will not be included, as the +server does not know what blobs it is missing - so the client should send all of the blobs +in the stream. +{ + 'send_sd_blob': bool + 'needed_blobs': list, conditional +} + +The client may begin the file transfer of the sd blob if send_sd_blob was True. +If the client sends the blob, after receiving it the server indicates if the +transfer was successful: +{ + 'received_sd_blob': bool +} +If the transfer was not successful (False), the blob is added to the needed_blobs queue + +############# Blob requests and responses ############# +A client with blobs to reflect (either populated by the client or by the stream descriptor +response) queries if the server is ready to begin transferring a blob +{ + 'blob_hash': str, + 'blob_size': int +} + +The server replies, send_blob will be False if the server has a validated copy of the blob: +{ + 'send_blob': bool +} + +The client may begin the raw blob file transfer if the server replied True. +If the client sends the blob, the server replies: +{ + 'received_blob': bool +} +If the transfer was not successful (False), the blob is re-added to the needed_blobs queue + +Blob requests continue for each of the blobs the client has queued to send, when completed +the client disconnects. +""" + from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory from lbrynet.reflector.client.blob import BlobReflectorClientFactory as BlobClientFactory diff --git a/lbrynet/reflector/client/blob.py b/lbrynet/reflector/client/blob.py index 62bb49938..2aba7716f 100644 --- a/lbrynet/reflector/client/blob.py +++ b/lbrynet/reflector/client/blob.py @@ -5,7 +5,7 @@ from twisted.protocols.basic import FileSender from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import defer, error -from lbrynet.reflector.common import IncompleteResponse +from lbrynet.reflector.common import IncompleteResponse, REFLECTOR_V2 log = logging.getLogger(__name__) @@ -22,7 +22,7 @@ class BlobReflectorClient(Protocol): self.next_blob_to_send = None self.blob_read_handle = None self.received_handshake_response = False - self.protocol_version = None + self.protocol_version = self.factory.protocol_version self.file_sender = None self.producer = None self.streaming = False @@ -32,7 +32,7 @@ class BlobReflectorClient(Protocol): lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) def dataReceived(self, data): - log.debug('Recieved %s', data) + log.debug('Received %s', data) self.response_buff += data try: msg = self.parse_response(self.response_buff) @@ -74,7 +74,7 @@ class BlobReflectorClient(Protocol): def send_handshake(self): log.debug('Sending handshake') - self.write(json.dumps({'version': 0})) + self.write(json.dumps({'version': self.protocol_version})) return defer.succeed(None) def parse_response(self, buff): @@ -102,7 +102,6 @@ class BlobReflectorClient(Protocol): def start_transfer(self): self.sent_blobs = True - self.write(json.dumps({})) assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) @@ -111,8 +110,8 @@ class BlobReflectorClient(Protocol): def handle_handshake_response(self, response_dict): if 'version' not in response_dict: raise ValueError("Need protocol version number!") - self.protocol_version = int(response_dict['version']) - if self.protocol_version != 0: + server_version = int(response_dict['version']) + if self.protocol_version != server_version: raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) self.received_handshake_response = True return defer.succeed(True) @@ -184,6 +183,7 @@ class BlobReflectorClientFactory(ClientFactory): protocol = BlobReflectorClient def __init__(self, blob_manager, blobs): + self.protocol_version = REFLECTOR_V2 self.blob_manager = blob_manager self.blobs = blobs self.p = None diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index e6c3e328e..31eda8d31 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -1,61 +1,13 @@ -""" -The reflector protocol (all dicts encoded in json): - -Client Handshake (sent once per connection, at the start of the connection): - -{ - 'version': 0, -} - - -Server Handshake (sent once per connection, after receiving the client handshake): - -{ - 'version': 0, -} - - -Client Info Request: - -{ - 'blob_hash': "", - 'blob_size': -} - - -Server Info Response (sent in response to Client Info Request): - -{ - 'send_blob': True|False -} - -If response is 'YES', client may send a Client Blob Request or a Client Info Request. -If response is 'NO', client may only send a Client Info Request - - -Client Blob Request: - -{} # Yes, this is an empty dictionary, in case something needs to go here in the future - # this blob data must match the info sent in the most recent Client Info Request - - -Server Blob Response (sent in response to Client Blob Request): -{ - 'received_blob': True -} - -Client may now send another Client Info Request - -""" import json import logging +from twisted.internet.error import ConnectionRefusedError from twisted.protocols.basic import FileSender from twisted.internet.protocol import Protocol, ClientFactory from twisted.internet import defer, error -from lbrynet.reflector.common import IncompleteResponse - +from lbrynet.reflector.common import IncompleteResponse, ReflectorRequestError +from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 log = logging.getLogger(__name__) @@ -63,24 +15,31 @@ log = logging.getLogger(__name__) class EncryptedFileReflectorClient(Protocol): # Protocol stuff def connectionMade(self): + log.debug("Connected to reflector") self.blob_manager = self.factory.blob_manager self.response_buff = '' self.outgoing_buff = '' self.blob_hashes_to_send = [] self.next_blob_to_send = None - self.blob_read_handle = None - self.received_handshake_response = False - self.protocol_version = None + self.read_handle = None + self.sent_stream_info = False + self.received_descriptor_response = False + self.protocol_version = self.factory.protocol_version + self.received_server_version = False + self.server_version = None + self.stream_descriptor = None + self.descriptor_needed = None + self.needed_blobs = [] + self.reflected_blobs = [] self.file_sender = None self.producer = None self.streaming = False - d = self.get_blobs_to_send(self.factory.stream_info_manager, self.factory.stream_hash) + d = self.load_descriptor() d.addCallback(lambda _: self.send_handshake()) d.addErrback( lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) def dataReceived(self, data): - log.debug('Recieved %s', data) self.response_buff += data try: msg = self.parse_response(self.response_buff) @@ -94,8 +53,8 @@ class EncryptedFileReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - log.info('Finished sending data via reflector') - self.factory.finished_deferred.callback(True) + log.debug('Finished sending data via reflector') + self.factory.finished_deferred.callback(self.reflected_blobs) else: log.debug('Reflector finished: %s', reason) self.factory.finished_deferred.callback(reason) @@ -118,31 +77,54 @@ class EncryptedFileReflectorClient(Protocol): from twisted.internet import reactor reactor.callLater(0, self.producer.resumeProducing) - def get_blobs_to_send(self, stream_info_manager, stream_hash): - log.debug('Getting blobs from stream hash: %s', stream_hash) - d = stream_info_manager.get_blobs_for_stream(stream_hash) + def get_validated_blobs(self, blobs_in_stream): + def get_blobs(blobs): + for (blob, _, _, blob_len) in blobs: + if blob: + yield self.blob_manager.get_blob(blob, True, blob_len) - def set_blobs(blob_hashes): - for blob_hash, position, iv, length in blob_hashes: - if blob_hash is not None: - self.blob_hashes_to_send.append(blob_hash) - log.debug("Preparing to send %i blobs", len(self.blob_hashes_to_send)) + dl = defer.DeferredList(list(get_blobs(blobs_in_stream)), consumeErrors=True) + dl.addCallback(lambda blobs: [blob for r, blob in blobs if r and blob.is_validated()]) + return dl - d.addCallback(set_blobs) + def set_blobs_to_send(self, blobs_to_send): + for blob in blobs_to_send: + if blob not in self.blob_hashes_to_send: + self.blob_hashes_to_send.append(blob) - d.addCallback(lambda _: stream_info_manager.get_sd_blob_hashes_for_stream(stream_hash)) + def get_blobs_to_send(self): + def _show_missing_blobs(filtered): + if filtered: + needs_desc = "" if not self.descriptor_needed else "descriptor and " + log.info("Reflector needs %s%i blobs for %s", + needs_desc, + len(filtered), + str(self.stream_descriptor)[:16]) + return filtered - def set_sd_blobs(sd_blob_hashes): - for sd_blob_hash in sd_blob_hashes: - self.blob_hashes_to_send.append(sd_blob_hash) - log.debug("Preparing to send %i sd blobs", len(sd_blob_hashes)) - - d.addCallback(set_sd_blobs) + d = self.factory.stream_info_manager.get_blobs_for_stream(self.factory.stream_hash) + d.addCallback(self.get_validated_blobs) + if not self.descriptor_needed: + d.addCallback(lambda filtered: + [blob for blob in filtered if blob.blob_hash in self.needed_blobs]) + d.addCallback(_show_missing_blobs) + d.addCallback(self.set_blobs_to_send) return d + def send_request(self, request_dict): + self.write(json.dumps(request_dict)) + def send_handshake(self): - log.debug('Sending handshake') - self.write(json.dumps({'version': 0})) + self.send_request({'version': self.protocol_version}) + + def load_descriptor(self): + def _save_descriptor_blob(sd_blob): + self.stream_descriptor = sd_blob + + d = self.factory.stream_info_manager.get_sd_blob_hashes_for_stream(self.factory.stream_hash) + d.addCallback(lambda sd: self.factory.blob_manager.get_blob(sd[0], True)) + d.addCallback(_save_descriptor_blob) + return d def parse_response(self, buff): try: @@ -154,8 +136,10 @@ class EncryptedFileReflectorClient(Protocol): log.warning("An error occurred handling the response: %s", err.getTraceback()) def handle_response(self, response_dict): - if self.received_handshake_response is False: + if not self.received_server_version: return self.handle_handshake_response(response_dict) + elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2: + return self.handle_descriptor_response(response_dict) else: return self.handle_normal_response(response_dict) @@ -168,7 +152,6 @@ class EncryptedFileReflectorClient(Protocol): return defer.succeed(None) def start_transfer(self): - self.write(json.dumps({})) assert self.read_handle is not None, \ "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) @@ -177,12 +160,37 @@ class EncryptedFileReflectorClient(Protocol): def handle_handshake_response(self, response_dict): if 'version' not in response_dict: raise ValueError("Need protocol version number!") - self.protocol_version = int(response_dict['version']) - if self.protocol_version != 0: - raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) - self.received_handshake_response = True + self.server_version = int(response_dict['version']) + if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]: + raise ValueError("I can't handle protocol version {}!".format(self.server_version)) + self.received_server_version = True return defer.succeed(True) + def handle_descriptor_response(self, response_dict): + if self.file_sender is None: # Expecting Server Info Response + if 'send_sd_blob' not in response_dict: + raise ReflectorRequestError("I don't know whether to send the sd blob or not!") + if response_dict['send_sd_blob'] is True: + self.file_sender = FileSender() + else: + self.received_descriptor_response = True + self.descriptor_needed = response_dict['send_sd_blob'] + self.needed_blobs = response_dict.get('needed_blobs', []) + return self.get_blobs_to_send() + else: # Expecting Server Blob Response + if 'received_sd_blob' not in response_dict: + raise ValueError("I don't know if the sd blob made it to the intended destination!") + else: + self.received_descriptor_response = True + if response_dict['received_sd_blob']: + self.reflected_blobs.append(self.next_blob_to_send.blob_hash) + log.info("Sent reflector descriptor %s", self.next_blob_to_send.blob_hash[:16]) + else: + log.warning("Reflector failed to receive descriptor %s, trying again later", + self.next_blob_to_send.blob_hash[:16]) + self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) + return self.set_not_uploading() + def handle_normal_response(self, response_dict): if self.file_sender is None: # Expecting Server Info Response if 'send_blob' not in response_dict: @@ -191,13 +199,19 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = FileSender() return defer.succeed(True) else: - log.debug("Reflector already has %s", str(self.next_blob_to_send.blob_hash)[:16]) + log.warning("Reflector already has %s", self.next_blob_to_send.blob_hash[:16]) return self.set_not_uploading() else: # Expecting Server Blob Response if 'received_blob' not in response_dict: raise ValueError("I don't know if the blob made it to the intended destination!") else: - log.info("Reflector received %s", str(self.next_blob_to_send.blob_hash)[:16]) + if response_dict['received_blob']: + self.reflected_blobs.append(self.next_blob_to_send.blob_hash) + log.info("Sent reflector blob %s", self.next_blob_to_send.blob_hash[:16]) + else: + log.warning("Reflector failed to receive blob %s, trying again later", + self.next_blob_to_send.blob_hash[:16]) + self.blob_hashes_to_send.append(self.next_blob_to_send.blob_hash) return self.set_not_uploading() def open_blob_for_reading(self, blob): @@ -207,50 +221,59 @@ class EncryptedFileReflectorClient(Protocol): log.debug('Getting ready to send %s', blob.blob_hash) self.next_blob_to_send = blob self.read_handle = read_handle - return None - else: - log.warning("Can't reflect blob %s", str(blob.blob_hash)[:16]) - return defer.fail(ValueError( + return defer.succeed(None) + return defer.fail(ValueError( "Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash))) def send_blob_info(self): assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" - log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) - self.write(json.dumps({ + r = { 'blob_hash': self.next_blob_to_send.blob_hash, 'blob_size': self.next_blob_to_send.length - })) + } + self.send_request(r) + + def send_descriptor_info(self): + assert self.stream_descriptor is not None, "need to have a sd blob to send at this point" + r = { + 'sd_blob_hash': self.stream_descriptor.blob_hash, + 'sd_blob_size': self.stream_descriptor.length + } + self.sent_stream_info = True + self.send_request(r) def skip_missing_blob(self, err, blob_hash): + log.warning("Can't reflect blob %s", str(blob_hash)[:16]) err.trap(ValueError) return self.send_next_request() def send_next_request(self): if self.file_sender is not None: # send the blob - log.debug('Sending %s to reflector', str(self.next_blob_to_send.blob_hash)[:16]) return self.start_transfer() + elif not self.sent_stream_info: + # open the sd blob to send + blob = self.stream_descriptor + d = self.open_blob_for_reading(blob) + d.addCallback(lambda _: self.send_descriptor_info()) + return d elif self.blob_hashes_to_send: # open the next blob to send - blob_hash = self.blob_hashes_to_send[0] - log.debug('No current blob, sending the next one: %s', blob_hash) + blob = self.blob_hashes_to_send[0] self.blob_hashes_to_send = self.blob_hashes_to_send[1:] - d = self.blob_manager.get_blob(blob_hash, True) - d.addCallback(self.open_blob_for_reading) - # send the server the next blob hash + length + d = self.open_blob_for_reading(blob) d.addCallbacks(lambda _: self.send_blob_info(), - lambda err: self.skip_missing_blob(err, blob_hash)) + lambda err: self.skip_missing_blob(err, blob.blob_hash)) return d - else: - # close connection - log.debug('No more blob hashes, closing connection') - self.transport.loseConnection() + # close connection + self.transport.loseConnection() class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient def __init__(self, blob_manager, stream_info_manager, stream_hash): + self.protocol_version = REFLECTOR_V2 self.blob_manager = blob_manager self.stream_info_manager = stream_info_manager self.stream_hash = stream_hash @@ -268,11 +291,13 @@ class EncryptedFileReflectorClientFactory(ClientFactory): ClientFactory.startFactory(self) def startedConnecting(self, connector): - log.debug('Started connecting') + log.debug('Connecting to reflector') def clientConnectionLost(self, connector, reason): """If we get disconnected, reconnect to server.""" - log.debug("connection lost: %s", reason) def clientConnectionFailed(self, connector, reason): - log.debug("connection failed: %s", reason) + if reason.check(ConnectionRefusedError): + log.warning("Could not connect to reflector server") + else: + log.error("Reflector connection failed: %s", reason) diff --git a/lbrynet/reflector/common.py b/lbrynet/reflector/common.py index f505167bf..5e41f2ede 100644 --- a/lbrynet/reflector/common.py +++ b/lbrynet/reflector/common.py @@ -1,2 +1,27 @@ +REFLECTOR_V1 = 0 +REFLECTOR_V2 = 1 + + +class ReflectorClientVersionError(Exception): + """ + Raised by reflector server if client sends an incompatible or unknown version + """ + + +class ReflectorRequestError(Exception): + """ + Raised by reflector server if client sends a message without the required fields + """ + + +class ReflectorRequestDecodeError(Exception): + """ + Raised by reflector server if client sends an invalid json request + """ + + class IncompleteResponse(Exception): - pass + """ + Raised by reflector server when client sends a portion of a json request, + used buffering the incoming request + """ diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 96a4b3d9f..9008e113f 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -1,5 +1,6 @@ import logging -from twisted.internet import reactor, defer +from twisted.internet import reactor +from twisted.internet.error import ConnectionLost, ConnectionDone from lbrynet.reflector import BlobClientFactory, ClientFactory log = logging.getLogger(__name__) @@ -27,19 +28,13 @@ def _reflect_stream(lbry_file, reflector_server): ) d = reactor.resolve(reflector_address) d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: log.info("Connected to %s", reflector_address)) d.addCallback(lambda _: factory.finished_deferred) + d.addCallback(lambda reflected_blobs: log.info("Reflected %i blobs for lbry://%s", + len(reflected_blobs), + lbry_file.uri)) return d -def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server): - if reflector_has_stream: - log.info("lbry://%s is available", lbry_file.uri) - return defer.succeed(False) - log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri) - return _reflect_stream(lbry_file, reflector_server) - - def _catch_error(err, uri): msg = "An error occurred while checking availability for lbry://%s: %s" log.error(msg, uri, err.getTraceback()) @@ -47,5 +42,6 @@ def _catch_error(err, uri): def check_and_restore_availability(lbry_file, reflector_server): d = _reflect_stream(lbry_file, reflector_server) + d.addErrback(lambda err: err.trap(ConnectionDone, ConnectionLost)) d.addErrback(_catch_error, lbry_file.uri) return d diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 03f278867..5340ba248 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -1,21 +1,36 @@ import logging +import json from twisted.python import failure from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory -import json from lbrynet.core.utils import is_valid_blobhash +from lbrynet.core.Error import DownloadCanceledError, InvalidBlobHashError +from lbrynet.reflector.common import REFLECTOR_V1, REFLECTOR_V2 +from lbrynet.reflector.common import ReflectorRequestError, ReflectorClientVersionError log = logging.getLogger(__name__) +MAXIMUM_QUERY_SIZE = 200 +SEND_SD_BLOB = 'send_sd_blob' +SEND_BLOB = 'send_blob' +RECEIVED_SD_BLOB = 'received_sd_blob' +RECEIVED_BLOB = 'received_blob' +NEEDED_BLOBS = 'needed_blobs' +VERSION = 'version' +BLOB_SIZE = 'blob_size' +BLOB_HASH = 'blob_hash' +SD_BLOB_SIZE = 'sd_blob_size' +SD_BLOB_HASH = 'sd_blob_hash' + class ReflectorServer(Protocol): - def connectionMade(self): peer_info = self.transport.getPeer() log.debug('Connection made to %s', peer_info) self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) self.blob_manager = self.factory.blob_manager + self.protocol_version = self.factory.protocol_version self.received_handshake = False self.peer_version = None self.receiving_blob = False @@ -28,6 +43,60 @@ class ReflectorServer(Protocol): def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): log.info("Reflector upload from %s finished" % self.peer.host) + def handle_error(self, err): + log.error(err.getTraceback()) + self.transport.loseConnection() + + def send_response(self, response_dict): + self.transport.write(json.dumps(response_dict)) + + ############################ + # Incoming blob file stuff # + ############################ + + def clean_up_failed_upload(self, err, blob): + log.warning("Failed to receive %s", blob) + if err.check(DownloadCanceledError): + self.blob_manager.delete_blobs([blob.blob_hash]) + else: + log.exception(err) + + @defer.inlineCallbacks + def _on_completed_blob(self, blob, response_key): + yield self.blob_manager.blob_completed(blob) + yield self.close_blob() + log.info("Received %s", blob) + yield self.send_response({response_key: True}) + + @defer.inlineCallbacks + def _on_failed_blob(self, err, response_key): + yield self.clean_up_failed_upload(err, self.incoming_blob) + yield self.send_response({response_key: False}) + + def handle_incoming_blob(self, response_key): + """ + Open blob for writing and send a response indicating if the transfer was + successful when finished. + + response_key will either be received_blob or received_sd_blob + """ + + blob = self.incoming_blob + self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) + self.blob_finished_d.addCallback(self._on_completed_blob, response_key) + self.blob_finished_d.addErrback(self._on_failed_blob, response_key) + + def close_blob(self): + self.blob_finished_d = None + self.blob_write = None + self.cancel_write = None + self.incoming_blob = None + self.receiving_blob = False + + #################### + # Request handling # + #################### + def dataReceived(self, data): if self.receiving_blob: self.blob_write(data) @@ -38,7 +107,7 @@ class ReflectorServer(Protocol): if msg is not None: self.request_buff = '' d = self.handle_request(msg) - d.addCallbacks(self.send_response, self.handle_error) + d.addErrback(self.handle_error) if self.receiving_blob and extra_data: log.debug('Writing extra data to blob') self.blob_write(extra_data) @@ -47,15 +116,15 @@ class ReflectorServer(Protocol): extra_data = None response = None curr_pos = 0 - while True: + while not self.receiving_blob: next_close_paren = response_msg.find('}', curr_pos) if next_close_paren != -1: curr_pos = next_close_paren + 1 try: response = json.loads(response_msg[:curr_pos]) except ValueError: - if curr_pos > 100: - raise Exception("error decoding response") + if curr_pos > MAXIMUM_QUERY_SIZE: + raise ValueError("Error decoding response: %s" % str(response_msg)) else: pass else: @@ -65,73 +134,185 @@ class ReflectorServer(Protocol): break return response, extra_data + def need_handshake(self): + return self.received_handshake is False + + def is_descriptor_request(self, request_dict): + if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict: + return False + if not is_valid_blobhash(request_dict[SD_BLOB_HASH]): + raise InvalidBlobHashError(request_dict[SD_BLOB_HASH]) + return True + + def is_blob_request(self, request_dict): + if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict: + return False + if not is_valid_blobhash(request_dict[BLOB_HASH]): + raise InvalidBlobHashError(request_dict[BLOB_HASH]) + return True + def handle_request(self, request_dict): - if self.received_handshake is False: + if self.need_handshake(): return self.handle_handshake(request_dict) - else: - return self.handle_normal_request(request_dict) + if self.is_descriptor_request(request_dict): + return self.handle_descriptor_request(request_dict) + if self.is_blob_request(request_dict): + return self.handle_blob_request(request_dict) + raise ReflectorRequestError("Invalid request") def handle_handshake(self, request_dict): - log.debug('Handling handshake') - if 'version' not in request_dict: - raise ValueError("Client should send version") - self.peer_version = int(request_dict['version']) - if self.peer_version != 0: - raise ValueError("I don't know that version!") + """ + Upon connecting, the client sends a version handshake: + { + 'version': int, + } + + The server replies with the same version if it is supported + { + 'version': int, + } + """ + + if VERSION not in request_dict: + raise ReflectorRequestError("Client should send version") + + if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]: + raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION])) + + log.debug('Handling handshake for client version %i', self.peer_version) + + self.peer_version = int(request_dict[VERSION]) self.received_handshake = True - return defer.succeed({'version': 0}) + return self.send_handshake_response() - def determine_blob_needed(self, blob): - if blob.is_validated(): - return {'send_blob': False} - else: - self.incoming_blob = blob - self.blob_finished_d, self.blob_write, self.cancel_write = blob.open_for_writing(self.peer) # pylint: disable=line-too-long - self.blob_finished_d.addCallback(lambda _: self.blob_manager.blob_completed(blob)) - return {'send_blob': True} + def send_handshake_response(self): + d = defer.succeed({VERSION: self.peer_version}) + d.addCallback(self.send_response) + return d - def close_blob(self): - self.blob_finished_d = None - self.blob_write = None - self.cancel_write = None - self.incoming_blob = None - self.receiving_blob = False + def handle_descriptor_request(self, request_dict): + """ + If the client is reflecting a whole stream, they send a stream descriptor request: + { + 'sd_blob_hash': str, + 'sd_blob_size': int + } + + The server indicates if it's aware of this stream already by requesting (or not requesting) + the stream descriptor blob. If the server has a validated copy of the sd blob, it will + include the needed_blobs field (a list of blob hashes missing from reflector) in the + response. If the server does not have the sd blob the needed_blobs field will not be + included, as the server does not know what blobs it is missing - so the client should send + all of the blobs in the stream. + { + 'send_sd_blob': bool + 'needed_blobs': list, conditional + } + + + The client may begin the file transfer of the sd blob if send_sd_blob was True. + If the client sends the blob, after receiving it the server indicates if the + transfer was successful: + { + 'received_sd_blob': bool + } + """ + + sd_blob_hash = request_dict[SD_BLOB_HASH] + sd_blob_size = request_dict[SD_BLOB_SIZE] - def handle_normal_request(self, request_dict): if self.blob_write is None: - # we haven't opened a blob yet, meaning we must be waiting for the - # next message containing a blob hash and a length. this message - # should be it. if it's one we want, open the blob for writing, and - # return a nice response dict (in a Deferred) saying go ahead - if not 'blob_hash' in request_dict or not 'blob_size' in request_dict: - raise ValueError("Expected a blob hash and a blob size") - if not is_valid_blobhash(request_dict['blob_hash']): - raise ValueError("Got a bad blob hash: {}".format(request_dict['blob_hash'])) - log.debug('Recieved info for blob: %s', request_dict['blob_hash']) - d = self.blob_manager.get_blob( - request_dict['blob_hash'], - True, - int(request_dict['blob_size']) - ) - d.addCallback(self.determine_blob_needed) + d = self.blob_manager.get_blob(sd_blob_hash, True, sd_blob_size) + d.addCallback(self.get_descriptor_response) + d.addCallback(self.send_response) + else: + self.receiving_blob = True + d = self.blob_finished_d + return d + + def get_descriptor_response(self, sd_blob): + if sd_blob.is_validated(): + d = defer.succeed({SEND_SD_BLOB: False}) + d.addCallback(self.request_needed_blobs, sd_blob) + else: + self.incoming_blob = sd_blob + self.receiving_blob = True + self.handle_incoming_blob(RECEIVED_SD_BLOB) + d = defer.succeed({SEND_SD_BLOB: True}) + return d + + def request_needed_blobs(self, response, sd_blob): + def _add_needed_blobs_to_response(needed_blobs): + response.update({NEEDED_BLOBS: needed_blobs}) + return response + + d = self.determine_missing_blobs(sd_blob) + d.addCallback(_add_needed_blobs_to_response) + return d + + def determine_missing_blobs(self, sd_blob): + with sd_blob.open_for_reading() as sd_file: + sd_blob_data = sd_file.read() + decoded_sd_blob = json.loads(sd_blob_data) + return self.get_unvalidated_blobs_in_stream(decoded_sd_blob) + + def get_unvalidated_blobs_in_stream(self, sd_blob): + dl = defer.DeferredList(list(self._iter_unvalidated_blobs_in_stream(sd_blob)), + consumeErrors=True) + dl.addCallback(lambda needed: [blob[1] for blob in needed if blob[1]]) + return dl + + def _iter_unvalidated_blobs_in_stream(self, sd_blob): + for blob in sd_blob['blobs']: + if 'blob_hash' in blob and 'length' in blob: + blob_hash, blob_len = blob['blob_hash'], blob['length'] + d = self.blob_manager.get_blob(blob_hash, True, blob_len) + d.addCallback(lambda blob: blob_hash if not blob.is_validated() else None) + yield d + + def handle_blob_request(self, request_dict): + """ + A client queries if the server will accept a blob + { + 'blob_hash': str, + 'blob_size': int + } + + The server replies, send_blob will be False if the server has a validated copy of the blob: + { + 'send_blob': bool + } + + The client may begin the raw blob file transfer if the server replied True. + If the client sends the blob, the server replies: + { + 'received_blob': bool + } + """ + + blob_hash = request_dict[BLOB_HASH] + blob_size = request_dict[BLOB_SIZE] + + if self.blob_write is None: + log.debug('Received info for blob: %s', blob_hash[:16]) + d = self.blob_manager.get_blob(blob_hash, True, blob_size) + d.addCallback(self.get_blob_response) + d.addCallback(self.send_response) else: - # we have a blob open already, so this message should have nothing - # important in it. to the deferred that fires when the blob is done, - # add a callback which returns a nice response dict saying to keep - # sending, and then return that deferred log.debug('blob is already open') self.receiving_blob = True d = self.blob_finished_d - d.addCallback(lambda _: self.close_blob()) - d.addCallback(lambda _: {'received_blob': True}) return d - def send_response(self, response_dict): - self.transport.write(json.dumps(response_dict)) - - def handle_error(self, err): - log.error(err.getTraceback()) - self.transport.loseConnection() + def get_blob_response(self, blob): + if blob.is_validated(): + return defer.succeed({SEND_BLOB: False}) + else: + self.incoming_blob = blob + self.receiving_blob = True + self.handle_incoming_blob(RECEIVED_BLOB) + d = defer.succeed({SEND_BLOB: True}) + return d class ReflectorServerFactory(ServerFactory): @@ -140,6 +321,7 @@ class ReflectorServerFactory(ServerFactory): def __init__(self, peer_manager, blob_manager): self.peer_manager = peer_manager self.blob_manager = blob_manager + self.protocol_version = REFLECTOR_V2 def buildProtocol(self, addr): log.debug('Creating a protocol for %s', addr) diff --git a/tests/functional/test_reflector.py b/tests/functional/test_reflector.py index 52032d12d..9eb21e915 100644 --- a/tests/functional/test_reflector.py +++ b/tests/functional/test_reflector.py @@ -29,32 +29,8 @@ class TestReflector(unittest.TestCase): self.lbry_file_manager = None self.server_blob_manager = None self.reflector_port = None + self.port = None self.addCleanup(self.take_down_env) - - def take_down_env(self): - d = defer.succeed(True) - if self.lbry_file_manager is not None: - d.addCallback(lambda _: self.lbry_file_manager.stop()) - if self.session is not None: - d.addCallback(lambda _: self.session.shut_down()) - if self.stream_info_manager is not None: - d.addCallback(lambda _: self.stream_info_manager.stop()) - if self.server_blob_manager is not None: - d.addCallback(lambda _: self.server_blob_manager.stop()) - if self.reflector_port is not None: - d.addCallback(lambda _: self.reflector_port.stopListening()) - - def delete_test_env(): - try: - shutil.rmtree('client') - except: - raise unittest.SkipTest("TODO: fix this for windows") - - d.addCallback(lambda _: threads.deferToThread(delete_test_env)) - d.addErrback(lambda err: str(err)) - return d - - def test_reflector(self): wallet = mocks.Wallet() peer_manager = PeerManager.PeerManager() peer_finder = mocks.PeerFinder(5553, peer_manager, 2) @@ -98,7 +74,7 @@ class TestReflector(unittest.TestCase): dht_node_class=Node ) - self.stream_info_manager = EncryptedFileMetadataManager.TempEncryptedFileMetadataManager() + self.stream_info_manager = EncryptedFileMetadataManager.DBEncryptedFileMetadataManager(db_dir) self.lbry_file_manager = EncryptedFileManager.EncryptedFileManager( self.session, self.stream_info_manager, sd_identifier) @@ -118,6 +94,7 @@ class TestReflector(unittest.TestCase): self.expected_blobs.append((sd_hash, 923)) def verify_stream_descriptor_file(stream_hash): + self.stream_hash = stream_hash d = lbryfile.get_sd_info(self.lbry_file_manager.stream_info_manager, stream_hash, True) d.addCallback(verify_equal) d.addCallback( @@ -127,7 +104,6 @@ class TestReflector(unittest.TestCase): ) ) d.addCallback(save_sd_blob_hash) - d.addCallback(lambda _: stream_hash) return d def create_stream(): @@ -149,45 +125,129 @@ class TestReflector(unittest.TestCase): while self.reflector_port is None: try: self.reflector_port = reactor.listenTCP(port, server_factory) + self.port = port except error.CannotListenError: port += 1 - return defer.succeed(port) - def send_to_server(port, stream_hash): - factory = reflector.ClientFactory( - self.session.blob_manager, - self.stream_info_manager, - stream_hash - ) + d.addCallback(lambda _: create_stream()) + d.addCallback(verify_stream_descriptor_file) + d.addCallback(lambda _: start_server()) + return d - from twisted.internet import reactor - reactor.connectTCP('localhost', port, factory) - return factory.finished_deferred + def take_down_env(self): + d = defer.succeed(True) + if self.lbry_file_manager is not None: + d.addCallback(lambda _: self.lbry_file_manager.stop()) + if self.session is not None: + d.addCallback(lambda _: self.session.shut_down()) + if self.stream_info_manager is not None: + d.addCallback(lambda _: self.stream_info_manager.stop()) + if self.server_blob_manager is not None: + d.addCallback(lambda _: self.server_blob_manager.stop()) + if self.reflector_port is not None: + d.addCallback(lambda _: self.reflector_port.stopListening()) - def verify_blob_completed(blob, blob_size): - self.assertTrue(blob.is_validated()) - self.assertEqual(blob_size, blob.length) + def delete_test_env(): + try: + shutil.rmtree('client') + except: + raise unittest.SkipTest("TODO: fix this for windows") - def verify_have_blob(blob_hash, blob_size): - d = self.server_blob_manager.get_blob(blob_hash, True) - d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) - return d + d.addCallback(lambda _: threads.deferToThread(delete_test_env)) + d.addErrback(lambda err: str(err)) + return d + def test_stream_reflector(self): def verify_data_on_reflector(): check_blob_ds = [] for blob_hash, blob_size in self.expected_blobs: check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) return defer.DeferredList(check_blob_ds) - def upload_to_reflector(stream_hash): - d = start_server() - d.addCallback(lambda port: send_to_server(port, stream_hash)) - d.addCallback(lambda _: verify_data_on_reflector()) + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) return d - d.addCallback(lambda _: create_stream()) - d.addCallback(verify_stream_descriptor_file) - d.addCallback(upload_to_reflector) + def send_to_server(): + factory = reflector.ClientFactory( + self.session.blob_manager, + self.stream_info_manager, + self.stream_hash + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + return + + d = send_to_server() + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + def test_blob_reflector(self): + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def send_to_server(blob_hashes_to_send): + factory = reflector.BlobClientFactory( + self.session.blob_manager, + blob_hashes_to_send + ) + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + d = send_to_server([x[0] for x in self.expected_blobs]) + d.addCallback(lambda _: verify_data_on_reflector()) + return d + + def test_blob_reflector_v1(self): + def verify_data_on_reflector(): + check_blob_ds = [] + for blob_hash, blob_size in self.expected_blobs: + check_blob_ds.append(verify_have_blob(blob_hash, blob_size)) + return defer.DeferredList(check_blob_ds) + + def verify_have_blob(blob_hash, blob_size): + d = self.server_blob_manager.get_blob(blob_hash, True) + d.addCallback(lambda blob: verify_blob_completed(blob, blob_size)) + return d + + def send_to_server(blob_hashes_to_send): + factory = reflector.BlobClientFactory( + self.session.blob_manager, + blob_hashes_to_send + ) + factory.protocol_version = 0 + + from twisted.internet import reactor + reactor.connectTCP('localhost', self.port, factory) + return factory.finished_deferred + + def verify_blob_completed(blob, blob_size): + self.assertTrue(blob.is_validated()) + self.assertEqual(blob_size, blob.length) + + d = send_to_server([x[0] for x in self.expected_blobs]) + d.addCallback(lambda _: verify_data_on_reflector()) return d