diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index c936dd1d1..39646266c 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -13,10 +13,9 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver, from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import save_sd_info -from lbrynet.reflector import BlobClientFactory, ClientFactory +from lbrynet.reflector import BlobClientFactory, ClientFactory, ReflectorAvailabilityHelper from lbrynet.conf import REFLECTOR_SERVERS - log = logging.getLogger(__name__) @@ -68,41 +67,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid)) return d - def _check_file_availability(): - reflector_server = random.choice(REFLECTOR_SERVERS) - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = BlobClientFactory( - self.blob_manager, - [self.sd_hash] - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(lambda _: _reflect_if_unavailable(factory.sent_blobs)) - return d - def _reflect_if_unavailable(sent_blobs): - if not sent_blobs: - log.info("lbry://%s is available", self.uri) - return defer.succeed(True) - if self.stream_hash is None: - return defer.fail(Exception("no stream hash")) - log.info("Reflecting previously unavailable stream: %s" % self.stream_hash) - reflector_server = random.choice(REFLECTOR_SERVERS) - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory( - self.blob_manager, - self.stream_info_manager, - self.stream_hash - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d d.addCallback(_save_sd_hash) d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None) - d.addCallback(lambda _: _check_file_availability()) d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) def restore_status(status): @@ -115,6 +83,10 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): return defer.succeed(True) d.addCallback(restore_status) + + reflector_server = random.choice(REFLECTOR_SERVERS) + + d.addCallback(lambda _: ReflectorAvailabilityHelper.check_and_restore_availability(self, reflector_server)) return d def stop(self, err=None, change_status=True): diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py index ccabe2e6f..9d32ccb6d 100644 --- a/lbrynet/reflector/__init__.py +++ b/lbrynet/reflector/__init__.py @@ -1,3 +1,4 @@ from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory +from lbrynet.reflector.util import ReflectorAvailabilityHelper \ No newline at end of file diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 0a4e4b599..3f855af90 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -126,7 +126,7 @@ class EncryptedFileReflectorClient(Protocol): def set_blobs(blob_hashes): for blob_hash, position, iv, length in blob_hashes: - log.info("Preparing to send %s", blob_hash) + log.debug("Preparing to send %s", blob_hash) if blob_hash is not None: self.blob_hashes_to_send.append(blob_hash) @@ -209,7 +209,7 @@ class EncryptedFileReflectorClient(Protocol): raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) def send_blob_info(self): - log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) + log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" log.debug('sending blob info') self.write(json.dumps({ @@ -284,7 +284,7 @@ class BlobReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False - self.blobs_where_sent = False + self.sent_blobs = False d = self.send_handshake() d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) @@ -303,7 +303,7 @@ class BlobReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - self.factory.sent_blobs = self.blobs_where_sent + self.factory.sent_blobs = self.sent_blobs if self.factory.sent_blobs: log.info('Finished sending data via reflector') self.factory.finished_deferred.callback(True) @@ -358,6 +358,7 @@ class BlobReflectorClient(Protocol): return defer.succeed(None) def start_transfer(self): + self.sent_blobs = True self.write(json.dumps({})) assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) @@ -377,7 +378,6 @@ class BlobReflectorClient(Protocol): if 'send_blob' not in response_dict: raise ValueError("I don't know whether to send the blob or not!") if response_dict['send_blob'] is True: - self.blobs_where_sent = True self.file_sender = FileSender() return defer.succeed(True) else: diff --git a/lbrynet/reflector/util.py b/lbrynet/reflector/util.py new file mode 100644 index 000000000..a3fe519f1 --- /dev/null +++ b/lbrynet/reflector/util.py @@ -0,0 +1,47 @@ +import logging +from twisted.internet import reactor, defer +from lbrynet.reflector import BlobClientFactory, ClientFactory + +log = logging.getLogger(__name__) + + +class ReflectorAvailabilityHelper(object): + @staticmethod + def _check_if_reflector_has_stream(lbry_file, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = BlobClientFactory( + lbry_file.blob_manager, + [lbry_file.sd_hash] + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + d.addCallback(lambda _: not factory.sent_blobs) + return d + + @staticmethod + def _reflect_stream(lbry_file, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = ClientFactory( + lbry_file.blob_manager, + lbry_file.stream_info_manager, + lbry_file.stream_hash + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + + @classmethod + def _reflect_if_unavailable(cls, reflector_has_stream, lbry_file, reflector_server): + if reflector_has_stream: + log.info("lbry://%s is available", lbry_file.uri) + return defer.succeed(False) + log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri) + return cls._reflect_stream(lbry_file, reflector_server) + + @classmethod + def check_and_restore_availability(cls, lbry_file, reflector_server): + d = cls._check_if_reflector_has_stream(lbry_file, reflector_server) + d.addCallback(lambda send_stream: cls._reflect_if_unavailable(send_stream, lbry_file, reflector_server)) + return d