forked from LBRYCommunity/lbry-sdk
Merge pull request #2217 from lbryio/chunked-reflector-response
fix handling chunked reflector server responses
This commit is contained in:
commit
588568752b
3 changed files with 38 additions and 10 deletions
|
@ -10,13 +10,15 @@ if typing.TYPE_CHECKING:
|
||||||
REFLECTOR_V1 = 0
|
REFLECTOR_V1 = 0
|
||||||
REFLECTOR_V2 = 1
|
REFLECTOR_V2 = 1
|
||||||
|
|
||||||
|
MAX_RESPONSE_SIZE = 2000000
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class StreamReflectorClient(asyncio.Protocol):
|
class StreamReflectorClient(asyncio.Protocol):
|
||||||
def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'):
|
def __init__(self, blob_manager: 'BlobManager', descriptor: 'StreamDescriptor'):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
self.transport: asyncio.StreamWriter = None
|
self.transport: typing.Optional[asyncio.WriteTransport] = None
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.descriptor = descriptor
|
self.descriptor = descriptor
|
||||||
self.response_buff = b''
|
self.response_buff = b''
|
||||||
|
@ -37,11 +39,21 @@ class StreamReflectorClient(asyncio.Protocol):
|
||||||
log.info("Finished sending reflector %i blobs", len(self.reflected_blobs))
|
log.info("Finished sending reflector %i blobs", len(self.reflected_blobs))
|
||||||
|
|
||||||
def data_received(self, data):
|
def data_received(self, data):
|
||||||
|
if len(self.response_buff + (data or b'')) > MAX_RESPONSE_SIZE:
|
||||||
|
log.warning("response message to large from reflector server: %i bytes",
|
||||||
|
len(self.response_buff + (data or b'')))
|
||||||
|
self.response_buff = b''
|
||||||
|
self.transport.close()
|
||||||
|
return
|
||||||
|
self.response_buff += (data or b'')
|
||||||
try:
|
try:
|
||||||
response = json.loads(data.decode())
|
response = json.loads(self.response_buff.decode())
|
||||||
|
self.response_buff = b''
|
||||||
self.response_queue.put_nowait(response)
|
self.response_queue.put_nowait(response)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
self.transport.close()
|
if not data:
|
||||||
|
log.warning("got undecodable response from reflector server")
|
||||||
|
self.response_buff = b''
|
||||||
return
|
return
|
||||||
|
|
||||||
async def send_request(self, request_dict: typing.Dict):
|
async def send_request(self, request_dict: typing.Dict):
|
||||||
|
|
|
@ -15,7 +15,7 @@ log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ReflectorServerProtocol(asyncio.Protocol):
|
class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
def __init__(self, blob_manager: 'BlobManager'):
|
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: asyncio.Task = None
|
self.server_task: asyncio.Task = None
|
||||||
|
@ -28,6 +28,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
self.sd_blob: typing.Optional['BlobFile'] = None
|
self.sd_blob: typing.Optional['BlobFile'] = None
|
||||||
self.received = []
|
self.received = []
|
||||||
self.incoming = asyncio.Event(loop=self.loop)
|
self.incoming = asyncio.Event(loop=self.loop)
|
||||||
|
self.chunk_size = response_chunk_size
|
||||||
|
|
||||||
def connection_made(self, transport):
|
def connection_made(self, transport):
|
||||||
self.transport = transport
|
self.transport = transport
|
||||||
|
@ -47,7 +48,15 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
self.loop.create_task(self.handle_request(request))
|
self.loop.create_task(self.handle_request(request))
|
||||||
|
|
||||||
def send_response(self, response: typing.Dict):
|
def send_response(self, response: typing.Dict):
|
||||||
self.transport.write(json.dumps(response).encode())
|
def chunk_response(remaining: bytes):
|
||||||
|
f = self.loop.create_future()
|
||||||
|
f.add_done_callback(lambda _: self.transport.write(remaining[:self.chunk_size]))
|
||||||
|
if len(remaining) > self.chunk_size:
|
||||||
|
f.add_done_callback(lambda _: self.loop.call_soon(chunk_response, remaining[self.chunk_size:]))
|
||||||
|
self.loop.call_soon(f.set_result, None)
|
||||||
|
|
||||||
|
response_bytes = json.dumps(response).encode()
|
||||||
|
chunk_response(response_bytes)
|
||||||
|
|
||||||
async def handle_request(self, request: typing.Dict):
|
async def handle_request(self, request: typing.Dict):
|
||||||
if self.client_version is None:
|
if self.client_version is None:
|
||||||
|
@ -121,11 +130,12 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
||||||
|
|
||||||
|
|
||||||
class ReflectorServer:
|
class ReflectorServer:
|
||||||
def __init__(self, blob_manager: 'BlobManager'):
|
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000):
|
||||||
self.loop = asyncio.get_event_loop()
|
self.loop = asyncio.get_event_loop()
|
||||||
self.blob_manager = blob_manager
|
self.blob_manager = blob_manager
|
||||||
self.server_task: asyncio.Task = None
|
self.server_task: typing.Optional[asyncio.Task] = None
|
||||||
self.started_listening = asyncio.Event(loop=self.loop)
|
self.started_listening = asyncio.Event(loop=self.loop)
|
||||||
|
self.response_chunk_size = response_chunk_size
|
||||||
|
|
||||||
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
||||||
if self.server_task is not None:
|
if self.server_task is not None:
|
||||||
|
@ -133,7 +143,7 @@ class ReflectorServer:
|
||||||
|
|
||||||
async def _start_server():
|
async def _start_server():
|
||||||
server = await self.loop.create_server(
|
server = await self.loop.create_server(
|
||||||
lambda: ReflectorServerProtocol(self.blob_manager),
|
lambda: ReflectorServerProtocol(self.blob_manager, self.response_chunk_size),
|
||||||
interface, port
|
interface, port
|
||||||
)
|
)
|
||||||
self.started_listening.set()
|
self.started_listening.set()
|
||||||
|
|
|
@ -41,8 +41,8 @@ class TestStreamAssembler(AsyncioTestCase):
|
||||||
|
|
||||||
self.stream = await self.stream_manager.create_stream(file_path)
|
self.stream = await self.stream_manager.create_stream(file_path)
|
||||||
|
|
||||||
async def test_reflect_stream(self):
|
async def _test_reflect_stream(self, response_chunk_size):
|
||||||
reflector = ReflectorServer(self.server_blob_manager)
|
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size)
|
||||||
reflector.start_server(5566, '127.0.0.1')
|
reflector.start_server(5566, '127.0.0.1')
|
||||||
await reflector.started_listening.wait()
|
await reflector.started_listening.wait()
|
||||||
self.addCleanup(reflector.stop_server)
|
self.addCleanup(reflector.stop_server)
|
||||||
|
@ -63,6 +63,12 @@ class TestStreamAssembler(AsyncioTestCase):
|
||||||
sent = await self.stream.upload_to_reflector('127.0.0.1', 5566)
|
sent = await self.stream.upload_to_reflector('127.0.0.1', 5566)
|
||||||
self.assertListEqual(sent, [])
|
self.assertListEqual(sent, [])
|
||||||
|
|
||||||
|
async def test_reflect_stream(self):
|
||||||
|
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop)
|
||||||
|
|
||||||
|
async def test_reflect_stream_small_response_chunks(self):
|
||||||
|
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop)
|
||||||
|
|
||||||
async def test_announces(self):
|
async def test_announces(self):
|
||||||
to_announce = await self.storage.get_blobs_to_announce()
|
to_announce = await self.storage.get_blobs_to_announce()
|
||||||
self.assertIn(self.stream.sd_hash, to_announce, "sd blob not set to announce")
|
self.assertIn(self.stream.sd_hash, to_announce, "sd blob not set to announce")
|
||||||
|
|
Loading…
Reference in a new issue