From b235f6fc6b49db2208c7d4115750e03202b6500b Mon Sep 17 00:00:00 2001 From: Jack Date: Fri, 26 Aug 2016 19:58:53 -0400 Subject: [PATCH] send all blobs to reflector --- lbrynet/lbrynet_daemon/LBRYDaemon.py | 37 ++++- lbrynet/lbrynet_daemon/LBRYPublisher.py | 2 +- lbrynet/reflector/__init__.py | 2 +- lbrynet/reflector/client/client.py | 184 ++++++++++++++++++++++++ lbrynet/reflector/server/server.py | 1 - 5 files changed, 221 insertions(+), 5 deletions(-) diff --git a/lbrynet/lbrynet_daemon/LBRYDaemon.py b/lbrynet/lbrynet_daemon/LBRYDaemon.py index a2db34560..56e62bafc 100644 --- a/lbrynet/lbrynet_daemon/LBRYDaemon.py +++ b/lbrynet/lbrynet_daemon/LBRYDaemon.py @@ -1359,7 +1359,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): reflector_server = random.choice(REFLECTOR_SERVERS) reflector_address, reflector_port = reflector_server[0], reflector_server[1] log.info("Start reflector client") - factory = reflector.ClientFactory( + factory = reflector.LBRYFileReflectorClientFactory( self.session.blob_manager, self.lbry_file_manager.stream_info_manager, stream_hash @@ -1369,6 +1369,24 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda _: factory.finished_deferred) return d + def _reflect_blobs(self, blob_hashes): + if not blob_hashes: + return defer.fail(Exception("no lbry file given to reflect")) + + log.info("Reflecting %i blobs" % len(blob_hashes)) + + reflector_server = random.choice(REFLECTOR_SERVERS) + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + log.info("Start reflector client") + factory = reflector.LBRYBlobReflectorClient( + self.session.blob_manager, + blob_hashes + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + def _log_to_slack(self, msg): URL = "https://hooks.slack.com/services/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" msg = platform.platform() + ": " + base58.b58encode(self.lbryid)[:20] + ", " + msg @@ -2444,7 +2462,7 @@ class LBRYDaemon(jsonrpc.JSONRPC): Reflect a stream Args: - sd_hash + sd_hash: sd_hash of lbry file Returns: True or traceback """ @@ -2469,6 +2487,21 @@ class LBRYDaemon(jsonrpc.JSONRPC): d.addCallback(lambda r: self._render_response(r, OK_CODE)) return d + def jsonrpc_reflect_all_blobs(self): + """ + Reflects all saved blobs + + Args: + None + Returns: + True + """ + + d = self.session.blob_manager.get_all_verified_blobs() + d.addCallback(self._reflect_blobs) + d.addCallback(lambda r: self._render_response(r, OK_CODE)) + return d + def get_lbrynet_version_from_github(): """Return the latest released version from github.""" diff --git a/lbrynet/lbrynet_daemon/LBRYPublisher.py b/lbrynet/lbrynet_daemon/LBRYPublisher.py index ec7b908dc..b69df8176 100644 --- a/lbrynet/lbrynet_daemon/LBRYPublisher.py +++ b/lbrynet/lbrynet_daemon/LBRYPublisher.py @@ -78,7 +78,7 @@ class Publisher(object): reflector_server = random.choice(REFLECTOR_SERVERS) reflector_address, reflector_port = reflector_server[0], reflector_server[1] log.info("Reflecting new publication") - factory = reflector.ClientFactory( + factory = reflector.LBRYFileReflectorClientFactory( self.session.blob_manager, self.lbry_file_manager.stream_info_manager, self.stream_hash diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py index 10e8292e7..b2bfb4bdc 100644 --- a/lbrynet/reflector/__init__.py +++ b/lbrynet/reflector/__init__.py @@ -1,2 +1,2 @@ from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory -from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory as ClientFactory +from lbrynet.reflector.client.client import LBRYFileReflectorClientFactory, LBRYBlobReflectorClient diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 0fe2c8ce1..16114b0a4 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -267,3 +267,187 @@ class LBRYFileReflectorClientFactory(ClientFactory): def clientConnectionFailed(self, connector, reason): log.debug("connection failed: %s", reason) + + +class LBRYBlobReflectorClient(Protocol): + # Protocol stuff + + def connectionMade(self): + self.blob_manager = self.factory.blob_manager + self.response_buff = '' + self.outgoing_buff = '' + self.blob_hashes_to_send = self.factory.blobs + self.next_blob_to_send = None + self.blob_read_handle = None + self.received_handshake_response = False + self.protocol_version = None + self.file_sender = None + self.producer = None + self.streaming = False + d = self.send_handshake() + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) + + def dataReceived(self, data): + log.debug('Recieved %s', data) + self.response_buff += data + try: + msg = self.parse_response(self.response_buff) + except IncompleteResponseError: + pass + else: + self.response_buff = '' + d = self.handle_response(msg) + d.addCallback(lambda _: self.send_next_request()) + d.addErrback(self.response_failure_handler) + + def connectionLost(self, reason): + if reason.check(error.ConnectionDone): + log.debug('Finished sending data via reflector') + self.factory.finished_deferred.callback(True) + else: + log.debug('reflector finished: %s', reason) + self.factory.finished_deferred.callback(reason) + + # IConsumer stuff + + def registerProducer(self, producer, streaming): + self.producer = producer + self.streaming = streaming + if self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) + + def unregisterProducer(self): + self.producer = None + + def write(self, data): + self.transport.write(data) + if self.producer is not None and self.streaming is False: + from twisted.internet import reactor + reactor.callLater(0, self.producer.resumeProducing) + + def send_handshake(self): + log.debug('Sending handshake') + self.write(json.dumps({'version': 0})) + + def parse_response(self, buff): + try: + return json.loads(buff) + except ValueError: + raise IncompleteResponseError() + + def response_failure_handler(self, err): + log.warning("An error occurred handling the response: %s", err.getTraceback()) + + def handle_response(self, response_dict): + if self.received_handshake_response is False: + return self.handle_handshake_response(response_dict) + else: + return self.handle_normal_response(response_dict) + + def set_not_uploading(self): + if self.next_blob_to_send is not None: + self.next_blob_to_send.close_read_handle(self.read_handle) + self.read_handle = None + self.next_blob_to_send = None + self.file_sender = None + return defer.succeed(None) + + def start_transfer(self): + self.write(json.dumps({})) + assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" + d = self.file_sender.beginFileTransfer(self.read_handle, self) + return d + + def handle_handshake_response(self, response_dict): + if 'version' not in response_dict: + raise ValueError("Need protocol version number!") + self.protocol_version = int(response_dict['version']) + if self.protocol_version != 0: + raise ValueError("I can't handle protocol version {}!".format(self.protocol_version)) + self.received_handshake_response = True + return defer.succeed(True) + + def handle_normal_response(self, response_dict): + if self.file_sender is None: # Expecting Server Info Response + if 'send_blob' not in response_dict: + raise ValueError("I don't know whether to send the blob or not!") + if response_dict['send_blob'] is True: + self.file_sender = FileSender() + return defer.succeed(True) + else: + return self.set_not_uploading() + else: # Expecting Server Blob Response + if 'received_blob' not in response_dict: + raise ValueError("I don't know if the blob made it to the intended destination!") + else: + return self.set_not_uploading() + + def open_blob_for_reading(self, blob): + if blob.is_validated(): + read_handle = blob.open_for_reading() + if read_handle is not None: + log.debug('Getting ready to send %s', blob.blob_hash) + self.next_blob_to_send = blob + self.read_handle = read_handle + return None + raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) + + def send_blob_info(self): + log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) + assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" + log.debug('sending blob info') + self.write(json.dumps({ + 'blob_hash': self.next_blob_to_send.blob_hash, + 'blob_size': self.next_blob_to_send.length + })) + + def send_next_request(self): + if self.file_sender is not None: + # send the blob + log.debug('Sending the blob') + return self.start_transfer() + elif self.blob_hashes_to_send: + # open the next blob to send + blob_hash = self.blob_hashes_to_send[0] + log.debug('No current blob, sending the next one: %s', blob_hash) + self.blob_hashes_to_send = self.blob_hashes_to_send[1:] + d = self.blob_manager.get_blob(blob_hash, True) + d.addCallback(self.open_blob_for_reading) + # send the server the next blob hash + length + d.addCallback(lambda _: self.send_blob_info()) + return d + else: + # close connection + log.debug('No more blob hashes, closing connection') + self.transport.loseConnection() + + +class LBRYBlobReflectorClientFactory(ClientFactory): + protocol = LBRYBlobReflectorClient + + def __init__(self, blob_manager, blobs): + self.blob_manager = blob_manager + self.blobs = blobs + self.p = None + self.finished_deferred = defer.Deferred() + + def buildProtocol(self, addr): + p = self.protocol() + p.factory = self + self.p = p + return p + + def startFactory(self): + log.debug('Starting reflector factory') + ClientFactory.startFactory(self) + + def startedConnecting(self, connector): + log.debug('Started connecting') + + def clientConnectionLost(self, connector, reason): + """If we get disconnected, reconnect to server.""" + log.debug("connection lost: %s", reason) + + def clientConnectionFailed(self, connector, reason): + log.debug("connection failed: %s", reason) diff --git a/lbrynet/reflector/server/server.py b/lbrynet/reflector/server/server.py index 38127dce2..d1b04407e 100644 --- a/lbrynet/reflector/server/server.py +++ b/lbrynet/reflector/server/server.py @@ -3,7 +3,6 @@ from twisted.python import failure from twisted.internet import error, defer from twisted.internet.protocol import Protocol, ServerFactory import json - from lbrynet.core.utils import is_valid_blobhash