forked from LBRYCommunity/lbry-sdk
Merge pull request #3308 from lbryio/reflect_more
Don't set stream as reflected until reflector says it doesn't need any blob
This commit is contained in:
commit
099f3b6a62
5 changed files with 48 additions and 20 deletions
|
@ -372,9 +372,6 @@ class ManagedStream(ManagedDownloadSource):
|
|||
protocol.transport.close()
|
||||
self.uploading_to_reflector = False
|
||||
|
||||
if not self.fully_reflected.is_set():
|
||||
self.fully_reflected.set()
|
||||
await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}")
|
||||
return sent
|
||||
|
||||
async def update_content_claim(self, claim_info: Optional[typing.Dict] = None):
|
||||
|
|
|
@ -59,7 +59,7 @@ class StreamReflectorClient(asyncio.Protocol):
|
|||
return
|
||||
|
||||
async def send_request(self, request_dict: typing.Dict, timeout: int = 180):
|
||||
msg = json.dumps(request_dict)
|
||||
msg = json.dumps(request_dict, sort_keys=True)
|
||||
try:
|
||||
self.transport.write(msg.encode())
|
||||
self.pending_request = self.loop.create_task(asyncio.wait_for(self.response_queue.get(), timeout))
|
||||
|
|
|
@ -17,7 +17,7 @@ log = logging.getLogger(__name__)
|
|||
class ReflectorServerProtocol(asyncio.Protocol):
|
||||
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
|
||||
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
|
||||
not_incoming_event: asyncio.Event = None):
|
||||
not_incoming_event: asyncio.Event = None, partial_event: asyncio.Event = None):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.blob_manager = blob_manager
|
||||
self.server_task: asyncio.Task = None
|
||||
|
@ -34,6 +34,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
|||
self.stop_event = stop_event or asyncio.Event(loop=self.loop)
|
||||
self.chunk_size = response_chunk_size
|
||||
self.wait_for_stop_task: typing.Optional[asyncio.Task] = None
|
||||
self.partial_event = partial_event
|
||||
|
||||
async def wait_for_stop(self):
|
||||
await self.stop_event.wait()
|
||||
|
@ -115,10 +116,14 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
|||
if self.writer:
|
||||
self.writer.close_handle()
|
||||
self.writer = None
|
||||
self.send_response({"send_sd_blob": False, 'needed': [
|
||||
blob.blob_hash for blob in self.descriptor.blobs[:-1]
|
||||
if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()
|
||||
]})
|
||||
|
||||
needs = [blob.blob_hash
|
||||
for blob in self.descriptor.blobs[:-1]
|
||||
if not self.blob_manager.get_blob(blob.blob_hash).get_is_verified()]
|
||||
if needs and not self.partial_event.is_set():
|
||||
needs = needs[:3]
|
||||
self.partial_event.set()
|
||||
self.send_response({"send_sd_blob": False, 'needed_blobs': needs})
|
||||
return
|
||||
return
|
||||
elif self.descriptor:
|
||||
|
@ -153,7 +158,7 @@ class ReflectorServerProtocol(asyncio.Protocol):
|
|||
class ReflectorServer:
|
||||
def __init__(self, blob_manager: 'BlobManager', response_chunk_size: int = 10000,
|
||||
stop_event: asyncio.Event = None, incoming_event: asyncio.Event = None,
|
||||
not_incoming_event: asyncio.Event = None):
|
||||
not_incoming_event: asyncio.Event = None, partial_needs=False):
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.blob_manager = blob_manager
|
||||
self.server_task: typing.Optional[asyncio.Task] = None
|
||||
|
@ -163,19 +168,19 @@ class ReflectorServer:
|
|||
self.not_incoming_event = not_incoming_event or asyncio.Event(loop=self.loop)
|
||||
self.response_chunk_size = response_chunk_size
|
||||
self.stop_event = stop_event
|
||||
self.partial_needs = partial_needs # for testing cases where it doesn't know what it wants
|
||||
|
||||
def start_server(self, port: int, interface: typing.Optional[str] = '0.0.0.0'):
|
||||
if self.server_task is not None:
|
||||
raise Exception("already running")
|
||||
|
||||
async def _start_server():
|
||||
server = await self.loop.create_server(
|
||||
lambda: ReflectorServerProtocol(
|
||||
self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event,
|
||||
self.not_incoming_event
|
||||
),
|
||||
interface, port
|
||||
)
|
||||
partial_event = asyncio.Event()
|
||||
if not self.partial_needs:
|
||||
partial_event.set()
|
||||
server = await self.loop.create_server(lambda: ReflectorServerProtocol(
|
||||
self.blob_manager, self.response_chunk_size, self.stop_event, self.incoming_event,
|
||||
self.not_incoming_event, partial_event), interface, port)
|
||||
self.started_listening.set()
|
||||
self.stopped_listening.clear()
|
||||
log.info("Reflector server listening on TCP %s:%i", interface, port)
|
||||
|
|
|
@ -215,7 +215,7 @@ class StreamManager(SourceManager):
|
|||
server, port = random.choice(self.config.reflector_servers)
|
||||
if stream.sd_hash in self.running_reflector_uploads:
|
||||
return self.running_reflector_uploads[stream.sd_hash]
|
||||
task = self.loop.create_task(stream.upload_to_reflector(server, port))
|
||||
task = self.loop.create_task(self._retriable_reflect_stream(stream, server, port))
|
||||
self.running_reflector_uploads[stream.sd_hash] = task
|
||||
task.add_done_callback(
|
||||
lambda _: None if stream.sd_hash not in self.running_reflector_uploads else
|
||||
|
@ -223,6 +223,12 @@ class StreamManager(SourceManager):
|
|||
)
|
||||
return task
|
||||
|
||||
async def _retriable_reflect_stream(self, stream, host, port):
|
||||
sent = await stream.upload_to_reflector(host, port)
|
||||
while not stream.is_fully_reflected and stream.reflector_progress > 0 and len(sent) > 0:
|
||||
stream.reflector_progress = 0
|
||||
sent = await stream.upload_to_reflector(host, port)
|
||||
|
||||
async def create(self, file_path: str, key: Optional[bytes] = None,
|
||||
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream:
|
||||
descriptor = await StreamDescriptor.create_stream(
|
||||
|
|
|
@ -43,19 +43,36 @@ class TestReflector(AsyncioTestCase):
|
|||
self.stream_manager.config.reflect_streams = False
|
||||
self.stream = await self.stream_manager.create(file_path)
|
||||
|
||||
async def _test_reflect_stream(self, response_chunk_size):
|
||||
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size)
|
||||
async def _test_reflect_stream(self, response_chunk_size=50, partial_needs=False):
|
||||
reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size,
|
||||
partial_needs=partial_needs)
|
||||
reflector.start_server(5566, '127.0.0.1')
|
||||
if partial_needs:
|
||||
server_blob = self.server_blob_manager.get_blob(self.stream.sd_hash)
|
||||
client_blob = self.blob_manager.get_blob(self.stream.sd_hash)
|
||||
with client_blob.reader_context() as handle:
|
||||
server_blob.set_length(client_blob.get_length())
|
||||
writer = server_blob.get_blob_writer('nobody', 0)
|
||||
writer.write(handle.read())
|
||||
self.server_blob_manager.blob_completed(server_blob)
|
||||
await reflector.started_listening.wait()
|
||||
self.addCleanup(reflector.stop_server)
|
||||
self.assertEqual(0, self.stream.reflector_progress)
|
||||
sent = await self.stream.upload_to_reflector('127.0.0.1', 5566)
|
||||
self.assertEqual(100, self.stream.reflector_progress)
|
||||
if partial_needs:
|
||||
self.assertFalse(self.stream.is_fully_reflected)
|
||||
send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566)
|
||||
self.assertGreater(len(send_more), 0)
|
||||
sent.extend(send_more)
|
||||
sent.append(self.stream.sd_hash)
|
||||
self.assertSetEqual(
|
||||
set(sent),
|
||||
set(map(lambda b: b.blob_hash,
|
||||
self.stream.descriptor.blobs[:-1] + [self.blob_manager.get_blob(self.stream.sd_hash)]))
|
||||
)
|
||||
send_more = await self.stream.upload_to_reflector('127.0.0.1', 5566)
|
||||
self.assertEqual(len(send_more), 0)
|
||||
self.assertTrue(self.stream.is_fully_reflected)
|
||||
server_sd_blob = self.server_blob_manager.get_blob(self.stream.sd_hash)
|
||||
self.assertTrue(server_sd_blob.get_is_verified())
|
||||
|
@ -71,6 +88,9 @@ class TestReflector(AsyncioTestCase):
|
|||
async def test_reflect_stream(self):
|
||||
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=50), 3, loop=self.loop)
|
||||
|
||||
async def test_reflect_stream_but_reflector_changes_its_mind(self):
|
||||
return await asyncio.wait_for(self._test_reflect_stream(partial_needs=True), 3, loop=self.loop)
|
||||
|
||||
async def test_reflect_stream_small_response_chunks(self):
|
||||
return await asyncio.wait_for(self._test_reflect_stream(response_chunk_size=30), 3, loop=self.loop)
|
||||
|
||||
|
|
Loading…
Reference in a new issue