re-assemble file / resume downloads

This commit is contained in:
Jack Robison 2019-02-01 15:46:31 -05:00
parent c1b4a012ec
commit 744375b2c0
No known key found for this signature in database
GPG key ID: DF25C68FE0239BB2
4 changed files with 43 additions and 12 deletions

View file

@ -1581,6 +1581,9 @@ class Daemon(metaclass=JSONRPCServerType):
if existing:
log.info("already have matching stream for %s", uri)
stream = existing[0]
if not stream.running:
log.info("resuming download")
await self.stream_manager.start_stream(stream)
else:
stream = await self.stream_manager.download_stream_from_claim(
self.dht_node, resolved, file_name, timeout, fee_amount, fee_address
@ -1618,7 +1621,7 @@ class Daemon(metaclass=JSONRPCServerType):
raise Exception(f'Unable to find a file for {kwargs}')
stream = streams[0]
if status == 'start' and not stream.running and not stream.finished:
stream.downloader.download(self.dht_node)
await self.stream_manager.start_stream(stream)
msg = "Resumed download"
elif status == 'stop' and stream.running:
stream.stop_download()

View file

@ -447,6 +447,12 @@ class SQLiteStorage(SQLiteMixin):
log.info("update file status %s -> %s", stream_hash, new_status)
return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash))
def change_file_download_dir(self, stream_hash: str, download_dir: str):
log.info("update file status %s -> %s", stream_hash, download_dir)
return self.db.execute("update file set download_directory=? where stream_hash=?", (
binascii.hexlify(download_dir.encode()).decode(), stream_hash
))
def get_all_stream_hashes(self):
return self.run_and_return_list("select stream_hash from stream")

View file

@ -11,6 +11,7 @@ from lbrynet.extras.daemon.storage import StoredStreamClaim
if typing.TYPE_CHECKING:
from lbrynet.schema.claim import ClaimDict
from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.dht.node import Node
log = logging.getLogger(__name__)
@ -155,6 +156,10 @@ class ManagedStream:
return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path),
status=cls.STATUS_FINISHED)
def start_download(self, node: typing.Optional['Node']):
self.downloader.download(node)
self.update_status(self.STATUS_RUNNING)
def stop_download(self):
if self.downloader:
self.downloader.stop()

View file

@ -63,15 +63,35 @@ class StreamManager:
claim_info = await self.storage.get_content_claim(stream.stream_hash)
stream.set_claim(claim_info, smart_decode(claim_info['value']))
async def start_stream(self, stream: ManagedStream):
path = os.path.join(stream.download_directory, stream.file_name)
if not stream.running or not os.path.isfile(path):
if stream.downloader:
stream.downloader.stop()
stream.downloader = None
if not os.path.isfile(path) and not os.path.isfile(
os.path.join(self.config.download_dir, stream.file_name)):
await self.storage.change_file_download_dir(stream.stream_hash, self.config.download_dir)
stream.download_directory = self.config.download_dir
stream.downloader = self.make_downloader(
stream.sd_hash, stream.download_directory, stream.file_name
)
stream.start_download(self.node)
await self.storage.change_file_status(stream.stream_hash, 'running')
stream.update_status('running')
self.wait_for_stream_finished(stream)
def make_downloader(self, sd_hash: str, download_directory: str, file_name: str):
return StreamDownloader(
self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name
)
async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim):
sd_blob = self.blob_manager.get_blob(sd_hash)
if sd_blob.get_is_verified():
descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash)
downloader = StreamDownloader(
self.loop, self.config, self.blob_manager, descriptor.sd_hash,
download_directory,
file_name
)
downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name)
stream = ManagedStream(
self.loop, self.blob_manager, descriptor,
download_directory,
@ -96,13 +116,10 @@ class StreamManager:
return
await self.node.joined.wait()
resumed = 0
for stream in self.streams:
if stream.status == ManagedStream.STATUS_RUNNING:
resumed += 1
stream.downloader.download(self.node)
self.wait_for_stream_finished(stream)
t = [self.start_stream(stream) for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING]
if resumed:
log.info("resuming %i downloads", resumed)
log.info("resuming %i downloads", t)
await asyncio.gather(*t, loop=self.loop)
async def reflect_streams(self):
streams = list(self.streams)