diff --git a/lbrynet/error.py b/lbrynet/error.py index fe7e627ed..234bb9229 100644 --- a/lbrynet/error.py +++ b/lbrynet/error.py @@ -28,7 +28,7 @@ class DownloadTimeoutError(Exception): class DownloadDataTimeout(Exception): def __init__(self, download): - super().__init__(f'Failed to download data blobs for sd hash {download} within timeout ') + super().__init__(f'Failed to download data blobs for sd hash {download} within timeout') self.download = download diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index fda968234..70360e878 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -1,3 +1,4 @@ +import os import asyncio import typing import logging @@ -68,6 +69,9 @@ class StreamDownloader(StreamAssembler): if not self.stream_handle.closed: self.stream_handle.close() self.stream_handle = None + if not self.stream_finished_event.is_set() and self.wrote_bytes_event.is_set() and \ + self.output_path and os.path.isfile(self.output_path): + os.remove(self.output_path) async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile': return await self.blob_downloader.download_blob(blob_hash, length) diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index fa9328886..137fbad11 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -465,13 +465,15 @@ class StreamManager: except asyncio.TimeoutError: if descriptor_time_fut.done(): time_to_descriptor = descriptor_time_fut.result() - error = DownloadDataTimeout(downloader.descriptor.blobs[0].blob_hash) + error = DownloadDataTimeout(downloader.sd_hash) + self.blob_manager.delete_blob(downloader.sd_hash) + await self.storage.delete_stream(downloader.descriptor) else: descriptor_time_fut.cancel() error = DownloadSDTimeout(downloader.sd_hash) if stream: await self.stop_stream(stream) - elif downloader: + else: downloader.stop() if error: log.warning(error) diff --git a/tests/integration/test_file_commands.py b/tests/integration/test_file_commands.py index 1d09a92af..569cace77 100644 --- a/tests/integration/test_file_commands.py +++ b/tests/integration/test_file_commands.py @@ -74,48 +74,58 @@ class FileCommands(CommandTestCase): sd_hash = claim['output']['value']['stream']['source']['source'] file_info = self.daemon.jsonrpc_file_list()[0] await self.daemon.jsonrpc_file_delete(claim_name='foo') - all_except_sd = [ - blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash + blobs = await self.server_storage.get_blobs_for_stream( + await self.server_storage.get_stream_hash_for_sd_hash(sd_hash) + ) + all_except_sd_and_head = [ + blob.blob_hash for blob in blobs[1:] if blob.blob_hash ] - await self.server.blob_manager.delete_blobs(all_except_sd) - + await self.server.blob_manager.delete_blobs(all_except_sd_and_head) + self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name']))) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) - self.assertIn('error', resp) + self.assertNotIn('error', resp) + self.assertTrue(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name']))) + self.daemon.stream_manager.stop() self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name']))) async def test_incomplete_downloads_retry(self): claim = await self.make_claim('foo', '0.01') sd_hash = claim['output']['value']['stream']['source']['source'] await self.daemon.jsonrpc_file_delete(claim_name='foo') - all_except_sd = [ - blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash + blobs = await self.server_storage.get_blobs_for_stream( + await self.server_storage.get_stream_hash_for_sd_hash(sd_hash) + ) + all_except_sd_and_head = [ + blob.blob_hash for blob in blobs[1:] if blob.blob_hash ] # backup server blobs - for blob_hash in all_except_sd: + for blob_hash in all_except_sd_and_head: blob = self.server_blob_manager.get_blob(blob_hash) os.rename(blob.file_path, blob.file_path + '__') # erase all except sd blob - await self.server.blob_manager.delete_blobs(all_except_sd) + await self.server.blob_manager.delete_blobs(all_except_sd_and_head) - # fails, as expected + # start the download resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) - self.assertIn('error', resp) + self.assertNotIn('error', resp) self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1) - self.assertEqual('stopped', self.daemon.jsonrpc_file_list()[0]['status']) + self.assertEqual('running', self.daemon.jsonrpc_file_list()[0]['status']) + await self.daemon.jsonrpc_file_set_status('stop', claim_name='foo') # recover blobs - for blob_hash in all_except_sd: + for blob_hash in all_except_sd_and_head: blob = self.server_blob_manager.get_blob(blob_hash) os.rename(blob.file_path + '__', blob.file_path) self.server_blob_manager.blobs.clear() await self.server_blob_manager.blob_completed(self.server_blob_manager.get_blob(blob_hash)) - resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2) + + await self.daemon.jsonrpc_file_set_status('start', claim_name='foo') await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) - self.assertNotIn('error', resp) file_info = self.daemon.jsonrpc_file_list()[0] self.assertEqual(file_info['blobs_completed'], file_info['blobs_in_stream']) + self.assertEqual('finished', file_info['status']) async def test_unban_recovers_stream(self): BlobDownloader.BAN_TIME = .5 # fixme: temporary field, will move to connection manager or a conf