forked from LBRYCommunity/lbry-sdk
fix deleting partial downloads when stopped and previous streams when updating a publish
This commit is contained in:
parent
0e972ec2ae
commit
c1c6d5bc99
6 changed files with 90 additions and 68 deletions
|
@ -63,23 +63,23 @@ class BlobFileManager:
|
|||
blob_hashes = await self.storage.get_all_blob_hashes()
|
||||
return self.check_completed_blobs(blob_hashes)
|
||||
|
||||
async def delete_blobs(self, blob_hashes: typing.List[str]):
|
||||
bh_to_delete_from_db = []
|
||||
for blob_hash in blob_hashes:
|
||||
if not blob_hash:
|
||||
continue
|
||||
try:
|
||||
blob = self.get_blob(blob_hash)
|
||||
await blob.delete()
|
||||
bh_to_delete_from_db.append(blob_hash)
|
||||
except Exception as e:
|
||||
log.warning("Failed to delete blob file. Reason: %s", e)
|
||||
if blob_hash in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.remove(blob_hash)
|
||||
if blob_hash in self.blobs:
|
||||
del self.blobs[blob_hash]
|
||||
async def delete_blob(self, blob_hash: str):
|
||||
try:
|
||||
await self.storage.delete_blobs_from_db(bh_to_delete_from_db)
|
||||
except IntegrityError as err:
|
||||
if str(err) != "FOREIGN KEY constraint failed":
|
||||
raise err
|
||||
blob = self.get_blob(blob_hash)
|
||||
await blob.delete()
|
||||
except Exception as e:
|
||||
log.warning("Failed to delete blob file. Reason: %s", e)
|
||||
if blob_hash in self.completed_blob_hashes:
|
||||
self.completed_blob_hashes.remove(blob_hash)
|
||||
if blob_hash in self.blobs:
|
||||
del self.blobs[blob_hash]
|
||||
|
||||
async def delete_blobs(self, blob_hashes: typing.List[str], delete_from_db: typing.Optional[bool] = True):
|
||||
bh_to_delete_from_db = []
|
||||
await asyncio.gather(*map(self.delete_blob, blob_hashes), loop=self.loop)
|
||||
if delete_from_db:
|
||||
try:
|
||||
await self.storage.delete_blobs_from_db(bh_to_delete_from_db)
|
||||
except IntegrityError as err:
|
||||
if str(err) != "FOREIGN KEY constraint failed":
|
||||
raise err
|
||||
|
|
|
@ -1614,7 +1614,7 @@ class Daemon(metaclass=JSONRPCServerType):
|
|||
await self.stream_manager.start_stream(stream)
|
||||
msg = "Resumed download"
|
||||
elif status == 'stop' and stream.running:
|
||||
stream.stop_download()
|
||||
await self.stream_manager.stop_stream(stream)
|
||||
msg = "Stopped download"
|
||||
else:
|
||||
msg = (
|
||||
|
|
|
@ -43,7 +43,7 @@ class StreamAssembler:
|
|||
self.written_bytes: int = 0
|
||||
|
||||
async def _decrypt_blob(self, blob: 'BlobFile', blob_info: 'BlobInfo', key: str):
|
||||
if not blob or self.stream_handle.closed:
|
||||
if not blob or not self.stream_handle or self.stream_handle.closed:
|
||||
return False
|
||||
|
||||
def _decrypt_and_write():
|
||||
|
@ -86,28 +86,35 @@ class StreamAssembler:
|
|||
self.sd_blob, self.descriptor
|
||||
)
|
||||
await self.blob_manager.blob_completed(self.sd_blob)
|
||||
with open(self.output_path, 'wb') as stream_handle:
|
||||
self.stream_handle = stream_handle
|
||||
for i, blob_info in enumerate(self.descriptor.blobs[:-1]):
|
||||
if blob_info.blob_num != i:
|
||||
log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash)
|
||||
return
|
||||
while not stream_handle.closed:
|
||||
try:
|
||||
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
||||
if await self._decrypt_blob(blob, blob_info, self.descriptor.key):
|
||||
await self.blob_manager.blob_completed(blob)
|
||||
break
|
||||
except FileNotFoundError:
|
||||
log.debug("stream assembler stopped")
|
||||
written_blobs = None
|
||||
try:
|
||||
with open(self.output_path, 'wb') as stream_handle:
|
||||
self.stream_handle = stream_handle
|
||||
for i, blob_info in enumerate(self.descriptor.blobs[:-1]):
|
||||
if blob_info.blob_num != i:
|
||||
log.error("sd blob %s is invalid, cannot assemble stream", self.descriptor.sd_hash)
|
||||
return
|
||||
except (ValueError, IOError, OSError):
|
||||
log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash,
|
||||
self.descriptor.sd_hash)
|
||||
continue
|
||||
|
||||
self.stream_finished_event.set()
|
||||
await self.after_finished()
|
||||
while self.stream_handle and not self.stream_handle.closed:
|
||||
try:
|
||||
blob = await self.get_blob(blob_info.blob_hash, blob_info.length)
|
||||
if await self._decrypt_blob(blob, blob_info, self.descriptor.key):
|
||||
await self.blob_manager.blob_completed(blob)
|
||||
written_blobs = i
|
||||
break
|
||||
except FileNotFoundError:
|
||||
log.debug("stream assembler stopped")
|
||||
return
|
||||
except (ValueError, IOError, OSError):
|
||||
log.warning("failed to decrypt blob %s for stream %s", blob_info.blob_hash,
|
||||
self.descriptor.sd_hash)
|
||||
continue
|
||||
finally:
|
||||
if written_blobs == len(self.descriptor.blobs) - 1:
|
||||
log.debug("finished decrypting and assembling stream")
|
||||
self.stream_finished_event.set()
|
||||
await self.after_finished()
|
||||
else:
|
||||
log.debug("stream decryption and assembly did not finish")
|
||||
|
||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||
return self.blob_manager.get_blob(blob_hash, length)
|
||||
|
|
|
@ -63,6 +63,10 @@ class StreamDownloader(StreamAssembler):
|
|||
self.fixed_peers_handle.cancel()
|
||||
self.fixed_peers_handle = None
|
||||
self.blob_downloader = None
|
||||
if self.stream_handle:
|
||||
if not self.stream_handle.closed:
|
||||
self.stream_handle.close()
|
||||
self.stream_handle = None
|
||||
|
||||
async def get_blob(self, blob_hash: str, length: typing.Optional[int] = None) -> 'BlobFile':
|
||||
return await self.blob_downloader.download_blob(blob_hash, length)
|
||||
|
|
|
@ -104,8 +104,12 @@ class ManagedStream:
|
|||
def blobs_remaining(self) -> int:
|
||||
return self.blobs_in_stream - self.blobs_completed
|
||||
|
||||
@property
|
||||
def full_path(self) -> str:
|
||||
return os.path.join(self.download_directory, os.path.basename(self.file_name))
|
||||
|
||||
def as_dict(self) -> typing.Dict:
|
||||
full_path = os.path.join(self.download_directory, self.file_name)
|
||||
full_path = self.full_path
|
||||
if not os.path.isfile(full_path):
|
||||
full_path = None
|
||||
mime_type = guess_media_type(os.path.basename(self.file_name))
|
||||
|
@ -170,12 +174,7 @@ class ManagedStream:
|
|||
def stop_download(self):
|
||||
if self.downloader:
|
||||
self.downloader.stop()
|
||||
if not self.downloader.stream_finished_event.is_set() and self.downloader.wrote_bytes_event.is_set():
|
||||
path = os.path.join(self.download_directory, self.file_name)
|
||||
if os.path.isfile(path):
|
||||
os.remove(path)
|
||||
if not self.finished:
|
||||
self.update_status(self.STATUS_STOPPED)
|
||||
self.downloader = None
|
||||
|
||||
async def upload_to_reflector(self, host: str, port: int) -> typing.List[str]:
|
||||
sent = []
|
||||
|
|
|
@ -4,7 +4,7 @@ import typing
|
|||
import binascii
|
||||
import logging
|
||||
import random
|
||||
from lbrynet.error import ResolveError
|
||||
from lbrynet.error import ResolveError, InvalidStreamDescriptorError
|
||||
from lbrynet.stream.downloader import StreamDownloader
|
||||
from lbrynet.stream.managed_stream import ManagedStream
|
||||
from lbrynet.schema.claim import ClaimDict
|
||||
|
@ -97,8 +97,9 @@ class StreamManager:
|
|||
await asyncio.wait_for(self.loop.create_task(stream.downloader.got_descriptor.wait()),
|
||||
self.config.download_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
stream.stop_download()
|
||||
stream.downloader = None
|
||||
await self.stop_stream(stream)
|
||||
if stream in self.streams:
|
||||
self.streams.remove(stream)
|
||||
return False
|
||||
file_name = os.path.basename(stream.downloader.output_path)
|
||||
await self.storage.change_file_download_dir_and_file_name(
|
||||
|
@ -108,6 +109,18 @@ class StreamManager:
|
|||
return True
|
||||
return True
|
||||
|
||||
async def stop_stream(self, stream: ManagedStream):
|
||||
stream.stop_download()
|
||||
if not stream.finished and os.path.isfile(stream.full_path):
|
||||
try:
|
||||
os.remove(stream.full_path)
|
||||
except OSError as err:
|
||||
log.warning("Failed to delete partial download %s from downloads directory: %s", stream.full_path,
|
||||
str(err))
|
||||
if stream.running:
|
||||
stream.update_status(ManagedStream.STATUS_STOPPED)
|
||||
await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_STOPPED)
|
||||
|
||||
def make_downloader(self, sd_hash: str, download_directory: str, file_name: str):
|
||||
return StreamDownloader(
|
||||
self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name
|
||||
|
@ -116,13 +129,15 @@ class StreamManager:
|
|||
async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim):
|
||||
sd_blob = self.blob_manager.get_blob(sd_hash)
|
||||
if sd_blob.get_is_verified():
|
||||
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
||||
try:
|
||||
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
|
||||
except InvalidStreamDescriptorError as err:
|
||||
log.warning("Failed to start stream for sd %s - %s", sd_hash, str(err))
|
||||
return
|
||||
|
||||
downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name)
|
||||
stream = ManagedStream(
|
||||
self.loop, self.blob_manager, descriptor,
|
||||
download_directory,
|
||||
file_name,
|
||||
downloader, status, claim
|
||||
self.loop, self.blob_manager, descriptor, download_directory, file_name, downloader, status, claim
|
||||
)
|
||||
self.streams.add(stream)
|
||||
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
|
||||
|
@ -194,18 +209,14 @@ class StreamManager:
|
|||
return stream
|
||||
|
||||
async def delete_stream(self, stream: ManagedStream, delete_file: typing.Optional[bool] = False):
|
||||
stream.stop_download()
|
||||
self.streams.remove(stream)
|
||||
await self.stop_stream(stream)
|
||||
if stream in self.streams:
|
||||
self.streams.remove(stream)
|
||||
blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]]
|
||||
await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
||||
await self.storage.delete_stream(stream.descriptor)
|
||||
|
||||
blob_hashes = [stream.sd_hash]
|
||||
for blob_info in stream.descriptor.blobs[:-1]:
|
||||
blob_hashes.append(blob_info.blob_hash)
|
||||
await self.blob_manager.delete_blobs(blob_hashes)
|
||||
if delete_file:
|
||||
path = os.path.join(stream.download_directory, stream.file_name)
|
||||
if os.path.isfile(path):
|
||||
os.remove(path)
|
||||
if delete_file and os.path.isfile(stream.full_path):
|
||||
os.remove(stream.full_path)
|
||||
|
||||
def wait_for_stream_finished(self, stream: ManagedStream):
|
||||
async def _wait_for_stream_finished():
|
||||
|
@ -213,6 +224,7 @@ class StreamManager:
|
|||
try:
|
||||
await stream.downloader.stream_finished_event.wait()
|
||||
stream.update_status(ManagedStream.STATUS_FINISHED)
|
||||
await self.storage.change_file_status(stream.stream_hash, ManagedStream.STATUS_FINISHED)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
task = self.loop.create_task(_wait_for_stream_finished())
|
||||
|
|
Loading…
Add table
Reference in a new issue