file status completed set only if all bytes were written

This commit is contained in:
Victor Shyba 2019-07-15 04:45:22 -03:00
parent eae4ed7b27
commit 8b1307e1ca
2 changed files with 17 additions and 16 deletions

View file

@ -56,7 +56,6 @@ class ManagedStream:
'stream_claim_info',
'download_id',
'rowid',
'written_bytes',
'content_fee',
'downloader',
'analytics_manager',
@ -89,7 +88,6 @@ class ManagedStream:
self.stream_claim_info = claim
self.download_id = download_id or binascii.hexlify(generate_id()).decode()
self.rowid = rowid
self.written_bytes = 0
self.content_fee = content_fee
self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor)
self.analytics_manager = analytics_manager
@ -121,6 +119,10 @@ class ManagedStream:
def status(self) -> str:
return self._status
@property
def written_bytes(self) -> int:
return 0 if not self.output_file_exists else os.stat(self.full_path).st_size
async def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status
@ -205,21 +207,14 @@ class ManagedStream:
full_path = self.full_path
file_name = self.file_name
download_directory = self.download_directory
if self.full_path and self.output_file_exists:
if self.written_bytes:
written_bytes = self.written_bytes
else:
written_bytes = os.stat(self.full_path).st_size
else:
if not self.output_file_exists:
full_path = None
file_name = None
download_directory = None
written_bytes = None
return {
'streaming_url': f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}",
'completed': (self.output_file_exists and (self.status in ('stopped', 'finished'))
or not self.saving.is_set()) or all(
self.blob_manager.is_blob_verified(b.blob_hash) for b in self.descriptor.blobs[:-1]),
'completed': self.written_bytes >= self.descriptor.lower_bound_decrypted_length(),
'file_name': file_name,
'download_directory': download_directory,
'points_paid': 0.0,
@ -233,7 +228,7 @@ class ManagedStream:
'key': self.descriptor.key,
'total_bytes_lower_bound': self.descriptor.lower_bound_decrypted_length(),
'total_bytes': self.descriptor.upper_bound_decrypted_length(),
'written_bytes': written_bytes,
'written_bytes': self.written_bytes,
'blobs_completed': self.blobs_completed,
'blobs_in_stream': self.blobs_in_stream,
'blobs_remaining': self.blobs_remaining,
@ -371,7 +366,6 @@ class ManagedStream:
async for blob_info, decrypted in self._aiter_read_stream(connection_id=self.SAVING_ID):
log.info("write blob %i/%i", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await self.loop.run_in_executor(None, self._write_decrypted_blob, file_write_handle, decrypted)
self.written_bytes += len(decrypted)
if not self.started_writing.is_set():
self.started_writing.set()
await self.update_status(ManagedStream.STATUS_FINISHED)
@ -387,7 +381,6 @@ class ManagedStream:
if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path)
self.written_bytes = 0
if isinstance(err, asyncio.TimeoutError):
self.downloader.stop()
await self.blob_manager.storage.change_file_download_dir_and_file_name(
@ -425,7 +418,6 @@ class ManagedStream:
self.stream_hash, self.download_directory, self.file_name
)
await self.update_status(ManagedStream.STATUS_RUNNING)
self.written_bytes = 0
self.file_output_task = self.loop.create_task(self._save_file(self.full_path))
await self.started_writing.wait()

View file

@ -40,6 +40,15 @@ class TestManagedStream(BlobExchangeTestBase):
self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir
)
async def test_status_file_completed(self):
await self._test_transfer_stream(10)
self.assertTrue(self.stream.output_file_exists)
self.assertTrue(self.stream.as_dict()['completed'])
with open(self.stream.full_path, 'w+b') as outfile:
outfile.truncate(1)
self.assertTrue(self.stream.output_file_exists)
self.assertFalse(self.stream.as_dict()['completed'])
async def _test_transfer_stream(self, blob_count: int, mock_accumulate_peers=None, stop_when_done=True):
await self.setup_stream(blob_count)
mock_node = mock.Mock(spec=Node)
@ -52,7 +61,7 @@ class TestManagedStream(BlobExchangeTestBase):
mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers
await self.stream.save_file(node=mock_node)
await self.stream.finished_writing.wait()
await self.stream.finished_write_attempt.wait()
self.assertTrue(os.path.isfile(self.stream.full_path))
if stop_when_done:
await self.stream.stop()