From 3dd99fdc92ee755bdfd939ccfa52dc3e09297921 Mon Sep 17 00:00:00 2001 From: Jack Date: Thu, 20 Oct 2016 15:40:35 -0400 Subject: [PATCH] upload unavailable streams to reflector on startup --- lbrynet/core/log_support.py | 8 +--- .../EncryptedFileDownloader.py | 42 ++++++++++++++++++- lbrynet/reflector/client/client.py | 11 +++-- 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/lbrynet/core/log_support.py b/lbrynet/core/log_support.py index 2f2a5c75b..becb698c8 100644 --- a/lbrynet/core/log_support.py +++ b/lbrynet/core/log_support.py @@ -77,13 +77,7 @@ def disable_third_party_loggers(): logging.getLogger('BitcoinRPC').setLevel(logging.INFO) def disable_noisy_loggers(): - logging.getLogger('lbrynet.analytics.api').setLevel(logging.INFO) - logging.getLogger('lbrynet.core').setLevel(logging.INFO) - logging.getLogger('lbrynet.dht').setLevel(logging.INFO) - logging.getLogger('lbrynet.lbrynet_daemon').setLevel(logging.INFO) - logging.getLogger('lbrynet.core.Wallet').setLevel(logging.INFO) - logging.getLogger('lbrynet.lbryfile').setLevel(logging.INFO) - logging.getLogger('lbrynet.lbryfilemanager').setLevel(logging.INFO) + logging.getLogger('lbrynet').setLevel(logging.INFO) @_log_decorator diff --git a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py index 5546787c4..c936dd1d1 100644 --- a/lbrynet/lbryfilemanager/EncryptedFileDownloader.py +++ b/lbrynet/lbryfilemanager/EncryptedFileDownloader.py @@ -1,16 +1,21 @@ """ Download LBRY Files from LBRYnet and save them to disk. """ +import random +import logging from zope.interface import implements +from twisted.internet import defer, reactor + from lbrynet.core.client.StreamProgressManager import FullStreamProgressManager from lbrynet.core.StreamDescriptor import StreamMetadata from lbrynet.lbryfile.client.EncryptedFileDownloader import EncryptedFileSaver, EncryptedFileDownloader from lbrynet.lbryfilemanager.EncryptedFileStatusReport import EncryptedFileStatusReport from lbrynet.interfaces import IStreamDownloaderFactory from lbrynet.lbryfile.StreamDescriptor import save_sd_info -from twisted.internet import defer -import logging +from lbrynet.reflector import BlobClientFactory, ClientFactory +from lbrynet.conf import REFLECTOR_SERVERS + log = logging.getLogger(__name__) @@ -63,8 +68,41 @@ class ManagedEncryptedFileDownloader(EncryptedFileSaver): d.addCallbacks(_save_claim_id, lambda err: _notify_bad_claim(name, txid)) return d + def _check_file_availability(): + reflector_server = random.choice(REFLECTOR_SERVERS) + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = BlobClientFactory( + self.blob_manager, + [self.sd_hash] + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + d.addCallback(lambda _: _reflect_if_unavailable(factory.sent_blobs)) + return d + + def _reflect_if_unavailable(sent_blobs): + if not sent_blobs: + log.info("lbry://%s is available", self.uri) + return defer.succeed(True) + if self.stream_hash is None: + return defer.fail(Exception("no stream hash")) + log.info("Reflecting previously unavailable stream: %s" % self.stream_hash) + reflector_server = random.choice(REFLECTOR_SERVERS) + reflector_address, reflector_port = reflector_server[0], reflector_server[1] + factory = ClientFactory( + self.blob_manager, + self.stream_info_manager, + self.stream_hash + ) + d = reactor.resolve(reflector_address) + d.addCallback(lambda ip: reactor.connectTCP(ip, reflector_port, factory)) + d.addCallback(lambda _: factory.finished_deferred) + return d + d.addCallback(_save_sd_hash) d.addCallback(lambda r: _save_claim(r[0], r[1]) if r else None) + d.addCallback(lambda _: _check_file_availability()) d.addCallback(lambda _: self.lbry_file_manager.get_lbry_file_status(self)) def restore_status(status): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 23d497456..0a4e4b599 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -284,6 +284,7 @@ class BlobReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False + self.blobs_where_sent = False d = self.send_handshake() d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) @@ -302,10 +303,12 @@ class BlobReflectorClient(Protocol): def connectionLost(self, reason): if reason.check(error.ConnectionDone): - log.debug('Finished sending data via reflector') + self.factory.sent_blobs = self.blobs_where_sent + if self.factory.sent_blobs: + log.info('Finished sending data via reflector') self.factory.finished_deferred.callback(True) else: - log.debug('reflector finished: %s', reason) + log.info('Reflector finished: %s', reason) self.factory.finished_deferred.callback(reason) # IConsumer stuff @@ -374,6 +377,7 @@ class BlobReflectorClient(Protocol): if 'send_blob' not in response_dict: raise ValueError("I don't know whether to send the blob or not!") if response_dict['send_blob'] is True: + self.blobs_where_sent = True self.file_sender = FileSender() return defer.succeed(True) else: @@ -395,7 +399,7 @@ class BlobReflectorClient(Protocol): raise ValueError("Couldn't open that blob for some reason. blob_hash: {}".format(blob.blob_hash)) def send_blob_info(self): - log.info("Send blob info for %s", self.next_blob_to_send.blob_hash) + log.debug("Send blob info for %s", self.next_blob_to_send.blob_hash) assert self.next_blob_to_send is not None, "need to have a next blob to send at this point" log.debug('sending blob info') self.write(json.dumps({ @@ -431,6 +435,7 @@ class BlobReflectorClientFactory(ClientFactory): self.blob_manager = blob_manager self.blobs = blobs self.p = None + self.sent_blobs = False self.finished_deferred = defer.Deferred() def buildProtocol(self, addr):