Merge pull request #2926 from lbryio/blocking-file-io
make file i/o in stream and blob creation and reads non-blocking
This commit is contained in:
commit
b7cb2a7aa5
6 changed files with 61 additions and 31 deletions
|
@ -110,7 +110,7 @@ class AbstractBlob:
|
||||||
if reader in self.readers:
|
if reader in self.readers:
|
||||||
self.readers.remove(reader)
|
self.readers.remove(reader)
|
||||||
|
|
||||||
def _write_blob(self, blob_bytes: bytes):
|
def _write_blob(self, blob_bytes: bytes) -> asyncio.Task:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def set_length(self, length) -> None:
|
def set_length(self, length) -> None:
|
||||||
|
@ -198,11 +198,17 @@ class AbstractBlob:
|
||||||
def save_verified_blob(self, verified_bytes: bytes):
|
def save_verified_blob(self, verified_bytes: bytes):
|
||||||
if self.verified.is_set():
|
if self.verified.is_set():
|
||||||
return
|
return
|
||||||
if self.is_writeable():
|
|
||||||
self._write_blob(verified_bytes)
|
def update_events(_):
|
||||||
self.verified.set()
|
self.verified.set()
|
||||||
|
self.writing.clear()
|
||||||
|
|
||||||
|
if self.is_writeable():
|
||||||
|
self.writing.set()
|
||||||
|
task = self._write_blob(verified_bytes)
|
||||||
|
task.add_done_callback(update_events)
|
||||||
if self.blob_completed_callback:
|
if self.blob_completed_callback:
|
||||||
self.blob_completed_callback(self)
|
task.add_done_callback(lambda _: self.blob_completed_callback(self))
|
||||||
|
|
||||||
def get_blob_writer(self, peer_address: typing.Optional[str] = None,
|
def get_blob_writer(self, peer_address: typing.Optional[str] = None,
|
||||||
peer_port: typing.Optional[int] = None) -> HashBlobWriter:
|
peer_port: typing.Optional[int] = None) -> HashBlobWriter:
|
||||||
|
@ -261,9 +267,11 @@ class BlobBuffer(AbstractBlob):
|
||||||
self.verified.clear()
|
self.verified.clear()
|
||||||
|
|
||||||
def _write_blob(self, blob_bytes: bytes):
|
def _write_blob(self, blob_bytes: bytes):
|
||||||
if self._verified_bytes:
|
async def write():
|
||||||
raise OSError("already have bytes for blob")
|
if self._verified_bytes:
|
||||||
self._verified_bytes = BytesIO(blob_bytes)
|
raise OSError("already have bytes for blob")
|
||||||
|
self._verified_bytes = BytesIO(blob_bytes)
|
||||||
|
return self.loop.create_task(write())
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
if self._verified_bytes:
|
if self._verified_bytes:
|
||||||
|
@ -319,8 +327,14 @@ class BlobFile(AbstractBlob):
|
||||||
handle.close()
|
handle.close()
|
||||||
|
|
||||||
def _write_blob(self, blob_bytes: bytes):
|
def _write_blob(self, blob_bytes: bytes):
|
||||||
with open(self.file_path, 'wb') as f:
|
def _write_blob():
|
||||||
f.write(blob_bytes)
|
with open(self.file_path, 'wb') as f:
|
||||||
|
f.write(blob_bytes)
|
||||||
|
|
||||||
|
async def write_blob():
|
||||||
|
await self.loop.run_in_executor(None, _write_blob)
|
||||||
|
|
||||||
|
return self.loop.create_task(write_blob())
|
||||||
|
|
||||||
def delete(self):
|
def delete(self):
|
||||||
if os.path.isfile(self.file_path):
|
if os.path.isfile(self.file_path):
|
||||||
|
|
|
@ -152,6 +152,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
|
||||||
log.debug(msg)
|
log.debug(msg)
|
||||||
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
|
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
|
||||||
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
|
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
|
||||||
|
# wait for the io to finish
|
||||||
|
await self.blob.verified.wait()
|
||||||
log.info("%s at %fMB/s", msg,
|
log.info("%s at %fMB/s", msg,
|
||||||
round((float(self._blob_bytes_received) /
|
round((float(self._blob_bytes_received) /
|
||||||
float(time.perf_counter() - start_time)) / 1000000.0, 2))
|
float(time.perf_counter() - start_time)) / 1000000.0, 2))
|
||||||
|
|
|
@ -44,18 +44,25 @@ def random_iv_generator() -> typing.Generator[bytes, None, None]:
|
||||||
yield os.urandom(AES.block_size // 8)
|
yield os.urandom(AES.block_size // 8)
|
||||||
|
|
||||||
|
|
||||||
def file_reader(file_path: str):
|
def read_bytes(file_path: str, offset: int, to_read: int):
|
||||||
|
with open(file_path, 'rb') as f:
|
||||||
|
f.seek(offset)
|
||||||
|
return f.read(to_read)
|
||||||
|
|
||||||
|
|
||||||
|
async def file_reader(file_path: str):
|
||||||
length = int(os.stat(file_path).st_size)
|
length = int(os.stat(file_path).st_size)
|
||||||
offset = 0
|
offset = 0
|
||||||
|
|
||||||
with open(file_path, 'rb') as stream_file:
|
while offset < length:
|
||||||
while offset < length:
|
bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1)
|
||||||
bytes_to_read = min((length - offset), MAX_BLOB_SIZE - 1)
|
if not bytes_to_read:
|
||||||
if not bytes_to_read:
|
break
|
||||||
break
|
blob_bytes = await asyncio.get_event_loop().run_in_executor(
|
||||||
blob_bytes = stream_file.read(bytes_to_read)
|
None, read_bytes, file_path, offset, bytes_to_read
|
||||||
yield blob_bytes
|
)
|
||||||
offset += bytes_to_read
|
yield blob_bytes
|
||||||
|
offset += bytes_to_read
|
||||||
|
|
||||||
|
|
||||||
def sanitize_file_name(dirty_name: str, default_file_name: str = 'lbry_download'):
|
def sanitize_file_name(dirty_name: str, default_file_name: str = 'lbry_download'):
|
||||||
|
@ -245,7 +252,7 @@ class StreamDescriptor:
|
||||||
iv_generator = iv_generator or random_iv_generator()
|
iv_generator = iv_generator or random_iv_generator()
|
||||||
key = key or os.urandom(AES.block_size // 8)
|
key = key or os.urandom(AES.block_size // 8)
|
||||||
blob_num = -1
|
blob_num = -1
|
||||||
for blob_bytes in file_reader(file_path):
|
async for blob_bytes in file_reader(file_path):
|
||||||
blob_num += 1
|
blob_num += 1
|
||||||
blob_info = await BlobFile.create_from_unencrypted(
|
blob_info = await BlobFile.create_from_unencrypted(
|
||||||
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback
|
loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, blob_completed_callback
|
||||||
|
|
|
@ -343,9 +343,10 @@ class ManagedStream:
|
||||||
self.streaming.clear()
|
self.streaming.clear()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _write_decrypted_blob(handle: typing.IO, data: bytes):
|
def _write_decrypted_blob(output_path: str, data: bytes):
|
||||||
handle.write(data)
|
with open(output_path, 'ab') as handle:
|
||||||
handle.flush()
|
handle.write(data)
|
||||||
|
handle.flush()
|
||||||
|
|
||||||
async def _save_file(self, output_path: str):
|
async def _save_file(self, output_path: str):
|
||||||
log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6],
|
log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6],
|
||||||
|
@ -355,12 +356,12 @@ class ManagedStream:
|
||||||
self.finished_writing.clear()
|
self.finished_writing.clear()
|
||||||
self.started_writing.clear()
|
self.started_writing.clear()
|
||||||
try:
|
try:
|
||||||
with open(output_path, 'wb') as file_write_handle:
|
open(output_path, 'wb').close()
|
||||||
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
|
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
|
||||||
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
|
||||||
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
|
await self.loop.run_in_executor(None, self._write_decrypted_blob, output_path, decrypted)
|
||||||
if not self.started_writing.is_set():
|
if not self.started_writing.is_set():
|
||||||
self.started_writing.set()
|
self.started_writing.set()
|
||||||
await self.update_status(ManagedStream.STATUS_FINISHED)
|
await self.update_status(ManagedStream.STATUS_FINISHED)
|
||||||
if self.analytics_manager:
|
if self.analytics_manager:
|
||||||
self.loop.create_task(self.analytics_manager.send_download_finished(
|
self.loop.create_task(self.analytics_manager.send_download_finished(
|
||||||
|
|
|
@ -16,7 +16,7 @@ class TestBlobManager(AsyncioTestCase):
|
||||||
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config)
|
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config)
|
||||||
await self.storage.open()
|
await self.storage.open()
|
||||||
|
|
||||||
async def test_memory_blobs_arent_verifie_but_real_ones_are(self):
|
async def test_memory_blobs_arent_verified_but_real_ones_are(self):
|
||||||
for save_blobs in (False, True):
|
for save_blobs in (False, True):
|
||||||
await self.setup_blob_manager(save_blobs=save_blobs)
|
await self.setup_blob_manager(save_blobs=save_blobs)
|
||||||
# add a blob file
|
# add a blob file
|
||||||
|
@ -24,6 +24,7 @@ class TestBlobManager(AsyncioTestCase):
|
||||||
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
|
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
|
||||||
blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes))
|
blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes))
|
||||||
blob.save_verified_blob(blob_bytes)
|
blob.save_verified_blob(blob_bytes)
|
||||||
|
await blob.verified.wait()
|
||||||
self.assertTrue(blob.get_is_verified())
|
self.assertTrue(blob.get_is_verified())
|
||||||
self.blob_manager.blob_completed(blob)
|
self.blob_manager.blob_completed(blob)
|
||||||
self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs)
|
self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs)
|
||||||
|
|
|
@ -130,10 +130,14 @@ class TestBlobExchange(BlobExchangeTestBase):
|
||||||
write_blob = blob._write_blob
|
write_blob = blob._write_blob
|
||||||
write_called_count = 0
|
write_called_count = 0
|
||||||
|
|
||||||
def wrap_write_blob(blob_bytes):
|
async def _wrap_write_blob(blob_bytes):
|
||||||
nonlocal write_called_count
|
nonlocal write_called_count
|
||||||
write_called_count += 1
|
write_called_count += 1
|
||||||
write_blob(blob_bytes)
|
await write_blob(blob_bytes)
|
||||||
|
|
||||||
|
def wrap_write_blob(blob_bytes):
|
||||||
|
return asyncio.create_task(_wrap_write_blob(blob_bytes))
|
||||||
|
|
||||||
blob._write_blob = wrap_write_blob
|
blob._write_blob = wrap_write_blob
|
||||||
|
|
||||||
writer1 = blob.get_blob_writer(peer_port=1)
|
writer1 = blob.get_blob_writer(peer_port=1)
|
||||||
|
@ -166,6 +170,7 @@ class TestBlobExchange(BlobExchangeTestBase):
|
||||||
|
|
||||||
self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results)
|
self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results)
|
||||||
self.assertEqual(1, write_called_count)
|
self.assertEqual(1, write_called_count)
|
||||||
|
await blob.verified.wait()
|
||||||
self.assertTrue(blob.get_is_verified())
|
self.assertTrue(blob.get_is_verified())
|
||||||
self.assertDictEqual({}, blob.writers)
|
self.assertDictEqual({}, blob.writers)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue