From 11bb1cc7eea536cca898021a5efb6740fc41eb7f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 3 Feb 2019 20:38:36 -0300 Subject: [PATCH 1/2] use with clause to avoid leaking fds --- lbrynet/stream/assembler.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index f00f9c56e..39761fa1f 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -82,11 +82,11 @@ class StreamAssembler: if not self.got_descriptor.is_set(): self.got_descriptor.set() await self.after_got_descriptor() - self.stream_handle = open(self.output_path, 'wb') await self.blob_manager.storage.store_stream( self.sd_blob, self.descriptor ) - try: + with open(self.output_path, 'wb') as stream_handle: + self.stream_handle = stream_handle for blob_info in self.descriptor.blobs[:-1]: while True: try: @@ -101,10 +101,8 @@ class StreamAssembler: self.descriptor.sd_hash) continue - self.stream_finished_event.set() - await self.after_finished() - finally: - self.stream_handle.close() + self.stream_finished_event.set() + await self.after_finished() async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return self.blob_manager.get_blob(blob_hash, length) From 736a949cb50f4f68f41491ebe19f65781670291e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 3 Feb 2019 20:39:01 -0300 Subject: [PATCH 2/2] resume streams on startup --- lbrynet/stream/stream_manager.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 150af63ca..89b14fee3 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -142,11 +142,12 @@ class StreamManager: log.warning("no DHT node given, cannot resume downloads") return await self.node.joined.wait() - resumed = 0 - t = [self.start_stream(stream) for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING] - if resumed: - log.info("resuming %i downloads", t) - await asyncio.gather(*t, loop=self.loop) + t = [ + stream.start_download(self.node) + for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING + ] + if t: + log.info("resuming %i downloads", len(t)) async def reflect_streams(self): while True: