Merge pull request #933 from lbryio/fix_concurrent_reflects

Fix to not reflect all the files at once
This commit is contained in:
Alex Grin 2017-10-02 20:02:08 -04:00 committed by GitHub
commit 010a1019fe
3 changed files with 14 additions and 8 deletions

View file

@ -20,6 +20,7 @@ at anytime.
* Fixed handling cancelled blob and availability requests * Fixed handling cancelled blob and availability requests
* Fixed redundant blob requests to a peer * Fixed redundant blob requests to a peer
* Fixed https://github.com/lbryio/lbry/issues/923 * Fixed https://github.com/lbryio/lbry/issues/923
* Fixed concurrent reflects opening too many files
### Deprecated ### Deprecated
* Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead.

View file

@ -25,10 +25,12 @@ log = logging.getLogger(__name__)
class EncryptedFileManager(object): class EncryptedFileManager(object):
"""Keeps track of currently opened LBRY Files, their options, and
their LBRY File specific metadata.
""" """
Keeps track of currently opened LBRY Files, their options, and
their LBRY File specific metadata.
"""
# when reflecting files, reflect up to this many files at a time
CONCURRENT_REFLECTS = 5
def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None): def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None):
@ -235,13 +237,13 @@ class EncryptedFileManager(object):
return l.toggle_running() return l.toggle_running()
return defer.fail(Failure(ValueError("Could not find that LBRY file"))) return defer.fail(Failure(ValueError("Could not find that LBRY file")))
def _reflect_lbry_files(self):
for lbry_file in self.lbry_files:
yield reflect_stream(lbry_file)
@defer.inlineCallbacks @defer.inlineCallbacks
def reflect_lbry_files(self): def reflect_lbry_files(self):
yield defer.DeferredList(list(self._reflect_lbry_files())) sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS)
ds = []
for lbry_file in self.lbry_files:
ds.append(sem.run(reflect_stream, lbry_file))
yield defer.DeferredList(ds)
@defer.inlineCallbacks @defer.inlineCallbacks
def stop(self): def stop(self):

View file

@ -71,6 +71,9 @@ class EncryptedFileReflectorClient(Protocol):
d.addErrback(self.response_failure_handler) d.addErrback(self.response_failure_handler)
def connectionLost(self, reason): def connectionLost(self, reason):
# make sure blob file readers get closed
self.set_not_uploading()
if reason.check(error.ConnectionDone): if reason.check(error.ConnectionDone):
if not self.needed_blobs: if not self.needed_blobs:
log.info("Reflector has all blobs for %s (%s)", log.info("Reflector has all blobs for %s (%s)",