diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 4103cc911..db55ac3ad 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -13,7 +13,7 @@ from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver, from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import save_sd_info -from lbrynet.reflector import ReflectorAvailabilityHelper +from lbrynet.reflector import reupload from lbrynet.conf import REFLECTOR_SERVERS log = logging.getLogger(__name__) @@ -86,7 +86,7 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): reflector_server = random.choice(REFLECTOR_SERVERS) - d.addCallback(lambda _: ReflectorAvailabilityHelper.check_and_restore_availability(self, reflector_server)) + d.addCallback(lambda _: reupload.check_and_restore_availability(self, reflector_server)) return d def stop(self, err=None, change_status=True): diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py index 9d32ccb6d..a2f9c186a 100644 --- a/lbrynet/reflector/__init__.py +++ b/lbrynet/reflector/__init__.py @@ -1,4 +1,4 @@ from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory -from lbrynet.reflector.util import ReflectorAvailabilityHelper \ No newline at end of file +from lbrynet.reflector import reupload \ No newline at end of file diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py new file mode 100644 index 000000000..8a95995c2 --- /dev/null +++ b/lbrynet/reflector/reupload.py @@ -0,0 +1,45 @@ +import logging +from twisted.internet import reactor, defer +from lbrynet.reflector import BlobClientFactory, ClientFactory + +log = logging.getLogger(__name__) + + +def _check_if_reflector_has_stream(lbry_file, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = BlobClientFactory( + lbry_file.blob_manager, + [lbry_file.sd_hash] + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + d.addCallback(lambda _: not factory.sent_blobs) + return d + + +def _reflect_stream(lbry_file, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = ClientFactory( + lbry_file.blob_manager, + lbry_file.stream_info_manager, + lbry_file.stream_hash + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + + +def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server): + if reflector_has_stream: + log.info("lbry://%s is available", lbry_file.uri) + return defer.succeed(False) + log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri) + return _reflect_stream(lbry_file, reflector_server) + + +def check_and_restore_availability(cls, lbry_file, reflector_server): + d = cls._check_if_reflector_has_stream(lbry_file, reflector_server) + d.addCallback(lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server)) + return d diff --git a/lbrynet/reflector/util.py b/lbrynet/reflector/util.py deleted file mode 100644 index a3fe519f1..000000000 --- a/lbrynet/reflector/util.py +++ /dev/null @@ -1,47 +0,0 @@ -import logging -from twisted.internet import reactor, defer -from lbrynet.reflector import BlobClientFactory, ClientFactory - -log = logging.getLogger(__name__) - - -class ReflectorAvailabilityHelper(object): - @staticmethod - def _check_if_reflector_has_stream(lbry_file, reflector_server): - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = BlobClientFactory( - lbry_file.blob_manager, - [lbry_file.sd_hash] - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - d.addCallback(lambda _: not factory.sent_blobs) - return d - - @staticmethod - def _reflect_stream(lbry_file, reflector_server): - reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory( - lbry_file.blob_manager, - lbry_file.stream_info_manager, - lbry_file.stream_hash - ) - d = reactor.resolve(reflector_address) - d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) - d.addCallback(lambda _: factory.finished_deferred) - return d - - @classmethod - def _reflect_if_unavailable(cls, reflector_has_stream, lbry_file, reflector_server): - if reflector_has_stream: - log.info("lbry://%s is available", lbry_file.uri) - return defer.succeed(False) - log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri) - return cls._reflect_stream(lbry_file, reflector_server) - - @classmethod - def check_and_restore_availability(cls, lbry_file, reflector_server): - d = cls._check_if_reflector_has_stream(lbry_file, reflector_server) - d.addCallback(lambda send_stream: cls._reflect_if_unavailable(send_stream, lbry_file, reflector_server)) - return d