expose reflector task errors, add debug statements for ongoing issue
This commit is contained in:
parent
d4bec79451
commit
f9b41d34ae
1 changed files with 14 additions and 0 deletions
|
@ -159,8 +159,18 @@ class StreamManager(SourceManager):
|
||||||
))
|
))
|
||||||
|
|
||||||
async def reflect_streams(self):
|
async def reflect_streams(self):
|
||||||
|
try:
|
||||||
|
return await self._reflact_streams()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception:
|
||||||
|
log.exception("reflector task encountered an unexpected error!")
|
||||||
|
|
||||||
|
async def _reflact_streams(self):
|
||||||
|
# todo: those debug statements are temporary for #2987 - remove them if its closed
|
||||||
while True:
|
while True:
|
||||||
if self.config.reflect_streams and self.config.reflector_servers:
|
if self.config.reflect_streams and self.config.reflector_servers:
|
||||||
|
log.debug("collecting streams to reflect")
|
||||||
sd_hashes = await self.storage.get_streams_to_re_reflect()
|
sd_hashes = await self.storage.get_streams_to_re_reflect()
|
||||||
sd_hashes = [sd for sd in sd_hashes if sd in self._sources]
|
sd_hashes = [sd for sd in sd_hashes if sd in self._sources]
|
||||||
batch = []
|
batch = []
|
||||||
|
@ -171,10 +181,14 @@ class StreamManager(SourceManager):
|
||||||
stream.fully_reflected.is_set():
|
stream.fully_reflected.is_set():
|
||||||
batch.append(self.reflect_stream(stream))
|
batch.append(self.reflect_stream(stream))
|
||||||
if len(batch) >= self.config.concurrent_reflector_uploads:
|
if len(batch) >= self.config.concurrent_reflector_uploads:
|
||||||
|
log.debug("waiting for batch of %s reflecting streams", len(batch))
|
||||||
await asyncio.gather(*batch, loop=self.loop)
|
await asyncio.gather(*batch, loop=self.loop)
|
||||||
|
log.debug("done processing %s streams", len(batch))
|
||||||
batch = []
|
batch = []
|
||||||
if batch:
|
if batch:
|
||||||
|
log.debug("waiting for batch of %s reflecting streams", len(batch))
|
||||||
await asyncio.gather(*batch, loop=self.loop)
|
await asyncio.gather(*batch, loop=self.loop)
|
||||||
|
log.debug("done processing %s streams", len(batch))
|
||||||
await asyncio.sleep(300, loop=self.loop)
|
await asyncio.sleep(300, loop=self.loop)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
|
|
Loading…
Add table
Reference in a new issue