diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 5546787c4..7ce5adb9d 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -1,16 +1,20 @@ """ Download LBRY Files from LBRYnet and save them to disk. """ +import random +import logging from zope.interface import implements +from twisted.internet import defer + from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver, EncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import save_sd_info -from twisted.internet import defer -import logging +from lbrynet.reflector import reupload +from lbrynet.conf import settings log = logging.getLogger(__name__) @@ -63,8 +67,11 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid)) return d + reflector_server = random.choice(settings.reflector_servers) + d.addCallback(_save_sd_hash) d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None) + d.addCallback(lambda _: reupload.check_and_restore_availability(self, reflector_server)) d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) def restore_status(status): diff --git a/lbrynet/reflector/__init__.py b/lbrynet/reflector/__init__.py index ccabe2e6f..a2f9c186a 100644 --- a/lbrynet/reflector/__init__.py +++ b/lbrynet/reflector/__init__.py @@ -1,3 +1,4 @@ from lbrynet.reflector.server.server import ReflectorServerFactory as ServerFactory from lbrynet.reflector.client.client import EncryptedFileReflectorClientFactory as ClientFactory from lbrynet.reflector.client.client import BlobReflectorClientFactory as BlobClientFactory +from lbrynet.reflector import reupload \ No newline at end of file diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 23d497456..3f855af90 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -126,7 +126,7 @@ class EncryptedFileReflectorClient(Protocol): def set_blobs(blob_hashes): for blob_hash, position, iv, length in blob_hashes: - log.info("Preparing to send %s", blob_hash) + log.debug("Preparing to send %s", blob_hash) if blob_hash is not None: self.blob_hashes_to_send.append(blob_hash) @@ -209,7 +209,7 @@ class EncryptedFileReflectorClient(Protocol): raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) def send_blob_info(self): - log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) + log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" log.debug('sending blob info') self.write(json.dumps({ @@ -284,6 +284,7 @@ class BlobReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False + self.sent_blobs = False d = self.send_handshake() d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) @@ -302,10 +303,12 @@ class BlobReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - log.debug('Finished sending data via reflector') + self.factory.sent_blobs = self.sent_blobs + if self.factory.sent_blobs: + log.info('Finished sending data via reflector') self.factory.finished_deferred.callback(True) else: - log.debug('reflector finished: %s', reason) + log.info('Reflector finished: %s', reason) self.factory.finished_deferred.callback(reason) # IConsumer stuff @@ -355,6 +358,7 @@ class BlobReflectorClient(Protocol): return defer.succeed(None) def start_transfer(self): + self.sent_blobs = True self.write(json.dumps({})) assert self.read_handle is not None, "self.read_handle was None when trying to start the transfer" d = self.file_sender.beginFileTransfer(self.read_handle, self) @@ -395,7 +399,7 @@ class BlobReflectorClient(Protocol): raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) def send_blob_info(self): - log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) + log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" log.debug('sending blob info') self.write(json.dumps({ @@ -431,6 +435,7 @@ class BlobReflectorClientFactory(ClientFactory): self.blob_manager = blob_manager self.blobs = blobs self.p = None + self.sent_blobs = False self.finished_deferred = defer.Deferred() def buildProtocol(self, addr): diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py new file mode 100644 index 000000000..97e401c6c --- /dev/null +++ b/lbrynet/reflector/reupload.py @@ -0,0 +1,45 @@ +import logging +from twisted.internet import reactor, defer +from lbrynet.reflector import BlobClientFactory, ClientFactory + +log = logging.getLogger(__name__) + + +def _check_if_reflector_has_stream(lbry_file, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = BlobClientFactory( + lbry_file.blob_manager, + [lbry_file.sd_hash] + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + d.addCallback(lambda _: not factory.sent_blobs) + return d + + +def _reflect_stream(lbry_file, reflector_server): + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = ClientFactory( + lbry_file.blob_manager, + lbry_file.stream_info_manager, + lbry_file.stream_hash + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + + +def _reflect_if_unavailable(reflector_has_stream, lbry_file, reflector_server): + if reflector_has_stream: + log.info("lbry://%s is available", lbry_file.uri) + return defer.succeed(False) + log.info("lbry://%s is unavailable, reflecting it", lbry_file.uri) + return _reflect_stream(lbry_file, reflector_server) + + +def check_and_restore_availability(lbry_file, reflector_server): + d = _check_if_reflector_has_stream(lbry_file, reflector_server) + d.addCallback(lambda send_stream: _reflect_if_unavailable(send_stream, lbry_file, reflector_server)) + return d