add re-reflect task to stream manager, add concurrent_reflector_uploads to config

This commit is contained in:
Jack Robison 2019-02-01 22:59:41 -05:00
parent 4e27eaf8e6
commit f2968aab22
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
3 changed files with 26 additions and 12 deletions

View file

@ -505,6 +505,9 @@ class Config(CLIConfig):
"Upload completed streams (published and downloaded) reflector in order to re-host them", True, "Upload completed streams (published and downloaded) reflector in order to re-host them", True,
previous_names=['reflect_uploads'] previous_names=['reflect_uploads']
) )
concurrent_reflector_uploads = Integer(
"Maximum number of streams to upload to a reflector server at a time", 10
)
# servers # servers
reflector_servers = Servers("Reflector re-hosting servers", [ reflector_servers = Servers("Reflector re-hosting servers", [

View file

@ -27,7 +27,7 @@ class StreamReflectorClient(asyncio.Protocol):
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
log.info("Connected to reflector") log.debug("Connected to reflector")
self.connected.set() self.connected.set()
def connection_lost(self, exc: typing.Optional[Exception]): def connection_lost(self, exc: typing.Optional[Exception]):

View file

@ -62,6 +62,7 @@ class StreamManager:
self.streams: typing.Set[ManagedStream] = set() self.streams: typing.Set[ManagedStream] = set()
self.starting_streams: typing.Dict[str, asyncio.Future] = {} self.starting_streams: typing.Dict[str, asyncio.Future] = {}
self.resume_downloading_task: asyncio.Task = None self.resume_downloading_task: asyncio.Task = None
self.re_reflect_task: asyncio.Task = None
self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.update_stream_finished_futs: typing.List[asyncio.Future] = []
async def _update_content_claim(self, stream: ManagedStream): async def _update_content_claim(self, stream: ManagedStream):
@ -147,26 +148,36 @@ class StreamManager:
await asyncio.gather(*t, loop=self.loop) await asyncio.gather(*t, loop=self.loop)
async def reflect_streams(self): async def reflect_streams(self):
streams = list(self.streams) while True:
batch = [] if self.config.reflector_servers:
while streams: sd_hashes = await self.storage.get_streams_to_re_reflect()
stream = streams.pop() streams = list(filter(lambda s: s.sd_hash in sd_hashes, self.streams))
if not stream.fully_reflected.is_set():
host, port = random.choice(self.reflector_servers)
batch.append(stream.upload_to_reflector(host, port))
if len(batch) >= 10:
await asyncio.gather(*batch)
batch = [] batch = []
if batch: total = len(streams)
await asyncio.gather(*batch) while streams:
stream = streams.pop()
if not stream.fully_reflected.is_set():
host, port = random.choice(self.config.reflector_servers)
batch.append(stream.upload_to_reflector(host, port))
if len(batch) >= self.config.concurrent_reflector_uploads:
await asyncio.gather(*batch)
batch = []
if batch:
await asyncio.gather(*batch)
if total:
log.info("uploaded %i streams to reflector", total)
await asyncio.sleep(300, loop=self.loop)
async def start(self): async def start(self):
await self.load_streams_from_database() await self.load_streams_from_database()
self.resume_downloading_task = self.loop.create_task(self.resume()) self.resume_downloading_task = self.loop.create_task(self.resume())
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
def stop(self): def stop(self):
if self.resume_downloading_task and not self.resume_downloading_task.done(): if self.resume_downloading_task and not self.resume_downloading_task.done():
self.resume_downloading_task.cancel() self.resume_downloading_task.cancel()
if self.re_reflect_task and not self.re_reflect_task.done():
self.re_reflect_task.cancel()
while self.streams: while self.streams:
stream = self.streams.pop() stream = self.streams.pop()
stream.stop_download() stream.stop_download()