if progress was made, retry without a delay

This commit is contained in:
Victor Shyba 2021-05-20 20:11:18 -03:00
parent 1437871d88
commit 142d182bc1

View file

@ -215,7 +215,7 @@ class StreamManager(SourceManager):
server, port = random.choice(self.config.reflector_servers) server, port = random.choice(self.config.reflector_servers)
if stream.sd_hash in self.running_reflector_uploads: if stream.sd_hash in self.running_reflector_uploads:
return self.running_reflector_uploads[stream.sd_hash] return self.running_reflector_uploads[stream.sd_hash]
task = self.loop.create_task(stream.upload_to_reflector(server, port)) task = self.loop.create_task(self._retriable_reflect_stream(stream, server, port))
self.running_reflector_uploads[stream.sd_hash] = task self.running_reflector_uploads[stream.sd_hash] = task
task.add_done_callback( task.add_done_callback(
lambda _: None if stream.sd_hash not in self.running_reflector_uploads else lambda _: None if stream.sd_hash not in self.running_reflector_uploads else
@ -223,6 +223,12 @@ class StreamManager(SourceManager):
) )
return task return task
async def _retriable_reflect_stream(self, stream, host, port):
sent = await stream.upload_to_reflector(host, port)
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
stream.reflector_progress = 0
sent = await stream.upload_to_reflector(host, port)
async def create(self, file_path: str, key: Optional[bytes] = None, async def create(self, file_path: str, key: Optional[bytes] = None,
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
descriptor = await StreamDescriptor.create_stream( descriptor = await StreamDescriptor.create_stream(