diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index 8c30c5875..296686c6d 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -59,6 +59,7 @@ class StreamAssembler: self.wrote_bytes_event.set() await self.loop.run_in_executor(None, _decrypt_and_write) + return True async def setup(self): pass @@ -76,6 +77,7 @@ class StreamAssembler: self.sd_blob = await self.get_blob(self.sd_hash) self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir, self.sd_blob) + await self.after_got_descriptor() self.output_path = await get_next_available_file_name(self.loop, output_dir, output_file_name or self.descriptor.suggested_file_name) if not self.got_descriptor.is_set(): @@ -84,14 +86,14 @@ class StreamAssembler: self.sd_blob, self.descriptor ) await self.blob_manager.blob_completed(self.sd_blob) - await self.after_got_descriptor() with open(self.output_path, 'wb') as stream_handle: self.stream_handle = stream_handle for blob_info in self.descriptor.blobs[:-1]: while True: try: blob = await self.get_blob(blob_info.blob_hash, blob_info.length) - await self._decrypt_blob(blob, blob_info, self.descriptor.key) + if await self._decrypt_blob(blob, blob_info, self.descriptor.key): + await self.blob_manager.blob_completed(blob) break except FileNotFoundError: log.debug("stream assembler stopped") diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index bab5776bd..c269275c4 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -47,11 +47,9 @@ class StreamDownloader(StreamAssembler): async def after_got_descriptor(self): self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash) log.info("added head blob to search") - await self.blob_manager.set_should_announce(self.sd_hash, 1) async def after_finished(self): log.info("downloaded stream %s -> %s", self.sd_hash, self.output_path) - await self.blob_manager.set_should_announce(self.descriptor.blobs[0].blob_hash, 1) await self.blob_manager.storage.change_file_status(self.descriptor.stream_hash, 'finished') def stop(self):