forked from LBRYCommunity/lbry-sdk
test bug 3296, failing
This commit is contained in:
parent
be8ecfa707
commit
9bdf3d23e1
2 changed files with 32 additions and 14 deletions
|
@ -17,7 +17,7 @@ log = logging.getLogger(__name__)
|
||||||
class ReflectorServerProtocol(asyncio.Protocol):
|
class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
|
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
|
||||||
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
|
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
|
||||||
not_incoming_event: asyncio.Event = None):
|
not_incoming_event: asyncio.Event = None, partial_needs=False):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: asyncio.Task = None
|
self.server_task: asyncio.Task = None
|
||||||
|
@ -34,6 +34,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
self.stop_event = stop_event or asyncio.Event(loop=self.loop)
|
self.stop_event = stop_event or asyncio.Event(loop=self.loop)
|
||||||
self.chunk_size = response_chunk_size
|
self.chunk_size = response_chunk_size
|
||||||
self.wait_for_stop_task: typing.Optional[asyncio.Task] = None
|
self.wait_for_stop_task: typing.Optional[asyncio.Task] = None
|
||||||
|
self.partial_needs = partial_needs
|
||||||
|
|
||||||
async def wait_for_stop(self):
|
async def wait_for_stop(self):
|
||||||
await self.stop_event.wait()
|
await self.stop_event.wait()
|
||||||
|
@ -115,10 +116,16 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
if self.writer:
|
if self.writer:
|
||||||
self.writer.close_handle()
|
self.writer.close_handle()
|
||||||
self.writer = None
|
self.writer = None
|
||||||
self.send_response({"send_sd_blob": False, 'needed': [
|
|
||||||
blob.blob_hash for blob in self.descriptor.blobs[:-1]
|
needs = [blob.blob_hash
|
||||||
if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()
|
for blob in self.descriptor.blobs[:-1]
|
||||||
]})
|
if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()]
|
||||||
|
print(self.partial_needs, needs)
|
||||||
|
if needs and self.partial_needs:
|
||||||
|
needs = needs[:3]
|
||||||
|
self.partial_needs = False
|
||||||
|
print(self.partial_needs, needs)
|
||||||
|
self.send_response({"send_sd_blob": False, 'needed_blobs': needs})
|
||||||
return
|
return
|
||||||
return
|
return
|
||||||
elif self.descriptor:
|
elif self.descriptor:
|
||||||
|
@ -153,7 +160,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
class ReflectorServer:
|
class ReflectorServer:
|
||||||
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
|
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
|
||||||
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
|
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
|
||||||
not_incoming_event: asyncio.Event = None):
|
not_incoming_event: asyncio.Event = None, partial_needs=False):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: typing.Optional[asyncio.Task] = None
|
self.server_task: typing.Optional[asyncio.Task] = None
|
||||||
|
@ -163,19 +170,18 @@ class ReflectorServer:
|
||||||
self.not_incoming_event = not_incoming_event or asyncio.Event(loop=self.loop)
|
self.not_incoming_event = not_incoming_event or asyncio.Event(loop=self.loop)
|
||||||
self.response_chunk_size = response_chunk_size
|
self.response_chunk_size = response_chunk_size
|
||||||
self.stop_event = stop_event
|
self.stop_event = stop_event
|
||||||
|
self.partial_needs = partial_needs # for testing cases where it doesn't know what it wants
|
||||||
|
|
||||||
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
||||||
if self.server_task is not None:
|
if self.server_task is not None:
|
||||||
raise Exception("already running")
|
raise Exception("already running")
|
||||||
|
|
||||||
async def _start_server():
|
async def _start_server():
|
||||||
server = await self.loop.create_server(
|
proto = ReflectorServerProtocol(
|
||||||
lambda: ReflectorServerProtocol(
|
|
||||||
self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event,
|
self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event,
|
||||||
self.not_incoming_event
|
self.not_incoming_event, self.partial_needs
|
||||||
),
|
|
||||||
interface, port
|
|
||||||
)
|
)
|
||||||
|
server = await self.loop.create_server(lambda: proto, interface, port)
|
||||||
self.started_listening.set()
|
self.started_listening.set()
|
||||||
self.stopped_listening.clear()
|
self.stopped_listening.clear()
|
||||||
log.info("Reflector server listening on TCP %s:%i", interface, port)
|
log.info("Reflector server listening on TCP %s:%i", interface, port)
|
||||||
|
|
|
@ -43,9 +43,18 @@ class TestReflector(AsyncioTestCase):
|
||||||
self.stream_manager.config.reflect_streams = False
|
self.stream_manager.config.reflect_streams = False
|
||||||
self.stream = await self.stream_manager.create(file_path)
|
self.stream = await self.stream_manager.create(file_path)
|
||||||
|
|
||||||
async def _test_reflect_stream(self, response_chunk_size):
|
async def _test_reflect_stream(self, response_chunk_size=50, partial_needs=False):
|
||||||
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size)
|
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size,
|
||||||
|
partial_needs=partial_needs)
|
||||||
reflector.start_server(5566, '127.0.0.1')
|
reflector.start_server(5566, '127.0.0.1')
|
||||||
|
if partial_needs:
|
||||||
|
server_blob = self.server_blob_manager.get_blob(self.stream.sd_hash)
|
||||||
|
client_blob = self.blob_manager.get_blob(self.stream.sd_hash)
|
||||||
|
with client_blob.reader_context() as handle:
|
||||||
|
server_blob.set_length(client_blob.get_length())
|
||||||
|
writer = server_blob.get_blob_writer('nobody', 0)
|
||||||
|
writer.write(handle.read())
|
||||||
|
self.server_blob_manager.blob_completed(server_blob)
|
||||||
await reflector.started_listening.wait()
|
await reflector.started_listening.wait()
|
||||||
self.addCleanup(reflector.stop_server)
|
self.addCleanup(reflector.stop_server)
|
||||||
self.assertEqual(0, self.stream.reflector_progress)
|
self.assertEqual(0, self.stream.reflector_progress)
|
||||||
|
@ -71,6 +80,9 @@ class TestReflector(AsyncioTestCase):
|
||||||
async def test_reflect_stream(self):
|
async def test_reflect_stream(self):
|
||||||
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop)
|
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop)
|
||||||
|
|
||||||
|
async def test_reflect_stream_but_reflector_changes_its_mind(self):
|
||||||
|
return await asyncio.wait_for(self._test_reflect_stream(partial_needs=True), 3, loop=self.loop)
|
||||||
|
|
||||||
async def test_reflect_stream_small_response_chunks(self):
|
async def test_reflect_stream_small_response_chunks(self):
|
||||||
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop)
|
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue