From f9b41d34ae5f80a6bf3770fb635d919573e2b721 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 7 Jul 2020 00:39:59 -0300 Subject: [PATCH] expose reflector task errors, add debug statements for ongoing issue --- lbry/stream/stream_manager.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 8e66aff96..541f5cdf0 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -159,8 +159,18 @@ class StreamManager(SourceManager): )) async def reflect_streams(self): + try: + return await self._reflact_streams() + except asyncio.CancelledError: + raise + except Exception: + log.exception("reflector task encountered an unexpected error!") + + async def _reflact_streams(self): + # todo: those debug statements are temporary for #2987 - remove them if its closed while True: if self.config.reflect_streams and self.config.reflector_servers: + log.debug("collecting streams to reflect") sd_hashes = await self.storage.get_streams_to_re_reflect() sd_hashes = [sd for sd in sd_hashes if sd in self._sources] batch = [] @@ -171,10 +181,14 @@ class StreamManager(SourceManager): stream.fully_reflected.is_set(): batch.append(self.reflect_stream(stream)) if len(batch) >= self.config.concurrent_reflector_uploads: + log.debug("waiting for batch of %s reflecting streams", len(batch)) await asyncio.gather(*batch, loop=self.loop) + log.debug("done processing %s streams", len(batch)) batch = [] if batch: + log.debug("waiting for batch of %s reflecting streams", len(batch)) await asyncio.gather(*batch, loop=self.loop) + log.debug("done processing %s streams", len(batch)) await asyncio.sleep(300, loop=self.loop) async def start(self):