From 352bf694098345c366bf9628a2ef6ad41b13db56 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 20 May 2021 18:49:20 -0300 Subject: [PATCH] improve test --- lbry/stream/reflector/server.py | 19 +++++++++---------- tests/unit/stream/test_reflector.py | 5 +++++ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/lbry/stream/reflector/server.py b/lbry/stream/reflector/server.py index 2f95cfb75..aa41f7bc7 100644 --- a/lbry/stream/reflector/server.py +++ b/lbry/stream/reflector/server.py @@ -17,7 +17,7 @@ log = logging.getLogger(__name__) class ReflectorServerProtocol(asyncio.Protocol): def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000, stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None, - not_incoming_event: asyncio.Event = None, partial_needs=False): + not_incoming_event: asyncio.Event = None, partial_event: asyncio.Event = None): self.loop = asyncio.get_event_loop() self.blob_manager = blob_manager self.server_task: asyncio.Task = None @@ -34,7 +34,7 @@ class ReflectorServerProtocol(asyncio.Protocol): self.stop_event = stop_event or asyncio.Event(loop=self.loop) self.chunk_size = response_chunk_size self.wait_for_stop_task: typing.Optional[asyncio.Task] = None - self.partial_needs = partial_needs + self.partial_event = partial_event async def wait_for_stop(self): await self.stop_event.wait() @@ -120,11 +120,9 @@ class ReflectorServerProtocol(asyncio.Protocol): needs = [blob.blob_hash for blob in self.descriptor.blobs[:-1] if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()] - print(self.partial_needs, needs) - if needs and self.partial_needs: + if needs and not self.partial_event.is_set(): needs = needs[:3] - self.partial_needs = False - print(self.partial_needs, needs) + self.partial_event.set() self.send_response({"send_sd_blob": False, 'needed_blobs': needs}) return return @@ -177,11 +175,12 @@ class ReflectorServer: raise Exception("already running") async def _start_server(): - proto = ReflectorServerProtocol( + partial_event = asyncio.Event() + if not self.partial_needs: + partial_event.set() + server = await self.loop.create_server(lambda: ReflectorServerProtocol( self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event, - self.not_incoming_event, self.partial_needs - ) - server = await self.loop.create_server(lambda: proto, interface, port) + self.not_incoming_event, partial_event), interface, port) self.started_listening.set() self.stopped_listening.clear() log.info("Reflector server listening on TCP %s:%i", interface, port) diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 0ca653523..89354b3cc 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -60,6 +60,11 @@ class TestReflector(AsyncioTestCase): self.assertEqual(0, self.stream.reflector_progress) sent = await self.stream.upload_to_reflector('127.0.0.1', 5566) self.assertEqual(100, self.stream.reflector_progress) + if partial_needs: + self.assertFalse(self.stream.is_fully_reflected) + send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566) + self.assertGreater(0, len(send_more)) + sent.extend(send_more) self.assertSetEqual( set(sent), set(map(lambda b: b.blob_hash,