Merge pull request #2992 from lbryio/2987_debug

log task errors and add debug information for reflector task
This commit is contained in:
Lex Berezhny 2020-07-14 19:20:15 -04:00 committed by GitHub
commit 640267fc2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -159,8 +159,18 @@ class StreamManager(SourceManager):
)) ))
async def reflect_streams(self): async def reflect_streams(self):
try:
return await self._reflact_streams()
except asyncio.CancelledError:
raise
except Exception:
log.exception("reflector task encountered an unexpected error!")
async def _reflact_streams(self):
# todo: those debug statements are temporary for #2987 - remove them if its closed
while True: while True:
if self.config.reflect_streams and self.config.reflector_servers: if self.config.reflect_streams and self.config.reflector_servers:
log.debug("collecting streams to reflect")
sd_hashes = await self.storage.get_streams_to_re_reflect() sd_hashes = await self.storage.get_streams_to_re_reflect()
sd_hashes = [sd for sd in sd_hashes if sd in self._sources] sd_hashes = [sd for sd in sd_hashes if sd in self._sources]
batch = [] batch = []
@ -171,10 +181,14 @@ class StreamManager(SourceManager):
stream.fully_reflected.is_set(): stream.fully_reflected.is_set():
batch.append(self.reflect_stream(stream)) batch.append(self.reflect_stream(stream))
if len(batch) >= self.config.concurrent_reflector_uploads: if len(batch) >= self.config.concurrent_reflector_uploads:
log.debug("waiting for batch of %s reflecting streams", len(batch))
await asyncio.gather(*batch, loop=self.loop) await asyncio.gather(*batch, loop=self.loop)
log.debug("done processing %s streams", len(batch))
batch = [] batch = []
if batch: if batch:
log.debug("waiting for batch of %s reflecting streams", len(batch))
await asyncio.gather(*batch, loop=self.loop) await asyncio.gather(*batch, loop=self.loop)
log.debug("done processing %s streams", len(batch))
await asyncio.sleep(300, loop=self.loop) await asyncio.sleep(300, loop=self.loop)
async def start(self): async def start(self):