diff --git a/lbrynet/extras/reflector/__init__.py b/lbrynet/extras/reflector/__init__.py deleted file mode 100644 index 56f9bfdf6..000000000 --- a/lbrynet/extras/reflector/__init__.py +++ /dev/null @@ -1,65 +0,0 @@ -__doc__ = """ -Reflector is a protocol to re-host lbry blobs and streams -Client queries and server responses follow, all dicts are encoded as json - -############# Handshake request and response ############# -Upon connecting, the client sends a version handshake: -{ - 'version': int, -} - -The server replies with the same version -{ - 'version': int, -} - -############# Stream descriptor requests and responses ############# -(if sending blobs directly this is skipped) -If the client is reflecting a whole stream, they send a stream descriptor request: -{ - 'sd_blob_hash': str, - 'sd_blob_size': int -} - -The server indicates if it's aware of this stream already by requesting (or not requesting) -the stream descriptor blob. If the server has a validated copy of the sd blob, it will -include the needed_blobs field (a list of blob hashes missing from reflector) in the response. -If the server does not have the sd blob the needed_blobs field will not be included, as the -server does not know what blobs it is missing - so the client should send all of the blobs -in the stream. -{ - 'send_sd_blob': bool - 'needed_blobs': list, conditional -} - -The client may begin the file transfer of the sd blob if send_sd_blob was True. -If the client sends the blob, after receiving it the server indicates if the -transfer was successful: -{ - 'received_sd_blob': bool -} -If the transfer was not successful (False), the blob is added to the needed_blobs queue - -############# Blob requests and responses ############# -A client with blobs to reflect (either populated by the client or by the stream descriptor -response) queries if the server is ready to begin transferring a blob -{ - 'blob_hash': str, - 'blob_size': int -} - -The server replies, send_blob will be False if the server has a validated copy of the blob: -{ - 'send_blob': bool -} - -The client may begin the raw blob file transfer if the server replied True. -If the client sends the blob, the server replies: -{ - 'received_blob': bool -} -If the transfer was not successful (False), the blob is re-added to the needed_blobs queue - -Blob requests continue for each of the blobs the client has queued to send, when completed -the client disconnects. -""" diff --git a/lbrynet/extras/reflector/client/__init__.py b/lbrynet/extras/reflector/client/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lbrynet/extras/reflector/client/blob.py b/lbrynet/extras/reflector/client/blob.py deleted file mode 100644 index 5355e3896..000000000 --- a/lbrynet/extras/reflector/client/blob.py +++ /dev/null @@ -1,206 +0,0 @@ -import json -import logging - -from twisted.protocols.basic import FileSender -from twisted.internet.protocol import Protocol, ClientFactory -from twisted.internet import defer, error - -from lbrynet.extras.reflector.common import IncompleteResponse, REFLECTOR_V2 - - -log = logging.getLogger(__name__) - - -class BlobReflectorClient(Protocol): - # Protocol stuff - - def connectionMade(self): - self.blob_manager = self.factory.blob_manager - self.response_buff = b'' - self.outgoing_buff = '' - self.blob_hashes_to_send = self.factory.blobs - self.next_blob_to_send = None - self.blob_read_handle = None - self.received_handshake_response = False - self.protocol_version = self.factory.protocol_version - self.file_sender = None - self.producer = None - self.streaming = False - self.reflected_blobs = [] - d = self.send_handshake() - d.addErrback( - lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) - - def dataReceived(self, data): - log.debug('Received %s', data) - self.response_buff += data - try: - msg = self.parse_response(self.response_buff) - except IncompleteResponse: - pass - else: - self.response_buff = b'' - d = self.handle_response(msg) - d.addCallback(lambda _: self.send_next_request()) - d.addErrback(self.response_failure_handler) - - def connectionLost(self, reason): - if reason.check(error.ConnectionDone): - if self.reflected_blobs: - log.info('Finished sending data via reflector') - self.factory.finished_deferred.callback(self.reflected_blobs) - else: - log.info('Reflector finished: %s', reason) - self.factory.finished_deferred.callback(reason) - - # IConsumer stuff - - def registerProducer(self, producer, streaming): - self.producer = producer - self.streaming = streaming - if self.streaming is False: - from twisted.internet import reactor - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.producer = None - - def write(self, data): - self.transport.write(data) - if self.producer is not None and self.streaming is False: - from twisted.internet import reactor - reactor.callLater(0, self.producer.resumeProducing) - - def send_handshake(self): - log.debug('Sending handshake') - self.write(json.dumps({'version': self.protocol_version}).encode()) - return defer.succeed(None) - - def parse_response(self, buff): - try: - return json.loads(buff) - except ValueError: - raise IncompleteResponse() - - def response_failure_handler(self, err): - log.warning("An error occurred handling the response: %s", err.getTraceback()) - - def handle_response(self, response_dict): - if self.received_handshake_response is False: - return self.handle_handshake_response(response_dict) - else: - return self.handle_normal_response(response_dict) - - def set_not_uploading(self): - if self.next_blob_to_send is not None: - self.read_handle.close() - self.read_handle = None - self.next_blob_to_send = None - self.file_sender = None - return defer.succeed(None) - - def start_transfer(self): - assert self.read_handle is not None, \ - "self.read_handle was None when trying to start the transfer" - d = self.file_sender.beginFileTransfer(self.read_handle, self) - d.addCallback(lambda _: self.read_handle.close()) - return d - - def handle_handshake_response(self, response_dict): - if 'version' not in response_dict: - raise ValueError("Need protocol version number!") - server_version = int(response_dict['version']) - if self.protocol_version != server_version: - raise ValueError(f"I can't handle protocol version {self.protocol_version}!") - self.received_handshake_response = True - return defer.succeed(True) - - def handle_normal_response(self, response_dict): - if self.file_sender is None: # Expecting Server Info Response - if 'send_blob' not in response_dict: - raise ValueError("I don't know whether to send the blob or not!") - if response_dict['send_blob'] is True: - self.file_sender = FileSender() - return defer.succeed(True) - else: - return self.set_not_uploading() - else: # Expecting Server Blob Response - if 'received_blob' not in response_dict: - raise ValueError("I don't know if the blob made it to the intended destination!") - else: - if response_dict['received_blob']: - self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - return self.set_not_uploading() - - def open_blob_for_reading(self, blob): - if blob.get_is_verified(): - read_handle = blob.open_for_reading() - if read_handle is not None: - log.debug('Getting ready to send %s', blob.blob_hash) - self.next_blob_to_send = blob - self.read_handle = read_handle - return None - raise ValueError( - f"Couldn't open that blob for some reason. blob_hash: {blob.blob_hash}") - - def send_blob_info(self): - log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) - assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" - log.debug('sending blob info') - self.write(json.dumps({ - 'blob_hash': self.next_blob_to_send.blob_hash, - 'blob_size': self.next_blob_to_send.length - }).encode()) - - def disconnect(self, err): - self.transport.loseConnection() - - def send_next_request(self): - if self.file_sender is not None: - # send the blob - log.debug('Sending the blob') - return self.start_transfer() - elif self.blob_hashes_to_send: - # open the next blob to send - blob_hash = self.blob_hashes_to_send[0] - log.debug('No current blob, sending the next one: %s', blob_hash) - self.blob_hashes_to_send = self.blob_hashes_to_send[1:] - blob = self.blob_manager.get_blob(blob_hash) - self.open_blob_for_reading(blob) - # send the server the next blob hash + length - return self.send_blob_info() - else: - # close connection - log.debug('No more blob hashes, closing connection') - self.transport.loseConnection() - - -class BlobReflectorClientFactory(ClientFactory): - protocol = BlobReflectorClient - - def __init__(self, blob_manager, blobs): - self.protocol_version = REFLECTOR_V2 - self.blob_manager = blob_manager - self.blobs = blobs - self.p = None - self.finished_deferred = defer.Deferred() - - def buildProtocol(self, addr): - p = self.protocol() - p.factory = self - self.p = p - return p - - def startFactory(self): - log.debug('Starting reflector factory') - ClientFactory.startFactory(self) - - def startedConnecting(self, connector): - log.debug('Started connecting') - - def clientConnectionLost(self, connector, reason): - """If we get disconnected, reconnect to server.""" - log.debug("connection lost: %s", reason.getErrorMessage()) - - def clientConnectionFailed(self, connector, reason): - log.debug("connection failed: %s", reason.getErrorMessage()) diff --git a/lbrynet/extras/reflector/client/client.py b/lbrynet/extras/reflector/client/client.py deleted file mode 100644 index 49be58b9a..000000000 --- a/lbrynet/extras/reflector/client/client.py +++ /dev/null @@ -1,346 +0,0 @@ -import json -import logging - -from twisted.internet.error import ConnectionRefusedError -from twisted.protocols.basic import FileSender -from twisted.internet.protocol import Protocol, ClientFactory -from twisted.internet import defer, error - -from lbrynet.extras.compat import f2d -from lbrynet.extras.reflector.common import IncompleteResponse, ReflectorRequestError -from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2 - -log = logging.getLogger(__name__) - - -class EncryptedFileReflectorClient(Protocol): - # Protocol stuff - def connectionMade(self): - log.debug("Connected to reflector") - self.response_buff = b'' - self.outgoing_buff = b'' - self.blob_hashes_to_send = [] - self.failed_blob_hashes = [] - self.next_blob_to_send = None - self.read_handle = None - self.sent_stream_info = False - self.received_descriptor_response = False - self.received_server_version = False - self.server_version = None - self.stream_descriptor = None - self.descriptor_needed = None - self.needed_blobs = [] - self.reflected_blobs = [] - self.file_sender = None - self.producer = None - self.streaming = False - - self.blob_manager = self.factory.blob_manager - self.protocol_version = self.factory.protocol_version - self.stream_hash = self.factory.stream_hash - self.sd_hash = self.factory.sd_hash - - d = self.load_descriptor() - d.addCallback(lambda _: self.send_handshake()) - d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) - - def dataReceived(self, data): - self.response_buff += data - try: - msg = self.parse_response(self.response_buff) - except IncompleteResponse: - pass - else: - self.response_buff = b'' - d = self.handle_response(msg) - d.addCallback(lambda _: self.send_next_request()) - d.addErrback(self.response_failure_handler) - - def store_result(self, result): - if not self.needed_blobs or len(self.reflected_blobs) == len(self.needed_blobs): - reflected = True - else: - reflected = False - - d = f2d(self.blob_manager.storage.update_reflected_stream( - self.sd_hash, self.transport.getPeer().host, reflected - )) - d.addCallback(lambda _: result) - return d - - def connectionLost(self, reason): - # make sure blob file readers get closed - self.set_not_uploading() - - if reason.check(error.ConnectionDone): - if not self.needed_blobs: - log.info("Reflector has all blobs for %s", self.stream_descriptor) - elif not self.reflected_blobs: - log.info("No more completed blobs for %s to reflect, %i are still needed", - self.stream_descriptor, len(self.needed_blobs)) - else: - log.info('Finished sending reflector %i blobs for %s', - len(self.reflected_blobs), self.stream_descriptor) - result = self.reflected_blobs - elif reason.check(error.ConnectionLost): - log.warning("Stopped reflecting %s after sending %i blobs", - self.stream_descriptor, len(self.reflected_blobs)) - result = self.reflected_blobs - else: - log.info('Reflector finished for %s: %s', self.stream_descriptor, - reason) - result = reason - self.factory.finished_deferred.addCallback(self.store_result) - self.factory.finished_deferred.callback(result) - - # IConsumer stuff - - def registerProducer(self, producer, streaming): - self.producer = producer - self.streaming = streaming - if self.streaming is False: - from twisted.internet import reactor - reactor.callLater(0, self.producer.resumeProducing) - - def unregisterProducer(self): - self.producer = None - - def write(self, data): - self.transport.write(data) - if self.producer is not None and self.streaming is False: - from twisted.internet import reactor - reactor.callLater(0, self.producer.resumeProducing) - - def get_validated_blobs(self, blobs_in_stream): - def get_blobs(blobs): - for crypt_blob in blobs: - if crypt_blob.blob_hash and crypt_blob.length: - yield self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) - return [blob for blob in get_blobs(blobs_in_stream) if blob.get_is_verified()] - - def set_blobs_to_send(self, blobs_to_send): - for blob in blobs_to_send: - if blob.blob_hash not in self.blob_hashes_to_send: - self.blob_hashes_to_send.append(blob.blob_hash) - - def get_blobs_to_send(self): - def _show_missing_blobs(filtered): - if filtered: - needs_desc = "" if not self.descriptor_needed else "descriptor and " - log.info("Reflector needs %s%i blobs for stream", - needs_desc, - len(filtered)) - return filtered - - d = f2d(self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash)) - d.addCallback(self.get_validated_blobs) - if not self.descriptor_needed: - d.addCallback(lambda filtered: - [blob for blob in filtered if blob.blob_hash in self.needed_blobs]) - d.addCallback(_show_missing_blobs) - d.addCallback(self.set_blobs_to_send) - d.addCallback(lambda _: None if self.descriptor_needed else self.set_not_uploading()) - return d - - def send_request(self, request_dict): - self.write(json.dumps(request_dict).encode()) - - def send_handshake(self): - self.send_request({'version': self.protocol_version}) - - @defer.inlineCallbacks - def load_descriptor(self): - if self.sd_hash: - self.stream_descriptor = yield self.factory.blob_manager.get_blob(self.sd_hash) - else: - raise ValueError("no sd hash for stream %s" % self.stream_hash) - - def parse_response(self, buff): - try: - return json.loads(buff) - except ValueError: - raise IncompleteResponse() - - def response_failure_handler(self, err): - log.warning("An error occurred handling the response: %s", err.getTraceback()) - - def handle_response(self, response_dict): - if not self.received_server_version: - return self.handle_handshake_response(response_dict) - elif not self.received_descriptor_response and self.server_version == REFLECTOR_V2: - return self.handle_descriptor_response(response_dict) - else: - return self.handle_normal_response(response_dict) - - def set_not_uploading(self): - if self.next_blob_to_send is not None: - log.debug("Close %s", self.next_blob_to_send) - self.read_handle.close() - self.read_handle = None - self.next_blob_to_send = None - if self.file_sender is not None: - self.file_sender.stopProducing() - self.file_sender = None - return defer.succeed(None) - - def start_transfer(self): - assert self.read_handle is not None, \ - "self.read_handle was None when trying to start the transfer" - d = self.file_sender.beginFileTransfer(self.read_handle, self) - d.addCallback(lambda _: self.read_handle.close()) - return d - - def handle_handshake_response(self, response_dict): - if 'version' not in response_dict: - raise ValueError("Need protocol version number!") - self.server_version = int(response_dict['version']) - if self.server_version not in [REFLECTOR_V1, REFLECTOR_V2]: - raise ValueError(f"I can't handle protocol version {self.server_version}!") - self.received_server_version = True - return defer.succeed(True) - - def handle_descriptor_response(self, response_dict): - if self.file_sender is None: # Expecting Server Info Response - if 'send_sd_blob' not in response_dict: - raise ReflectorRequestError("I don't know whether to send the sd blob or not!") - if response_dict['send_sd_blob'] is True: - self.file_sender = FileSender() - else: - self.received_descriptor_response = True - self.descriptor_needed = response_dict['send_sd_blob'] - self.needed_blobs = response_dict.get('needed_blobs', []) - return self.get_blobs_to_send() - else: # Expecting Server Blob Response - if 'received_sd_blob' not in response_dict: - raise ValueError("I don't know if the sd blob made it to the intended destination!") - else: - self.received_descriptor_response = True - disconnect = False - if response_dict['received_sd_blob']: - self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - log.info("Sent reflector descriptor %s", self.next_blob_to_send) - else: - log.warning("Reflector failed to receive descriptor %s", - self.next_blob_to_send) - disconnect = True - d = self.set_not_uploading() - if disconnect: - d.addCallback(lambda _: self.transport.loseConnection()) - return d - - def handle_normal_response(self, response_dict): - if self.file_sender is None: # Expecting Server Info Response - if 'send_blob' not in response_dict: - raise ValueError("I don't know whether to send the blob or not!") - if response_dict['send_blob'] is True: - self.file_sender = FileSender() - return defer.succeed(True) - else: - log.info("Reflector already has %s", self.next_blob_to_send) - return self.set_not_uploading() - else: # Expecting Server Blob Response - if 'received_blob' not in response_dict: - raise ValueError("I don't know if the blob made it to the intended destination!") - else: - if response_dict['received_blob']: - self.reflected_blobs.append(self.next_blob_to_send.blob_hash) - log.debug("Sent reflector blob %s", self.next_blob_to_send) - else: - log.warning("Reflector failed to receive blob %s", self.next_blob_to_send) - return self.set_not_uploading() - - def open_blob_for_reading(self, blob): - if blob.get_is_verified(): - read_handle = blob.open_for_reading() - if read_handle is not None: - log.debug('Getting ready to send %s', blob.blob_hash) - self.next_blob_to_send = blob - self.read_handle = read_handle - return defer.succeed(None) - return defer.fail(ValueError( - f"Couldn't open that blob for some reason. blob_hash: {blob.blob_hash}")) - - def send_blob_info(self): - assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" - r = { - 'blob_hash': self.next_blob_to_send.blob_hash, - 'blob_size': self.next_blob_to_send.length - } - self.send_request(r) - - def send_descriptor_info(self): - assert self.stream_descriptor is not None, "need to have a sd blob to send at this point" - r = { - 'sd_blob_hash': self.stream_descriptor.blob_hash, - 'sd_blob_size': self.stream_descriptor.length - } - self.sent_stream_info = True - self.send_request(r) - - def skip_missing_blob(self, err, blob_hash): - err.trap(ValueError) - if blob_hash not in self.failed_blob_hashes: - log.warning("Failed to reflect blob %s, reason: %s", - str(blob_hash)[:16], err.getTraceback()) - self.blob_hashes_to_send.append(blob_hash) - self.failed_blob_hashes.append(blob_hash) - else: - log.warning("Failed second try reflecting blob %s, giving up, reason: %s", - str(blob_hash)[:16], err.getTraceback()) - - def send_next_request(self): - if self.file_sender is not None: - # send the blob - return self.start_transfer() - elif not self.sent_stream_info: - # open the sd blob to send - blob = self.stream_descriptor - d = self.open_blob_for_reading(blob) - d.addCallbacks(lambda _: self.send_descriptor_info(), - lambda err: self.skip_missing_blob(err, blob.blob_hash)) - return d - elif self.blob_hashes_to_send: - # open the next blob to send - blob_hash = self.blob_hashes_to_send[0] - self.blob_hashes_to_send = self.blob_hashes_to_send[1:] - d = defer.succeed(self.blob_manager.get_blob(blob_hash)) - d.addCallback(self.open_blob_for_reading) - d.addCallbacks(lambda _: self.send_blob_info(), - lambda err: self.skip_missing_blob(err, blob.blob_hash)) - return d - # close connection - self.transport.loseConnection() - - -class EncryptedFileReflectorClientFactory(ClientFactory): - protocol = EncryptedFileReflectorClient - protocol_version = REFLECTOR_V2 - - def __init__(self, blob_manager, stream_hash, sd_hash): - self.blob_manager = blob_manager - self.stream_hash = stream_hash - self.sd_hash = sd_hash - self.p = None - self.finished_deferred = defer.Deferred() - - def buildProtocol(self, addr): - p = self.protocol() - p.factory = self - self.p = p - return p - - def startFactory(self): - log.debug('Starting reflector factory') - ClientFactory.startFactory(self) - - def startedConnecting(self, connector): - log.debug('Connecting to reflector') - - def clientConnectionLost(self, connector, reason): - """If we get disconnected, reconnect to server.""" - - def clientConnectionFailed(self, connector, reason): - if reason.check(ConnectionRefusedError): - log.warning("Could not connect to reflector server") - else: - log.error("Reflector connection failed: %s", reason) diff --git a/lbrynet/extras/reflector/common.py b/lbrynet/extras/reflector/common.py deleted file mode 100644 index 5e41f2ede..000000000 --- a/lbrynet/extras/reflector/common.py +++ /dev/null @@ -1,27 +0,0 @@ -REFLECTOR_V1 = 0 -REFLECTOR_V2 = 1 - - -class ReflectorClientVersionError(Exception): - """ - Raised by reflector server if client sends an incompatible or unknown version - """ - - -class ReflectorRequestError(Exception): - """ - Raised by reflector server if client sends a message without the required fields - """ - - -class ReflectorRequestDecodeError(Exception): - """ - Raised by reflector server if client sends an invalid json request - """ - - -class IncompleteResponse(Exception): - """ - Raised by reflector server when client sends a portion of a json request, - used buffering the incoming request - """ diff --git a/lbrynet/extras/reflector/reupload.py b/lbrynet/extras/reflector/reupload.py deleted file mode 100644 index 249232747..000000000 --- a/lbrynet/extras/reflector/reupload.py +++ /dev/null @@ -1,63 +0,0 @@ -from twisted.internet import reactor, defer -from lbrynet.extras.reflector.client.client import EncryptedFileReflectorClientFactory -from lbrynet.extras.reflector.client.blob import BlobReflectorClientFactory - - -def _is_ip(host): - try: - if len(host.split(".")) == 4 and all([0 <= int(x) <= 255 for x in host.split(".")]): - return True - return False - except ValueError: - return False - - -@defer.inlineCallbacks -def resolve(host): - if _is_ip(host): - ip = host - else: - ip = yield reactor.resolve(host) - defer.returnValue(ip) - - -@defer.inlineCallbacks -def _reflect_stream(blob_manager, stream_hash, sd_hash, reflector_server): - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = EncryptedFileReflectorClientFactory(blob_manager, stream_hash, sd_hash) - ip = yield resolve(reflector_address) - yield reactor.connectTCP(ip, reflector_port, factory) - result = yield factory.finished_deferred - defer.returnValue(result) - - -def _reflect_file(lbry_file, reflector_server): - return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, lbry_file.sd_hash, reflector_server) - - -@defer.inlineCallbacks -def _reflect_blobs(blob_manager, blob_hashes, reflector_server): - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = BlobReflectorClientFactory(blob_manager, blob_hashes) - ip = yield resolve(reflector_address) - yield reactor.connectTCP(ip, reflector_port, factory) - result = yield factory.finished_deferred - return result - - -def reflect_file(lbry_file, reflector_server): - if len(reflector_server.split(":")) == 2: - host, port = tuple(reflector_server.split(":")) - reflector_server = host, int(port) - else: - reflector_server = reflector_server, 5566 - return _reflect_file(lbry_file, reflector_server) - - -def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server): - if len(reflector_server.split(":")) == 2: - host, port = tuple(reflector_server.split(":")) - reflector_server = host, int(port) - else: - reflector_server = reflector_server, 5566 - return _reflect_blobs(blob_manager, blob_hashes, reflector_server) diff --git a/lbrynet/extras/reflector/server/__init__.py b/lbrynet/extras/reflector/server/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lbrynet/extras/reflector/server/server.py b/lbrynet/extras/reflector/server/server.py deleted file mode 100644 index f51869e38..000000000 --- a/lbrynet/extras/reflector/server/server.py +++ /dev/null @@ -1,325 +0,0 @@ -import logging -import json -from twisted.python import failure -from twisted.internet import error, defer -from twisted.internet.protocol import Protocol, ServerFactory -from lbrynet.blob.blob_file import is_valid_blobhash -from lbrynet.p2p.Error import DownloadCanceledError, InvalidBlobHashError -from lbrynet.p2p.StreamDescriptor import BlobStreamDescriptorReader -from lbrynet.p2p.StreamDescriptor import save_sd_info -from lbrynet.extras.reflector.common import REFLECTOR_V1, REFLECTOR_V2 -from lbrynet.extras.reflector.common import ReflectorRequestError, ReflectorClientVersionError - - -log = logging.getLogger(__name__) - -MAXIMUM_QUERY_SIZE = 200 -SEND_SD_BLOB = 'send_sd_blob' -SEND_BLOB = 'send_blob' -RECEIVED_SD_BLOB = 'received_sd_blob' -RECEIVED_BLOB = 'received_blob' -NEEDED_BLOBS = 'needed_blobs' -VERSION = 'version' -BLOB_SIZE = 'blob_size' -BLOB_HASH = 'blob_hash' -SD_BLOB_SIZE = 'sd_blob_size' -SD_BLOB_HASH = 'sd_blob_hash' - - -class ReflectorServer(Protocol): - def connectionMade(self): - peer_info = self.transport.getPeer() - log.debug('Connection made to %s', peer_info) - self.peer = self.factory.peer_manager.get_peer(peer_info.host, peer_info.port) - self.blob_manager = self.factory.blob_manager - self.storage = self.factory.blob_manager.storage - self.lbry_file_manager = self.factory.lbry_file_manager - self.protocol_version = self.factory.protocol_version - self.received_handshake = False - self.peer_version = None - self.receiving_blob = False - self.incoming_blob = None - self.blob_finished_d = None - self.request_buff = b"" - - self.blob_writer = None - - def connectionLost(self, reason=failure.Failure(error.ConnectionDone())): - log.info("Reflector upload from %s finished" % self.peer.host) - - def handle_error(self, err): - log.error(err.getTraceback()) - self.transport.loseConnection() - - def send_response(self, response_dict): - self.transport.write(json.dumps(response_dict).encode()) - - ############################ - # Incoming blob file stuff # - ############################ - - def clean_up_failed_upload(self, err, blob): - log.warning("Failed to receive %s", blob) - if err.check(DownloadCanceledError): - self.blob_manager.delete_blobs([blob.blob_hash]) - else: - log.exception(err) - - @defer.inlineCallbacks - def _on_completed_blob(self, blob, response_key): - yield self.blob_manager.blob_completed(blob, should_announce=False) - - if response_key == RECEIVED_SD_BLOB: - sd_info = yield BlobStreamDescriptorReader(blob).get_info() - yield save_sd_info(self.blob_manager, blob.blob_hash, sd_info) - yield self.blob_manager.set_should_announce(blob.blob_hash, True) - else: - stream_hash = yield self.storage.get_stream_of_blob(blob.blob_hash) - if stream_hash is not None: - blob_num = yield self.storage.get_blob_num_by_hash(stream_hash, - blob.blob_hash) - if blob_num == 0: - yield self.blob_manager.set_should_announce(blob.blob_hash, True) - - yield self.close_blob() - log.info("Received %s", blob) - yield self.send_response({response_key: True}) - - @defer.inlineCallbacks - def _on_failed_blob(self, err, response_key): - yield self.clean_up_failed_upload(err, self.incoming_blob) - yield self.send_response({response_key: False}) - - def handle_incoming_blob(self, response_key): - """ - Open blob for writing and send a response indicating if the transfer was - successful when finished. - - response_key will either be received_blob or received_sd_blob - """ - - blob = self.incoming_blob - self.blob_writer, self.blob_finished_d = blob.open_for_writing(self.peer) - self.blob_finished_d.addCallback(self._on_completed_blob, response_key) - self.blob_finished_d.addErrback(self._on_failed_blob, response_key) - - def close_blob(self): - self.blob_writer.close() - self.blob_writer = None - self.blob_finished_d = None - self.incoming_blob = None - self.receiving_blob = False - - #################### - # Request handling # - #################### - - def dataReceived(self, data): - if self.receiving_blob: - self.blob_writer.write(data) - else: - log.debug('Not yet receiving blob, data needs further processing') - self.request_buff += data - msg, extra_data = self._get_valid_response(self.request_buff) - if msg is not None: - self.request_buff = b'' - d = self.handle_request(msg) - d.addErrback(self.handle_error) - if self.receiving_blob and extra_data: - log.debug('Writing extra data to blob') - self.blob_writer.write(extra_data) - - def _get_valid_response(self, response_msg): - extra_data = None - response = None - curr_pos = 0 - while not self.receiving_blob: - next_close_paren = response_msg.find(b'}', curr_pos) - if next_close_paren != -1: - curr_pos = next_close_paren + 1 - try: - response = json.loads(response_msg[:curr_pos]) - except ValueError: - if curr_pos > MAXIMUM_QUERY_SIZE: - raise ValueError("Error decoding response: %s" % str(response_msg)) - else: - pass - else: - extra_data = response_msg[curr_pos:] - break - else: - break - return response, extra_data - - def need_handshake(self): - return self.received_handshake is False - - def is_descriptor_request(self, request_dict): - if SD_BLOB_HASH not in request_dict or SD_BLOB_SIZE not in request_dict: - return False - if not is_valid_blobhash(request_dict[SD_BLOB_HASH]): - raise InvalidBlobHashError(request_dict[SD_BLOB_HASH]) - return True - - def is_blob_request(self, request_dict): - if BLOB_HASH not in request_dict or BLOB_SIZE not in request_dict: - return False - if not is_valid_blobhash(request_dict[BLOB_HASH]): - raise InvalidBlobHashError(request_dict[BLOB_HASH]) - return True - - def handle_request(self, request_dict): - if self.need_handshake(): - return self.handle_handshake(request_dict) - if self.is_descriptor_request(request_dict): - return self.handle_descriptor_request(request_dict) - if self.is_blob_request(request_dict): - return self.handle_blob_request(request_dict) - raise ReflectorRequestError("Invalid request") - - def handle_handshake(self, request_dict): - """ - Upon connecting, the client sends a version handshake: - { - 'version': int, - } - - The server replies with the same version if it is supported - { - 'version': int, - } - """ - - if VERSION not in request_dict: - raise ReflectorRequestError("Client should send version") - - if int(request_dict[VERSION]) not in [REFLECTOR_V1, REFLECTOR_V2]: - raise ReflectorClientVersionError("Unknown version: %i" % int(request_dict[VERSION])) - - self.peer_version = int(request_dict[VERSION]) - log.debug('Handling handshake for client version %i', self.peer_version) - self.received_handshake = True - return self.send_handshake_response() - - def send_handshake_response(self): - d = defer.succeed({VERSION: self.peer_version}) - d.addCallback(self.send_response) - return d - - def handle_descriptor_request(self, request_dict): - """ - If the client is reflecting a whole stream, they send a stream descriptor request: - { - 'sd_blob_hash': str, - 'sd_blob_size': int - } - - The server indicates if it's aware of this stream already by requesting (or not requesting) - the stream descriptor blob. If the server has a validated copy of the sd blob, it will - include the needed_blobs field (a list of blob hashes missing from reflector) in the - response. If the server does not have the sd blob the needed_blobs field will not be - included, as the server does not know what blobs it is missing - so the client should send - all of the blobs in the stream. - { - 'send_sd_blob': bool - 'needed_blobs': list, conditional - } - - - The client may begin the file transfer of the sd blob if send_sd_blob was True. - If the client sends the blob, after receiving it the server indicates if the - transfer was successful: - { - 'received_sd_blob': bool - } - """ - - sd_blob_hash = request_dict[SD_BLOB_HASH] - sd_blob_size = request_dict[SD_BLOB_SIZE] - - if self.blob_writer is None: - d = self.blob_manager.get_blob(sd_blob_hash, sd_blob_size) - d.addCallback(self.get_descriptor_response) - d.addCallback(self.send_response) - else: - self.receiving_blob = True - d = self.blob_finished_d - return d - - @defer.inlineCallbacks - def get_descriptor_response(self, sd_blob): - if sd_blob.get_is_verified(): - sd_info = yield BlobStreamDescriptorReader(sd_blob).get_info() - yield save_sd_info(self.blob_manager, sd_blob.blob_hash, sd_info) - yield self.storage.verify_will_announce_head_and_sd_blobs(sd_info['stream_hash']) - response = yield self.request_needed_blobs({SEND_SD_BLOB: False}, sd_info['stream_hash']) - else: - self.incoming_blob = sd_blob - self.receiving_blob = True - self.handle_incoming_blob(RECEIVED_SD_BLOB) - response = {SEND_SD_BLOB: True} - defer.returnValue(response) - - @defer.inlineCallbacks - def request_needed_blobs(self, response, stream_hash): - needed_blobs = yield self.storage.get_pending_blobs_for_stream(stream_hash) - response.update({NEEDED_BLOBS: needed_blobs}) - defer.returnValue(response) - - def handle_blob_request(self, request_dict): - """ - A client queries if the server will accept a blob - { - 'blob_hash': str, - 'blob_size': int - } - - The server replies, send_blob will be False if the server has a validated copy of the blob: - { - 'send_blob': bool - } - - The client may begin the raw blob file transfer if the server replied True. - If the client sends the blob, the server replies: - { - 'received_blob': bool - } - """ - - blob_hash = request_dict[BLOB_HASH] - blob_size = request_dict[BLOB_SIZE] - - if self.blob_writer is None: - log.debug('Received info for blob: %s', blob_hash[:16]) - d = self.blob_manager.get_blob(blob_hash, blob_size) - d.addCallback(self.get_blob_response) - d.addCallback(self.send_response) - else: - log.debug('blob is already open') - self.receiving_blob = True - d = self.blob_finished_d - return d - - def get_blob_response(self, blob): - if blob.get_is_verified(): - return defer.succeed({SEND_BLOB: False}) - else: - self.incoming_blob = blob - self.receiving_blob = True - self.handle_incoming_blob(RECEIVED_BLOB) - d = defer.succeed({SEND_BLOB: True}) - return d - - -class ReflectorServerFactory(ServerFactory): - protocol = ReflectorServer - - def __init__(self, peer_manager, blob_manager, lbry_file_manager): - self.peer_manager = peer_manager - self.blob_manager = blob_manager - self.lbry_file_manager = lbry_file_manager - self.protocol_version = REFLECTOR_V2 - - def buildProtocol(self, addr): - log.debug('Creating a protocol for %s', addr) - return ServerFactory.buildProtocol(self, addr)