From b28bdbd752fd36de95c6c6555c01482886421b99 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Mon, 19 Mar 2018 13:36:53 -0400 Subject: [PATCH] rename existing reupload.reflect_stream --> reupload.reflect_file, add a reupload.reflect_stream function --- lbrynet/daemon/Daemon.py | 4 +- lbrynet/file_manager/EncryptedFileManager.py | 4 +- lbrynet/reflector/client/client.py | 50 +++++--------------- lbrynet/reflector/reupload.py | 24 ++++++++-- lbrynet/tests/functional/test_reflector.py | 10 +--- 5 files changed, 38 insertions(+), 54 deletions(-) diff --git a/lbrynet/daemon/Daemon.py b/lbrynet/daemon/Daemon.py index 0494badf9..c5e7d0e22 100644 --- a/lbrynet/daemon/Daemon.py +++ b/lbrynet/daemon/Daemon.py @@ -719,7 +719,7 @@ class Daemon(AuthJSONRPCServer): claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address, change_address) if conf.settings['reflect_uploads']: - d = reupload.reflect_stream(publisher.lbry_file) + d = reupload.reflect_file(publisher.lbry_file) d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), log.exception) self.analytics_manager.send_claim_action('publish') @@ -3010,7 +3010,7 @@ class Daemon(AuthJSONRPCServer): raise Exception('No file found') lbry_file = lbry_files[0] - results = yield reupload.reflect_stream(lbry_file, reflector_server=reflector_server) + results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server) defer.returnValue(results) @defer.inlineCallbacks diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 581dfe2a2..5f91eae01 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -7,7 +7,7 @@ import logging from twisted.internet import defer, task, reactor from twisted.python.failure import Failure from lbrynet.core.Error import InvalidStreamDescriptorError -from lbrynet.reflector.reupload import reflect_stream +from lbrynet.reflector.reupload import reflect_file from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory @@ -254,7 +254,7 @@ class EncryptedFileManager(object): sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) ds = [] for lbry_file in self.lbry_files: - ds.append(sem.run(reflect_stream, lbry_file)) + ds.append(sem.run(reflect_file, lbry_file)) yield defer.DeferredList(ds) @defer.inlineCallbacks diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index d70e531a2..cd52adc14 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -33,26 +33,14 @@ class EncryptedFileReflectorClient(Protocol): self.file_sender = None self.producer = None self.streaming = False + + self.blob_manager = self.factory.blob_manager + self.protocol_version = self.factory.protocol_version + self.stream_hash = self.factory.stream_hash + d = self.load_descriptor() d.addCallback(lambda _: self.send_handshake()) - d.addErrback( - lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) - - @property - def file_name(self): - return self.factory.file_name - - @property - def blob_manager(self): - return self.factory.blob_manager - - @property - def protocol_version(self): - return self.factory.protocol_version - - @property - def stream_hash(self): - return self.factory.stream_hash + d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback())) def dataReceived(self, data): self.response_buff += data @@ -131,7 +119,7 @@ class EncryptedFileReflectorClient(Protocol): len(filtered)) return filtered - d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash) d.addCallback(self.get_validated_blobs) if not self.descriptor_needed: d.addCallback(lambda filtered: @@ -151,7 +139,7 @@ class EncryptedFileReflectorClient(Protocol): def _save_descriptor_blob(sd_blob): self.stream_descriptor = sd_blob - d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash) + d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash) d.addCallback(self.factory.blob_manager.get_blob) d.addCallback(_save_descriptor_blob) return d @@ -312,28 +300,14 @@ class EncryptedFileReflectorClient(Protocol): class EncryptedFileReflectorClientFactory(ClientFactory): protocol = EncryptedFileReflectorClient + protocol_version = REFLECTOR_V2 - def __init__(self, lbry_file): - self._lbry_file = lbry_file + def __init__(self, blob_manager, stream_hash): + self.blob_manager = blob_manager + self.stream_hash = stream_hash self.p = None self.finished_deferred = defer.Deferred() - @property - def blob_manager(self): - return self._lbry_file.blob_manager - - @property - def stream_hash(self): - return self._lbry_file.stream_hash - - @property - def file_name(self): - return self._lbry_file.file_name - - @property - def protocol_version(self): - return REFLECTOR_V2 - def buildProtocol(self, addr): p = self.protocol() p.factory = self diff --git a/lbrynet/reflector/reupload.py b/lbrynet/reflector/reupload.py index 39c6c1ba1..5f31b5c32 100644 --- a/lbrynet/reflector/reupload.py +++ b/lbrynet/reflector/reupload.py @@ -24,15 +24,19 @@ def resolve(host): @defer.inlineCallbacks -def _reflect_stream(lbry_file, reflector_server): +def _reflect_stream(blob_manager, stream_hash, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] - factory = ClientFactory(lbry_file) + factory = ClientFactory(blob_manager, stream_hash) ip = yield resolve(reflector_address) yield reactor.connectTCP(ip, reflector_port, factory) result = yield factory.finished_deferred defer.returnValue(result) +def _reflect_file(lbry_file, reflector_server): + return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server) + + @defer.inlineCallbacks def _reflect_blobs(blob_manager, blob_hashes, reflector_server): reflector_address, reflector_port = reflector_server[0], reflector_server[1] @@ -43,7 +47,7 @@ def _reflect_blobs(blob_manager, blob_hashes, reflector_server): defer.returnValue(result) -def reflect_stream(lbry_file, reflector_server=None): +def reflect_file(lbry_file, reflector_server=None): if reflector_server: if len(reflector_server.split(":")) == 2: host, port = tuple(reflector_server.split(":")) @@ -52,7 +56,19 @@ def reflect_stream(lbry_file, reflector_server=None): reflector_server = reflector_server, 5566 else: reflector_server = random.choice(conf.settings['reflector_servers']) - return _reflect_stream(lbry_file, reflector_server) + return _reflect_file(lbry_file, reflector_server) + + +def reflect_stream(blob_manager, stream_hash, reflector_server=None): + if reflector_server: + if len(reflector_server.split(":")) == 2: + host, port = tuple(reflector_server.split(":")) + reflector_server = host, int(port) + else: + reflector_server = reflector_server, 5566 + else: + reflector_server = random.choice(conf.settings['reflector_servers']) + return _reflect_stream(blob_manager, stream_hash, reflector_server) def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None): diff --git a/lbrynet/tests/functional/test_reflector.py b/lbrynet/tests/functional/test_reflector.py index a73dbee96..41d24902a 100644 --- a/lbrynet/tests/functional/test_reflector.py +++ b/lbrynet/tests/functional/test_reflector.py @@ -212,10 +212,7 @@ class TestReflector(unittest.TestCase): return d def send_to_server(): - fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, - self.server_session.storage, - self.stream_hash) - factory = reflector.ClientFactory(fake_lbry_file) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory) @@ -348,10 +345,7 @@ class TestReflector(unittest.TestCase): return factory.finished_deferred def send_to_server_as_stream(result): - fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, - self.server_session.storage, - self.stream_hash) - factory = reflector.ClientFactory(fake_lbry_file) + factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash) from twisted.internet import reactor reactor.connectTCP('localhost', self.port, factory)