fix tests
-fix cleanup of a failed download
This commit is contained in:
parent
5d212a0f82
commit
993cb43e5b
4 changed files with 34 additions and 18 deletions
|
@ -28,7 +28,7 @@ class DownloadTimeoutError(Exception):
|
||||||
|
|
||||||
class DownloadDataTimeout(Exception):
|
class DownloadDataTimeout(Exception):
|
||||||
def __init__(self, download):
|
def __init__(self, download):
|
||||||
super().__init__(f'Failed to download data blobs for sd hash {download} within timeout ')
|
super().__init__(f'Failed to download data blobs for sd hash {download} within timeout')
|
||||||
self.download = download
|
self.download = download
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
import typing
|
import typing
|
||||||
import logging
|
import logging
|
||||||
|
@ -68,6 +69,9 @@ class StreamDownloader(StreamAssembler):
|
||||||
if not self.stream_handle.closed:
|
if not self.stream_handle.closed:
|
||||||
self.stream_handle.close()
|
self.stream_handle.close()
|
||||||
self.stream_handle = None
|
self.stream_handle = None
|
||||||
|
if not self.stream_finished_event.is_set() and self.wrote_bytes_event.is_set() and \
|
||||||
|
self.output_path and os.path.isfile(self.output_path):
|
||||||
|
os.remove(self.output_path)
|
||||||
|
|
||||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||||
return await self.blob_downloader.download_blob(blob_hash, length)
|
return await self.blob_downloader.download_blob(blob_hash, length)
|
||||||
|
|
|
@ -465,13 +465,15 @@ class StreamManager:
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
if descriptor_time_fut.done():
|
if descriptor_time_fut.done():
|
||||||
time_to_descriptor = descriptor_time_fut.result()
|
time_to_descriptor = descriptor_time_fut.result()
|
||||||
error = DownloadDataTimeout(downloader.descriptor.blobs[0].blob_hash)
|
error = DownloadDataTimeout(downloader.sd_hash)
|
||||||
|
self.blob_manager.delete_blob(downloader.sd_hash)
|
||||||
|
await self.storage.delete_stream(downloader.descriptor)
|
||||||
else:
|
else:
|
||||||
descriptor_time_fut.cancel()
|
descriptor_time_fut.cancel()
|
||||||
error = DownloadSDTimeout(downloader.sd_hash)
|
error = DownloadSDTimeout(downloader.sd_hash)
|
||||||
if stream:
|
if stream:
|
||||||
await self.stop_stream(stream)
|
await self.stop_stream(stream)
|
||||||
elif downloader:
|
else:
|
||||||
downloader.stop()
|
downloader.stop()
|
||||||
if error:
|
if error:
|
||||||
log.warning(error)
|
log.warning(error)
|
||||||
|
|
|
@ -74,48 +74,58 @@ class FileCommands(CommandTestCase):
|
||||||
sd_hash = claim['output']['value']['stream']['source']['source']
|
sd_hash = claim['output']['value']['stream']['source']['source']
|
||||||
file_info = self.daemon.jsonrpc_file_list()[0]
|
file_info = self.daemon.jsonrpc_file_list()[0]
|
||||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||||
all_except_sd = [
|
blobs = await self.server_storage.get_blobs_for_stream(
|
||||||
blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash
|
await self.server_storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
|
)
|
||||||
|
all_except_sd_and_head = [
|
||||||
|
blob.blob_hash for blob in blobs[1:] if blob.blob_hash
|
||||||
]
|
]
|
||||||
await self.server.blob_manager.delete_blobs(all_except_sd)
|
await self.server.blob_manager.delete_blobs(all_except_sd_and_head)
|
||||||
|
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||||
self.assertIn('error', resp)
|
self.assertNotIn('error', resp)
|
||||||
|
self.assertTrue(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
||||||
|
self.daemon.stream_manager.stop()
|
||||||
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
self.assertFalse(os.path.isfile(os.path.join(self.daemon.conf.download_dir, file_info['file_name'])))
|
||||||
|
|
||||||
async def test_incomplete_downloads_retry(self):
|
async def test_incomplete_downloads_retry(self):
|
||||||
claim = await self.make_claim('foo', '0.01')
|
claim = await self.make_claim('foo', '0.01')
|
||||||
sd_hash = claim['output']['value']['stream']['source']['source']
|
sd_hash = claim['output']['value']['stream']['source']['source']
|
||||||
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
await self.daemon.jsonrpc_file_delete(claim_name='foo')
|
||||||
all_except_sd = [
|
blobs = await self.server_storage.get_blobs_for_stream(
|
||||||
blob_hash for blob_hash in self.server.blob_manager.completed_blob_hashes if blob_hash != sd_hash
|
await self.server_storage.get_stream_hash_for_sd_hash(sd_hash)
|
||||||
|
)
|
||||||
|
all_except_sd_and_head = [
|
||||||
|
blob.blob_hash for blob in blobs[1:] if blob.blob_hash
|
||||||
]
|
]
|
||||||
|
|
||||||
# backup server blobs
|
# backup server blobs
|
||||||
for blob_hash in all_except_sd:
|
for blob_hash in all_except_sd_and_head:
|
||||||
blob = self.server_blob_manager.get_blob(blob_hash)
|
blob = self.server_blob_manager.get_blob(blob_hash)
|
||||||
os.rename(blob.file_path, blob.file_path + '__')
|
os.rename(blob.file_path, blob.file_path + '__')
|
||||||
|
|
||||||
# erase all except sd blob
|
# erase all except sd blob
|
||||||
await self.server.blob_manager.delete_blobs(all_except_sd)
|
await self.server.blob_manager.delete_blobs(all_except_sd_and_head)
|
||||||
|
|
||||||
# fails, as expected
|
# start the download
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
||||||
self.assertIn('error', resp)
|
self.assertNotIn('error', resp)
|
||||||
self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)
|
self.assertEqual(len(self.daemon.jsonrpc_file_list()), 1)
|
||||||
self.assertEqual('stopped', self.daemon.jsonrpc_file_list()[0]['status'])
|
self.assertEqual('running', self.daemon.jsonrpc_file_list()[0]['status'])
|
||||||
|
await self.daemon.jsonrpc_file_set_status('stop', claim_name='foo')
|
||||||
|
|
||||||
# recover blobs
|
# recover blobs
|
||||||
for blob_hash in all_except_sd:
|
for blob_hash in all_except_sd_and_head:
|
||||||
blob = self.server_blob_manager.get_blob(blob_hash)
|
blob = self.server_blob_manager.get_blob(blob_hash)
|
||||||
os.rename(blob.file_path + '__', blob.file_path)
|
os.rename(blob.file_path + '__', blob.file_path)
|
||||||
self.server_blob_manager.blobs.clear()
|
self.server_blob_manager.blobs.clear()
|
||||||
await self.server_blob_manager.blob_completed(self.server_blob_manager.get_blob(blob_hash))
|
await self.server_blob_manager.blob_completed(self.server_blob_manager.get_blob(blob_hash))
|
||||||
resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2)
|
|
||||||
|
await self.daemon.jsonrpc_file_set_status('start', claim_name='foo')
|
||||||
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5)
|
await asyncio.wait_for(self.wait_files_to_complete(), timeout=5)
|
||||||
self.assertNotIn('error', resp)
|
|
||||||
file_info = self.daemon.jsonrpc_file_list()[0]
|
file_info = self.daemon.jsonrpc_file_list()[0]
|
||||||
self.assertEqual(file_info['blobs_completed'], file_info['blobs_in_stream'])
|
self.assertEqual(file_info['blobs_completed'], file_info['blobs_in_stream'])
|
||||||
|
self.assertEqual('finished', file_info['status'])
|
||||||
|
|
||||||
async def test_unban_recovers_stream(self):
|
async def test_unban_recovers_stream(self):
|
||||||
BlobDownloader.BAN_TIME = .5 # fixme: temporary field, will move to connection manager or a conf
|
BlobDownloader.BAN_TIME = .5 # fixme: temporary field, will move to connection manager or a conf
|
||||||
|
|
Loading…
Reference in a new issue