rename existing reupload.reflect_stream --> reupload.reflect_file, add a reupload.reflect_stream function

This commit is contained in:
Jack Robison 2018-03-19 13:36:53 -04:00
parent f94a9e8729
commit b28bdbd752
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 38 additions and 54 deletions

View file

@ -719,7 +719,7 @@ class Daemon(AuthJSONRPCServer):
claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path,
claim_address, change_address) claim_address, change_address)
if conf.settings['reflect_uploads']: if conf.settings['reflect_uploads']:
d = reupload.reflect_stream(publisher.lbry_file) d = reupload.reflect_file(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
self.analytics_manager.send_claim_action('publish') self.analytics_manager.send_claim_action('publish')
@ -3010,7 +3010,7 @@ class Daemon(AuthJSONRPCServer):
raise Exception('No file found') raise Exception('No file found')
lbry_file = lbry_files[0] lbry_file = lbry_files[0]
results = yield reupload.reflect_stream(lbry_file, reflector_server=reflector_server) results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server)
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -7,7 +7,7 @@ import logging
from twisted.internet import defer, task, reactor from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.core.Error import InvalidStreamDescriptorError from lbrynet.core.Error import InvalidStreamDescriptorError
from lbrynet.reflector.reupload import reflect_stream from lbrynet.reflector.reupload import reflect_file
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
@ -254,7 +254,7 @@ class EncryptedFileManager(object):
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
ds = [] ds = []
for lbry_file in self.lbry_files: for lbry_file in self.lbry_files:
ds.append(sem.run(reflect_stream, lbry_file)) ds.append(sem.run(reflect_file, lbry_file))
yield defer.DeferredList(ds) yield defer.DeferredList(ds)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -33,26 +33,14 @@ class EncryptedFileReflectorClient(Protocol):
self.file_sender = None self.file_sender = None
self.producer = None self.producer = None
self.streaming = False self.streaming = False
self.blob_manager = self.factory.blob_manager
self.protocol_version = self.factory.protocol_version
self.stream_hash = self.factory.stream_hash
d = self.load_descriptor() d = self.load_descriptor()
d.addCallback(lambda _: self.send_handshake()) d.addCallback(lambda _: self.send_handshake())
d.addErrback( d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
@property
def file_name(self):
return self.factory.file_name
@property
def blob_manager(self):
return self.factory.blob_manager
@property
def protocol_version(self):
return self.factory.protocol_version
@property
def stream_hash(self):
return self.factory.stream_hash
def dataReceived(self, data): def dataReceived(self, data):
self.response_buff += data self.response_buff += data
@ -131,7 +119,7 @@ class EncryptedFileReflectorClient(Protocol):
len(filtered)) len(filtered))
return filtered return filtered
d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash) d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash)
d.addCallback(self.get_validated_blobs) d.addCallback(self.get_validated_blobs)
if not self.descriptor_needed: if not self.descriptor_needed:
d.addCallback(lambda filtered: d.addCallback(lambda filtered:
@ -151,7 +139,7 @@ class EncryptedFileReflectorClient(Protocol):
def _save_descriptor_blob(sd_blob): def _save_descriptor_blob(sd_blob):
self.stream_descriptor = sd_blob self.stream_descriptor = sd_blob
d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash) d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash)
d.addCallback(self.factory.blob_manager.get_blob) d.addCallback(self.factory.blob_manager.get_blob)
d.addCallback(_save_descriptor_blob) d.addCallback(_save_descriptor_blob)
return d return d
@ -312,28 +300,14 @@ class EncryptedFileReflectorClient(Protocol):
class EncryptedFileReflectorClientFactory(ClientFactory): class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient protocol = EncryptedFileReflectorClient
protocol_version = REFLECTOR_V2
def __init__(self, lbry_file): def __init__(self, blob_manager, stream_hash):
self._lbry_file = lbry_file self.blob_manager = blob_manager
self.stream_hash = stream_hash
self.p = None self.p = None
self.finished_deferred = defer.Deferred() self.finished_deferred = defer.Deferred()
@property
def blob_manager(self):
return self._lbry_file.blob_manager
@property
def stream_hash(self):
return self._lbry_file.stream_hash
@property
def file_name(self):
return self._lbry_file.file_name
@property
def protocol_version(self):
return REFLECTOR_V2
def buildProtocol(self, addr): def buildProtocol(self, addr):
p = self.protocol() p = self.protocol()
p.factory = self p.factory = self

View file

@ -24,15 +24,19 @@ def resolve(host):
@defer.inlineCallbacks @defer.inlineCallbacks
def _reflect_stream(lbry_file, reflector_server): def _reflect_stream(blob_manager, stream_hash, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1] reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = ClientFactory(lbry_file) factory = ClientFactory(blob_manager, stream_hash)
ip = yield resolve(reflector_address) ip = yield resolve(reflector_address)
yield reactor.connectTCP(ip, reflector_port, factory) yield reactor.connectTCP(ip, reflector_port, factory)
result = yield factory.finished_deferred result = yield factory.finished_deferred
defer.returnValue(result) defer.returnValue(result)
def _reflect_file(lbry_file, reflector_server):
return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server)
@defer.inlineCallbacks @defer.inlineCallbacks
def _reflect_blobs(blob_manager, blob_hashes, reflector_server): def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1] reflector_address, reflector_port = reflector_server[0], reflector_server[1]
@ -43,7 +47,7 @@ def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
defer.returnValue(result) defer.returnValue(result)
def reflect_stream(lbry_file, reflector_server=None): def reflect_file(lbry_file, reflector_server=None):
if reflector_server: if reflector_server:
if len(reflector_server.split(":")) == 2: if len(reflector_server.split(":")) == 2:
host, port = tuple(reflector_server.split(":")) host, port = tuple(reflector_server.split(":"))
@ -52,7 +56,19 @@ def reflect_stream(lbry_file, reflector_server=None):
reflector_server = reflector_server, 5566 reflector_server = reflector_server, 5566
else: else:
reflector_server = random.choice(conf.settings['reflector_servers']) reflector_server = random.choice(conf.settings['reflector_servers'])
return _reflect_stream(lbry_file, reflector_server) return _reflect_file(lbry_file, reflector_server)
def reflect_stream(blob_manager, stream_hash, reflector_server=None):
if reflector_server:
if len(reflector_server.split(":")) == 2:
host, port = tuple(reflector_server.split(":"))
reflector_server = host, int(port)
else:
reflector_server = reflector_server, 5566
else:
reflector_server = random.choice(conf.settings['reflector_servers'])
return _reflect_stream(blob_manager, stream_hash, reflector_server)
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None): def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None):

View file

@ -212,10 +212,7 @@ class TestReflector(unittest.TestCase):
return d return d
def send_to_server(): def send_to_server():
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
self.server_session.storage,
self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file)
from twisted.internet import reactor from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory) reactor.connectTCP('localhost', self.port, factory)
@ -348,10 +345,7 @@ class TestReflector(unittest.TestCase):
return factory.finished_deferred return factory.finished_deferred
def send_to_server_as_stream(result): def send_to_server_as_stream(result):
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
self.server_session.storage,
self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file)
from twisted.internet import reactor from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory) reactor.connectTCP('localhost', self.port, factory)