Merge remote-tracking branch 'origin/fix-slow-file-manager-setup'

This commit is contained in:
Jack Robison 2018-03-19 14:28:57 -04:00
commit 7096d54e60
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
6 changed files with 77 additions and 84 deletions

View file

@ -719,7 +719,7 @@ class Daemon(AuthJSONRPCServer):
claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_out = yield publisher.create_and_publish_stream(name, bid, claim_dict, file_path,
claim_address, change_address) claim_address, change_address)
if conf.settings['reflect_uploads']: if conf.settings['reflect_uploads']:
d = reupload.reflect_stream(publisher.lbry_file) d = reupload.reflect_file(publisher.lbry_file)
d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name),
log.exception) log.exception)
self.analytics_manager.send_claim_action('publish') self.analytics_manager.send_claim_action('publish')
@ -3010,7 +3010,7 @@ class Daemon(AuthJSONRPCServer):
raise Exception('No file found') raise Exception('No file found')
lbry_file = lbry_files[0] lbry_file = lbry_files[0]
results = yield reupload.reflect_stream(lbry_file, reflector_server=reflector_server) results = yield reupload.reflect_file(lbry_file, reflector_server=reflector_server)
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -423,12 +423,14 @@ class SQLiteStorage(object):
if only_completed: if only_completed:
lengths = transaction.execute( lengths = transaction.execute(
"select b.blob_hash, b.blob_length from blob b " "select b.blob_hash, b.blob_length from blob b "
"inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished'" "inner join stream_blob s ON b.blob_hash=s.blob_hash and b.status='finished' and s.stream_hash=?",
(stream_hash, )
).fetchall() ).fetchall()
else: else:
lengths = transaction.execute( lengths = transaction.execute(
"select b.blob_hash, b.blob_length from blob b " "select b.blob_hash, b.blob_length from blob b "
"inner join stream_blob s ON b.blob_hash=s.blob_hash" "inner join stream_blob s ON b.blob_hash=s.blob_hash and s.stream_hash=?",
(stream_hash, )
).fetchall() ).fetchall()
blob_length_dict = {} blob_length_dict = {}

View file

