diff --git a/CHANGELOG.md b/CHANGELOG.md index c31c7d028..b31eb178d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ at anytime. * Fixed handling cancelled blob and availability requests * Fixed redundant blob requests to a peer * Fixed https://github.com/lbryio/lbry/issues/923 + * Fixed concurrent reflects opening too many files ### Deprecated * Deprecated `blob_announce_all` JSONRPC command. Use `blob_announce` instead. diff --git a/lbrynet/file_manager/EncryptedFileManager.py b/lbrynet/file_manager/EncryptedFileManager.py index 9d38548dd..b0f9966a1 100644 --- a/lbrynet/file_manager/EncryptedFileManager.py +++ b/lbrynet/file_manager/EncryptedFileManager.py @@ -25,10 +25,12 @@ log = logging.getLogger(__name__) class EncryptedFileManager(object): - """Keeps track of currently opened LBRY Files, their options, and - their LBRY File specific metadata. - """ + Keeps track of currently opened LBRY Files, their options, and + their LBRY File specific metadata. + """ + # when reflecting files, reflect up to this many files at a time + CONCURRENT_REFLECTS = 5 def __init__(self, session, stream_info_manager, sd_identifier, download_directory=None): @@ -235,13 +237,13 @@ class EncryptedFileManager(object): return l.toggle_running() return defer.fail(Failure(ValueError("Could not find that LBRY file"))) - def _reflect_lbry_files(self): - for lbry_file in self.lbry_files: - yield reflect_stream(lbry_file) - @defer.inlineCallbacks def reflect_lbry_files(self): - yield defer.DeferredList(list(self._reflect_lbry_files())) + sem = defer.DeferredSemaphore(self.CONCURRENT_REFLECTS) + ds = [] + for lbry_file in self.lbry_files: + ds.append(sem.run(reflect_stream, lbry_file)) + yield defer.DeferredList(ds) @defer.inlineCallbacks def stop(self): diff --git a/lbrynet/reflector/client/client.py b/lbrynet/reflector/client/client.py index 7e0060f93..763aa6735 100644 --- a/lbrynet/reflector/client/client.py +++ b/lbrynet/reflector/client/client.py @@ -71,6 +71,9 @@ class EncryptedFileReflectorClient(Protocol): d.addErrback(self.response_failure_handler) def connectionLost(self, reason): + # make sure blob file readers get closed + self.set_not_uploading() + if reason.check(error.ConnectionDone): if not self.needed_blobs: log.info("Reflector has all blobs for %s (%s)",