non blocking blob creation

This commit is contained in:
Jack Robison 2020-04-22 16:10:23 -04:00
parent 7e23d6e2ef
commit fbe0f886b6
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
5 changed files with 44 additions and 21 deletions

View file

@ -110,7 +110,7 @@ class AbstractBlob:
if reader in self.readers: if reader in self.readers:
self.readers.remove(reader) self.readers.remove(reader)
def _write_blob(self, blob_bytes: bytes): def _write_blob(self, blob_bytes: bytes) -> asyncio.Task:
raise NotImplementedError() raise NotImplementedError()
def set_length(self, length) -> None: def set_length(self, length) -> None:
@ -198,11 +198,17 @@ class AbstractBlob:
def save_verified_blob(self, verified_bytes: bytes): def save_verified_blob(self, verified_bytes: bytes):
if self.verified.is_set(): if self.verified.is_set():
return return
if self.is_writeable():
self._write_blob(verified_bytes) def update_events(_):
self.verified.set() self.verified.set()
self.writing.clear()
if self.is_writeable():
self.writing.set()
task = self._write_blob(verified_bytes)
task.add_done_callback(update_events)
if self.blob_completed_callback: if self.blob_completed_callback:
self.blob_completed_callback(self) task.add_done_callback(lambda _: self.blob_completed_callback(self))
def get_blob_writer(self, peer_address: typing.Optional[str] = None, def get_blob_writer(self, peer_address: typing.Optional[str] = None,
peer_port: typing.Optional[int] = None) -> HashBlobWriter: peer_port: typing.Optional[int] = None) -> HashBlobWriter:
@ -261,9 +267,11 @@ class BlobBuffer(AbstractBlob):
self.verified.clear() self.verified.clear()
def _write_blob(self, blob_bytes: bytes): def _write_blob(self, blob_bytes: bytes):
if self._verified_bytes: async def write():
raise OSError("already have bytes for blob") if self._verified_bytes:
self._verified_bytes = BytesIO(blob_bytes) raise OSError("already have bytes for blob")
self._verified_bytes = BytesIO(blob_bytes)
return self.loop.create_task(write())
def delete(self): def delete(self):
if self._verified_bytes: if self._verified_bytes:
@ -319,8 +327,14 @@ class BlobFile(AbstractBlob):
handle.close() handle.close()
def _write_blob(self, blob_bytes: bytes): def _write_blob(self, blob_bytes: bytes):
with open(self.file_path, 'wb') as f: def _write_blob():
f.write(blob_bytes) with open(self.file_path, 'wb') as f:
f.write(blob_bytes)
async def write_blob():
await self.loop.run_in_executor(None, _write_blob)
return self.loop.create_task(write_blob())
def delete(self): def delete(self):
if os.path.isfile(self.file_path): if os.path.isfile(self.file_path):

View file

@ -152,6 +152,8 @@ class BlobExchangeClientProtocol(asyncio.Protocol):
log.debug(msg) log.debug(msg)
msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}" msg = f"downloaded {self.blob.blob_hash[:8]} from {self.peer_address}:{self.peer_port}"
await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop) await asyncio.wait_for(self.writer.finished, self.peer_timeout, loop=self.loop)
# wait for the io to finish
await self.blob.verified.wait()
log.info("%s at %fMB/s", msg, log.info("%s at %fMB/s", msg,
round((float(self._blob_bytes_received) / round((float(self._blob_bytes_received) /
float(time.perf_counter() - start_time)) / 1000000.0, 2)) float(time.perf_counter() - start_time)) / 1000000.0, 2))

View file

@ -343,9 +343,10 @@ class ManagedStream:
self.streaming.clear() self.streaming.clear()
@staticmethod @staticmethod
def _write_decrypted_blob(handle: typing.IO, data: bytes): def _write_decrypted_blob(output_path: str, data: bytes):
handle.write(data) with open(output_path, 'ab') as handle:
handle.flush() handle.write(data)
handle.flush()
async def _save_file(self, output_path: str): async def _save_file(self, output_path: str):
log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6], log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6],
@ -355,12 +356,12 @@ class ManagedStream:
self.finished_writing.clear() self.finished_writing.clear()
self.started_writing.clear() self.started_writing.clear()
try: try:
with open(output_path, 'wb') as file_write_handle: open(output_path, 'wb').close()
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID): async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted) await self.loop.run_in_executor(None, self._write_decrypted_blob, output_path, decrypted)
if not self.started_writing.is_set(): if not self.started_writing.is_set():
self.started_writing.set() self.started_writing.set()
await self.update_status(ManagedStream.STATUS_FINISHED) await self.update_status(ManagedStream.STATUS_FINISHED)
if self.analytics_manager: if self.analytics_manager:
self.loop.create_task(self.analytics_manager.send_download_finished( self.loop.create_task(self.analytics_manager.send_download_finished(

View file

@ -16,7 +16,7 @@ class TestBlobManager(AsyncioTestCase):
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config) self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config)
await self.storage.open() await self.storage.open()
async def test_memory_blobs_arent_verifie_but_real_ones_are(self): async def test_memory_blobs_arent_verified_but_real_ones_are(self):
for save_blobs in (False, True): for save_blobs in (False, True):
await self.setup_blob_manager(save_blobs=save_blobs) await self.setup_blob_manager(save_blobs=save_blobs)
# add a blob file # add a blob file
@ -24,6 +24,7 @@ class TestBlobManager(AsyncioTestCase):
blob_bytes = b'1' * ((2 * 2 ** 20) - 1) blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes)) blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes))
blob.save_verified_blob(blob_bytes) blob.save_verified_blob(blob_bytes)
await blob.verified.wait()
self.assertTrue(blob.get_is_verified()) self.assertTrue(blob.get_is_verified())
self.blob_manager.blob_completed(blob) self.blob_manager.blob_completed(blob)
self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs) self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs)

View file

@ -130,10 +130,14 @@ class TestBlobExchange(BlobExchangeTestBase):
write_blob = blob._write_blob write_blob = blob._write_blob
write_called_count = 0 write_called_count = 0
def wrap_write_blob(blob_bytes): async def _wrap_write_blob(blob_bytes):
nonlocal write_called_count nonlocal write_called_count
write_called_count += 1 write_called_count += 1
write_blob(blob_bytes) await write_blob(blob_bytes)
def wrap_write_blob(blob_bytes):
return asyncio.create_task(_wrap_write_blob(blob_bytes))
blob._write_blob = wrap_write_blob blob._write_blob = wrap_write_blob
writer1 = blob.get_blob_writer(peer_port=1) writer1 = blob.get_blob_writer(peer_port=1)
@ -166,6 +170,7 @@ class TestBlobExchange(BlobExchangeTestBase):
self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results) self.assertDictEqual({1: mock_blob_bytes, 2: mock_blob_bytes}, results)
self.assertEqual(1, write_called_count) self.assertEqual(1, write_called_count)
await blob.verified.wait()
self.assertTrue(blob.get_is_verified()) self.assertTrue(blob.get_is_verified())
self.assertDictEqual({}, blob.writers) self.assertDictEqual({}, blob.writers)