@ -7,7 +7,7 @@ import logging
from twisted.internet import defer, task, reactor from twisted.internet import defer, task, reactor
from twisted.python.failure import Failure from twisted.python.failure import Failure
from lbrynet.core.Error import InvalidStreamDescriptorError from lbrynet.core.Error import InvalidStreamDescriptorError
from lbrynet.reflector.reupload import reflect_stream from lbrynet.reflector.reupload import reflect_file
from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager from lbrynet.core.PaymentRateManager import NegotiatedPaymentRateManager
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloader
from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory from lbrynet.file_manager.EncryptedFileDownloader import ManagedEncryptedFileDownloaderFactory
@ -96,41 +96,48 @@ class EncryptedFileManager(object):
suggested_file_name=suggested_file_name suggested_file_name=suggested_file_name
) )
@defer.inlineCallbacks
def _start_lbry_file(self, file_info, payment_rate_manager):
lbry_file = self._get_lbry_file(
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
file_info['suggested_file_name']
)
yield lbry_file.get_claim_info()
try:
# verify the stream is valid (we might have downloaded an invalid stream
# in the past when the validation check didn't work)
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
validate_descriptor(stream_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
lbry_file.sd_hash, err.message)
yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash)
else:
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status'])
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file)
if len(self.lbry_files) % 500 == 0:
log.info("Started %i files", len(self.lbry_files))
except Exception:
log.warning("Failed to start %i", file_info.get('rowid'))
@defer.inlineCallbacks @defer.inlineCallbacks
def _start_lbry_files(self): def _start_lbry_files(self):
files = yield self.session.storage.get_all_lbry_files() files = yield self.session.storage.get_all_lbry_files()
b_prm = self.session.base_payment_rate_manager b_prm = self.session.base_payment_rate_manager
payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker) payment_rate_manager = NegotiatedPaymentRateManager(b_prm, self.session.blob_tracker)
log.info("Trying to start %i files", len(files)) log.info("Starting %i files", len(files))
for i, file_info in enumerate(files): dl = []
if len(files) > 500 and i % 500 == 0: for file_info in files:
log.info("Started %i/%i files", i, len(files)) dl.append(self._start_lbry_file(file_info, payment_rate_manager))
yield defer.DeferredList(dl)
lbry_file = self._get_lbry_file(
file_info['row_id'], file_info['stream_hash'], payment_rate_manager, file_info['sd_hash'],
file_info['key'], file_info['stream_name'], file_info['file_name'], file_info['download_directory'],
file_info['suggested_file_name']
)
yield lbry_file.get_claim_info()
try:
# verify the stream is valid (we might have downloaded an invalid stream
# in the past when the validation check didn't work)
stream_info = yield get_sd_info(self.storage, file_info['stream_hash'], include_blobs=True)
validate_descriptor(stream_info)
except InvalidStreamDescriptorError as err:
log.warning("Stream for descriptor %s is invalid (%s), cleaning it up",
lbry_file.sd_hash, err.message)
yield lbry_file.delete_data()
yield self.session.storage.delete_stream(lbry_file.stream_hash)
else:
try:
# restore will raise an Exception if status is unknown
lbry_file.restore(file_info['status'])
self.storage.content_claim_callbacks[lbry_file.stream_hash] = lbry_file.get_claim_info
self.lbry_files.append(lbry_file)
except Exception:
log.warning("Failed to start %i", file_info.get('rowid'))
log.info("Started %i lbry files", len(self.lbry_files)) log.info("Started %i lbry files", len(self.lbry_files))
if self.auto_re_reflect is True: if self.auto_re_reflect is True:
safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval) safe_start_looping_call(self.lbry_file_reflector, self.auto_re_reflect_interval)
@ -247,7 +254,7 @@ class EncryptedFileManager(object):
sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
ds = [] ds = []
for lbry_file in self.lbry_files: for lbry_file in self.lbry_files:
ds.append(sem.run(reflect_stream, lbry_file)) ds.append(sem.run(reflect_file, lbry_file))
yield defer.DeferredList(ds) yield defer.DeferredList(ds)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -33,26 +33,14 @@ class EncryptedFileReflectorClient(Protocol):
self.file_sender = None self.file_sender = None
self.producer = None self.producer = None
self.streaming = False self.streaming = False
self.blob_manager = self.factory.blob_manager
self.protocol_version = self.factory.protocol_version
self.stream_hash = self.factory.stream_hash
d = self.load_descriptor() d = self.load_descriptor()
d.addCallback(lambda _: self.send_handshake()) d.addCallback(lambda _: self.send_handshake())
d.addErrback( d.addErrback(lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
lambda err: log.warning("An error occurred immediately: %s", err.getTraceback()))
@property
def file_name(self):
return self.factory.file_name
@property
def blob_manager(self):
return self.factory.blob_manager
@property
def protocol_version(self):
return self.factory.protocol_version
@property
def stream_hash(self):
return self.factory.stream_hash
def dataReceived(self, data): def dataReceived(self, data):
self.response_buff += data self.response_buff += data
@ -131,7 +119,7 @@ class EncryptedFileReflectorClient(Protocol):
len(filtered)) len(filtered))
return filtered return filtered
d = self.factory.blob_manager.storage.get_blobs_for_stream(self.factory.stream_hash) d = self.factory.blob_manager.storage.get_blobs_for_stream(self.stream_hash)
d.addCallback(self.get_validated_blobs) d.addCallback(self.get_validated_blobs)
if not self.descriptor_needed: if not self.descriptor_needed:
d.addCallback(lambda filtered: d.addCallback(lambda filtered:
@ -151,7 +139,7 @@ class EncryptedFileReflectorClient(Protocol):
def _save_descriptor_blob(sd_blob): def _save_descriptor_blob(sd_blob):
self.stream_descriptor = sd_blob self.stream_descriptor = sd_blob
d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.factory.stream_hash) d = self.factory.blob_manager.storage.get_sd_blob_hash_for_stream(self.stream_hash)
d.addCallback(self.factory.blob_manager.get_blob) d.addCallback(self.factory.blob_manager.get_blob)
d.addCallback(_save_descriptor_blob) d.addCallback(_save_descriptor_blob)
return d return d
@ -312,28 +300,14 @@ class EncryptedFileReflectorClient(Protocol):
class EncryptedFileReflectorClientFactory(ClientFactory): class EncryptedFileReflectorClientFactory(ClientFactory):
protocol = EncryptedFileReflectorClient protocol = EncryptedFileReflectorClient
protocol_version = REFLECTOR_V2
def __init__(self, lbry_file): def __init__(self, blob_manager, stream_hash):
self._lbry_file = lbry_file self.blob_manager = blob_manager
self.stream_hash = stream_hash
self.p = None self.p = None
self.finished_deferred = defer.Deferred() self.finished_deferred = defer.Deferred()
@property
def blob_manager(self):
return self._lbry_file.blob_manager
@property
def stream_hash(self):
return self._lbry_file.stream_hash
@property
def file_name(self):
return self._lbry_file.file_name
@property
def protocol_version(self):
return REFLECTOR_V2
def buildProtocol(self, addr): def buildProtocol(self, addr):
p = self.protocol() p = self.protocol()
p.factory = self p.factory = self

View file

@ -24,15 +24,19 @@ def resolve(host):
@defer.inlineCallbacks @defer.inlineCallbacks
def _reflect_stream(lbry_file, reflector_server): def _reflect_stream(blob_manager, stream_hash, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1] reflector_address, reflector_port = reflector_server[0], reflector_server[1]
factory = ClientFactory(lbry_file) factory = ClientFactory(blob_manager, stream_hash)
ip = yield resolve(reflector_address) ip = yield resolve(reflector_address)
yield reactor.connectTCP(ip, reflector_port, factory) yield reactor.connectTCP(ip, reflector_port, factory)
result = yield factory.finished_deferred result = yield factory.finished_deferred
defer.returnValue(result) defer.returnValue(result)
def _reflect_file(lbry_file, reflector_server):
return _reflect_stream(lbry_file.blob_manager, lbry_file.stream_hash, reflector_server)
@defer.inlineCallbacks @defer.inlineCallbacks
def _reflect_blobs(blob_manager, blob_hashes, reflector_server): def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
reflector_address, reflector_port = reflector_server[0], reflector_server[1] reflector_address, reflector_port = reflector_server[0], reflector_server[1]
@ -43,7 +47,7 @@ def _reflect_blobs(blob_manager, blob_hashes, reflector_server):
defer.returnValue(result) defer.returnValue(result)
def reflect_stream(lbry_file, reflector_server=None): def reflect_file(lbry_file, reflector_server=None):
if reflector_server: if reflector_server:
if len(reflector_server.split(":")) == 2: if len(reflector_server.split(":")) == 2:
host, port = tuple(reflector_server.split(":")) host, port = tuple(reflector_server.split(":"))
@ -52,7 +56,19 @@ def reflect_stream(lbry_file, reflector_server=None):
reflector_server = reflector_server, 5566 reflector_server = reflector_server, 5566
else: else:
reflector_server = random.choice(conf.settings['reflector_servers']) reflector_server = random.choice(conf.settings['reflector_servers'])
return _reflect_stream(lbry_file, reflector_server) return _reflect_file(lbry_file, reflector_server)
def reflect_stream(blob_manager, stream_hash, reflector_server=None):
if reflector_server:
if len(reflector_server.split(":")) == 2:
host, port = tuple(reflector_server.split(":"))
reflector_server = host, int(port)
else:
reflector_server = reflector_server, 5566
else:
reflector_server = random.choice(conf.settings['reflector_servers'])
return _reflect_stream(blob_manager, stream_hash, reflector_server)
def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None): def reflect_blob_hashes(blob_hashes, blob_manager, reflector_server=None):

View file

@ -212,10 +212,7 @@ class TestReflector(unittest.TestCase):
return d return d
def send_to_server(): def send_to_server():
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
self.server_session.storage,
self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file)
from twisted.internet import reactor from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory) reactor.connectTCP('localhost', self.port, factory)
@ -348,10 +345,7 @@ class TestReflector(unittest.TestCase):
return factory.finished_deferred return factory.finished_deferred
def send_to_server_as_stream(result): def send_to_server_as_stream(result):
fake_lbry_file = mocks.FakeLBRYFile(self.session.blob_manager, factory = reflector.ClientFactory(self.session.blob_manager, self.stream_hash)
self.server_session.storage,
self.stream_hash)
factory = reflector.ClientFactory(fake_lbry_file)
from twisted.internet import reactor from twisted.internet import reactor
reactor.connectTCP('localhost', self.port, factory) reactor.connectTCP('localhost', self.port, factory)