Merge pull request #1939 from lbryio/missing_await
refactor file_name and full_path as it can be None, fix missing await, stop using blob_manager.storage on stream_manager and use your own
This commit is contained in:
commit
f175e8826c
2 changed files with 28 additions and 26 deletions
|
@ -22,7 +22,7 @@ class ManagedStream:
|
||||||
STATUS_FINISHED = "finished"
|
STATUS_FINISHED = "finished"
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int,
|
def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', rowid: int,
|
||||||
descriptor: 'StreamDescriptor', download_directory: str, file_name: str,
|
descriptor: 'StreamDescriptor', download_directory: str, file_name: typing.Optional[str],
|
||||||
downloader: typing.Optional[StreamDownloader] = None,
|
downloader: typing.Optional[StreamDownloader] = None,
|
||||||
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None):
|
status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredStreamClaim] = None):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
|
@ -39,7 +39,7 @@ class ManagedStream:
|
||||||
self.tx = None
|
self.tx = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def file_name(self):
|
def file_name(self) -> typing.Optional[str]:
|
||||||
return self.downloader.output_file_name if self.downloader else self._file_name
|
return self.downloader.output_file_name if self.downloader else self._file_name
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -112,14 +112,16 @@ class ManagedStream:
|
||||||
return self.blobs_in_stream - self.blobs_completed
|
return self.blobs_in_stream - self.blobs_completed
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def full_path(self) -> str:
|
def full_path(self) -> typing.Optional[str]:
|
||||||
return os.path.join(self.download_directory, os.path.basename(self.file_name))
|
return os.path.join(self.download_directory, os.path.basename(self.file_name)) if self.file_name else None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def output_file_exists(self):
|
||||||
|
return os.path.isfile(self.full_path) if self.full_path else False
|
||||||
|
|
||||||
def as_dict(self) -> typing.Dict:
|
def as_dict(self) -> typing.Dict:
|
||||||
full_path = self.full_path
|
full_path = self.full_path if self.output_file_exists else None
|
||||||
if not os.path.isfile(full_path):
|
mime_type = guess_media_type(os.path.basename(self.descriptor.suggested_file_name))
|
||||||
full_path = None
|
|
||||||
mime_type = guess_media_type(os.path.basename(self.file_name))
|
|
||||||
|
|
||||||
if self.downloader and self.downloader.written_bytes:
|
if self.downloader and self.downloader.written_bytes:
|
||||||
written_bytes = self.downloader.written_bytes
|
written_bytes = self.downloader.written_bytes
|
||||||
|
|
|
@ -76,10 +76,7 @@ class StreamManager:
|
||||||
"""
|
"""
|
||||||
Resume or rebuild a partial or completed stream
|
Resume or rebuild a partial or completed stream
|
||||||
"""
|
"""
|
||||||
|
if not stream.running and not stream.output_file_exists:
|
||||||
path = os.path.join(stream.download_directory, stream.file_name)
|
|
||||||
|
|
||||||
if not stream.running and not os.path.isfile(path):
|
|
||||||
if stream.downloader:
|
if stream.downloader:
|
||||||
stream.downloader.stop()
|
stream.downloader.stop()
|
||||||
stream.downloader = None
|
stream.downloader = None
|
||||||
|
@ -117,7 +114,7 @@ class StreamManager:
|
||||||
|
|
||||||
async def stop_stream(self, stream: ManagedStream):
|
async def stop_stream(self, stream: ManagedStream):
|
||||||
stream.stop_download()
|
stream.stop_download()
|
||||||
if not stream.finished and os.path.isfile(stream.full_path):
|
if not stream.finished and stream.output_file_exists:
|
||||||
try:
|
try:
|
||||||
os.remove(stream.full_path)
|
os.remove(stream.full_path)
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
|
@ -272,7 +269,7 @@ class StreamManager:
|
||||||
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
|
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
|
||||||
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
||||||
await self.storage.delete_stream(stream.descriptor)
|
await self.storage.delete_stream(stream.descriptor)
|
||||||
if delete_file and os.path.isfile(stream.full_path):
|
if delete_file and stream.output_file_exists:
|
||||||
os.remove(stream.full_path)
|
os.remove(stream.full_path)
|
||||||
|
|
||||||
def wait_for_stream_finished(self, stream: ManagedStream):
|
def wait_for_stream_finished(self, stream: ManagedStream):
|
||||||
|
@ -305,18 +302,8 @@ class StreamManager:
|
||||||
downloader.stop()
|
downloader.stop()
|
||||||
log.info("stopped stream")
|
log.info("stopped stream")
|
||||||
raise DownloadSDTimeout(downloader.sd_hash)
|
raise DownloadSDTimeout(downloader.sd_hash)
|
||||||
file_name = os.path.basename(downloader.output_path)
|
rowid = await self._store_stream(downloader)
|
||||||
download_directory = os.path.dirname(downloader.output_path)
|
await self.storage.save_content_claim(
|
||||||
if not await self.blob_manager.storage.stream_exists(downloader.sd_hash):
|
|
||||||
await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
|
||||||
if not await self.blob_manager.storage.file_exists(downloader.sd_hash):
|
|
||||||
rowid = await self.blob_manager.storage.save_downloaded_file(
|
|
||||||
downloader.descriptor.stream_hash, file_name, download_directory,
|
|
||||||
0.0
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
rowid = self.blob_manager.storage.rowid_for_stream(downloader.descriptor.stream_hash)
|
|
||||||
await self.blob_manager.storage.save_content_claim(
|
|
||||||
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}"
|
||||||
)
|
)
|
||||||
stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, download_directory,
|
stream = ManagedStream(self.loop, self.blob_manager, rowid, downloader.descriptor, download_directory,
|
||||||
|
@ -333,6 +320,19 @@ class StreamManager:
|
||||||
await self.stop_stream(stream)
|
await self.stop_stream(stream)
|
||||||
raise DownloadDataTimeout(downloader.sd_hash)
|
raise DownloadDataTimeout(downloader.sd_hash)
|
||||||
|
|
||||||
|
async def _store_stream(self, downloader: StreamDownloader) -> int:
|
||||||
|
file_name = os.path.basename(downloader.output_path)
|
||||||
|
download_directory = os.path.dirname(downloader.output_path)
|
||||||
|
if not await self.storage.stream_exists(downloader.sd_hash):
|
||||||
|
await self.storage.store_stream(downloader.sd_blob, downloader.descriptor)
|
||||||
|
if not await self.storage.file_exists(downloader.sd_hash):
|
||||||
|
return await self.storage.save_downloaded_file(
|
||||||
|
downloader.descriptor.stream_hash, file_name, download_directory,
|
||||||
|
0.0
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return await self.storage.rowid_for_stream(downloader.descriptor.stream_hash)
|
||||||
|
|
||||||
async def download_stream_from_claim(self, node: 'Node', claim_info: typing.Dict,
|
async def download_stream_from_claim(self, node: 'Node', claim_info: typing.Dict,
|
||||||
file_name: typing.Optional[str] = None,
|
file_name: typing.Optional[str] = None,
|
||||||
timeout: typing.Optional[float] = 60,
|
timeout: typing.Optional[float] = 60,
|
||||||
|
|
Loading…
Add table
Reference in a new issue