From fbe0f886b621b6396efd42f99438c6cdb1c2fb67 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 22 Apr 2020 16:10:23 -0400 Subject: [PATCH 1/2] non blocking blob creation --- lbry/blob/blob_file.py | 32 +++++++++++++------ lbry/blob_exchange/client.py | 2 ++ lbry/stream/managed_stream.py | 19 +++++------ tests/unit/blob/test_blob_manager.py | 3 +- .../unit/blob_exchange/test_transfer_blob.py | 9 ++++-- 5 files changed, 44 insertions(+), 21 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index 65e0d4a43..b8c7c461c 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -110,7 +110,7 @@ class AbstractBlob: if reader in self.readers: self.readers.remove(reader) - def _write_blob(self, blob_bytes: bytes): + def _write_blob(self, blob_bytes: bytes) -> asyncio.Task: raise NotImplementedError() def set_length(self, length) -> None: @@ -198,11 +198,17 @@ class AbstractBlob: def save_verified_blob(self, verified_bytes: bytes): if self.verified.is_set(): return - if self.is_writeable(): - self._write_blob(verified_bytes) + + def update_events(_): self.verified.set() + self.writing.clear() + + if self.is_writeable(): + self.writing.set() + task = self._write_blob(verified_bytes) + task.add_done_callback(update_events) if self.blob_completed_callback: - self.blob_completed_callback(self) + task.add_done_callback(lambda _: self.blob_completed_callback(self)) def get_blob_writer(self, peer_address: typing.Optional[str] = None, peer_port: typing.Optional[int] = None) -> HashBlobWriter: @@ -261,9 +267,11 @@ class BlobBuffer(AbstractBlob): self.verified.clear() def _write_blob(self, blob_bytes: bytes): - if self._verified_bytes: - raise OSError("already have bytes for blob") - self._verified_bytes = BytesIO(blob_bytes) + async def write(): + if self._verified_bytes: + raise OSError("already have bytes for blob") + self._verified_bytes = BytesIO(blob_bytes) + return self.loop.create_task(write()) def delete(self): if self._verified_bytes: @@ -319,8 +327,14 @@ class BlobFile(AbstractBlob): handle.close() def _write_blob(self, blob_bytes: bytes): - with open(self.file_path, 'wb') as f: - f.write(blob_bytes) + def _write_blob(): + with open(self.file_path, 'wb') as f: + f.write(blob_bytes) + + async def write_blob(): + await self.loop.run_in_executor(None, _write_blob) + + return self.loop.create_task(write_blob()) def delete(self): if os.path.isfile(self.file_path): diff --git a/lbry/blob_exchange/client.py b/lbry/blob_exchange/client.py index 408c0d323..61920c5b7 100644 --- a/lbry/blob_exchange/client.py +++ b/lbry/blob_exchange/client.py @@ -152,6 +152,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol): log.debug(msg) msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) + # wait for the io to finish + await self.blob.verified.wait() log.info("%s at %fMB/s", msg, round((float(self._blob_bytes_received) / float(time.perf_counter() - start_time)) / 1000000.0, 2)) diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 8339f6a67..c449fe232 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -343,9 +343,10 @@ class ManagedStream: self.streaming.clear() @staticmethod - def _write_decrypted_blob(handle: typing.IO, data: bytes): - handle.write(data) - handle.flush() + def _write_decrypted_blob(output_path: str, data: bytes): + with open(output_path, 'ab') as handle: + handle.write(data) + handle.flush() async def _save_file(self, output_path: str): log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6], @@ -355,12 +356,12 @@ class ManagedStream: self.finished_writing.clear() self.started_writing.clear() try: - with open(output_path, 'wb') as file_write_handle: - async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID): - log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) - await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted) - if not self.started_writing.is_set(): - self.started_writing.set() + open(output_path, 'wb').close() + async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID): + log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) + await self.loop.run_in_executor(None, self._write_decrypted_blob, output_path, decrypted) + if not self.started_writing.is_set(): + self.started_writing.set() await self.update_status(ManagedStream.STATUS_FINISHED) if self.analytics_manager: self.loop.create_task(self.analytics_manager.send_download_finished( diff --git a/tests/unit/blob/test_blob_manager.py b/tests/unit/blob/test_blob_manager.py index c868890f1..788ab0953 100644 --- a/tests/unit/blob/test_blob_manager.py +++ b/tests/unit/blob/test_blob_manager.py @@ -16,7 +16,7 @@ class TestBlobManager(AsyncioTestCase): self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config) await self.storage.open() - async def test_memory_blobs_arent_verifie_but_real_ones_are(self): + async def test_memory_blobs_arent_verified_but_real_ones_are(self): for save_blobs in (False, True): await self.setup_blob_manager(save_blobs=save_blobs) # add a blob file @@ -24,6 +24,7 @@ class TestBlobManager(AsyncioTestCase): blob_bytes = b'1' * ((2 * 2 ** 20) - 1) blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes)) blob.save_verified_blob(blob_bytes) + await blob.verified.wait() self.assertTrue(blob.get_is_verified()) self.blob_manager.blob_completed(blob) self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs) diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index f7c011e3b..fab7a4db0 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -130,10 +130,14 @@ class TestBlobExchange(BlobExchangeTestBase): write_blob = blob._write_blob write_called_count = 0 - def wrap_write_blob(blob_bytes): + async def _wrap_write_blob(blob_bytes): nonlocal write_called_count write_called_count += 1 - write_blob(blob_bytes) + await write_blob(blob_bytes) + + def wrap_write_blob(blob_bytes): + return asyncio.create_task(_wrap_write_blob(blob_bytes)) + blob._write_blob = wrap_write_blob writer1 = blob.get_blob_writer(peer_port=1) @@ -166,6 +170,7 @@ class TestBlobExchange(BlobExchangeTestBase): self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results) self.assertEqual(1, write_called_count) + await blob.verified.wait() self.assertTrue(blob.get_is_verified()) self.assertDictEqual({}, blob.writers) From decc5c74ef02e3e54790a08a3b9c377e70fe9db2 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Wed, 22 Apr 2020 18:17:01 -0400 Subject: [PATCH 2/2] don't block when reading a file when creating a stream --- lbry/stream/descriptor.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/lbry/stream/descriptor.py b/lbry/stream/descriptor.py index 1c0305ddc..2e2626b9b 100644 --- a/lbry/stream/descriptor.py +++ b/lbry/stream/descriptor.py @@ -44,18 +44,25 @@ def random_iv_generator() -> typing.Generator[bytes, None, None]: yield os.urandom(AES.block_size // 8) -def file_reader(file_path: str): +def read_bytes(file_path: str, offset: int, to_read: int): + with open(file_path, 'rb') as f: + f.seek(offset) + return f.read(to_read) + + +async def file_reader(file_path: str): length = int(os.stat(file_path).st_size) offset = 0 - with open(file_path, 'rb') as stream_file: - while offset < length: - bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1) - if not bytes_to_read: - break - blob_bytes = stream_file.read(bytes_to_read) - yield blob_bytes - offset += bytes_to_read + while offset < length: + bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1) + if not bytes_to_read: + break + blob_bytes = await asyncio.get_event_loop().run_in_executor( + None, read_bytes, file_path, offset, bytes_to_read + ) + yield blob_bytes + offset += bytes_to_read def sanitize_file_name(dirty_name: str, default_file_name: str = 'lbry_download'): @@ -245,7 +252,7 @@ class StreamDescriptor: iv_generator = iv_generator or random_iv_generator() key = key or os.urandom(AES.block_size // 8) blob_num = -1 - for blob_bytes in file_reader(file_path): + async for blob_bytes in file_reader(file_path): blob_num += 1 blob_info = await BlobFile.create_from_unencrypted( loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback