Merge pull request #2310 from lbryio/2293

set file status to completed only when all bytes were written
This commit is contained in:
Jack Robison 2019-07-18 13:21:41 -04:00 committed by GitHub
commit 2a9e6473eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 19 deletions

View file

@ -69,10 +69,10 @@ class BlobManager:
def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool: def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool:
if not is_valid_blobhash(blob_hash): if not is_valid_blobhash(blob_hash):
raise ValueError(blob_hash) raise ValueError(blob_hash)
if blob_hash in self.blobs:
return self.blobs[blob_hash].get_is_verified()
if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)): if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
return False return False
if blob_hash in self.blobs:
return self.blobs[blob_hash].get_is_verified()
return self._get_blob(blob_hash, length).get_is_verified() return self._get_blob(blob_hash, length).get_is_verified()
async def setup(self) -> bool: async def setup(self) -> bool:

View file

@ -56,7 +56,6 @@ class ManagedStream:
'stream_claim_info', 'stream_claim_info',
'download_id', 'download_id',
'rowid', 'rowid',
'written_bytes',
'content_fee', 'content_fee',
'downloader', 'downloader',
'analytics_manager', 'analytics_manager',
@ -89,7 +88,6 @@ class ManagedStream:
self.stream_claim_info = claim self.stream_claim_info = claim
self.download_id = download_id or binascii.hexlify(generate_id()).decode() self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid self.rowid = rowid
self.written_bytes = 0
self.content_fee = content_fee self.content_fee = content_fee
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager self.analytics_manager = analytics_manager
@ -121,6 +119,10 @@ class ManagedStream:
def status(self) -> str: def status(self) -> str:
return self._status return self._status
@property
def written_bytes(self) -> int:
return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
async def update_status(self, status: str): async def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status self._status = status
@ -205,21 +207,13 @@ class ManagedStream:
full_path = self.full_path full_path = self.full_path
file_name = self.file_name file_name = self.file_name
download_directory = self.download_directory download_directory = self.download_directory
if self.full_path and self.output_file_exists: if not self.output_file_exists:
if self.written_bytes:
written_bytes = self.written_bytes
else:
written_bytes = os.stat(self.full_path).st_size
else:
full_path = None full_path = None
file_name = None file_name = None
download_directory = None download_directory = None
written_bytes = None
return { return {
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}", 'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
'completed': (self.output_file_exists and (self.status in ('stopped', 'finished')) 'completed': self.written_bytes >= self.descriptor.lower_bound_decrypted_length(),
or not self.saving.is_set()) or all(
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
'file_name': file_name, 'file_name': file_name,
'download_directory': download_directory, 'download_directory': download_directory,
'points_paid': 0.0, 'points_paid': 0.0,
@ -233,7 +227,7 @@ class ManagedStream:
'key': self.descriptor.key, 'key': self.descriptor.key,
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(), 'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
'total_bytes': self.descriptor.upper_bound_decrypted_length(), 'total_bytes': self.descriptor.upper_bound_decrypted_length(),
'written_bytes': written_bytes, 'written_bytes': self.written_bytes,
'blobs_completed': self.blobs_completed, 'blobs_completed': self.blobs_completed,
'blobs_in_stream': self.blobs_in_stream, 'blobs_in_stream': self.blobs_in_stream,
'blobs_remaining': self.blobs_remaining, 'blobs_remaining': self.blobs_remaining,
@ -371,7 +365,6 @@ class ManagedStream:
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID): async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1) log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted) await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
self.written_bytes += len(decrypted)
if not self.started_writing.is_set(): if not self.started_writing.is_set():
self.started_writing.set() self.started_writing.set()
await self.update_status(ManagedStream.STATUS_FINISHED) await self.update_status(ManagedStream.STATUS_FINISHED)
@ -387,7 +380,6 @@ class ManagedStream:
if os.path.isfile(output_path): if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash) log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path) os.remove(output_path)
self.written_bytes = 0
if isinstance(err, asyncio.TimeoutError): if isinstance(err, asyncio.TimeoutError):
self.downloader.stop() self.downloader.stop()
await self.blob_manager.storage.change_file_download_dir_and_file_name( await self.blob_manager.storage.change_file_download_dir_and_file_name(
@ -425,7 +417,6 @@ class ManagedStream:
self.stream_hash, self.download_directory, self.file_name self.stream_hash, self.download_directory, self.file_name
) )
await self.update_status(ManagedStream.STATUS_RUNNING) await self.update_status(ManagedStream.STATUS_RUNNING)
self.written_bytes = 0
self.file_output_task = self.loop.create_task(self._save_file(self.full_path)) self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
await self.started_writing.wait() await self.started_writing.wait()

View file

@ -17,6 +17,18 @@ class TestBlobManager(AsyncioTestCase):
self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config) self.blob_manager = BlobManager(self.loop, tmp_dir, self.storage, self.config)
await self.storage.open() await self.storage.open()
async def test_memory_blobs_arent_verifie_but_real_ones_are(self):
for save_blobs in (False, True):
await self.setup_blob_manager(save_blobs=save_blobs)
# add a blob file
blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed"
blob_bytes = b'1' * ((2 * 2 ** 20) - 1)
blob = self.blob_manager.get_blob(blob_hash, len(blob_bytes))
blob.save_verified_blob(blob_bytes)
self.assertTrue(blob.get_is_verified())
self.blob_manager.blob_completed(blob)
self.assertEqual(self.blob_manager.is_blob_verified(blob_hash), save_blobs)
async def test_sync_blob_file_manager_on_startup(self): async def test_sync_blob_file_manager_on_startup(self):
await self.setup_blob_manager(save_blobs=True) await self.setup_blob_manager(save_blobs=True)

View file

@ -40,6 +40,15 @@ class TestManagedStream(BlobExchangeTestBase):
self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir
) )
async def test_status_file_completed(self):
await self._test_transfer_stream(10)
self.assertTrue(self.stream.output_file_exists)
self.assertTrue(self.stream.as_dict()['completed'])
with open(self.stream.full_path, 'w+b') as outfile:
outfile.truncate(1)
self.assertTrue(self.stream.output_file_exists)
self.assertFalse(self.stream.as_dict()['completed'])
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True): async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True):
await self.setup_stream(blob_count) await self.setup_stream(blob_count)
mock_node = mock.Mock(spec=Node) mock_node = mock.Mock(spec=Node)
@ -52,7 +61,7 @@ class TestManagedStream(BlobExchangeTestBase):
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
await self.stream.save_file(node=mock_node) await self.stream.save_file(node=mock_node)
await self.stream.finished_writing.wait() await self.stream.finished_write_attempt.wait()
self.assertTrue(os.path.isfile(self.stream.full_path)) self.assertTrue(os.path.isfile(self.stream.full_path))
if stop_when_done: if stop_when_done:
await self.stream.stop() await self.stream.stop()