diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0494badf9..c5e7d0e22 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -719,7 +719,7 @@ class Daemon(AuthJSONRPCServer): claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address, change_address) if conf.settings['reflect_uploads']: - d = reupload.reflect_stream(publisher.lbry_file) + d = reupload.reflect_file(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) self.analytics_manager.send_claim_action('publish') @@ -3010,7 +3010,7 @@ class Daemon(AuthJSONRPCServer): raise Exception('No file found') lbry_file = lbry_files[0] - results = yield reupload.reflect_stream(lbry_file, reflector_server=reflector_server) + results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) defer.returnValue(results) @defer.inlineCallbacks diff --git a/lbrynet/database/storage.py b/lbrynet/database/storage.py index 4dce4b477..a77a8dae8 100644 --- a/lbrynet/database/storage.py +++ b/lbrynet/database/storage.py @@ -423,12 +423,14 @@ class SQLiteStorage(object): if only_completed: lengths = transaction.execute( "select b.blob_hash, b.blob_length from blob b " - "inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished'" + "inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished' and s.stream_hash=?", + (stream_hash, ) ).fetchall() else: lengths = transaction.execute( "select b.blob_hash, b.blob_length from blob b " - "inner join stream_blob s ON b.blob_hash=s.blob_hash" + "inner join stream_blob s ON b.blob_hash=s.blob_hash and s.stream_hash=?", + (stream_hash, ) ).fetchall() blob_length_dict = {} diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 96b56c0ab..5f91eae01 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -7,7 +7,7 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure from lbrynet.core.Error import InvalidStreamDescriptorError -from lbrynet.reflector.reupload import reflect_stream +from lbrynet.reflector.reupload import reflect_file from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -96,41 +96,48 @@ class EncryptedFileManager(object): suggested_file_name=suggested_file_name ) + @defer.inlineCallbacks + def _start_lbry_file(self, file_info, payment_rate_manager): + lbry_file = self._get_lbry_file( + file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], + file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], + file_info['suggested_file_name'] + ) + yield lbry_file.get_claim_info() + try: + # verify the stream is valid (we might have downloaded an invalid stream + # in the past when the validation check didn't work) + stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) + validate_descriptor(stream_info) + except InvalidStreamDescriptorError as err: + log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", + lbry_file.sd_hash, err.message) + yield lbry_file.delete_data() + yield self.session.storage.delete_stream(lbry_file.stream_hash) + else: + try: + # restore will raise an Exception if status is unknown + lbry_file.restore(file_info['status']) + self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info + self.lbry_files.append(lbry_file) + if len(self.lbry_files) % 500 == 0: + log.info("Started %i files", len(self.lbry_files)) + except Exception: + log.warning("Failed to start %i", file_info.get('rowid')) + @defer.inlineCallbacks def _start_lbry_files(self): files = yield self.session.storage.get_all_lbry_files() b_prm = self.session.base_payment_rate_manager payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) - log.info("Trying to start %i files", len(files)) - for i, file_info in enumerate(files): - if len(files) > 500 and i % 500 == 0: - log.info("Started %i/%i files", i, len(files)) + log.info("Starting %i files", len(files)) + dl = [] + for file_info in files: + dl.append(self._start_lbry_file(file_info, payment_rate_manager)) + + yield defer.DeferredList(dl) - lbry_file = self._get_lbry_file( - file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'], - file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'], - file_info['suggested_file_name'] - ) - yield lbry_file.get_claim_info() - try: - # verify the stream is valid (we might have downloaded an invalid stream - # in the past when the validation check didn't work) - stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True) - validate_descriptor(stream_info) - except InvalidStreamDescriptorError as err: - log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", - lbry_file.sd_hash, err.message) - yield lbry_file.delete_data() - yield self.session.storage.delete_stream(lbry_file.stream_hash) - else: - try: - # restore will raise an Exception if status is unknown - lbry_file.restore(file_info['status']) - self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info - self.lbry_files.append(lbry_file) - except Exception: - log.warning("Failed to start %i", file_info.get('rowid')) log.info("Started %i lbry files", len(self.lbry_files)) if self.auto_re_reflect is True: safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) @@ -247,7 +254,7 @@ class EncryptedFileManager(object): sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) ds = [] for lbry_file in self.lbry_files: - ds.append(sem.run(reflect_stream, lbry_file)) + ds.append(sem.run(reflect_file, lbry_file)) yield defer.DeferredList(ds) @defer.inlineCallbacks diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index d70e531a2..cd52adc14 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -33,26 +33,14 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False + + self.blob_manager = self.factory.blob_manager + self.protocol_version = self.factory.protocol_version + self.stream_hash = self.factory.stream_hash + d = self.load_descriptor() d.addCallback(lambda _: self.send_handshake()) - d.addErrback( - lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) - - @property - def file_name(self): - return self.factory.file_name - - @property - def blob_manager(self): - return self.factory.blob_manager - - @property - def protocol_version(self): - return self.factory.protocol_version - - @property - def stream_hash(self): - return self.factory.stream_hash + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) def dataReceived(self, data): self.response_buff += data @@ -131,7 +119,7 @@ class EncryptedFileReflectorClient(Protocol): len(filtered)) return filtered - d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash) d.addCallback(self.get_validated_blobs) if not self.descriptor_needed: d.addCallback(lambda filtered: @@ -151,7 +139,7 @@ class EncryptedFileReflectorClient(Protocol): def _save_descriptor_blob(sd_blob): self.stream_descriptor = sd_blob - d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash) d.addCallback(self.factory.blob_manager.get_blob) d.addCallback(_save_descriptor_blob) return d @@ -312,28 +300,14 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient + protocol_version = REFLECTOR_V2 - def __init__(self, lbry_file): - self._lbry_file = lbry_file + def __init__(self, blob_manager, stream_hash): + self.blob_manager = blob_manager + self.stream_hash = stream_hash self.p = None self.finished_deferred = defer.Deferred() - @property - def blob_manager(self): - return self._lbry_file.blob_manager - - @property - def stream_hash(self): - return self._lbry_file.stream_hash - - @property - def file_name(self): - return self._lbry_file.file_name - - @property - def protocol_version(self): - return REFLECTOR_V2 - def buildProtocol(self, addr): p = self.protocol() p.factory = self diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 39c6c1ba1..5f31b5c32 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -24,15 +24,19 @@ def resolve(host): @defer.inlineCallbacks -def _reflect_stream(lbry_file, reflector_server): +def _reflect_stream(blob_manager, stream_hash, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory(lbry_file) + factory = ClientFactory(blob_manager, stream_hash) ip = yield resolve(reflector_address) yield reactor.connectTCP(ip, reflector_port, factory) result = yield factory.finished_deferred defer.returnValue(result) +def _reflect_file(lbry_file, reflector_server): + return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server) + + @defer.inlineCallbacks def _reflect_blobs(blob_manager, blob_hashes, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] @@ -43,7 +47,7 @@ def _reflect_blobs(blob_manager, blob_hashes, reflector_server): defer.returnValue(result) -def reflect_stream(lbry_file, reflector_server=None): +def reflect_file(lbry_file, reflector_server=None): if reflector_server: if len(reflector_server.split(":")) == 2: host, port = tuple(reflector_server.split(":")) @@ -52,7 +56,19 @@ def reflect_stream(lbry_file, reflector_server=None): reflector_server = reflector_server, 5566 else: reflector_server = random.choice(conf.settings['reflector_servers']) - return _reflect_stream(lbry_file, reflector_server) + return _reflect_file(lbry_file, reflector_server) + + +def reflect_stream(blob_manager, stream_hash, reflector_server=None): + if reflector_server: + if len(reflector_server.split(":")) == 2: + host, port = tuple(reflector_server.split(":")) + reflector_server = host, int(port) + else: + reflector_server = reflector_server, 5566 + else: + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_stream(blob_manager, stream_hash, reflector_server) def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None): diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index a73dbee96..41d24902a 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -212,10 +212,7 @@ class TestReflector(unittest.TestCase): return d def send_to_server(): - fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, - self.server_session.storage, - self.stream_hash) - factory = reflector.ClientFactory(fake_lbry_file) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) @@ -348,10 +345,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def send_to_server_as_stream(result): - fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, - self.server_session.storage, - self.stream_hash) - factory = reflector.ClientFactory(fake_lbry_file) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory)