diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index d7333e967..6068d2878 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3333,17 +3333,16 @@ class Daemon(metaclass=JSONRPCServerType): stream_hash = None if not preview: - old_stream_hash = await self.storage.get_stream_hash_for_sd_hash(old_txo.claim.stream.source.sd_hash) + old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) if file_path is not None: - if old_stream_hash: - stream_to_delete = self.stream_manager.get_stream_by_stream_hash(old_stream_hash) - await self.stream_manager.delete_stream(stream_to_delete, delete_file=False) + if old_stream: + await self.stream_manager.delete_stream(old_stream, delete_file=False) file_stream = await self.stream_manager.create_stream(file_path) new_txo.claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() stream_hash = file_stream.stream_hash - else: - stream_hash = old_stream_hash + elif old_stream: + stream_hash = old_stream.stream_hash if channel: new_txo.sign(channel) @@ -4390,7 +4389,7 @@ class Daemon(metaclass=JSONRPCServerType): else: server, port = random.choice(self.conf.reflector_servers) reflected = await asyncio.gather(*[ - stream.upload_to_reflector(server, port) + self.stream_manager.reflect_stream(stream, server, port) for stream in self.stream_manager.get_filtered_streams(**kwargs) ]) total = [] diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 40c42b0d9..119f7f01e 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -77,7 +77,7 @@ class StreamManager: self.resume_saving_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] - self.running_reflector_uploads: typing.List[asyncio.Task] = [] + self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.started = asyncio.Event(loop=self.loop) async def _update_content_claim(self, stream: ManagedStream): @@ -185,10 +185,10 @@ class StreamManager: batch = [] while sd_hashes: stream = self.streams[sd_hashes.pop()] - if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed: - if not stream.fully_reflected.is_set(): - host, port = random.choice(self.config.reflector_servers) - batch.append(stream.upload_to_reflector(host, port)) + if self.blob_manager.is_blob_verified(stream.sd_hash) and stream.blobs_completed and \ + stream.sd_hash not in self.running_reflector_uploads and not \ + stream.fully_reflected.is_set(): + batch.append(self.reflect_stream(stream)) if len(batch) >= self.config.concurrent_reflector_uploads: await asyncio.gather(*batch, loop=self.loop) batch = [] @@ -212,26 +212,37 @@ class StreamManager: while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: - self.running_reflector_uploads.pop().cancel() + _, t = self.running_reflector_uploads.popitem() + t.cancel() self.started.clear() log.info("finished stopping the stream manager") + def reflect_stream(self, stream: ManagedStream, server: Optional[str] = None, + port: Optional[int] = None) -> asyncio.Task: + if not server or not port: + server, port = random.choice(self.config.reflector_servers) + if stream.sd_hash in self.running_reflector_uploads: + return self.running_reflector_uploads[stream.sd_hash] + task = self.loop.create_task(stream.upload_to_reflector(server, port)) + self.running_reflector_uploads[stream.sd_hash] = task + task.add_done_callback( + lambda _: None if stream.sd_hash not in self.running_reflector_uploads else + self.running_reflector_uploads.pop(stream.sd_hash) + ) + return task + async def create_stream(self, file_path: str, key: Optional[bytes] = None, iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: - host, port = random.choice(self.config.reflector_servers) - task = self.loop.create_task(stream.upload_to_reflector(host, port)) - self.running_reflector_uploads.append(task) - task.add_done_callback( - lambda _: None - if task not in self.running_reflector_uploads else self.running_reflector_uploads.remove(task) - ) + self.reflect_stream(stream) return stream async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): + if stream.sd_hash in self.running_reflector_uploads: + self.running_reflector_uploads[stream.sd_hash].cancel() stream.stop_tasks() if stream.sd_hash in self.streams: del self.streams[stream.sd_hash]