From 744375b2c0373395f7d295d7a8bf73f10b5a02ca Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 15:46:31 -0500 Subject: [PATCH 1/5] re-assemble file / resume downloads --- lbrynet/extras/daemon/Daemon.py | 5 +++- lbrynet/extras/daemon/storage.py | 6 +++++ lbrynet/stream/managed_stream.py | 5 ++++ lbrynet/stream/stream_manager.py | 39 +++++++++++++++++++++++--------- 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index a7d4b167b..c0566765f 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1581,6 +1581,9 @@ class Daemon(metaclass=JSONRPCServerType): if existing: log.info("already have matching stream for %s", uri) stream = existing[0] + if not stream.running: + log.info("resuming download") + await self.stream_manager.start_stream(stream) else: stream = await self.stream_manager.download_stream_from_claim( self.dht_node, resolved, file_name, timeout, fee_amount, fee_address @@ -1618,7 +1621,7 @@ class Daemon(metaclass=JSONRPCServerType): raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running and not stream.finished: - stream.downloader.download(self.dht_node) + await self.stream_manager.start_stream(stream) msg = "Resumed download" elif status == 'stop' and stream.running: stream.stop_download() diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 084b0c11d..650054e4a 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -447,6 +447,12 @@ class SQLiteStorage(SQLiteMixin): log.info("update file status %s -> %s", stream_hash, new_status) return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash)) + def change_file_download_dir(self, stream_hash: str, download_dir: str): + log.info("update file status %s -> %s", stream_hash, download_dir) + return self.db.execute("update file set download_directory=? where stream_hash=?", ( + binascii.hexlify(download_dir.encode()).decode(), stream_hash + )) + def get_all_stream_hashes(self): return self.run_and_return_list("select stream_hash from stream") diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index f5c4f957b..6ca488031 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -11,6 +11,7 @@ from lbrynet.extras.daemon.storage import StoredStreamClaim if typing.TYPE_CHECKING: from lbrynet.schema.claim import ClaimDict from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.dht.node import Node log = logging.getLogger(__name__) @@ -155,6 +156,10 @@ class ManagedStream: return cls(loop, blob_manager, descriptor, os.path.dirname(file_path), os.path.basename(file_path), status=cls.STATUS_FINISHED) + def start_download(self, node: typing.Optional['Node']): + self.downloader.download(node) + self.update_status(self.STATUS_RUNNING) + def stop_download(self): if self.downloader: self.downloader.stop() diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 164b1cda6..b38a0dfc5 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -63,15 +63,35 @@ class StreamManager: claim_info = await self.storage.get_content_claim(stream.stream_hash) stream.set_claim(claim_info, smart_decode(claim_info['value'])) + async def start_stream(self, stream: ManagedStream): + path = os.path.join(stream.download_directory, stream.file_name) + + if not stream.running or not os.path.isfile(path): + if stream.downloader: + stream.downloader.stop() + stream.downloader = None + if not os.path.isfile(path) and not os.path.isfile( + os.path.join(self.config.download_dir, stream.file_name)): + await self.storage.change_file_download_dir(stream.stream_hash, self.config.download_dir) + stream.download_directory = self.config.download_dir + stream.downloader = self.make_downloader( + stream.sd_hash, stream.download_directory, stream.file_name + ) + stream.start_download(self.node) + await self.storage.change_file_status(stream.stream_hash, 'running') + stream.update_status('running') + self.wait_for_stream_finished(stream) + + def make_downloader(self, sd_hash: str, download_directory: str, file_name: str): + return StreamDownloader( + self.loop, self.config, self.blob_manager, sd_hash, download_directory, file_name + ) + async def add_stream(self, sd_hash: str, file_name: str, download_directory: str, status: str, claim): sd_blob = self.blob_manager.get_blob(sd_hash) if sd_blob.get_is_verified(): descriptor = await self.blob_manager.get_stream_descriptor(sd_blob.blob_hash) - downloader = StreamDownloader( - self.loop, self.config, self.blob_manager, descriptor.sd_hash, - download_directory, - file_name - ) + downloader = self.make_downloader(descriptor.sd_hash, download_directory, file_name) stream = ManagedStream( self.loop, self.blob_manager, descriptor, download_directory, @@ -96,13 +116,10 @@ class StreamManager: return await self.node.joined.wait() resumed = 0 - for stream in self.streams: - if stream.status == ManagedStream.STATUS_RUNNING: - resumed += 1 - stream.downloader.download(self.node) - self.wait_for_stream_finished(stream) + t = [self.start_stream(stream) for stream in self.streams if stream.status == ManagedStream.STATUS_RUNNING] if resumed: - log.info("resuming %i downloads", resumed) + log.info("resuming %i downloads", t) + await asyncio.gather(*t, loop=self.loop) async def reflect_streams(self): streams = list(self.streams) From c75665d3f0c4ce6d81059fcbed4d054d77ce6369 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 16:17:10 -0500 Subject: [PATCH 2/5] update file_list docs, add blobs_remaining to file list args and to file dict result --- lbrynet/extras/daemon/Daemon.py | 29 ++++++++++++++++++----------- lbrynet/stream/managed_stream.py | 5 +++++ lbrynet/stream/stream_manager.py | 6 ++++-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index c0566765f..72aae3ca6 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1308,8 +1308,9 @@ class Daemon(metaclass=JSONRPCServerType): file_list [--sd_hash=] [--file_name=] [--stream_hash=] [--rowid=] [--claim_id=] [--outpoint=] [--txid=] [--nout=] [--channel_claim_id=] [--channel_name=] - [--claim_name=] [--sort=] [--reverse] [--comparison=] - [--full_status=] + [--claim_name=] [--blobs_in_stream=] + [--blobs_remaining=] [--sort=] + [--comparison=] [--full_status=] [--reverse] Options: --sd_hash= : (str) get file with matching sd hash @@ -1322,11 +1323,12 @@ class Daemon(metaclass=JSONRPCServerType): --txid= : (str) get file with matching claim txid --nout= : (int) get file with matching claim nout --channel_claim_id= : (str) get file with matching channel claim id - --channel_name= : (str) get file with matching channel name + --channel_name= : (str) get file with matching channel name --claim_name= : (str) get file with matching claim name - --sort= : (str) sort by any property, like 'file_name' - or 'metadata.author'; to specify direction - append ',asc' or ',desc' + --blobs_in_stream : (int) get file with matching blobs in stream + --blobs_remaining= : (int) amount of remaining blobs to download + --sort= : (str) field to sort by (one of the above filter fields) + --comparison= : (str) logical comparision, (eq | ne | g | ge | l | le) Returns: (list) List of files @@ -1345,21 +1347,24 @@ class Daemon(metaclass=JSONRPCServerType): 'download_path': (str) download path of file, 'mime_type': (str) mime type of file, 'key': (str) key attached to file, - 'total_bytes': (int) file size in bytes, + 'total_bytes_lower_bound': (int) lower bound file size in bytes, + 'total_bytes': (int) file upper bound size in bytes, 'written_bytes': (int) written size in bytes, 'blobs_completed': (int) number of fully downloaded blobs, 'blobs_in_stream': (int) total blobs on stream, + 'blobs_remaining': (int) total blobs remaining to download, 'status': (str) downloader status 'claim_id': (str) None if claim is not found else the claim id, - 'outpoint': (str) None if claim is not found else the tx and output, 'txid': (str) None if claim is not found else the transaction id, 'nout': (int) None if claim is not found else the transaction output index, + 'outpoint': (str) None if claim is not found else the tx and output, 'metadata': (dict) None if claim is not found else the claim metadata, 'channel_claim_id': (str) None if claim is not found or not signed, 'channel_name': (str) None if claim is not found or not signed, 'claim_name': (str) None if claim is not found else the claim name }, ] + } """ sort = sort or 'status' comparison = comparison or 'eq' @@ -1582,8 +1587,10 @@ class Daemon(metaclass=JSONRPCServerType): log.info("already have matching stream for %s", uri) stream = existing[0] if not stream.running: - log.info("resuming download") - await self.stream_manager.start_stream(stream) + full_path = os.path.join(stream.download_directory, stream.file_name) + if not os.path.isfile(full_path): + log.info("resuming download") + await self.stream_manager.start_stream(stream) else: stream = await self.stream_manager.download_stream_from_claim( self.dht_node, resolved, file_name, timeout, fee_amount, fee_address @@ -1620,7 +1627,7 @@ class Daemon(metaclass=JSONRPCServerType): if not streams: raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] - if status == 'start' and not stream.running and not stream.finished: + if status == 'start' and not stream.running: await self.stream_manager.start_stream(stream) msg = "Resumed download" elif status == 'stop' and stream.running: diff --git a/lbrynet/stream/managed_stream.py b/lbrynet/stream/managed_stream.py index 6ca488031..237e8f42d 100644 --- a/lbrynet/stream/managed_stream.py +++ b/lbrynet/stream/managed_stream.py @@ -100,6 +100,10 @@ class ManagedStream: def sd_hash(self): return self.descriptor.sd_hash + @property + def blobs_remaining(self) -> int: + return self.blobs_in_stream - self.blobs_completed + def as_dict(self) -> typing.Dict: full_path = os.path.join(self.download_directory, self.file_name) if not os.path.isfile(full_path): @@ -130,6 +134,7 @@ class ManagedStream: 'written_bytes': written_bytes, 'blobs_completed': self.blobs_completed, 'blobs_in_stream': self.blobs_in_stream, + 'blobs_remaining': self.blobs_remaining, 'status': self.status, 'claim_id': self.claim_id, 'txid': self.txid, diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index b38a0dfc5..7756992cd 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -32,7 +32,9 @@ filter_fields = [ 'nout', 'channel_claim_id', 'channel_name', - 'full_status' + 'full_status', # TODO: remove + 'blobs_remaining', + 'blobs_in_stream' ] comparison_operators = { @@ -66,7 +68,7 @@ class StreamManager: async def start_stream(self, stream: ManagedStream): path = os.path.join(stream.download_directory, stream.file_name) - if not stream.running or not os.path.isfile(path): + if not stream.running and not os.path.isfile(path): if stream.downloader: stream.downloader.stop() stream.downloader = None From c7cb6822cced98f2397686881385d1748e22302d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 18:13:45 -0500 Subject: [PATCH 3/5] logging --- lbrynet/blob_exchange/server.py | 2 +- lbrynet/dht/node.py | 3 +-- lbrynet/dht/protocol/iterative_find.py | 4 ++-- lbrynet/dht/protocol/protocol.py | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/lbrynet/blob_exchange/server.py b/lbrynet/blob_exchange/server.py index 32cdf51fe..05614e1ce 100644 --- a/lbrynet/blob_exchange/server.py +++ b/lbrynet/blob_exchange/server.py @@ -57,7 +57,7 @@ class BlobServerProtocol(asyncio.Protocol): incoming_blob = {'blob_hash': blob.blob_hash, 'length': blob.length} responses.append(BlobDownloadResponse(incoming_blob=incoming_blob)) self.send_response(responses) - log.info("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port) + log.debug("send %s to %s:%i", blob.blob_hash[:8], peer_address, peer_port) try: sent = await blob.sendfile(self) except (ConnectionResetError, BrokenPipeError, RuntimeError, OSError): diff --git a/lbrynet/dht/node.py b/lbrynet/dht/node.py index 89053af03..004492296 100644 --- a/lbrynet/dht/node.py +++ b/lbrynet/dht/node.py @@ -79,8 +79,7 @@ class Node: if not self.protocol.external_ip: raise Exception("Cannot determine external IP") - log.info("Store to %i peers", len(peers)) - log.info(peers) + log.debug("Store to %i peers", len(peers)) for peer in peers: log.debug("store to %s %s %s", peer.address, peer.udp_port, peer.tcp_port) stored_to_tup = await asyncio.gather( diff --git a/lbrynet/dht/protocol/iterative_find.py b/lbrynet/dht/protocol/iterative_find.py index f0b905295..8fadcc91a 100644 --- a/lbrynet/dht/protocol/iterative_find.py +++ b/lbrynet/dht/protocol/iterative_find.py @@ -288,7 +288,7 @@ class IterativeNodeFinder(IterativeFinder): found = response.found and self.key != self.protocol.node_id if found: - log.info("found") + log.debug("found") return self.put_result(self.shortlist, finish=True) if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer): # log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted), @@ -302,7 +302,7 @@ class IterativeNodeFinder(IterativeFinder): log.info("limit hit") self.put_result(self.active, True) elif self.max_results and len(self.active) - len(self.yielded_peers) >= self.max_results: - log.info("max results") + log.debug("max results") self.put_result(self.active, True) diff --git a/lbrynet/dht/protocol/protocol.py b/lbrynet/dht/protocol/protocol.py index 4f65f2d6a..1c7d2cc33 100644 --- a/lbrynet/dht/protocol/protocol.py +++ b/lbrynet/dht/protocol/protocol.py @@ -601,7 +601,7 @@ class KademliaProtocol(DatagramProtocol): res = await self.get_rpc_peer(peer).store(hash_value) if res != b"OK": raise ValueError(res) - log.info("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer) + log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer) return peer.node_id, True except asyncio.TimeoutError: log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer) From efe4afd09e1a31bf7c2a00bcd0e8377a6f8872d5 Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 18:13:55 -0500 Subject: [PATCH 4/5] fix blob announce time --- lbrynet/dht/blob_announcer.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lbrynet/dht/blob_announcer.py b/lbrynet/dht/blob_announcer.py index fa4d5cc06..136499752 100644 --- a/lbrynet/dht/blob_announcer.py +++ b/lbrynet/dht/blob_announcer.py @@ -1,6 +1,7 @@ import asyncio import typing import logging +import time if typing.TYPE_CHECKING: from lbrynet.dht.node import Node from lbrynet.extras.daemon.storage import SQLiteStorage @@ -9,7 +10,8 @@ log = logging.getLogger(__name__) class BlobAnnouncer: - def __init__(self, loop: asyncio.BaseEventLoop, node: 'Node', storage: 'SQLiteStorage'): + def __init__(self, loop: asyncio.BaseEventLoop, node: 'Node', storage: 'SQLiteStorage', + time_getter: typing.Callable[[], float] = time.time): self.loop = loop self.node = node self.storage = storage @@ -17,6 +19,7 @@ class BlobAnnouncer: self.announce_task: asyncio.Task = None self.running = False self.announce_queue: typing.List[str] = [] + self.time_getter = time_getter async def _announce(self, batch_size: typing.Optional[int] = 10): if not batch_size: @@ -41,7 +44,7 @@ class BlobAnnouncer: to_await.append(batch.pop()) if to_await: await asyncio.gather(*tuple(to_await), loop=self.loop) - await self.storage.update_last_announced_blobs(announced, self.loop.time()) + await self.storage.update_last_announced_blobs(announced, self.time_getter()) log.info("announced %i blobs", len(announced)) if self.running: self.pending_call = self.loop.call_later(60, self.announce, batch_size) From e96b75a0d0f7a34a58a55502fe45a025c22a606d Mon Sep 17 00:00:00 2001 From: Jack Robison Date: Fri, 1 Feb 2019 20:46:09 -0500 Subject: [PATCH 5/5] fix file/download paths move download-from-uri logic into stream manager --- lbrynet/extras/daemon/Daemon.py | 45 ++----------- lbrynet/extras/daemon/storage.py | 8 +-- lbrynet/stream/assembler.py | 5 +- lbrynet/stream/stream_manager.py | 110 +++++++++++++++++++++++++++---- 4 files changed, 107 insertions(+), 61 deletions(-) diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 72aae3ca6..9a55b7d8d 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -20,7 +20,7 @@ from lbrynet.conf import Config, Setting, SLACK_WEBHOOK from lbrynet.blob.blob_file import is_valid_blobhash from lbrynet.blob_exchange.downloader import download_blob from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted -from lbrynet.error import NullFundsError, NegativeFundsError, ResolveError, ComponentStartConditionNotMet +from lbrynet.error import NullFundsError, NegativeFundsError, ComponentStartConditionNotMet from lbrynet.extras import system_info from lbrynet.extras.daemon import analytics from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT @@ -1557,47 +1557,12 @@ class Daemon(metaclass=JSONRPCServerType): } """ - parsed_uri = parse_lbry_uri(uri) - if parsed_uri.is_channel: - raise Exception("cannot download a channel claim, specify a /path") - - resolved = (await self.wallet_manager.resolve(uri)).get(uri, {}) - resolved = resolved if 'value' in resolved else resolved.get('claim') - - if not resolved: - raise ResolveError( - "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) - ) - if 'error' in resolved: - raise ResolveError(f"error resolving stream: {resolved['error']}") - - claim = ClaimDict.load_dict(resolved['value']) - fee_amount, fee_address = None, None - if claim.has_fee: - fee_amount = round(self.exchange_rate_manager.convert_currency( - claim.source_fee.currency, "LBC", claim.source_fee.amount - ), 5) - fee_address = claim.source_fee.address - outpoint = f"{resolved['txid']}:{resolved['nout']}" - existing = self.stream_manager.get_filtered_streams(outpoint=outpoint) - if not existing: - existing.extend(self.stream_manager.get_filtered_streams(claim_id=resolved['claim_id'], - sd_hash=claim.source_hash)) - if existing: - log.info("already have matching stream for %s", uri) - stream = existing[0] - if not stream.running: - full_path = os.path.join(stream.download_directory, stream.file_name) - if not os.path.isfile(full_path): - log.info("resuming download") - await self.stream_manager.start_stream(stream) - else: - stream = await self.stream_manager.download_stream_from_claim( - self.dht_node, resolved, file_name, timeout, fee_amount, fee_address - ) + stream = await self.stream_manager.download_stream_from_uri( + uri, self.exchange_rate_manager, file_name, timeout + ) if stream: return stream.as_dict() - raise DownloadSDTimeout(resolved['value']['stream']['source']['source']) + raise DownloadSDTimeout(uri) @requires(STREAM_MANAGER_COMPONENT) async def jsonrpc_file_set_status(self, status, **kwargs): diff --git a/lbrynet/extras/daemon/storage.py b/lbrynet/extras/daemon/storage.py index 650054e4a..fbac8f09e 100644 --- a/lbrynet/extras/daemon/storage.py +++ b/lbrynet/extras/daemon/storage.py @@ -447,10 +447,10 @@ class SQLiteStorage(SQLiteMixin): log.info("update file status %s -> %s", stream_hash, new_status) return self.db.execute("update file set status=? where stream_hash=?", (new_status, stream_hash)) - def change_file_download_dir(self, stream_hash: str, download_dir: str): - log.info("update file status %s -> %s", stream_hash, download_dir) - return self.db.execute("update file set download_directory=? where stream_hash=?", ( - binascii.hexlify(download_dir.encode()).decode(), stream_hash + def change_file_download_dir_and_file_name(self, stream_hash: str, download_dir: str, file_name: str): + return self.db.execute("update file set download_directory=?, file_name=? where stream_hash=?", ( + binascii.hexlify(download_dir.encode()).decode(), binascii.hexlify(file_name.encode()).decode(), + stream_hash )) def get_all_stream_hashes(self): diff --git a/lbrynet/stream/assembler.py b/lbrynet/stream/assembler.py index b47ba5ce1..f00f9c56e 100644 --- a/lbrynet/stream/assembler.py +++ b/lbrynet/stream/assembler.py @@ -77,12 +77,11 @@ class StreamAssembler: await self.blob_manager.blob_completed(self.sd_blob) self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_manager.blob_dir, self.sd_blob) + self.output_path = await get_next_available_file_name(self.loop, output_dir, + output_file_name or self.descriptor.suggested_file_name) if not self.got_descriptor.is_set(): self.got_descriptor.set() await self.after_got_descriptor() - self.output_path = await get_next_available_file_name(self.loop, output_dir, - output_file_name or self.descriptor.suggested_file_name) - self.stream_handle = open(self.output_path, 'wb') await self.blob_manager.storage.store_stream( self.sd_blob, self.descriptor diff --git a/lbrynet/stream/stream_manager.py b/lbrynet/stream/stream_manager.py index 7756992cd..379ee5f07 100644 --- a/lbrynet/stream/stream_manager.py +++ b/lbrynet/stream/stream_manager.py @@ -4,9 +4,11 @@ import typing import binascii import logging import random +from lbrynet.error import ResolveError from lbrynet.stream.downloader import StreamDownloader from lbrynet.stream.managed_stream import ManagedStream from lbrynet.schema.claim import ClaimDict +from lbrynet.schema.uri import parse_lbry_uri from lbrynet.schema.decode import smart_decode from lbrynet.extras.daemon.storage import lbc_to_dewies if typing.TYPE_CHECKING: @@ -15,6 +17,7 @@ if typing.TYPE_CHECKING: from lbrynet.dht.node import Node from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.extras.wallet import LbryWalletManager + from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager log = logging.getLogger(__name__) @@ -65,24 +68,44 @@ class StreamManager: claim_info = await self.storage.get_content_claim(stream.stream_hash) stream.set_claim(claim_info, smart_decode(claim_info['value'])) - async def start_stream(self, stream: ManagedStream): + async def start_stream(self, stream: ManagedStream) -> bool: + """ + Resume or rebuild a partial or completed stream + """ + path = os.path.join(stream.download_directory, stream.file_name) if not stream.running and not os.path.isfile(path): if stream.downloader: stream.downloader.stop() stream.downloader = None - if not os.path.isfile(path) and not os.path.isfile( - os.path.join(self.config.download_dir, stream.file_name)): - await self.storage.change_file_download_dir(stream.stream_hash, self.config.download_dir) + + # the directory is gone, can happen when the folder that contains a published file is deleted + # reset the download directory to the default and update the file name + if not os.path.isdir(stream.download_directory): stream.download_directory = self.config.download_dir + stream.downloader = self.make_downloader( - stream.sd_hash, stream.download_directory, stream.file_name + stream.sd_hash, stream.download_directory, stream.descriptor.suggested_file_name ) + if stream.status != ManagedStream.STATUS_FINISHED: + await self.storage.change_file_status(stream.stream_hash, 'running') + stream.update_status('running') stream.start_download(self.node) - await self.storage.change_file_status(stream.stream_hash, 'running') - stream.update_status('running') + try: + await asyncio.wait_for(self.loop.create_task(stream.downloader.got_descriptor.wait()), + self.config.download_timeout) + except asyncio.TimeoutError: + stream.stop_download() + stream.downloader = None + return False + file_name = os.path.basename(stream.downloader.output_path) + await self.storage.change_file_download_dir_and_file_name( + stream.stream_hash, self.config.download_dir, file_name + ) self.wait_for_stream_finished(stream) + return True + return True def make_downloader(self, sd_hash: str, download_directory: str, file_name: str): return StreamDownloader( @@ -204,18 +227,20 @@ class StreamManager: downloader.stop() log.info("stopped stream") return + file_name = os.path.basename(downloader.output_path) + download_directory = os.path.dirname(downloader.output_path) if not await self.blob_manager.storage.stream_exists(downloader.sd_hash): await self.blob_manager.storage.store_stream(downloader.sd_blob, downloader.descriptor) if not await self.blob_manager.storage.file_exists(downloader.sd_hash): await self.blob_manager.storage.save_downloaded_file( - downloader.descriptor.stream_hash, os.path.basename(downloader.output_path), download_directory, + downloader.descriptor.stream_hash, file_name, download_directory, 0.0 ) await self.blob_manager.storage.save_content_claim( downloader.descriptor.stream_hash, f"{claim_info['txid']}:{claim_info['nout']}" ) stream = ManagedStream(self.loop, self.blob_manager, downloader.descriptor, download_directory, - os.path.basename(downloader.output_path), downloader, ManagedStream.STATUS_RUNNING) + file_name, downloader, ManagedStream.STATUS_RUNNING) stream.set_claim(claim_info, claim) self.streams.add(stream) try: @@ -230,18 +255,18 @@ class StreamManager: file_name: typing.Optional[str] = None, timeout: typing.Optional[float] = 60, fee_amount: typing.Optional[float] = 0.0, - fee_address: typing.Optional[str] = None) -> typing.Optional[ManagedStream]: + fee_address: typing.Optional[str] = None, + should_pay: typing.Optional[bool] = True) -> typing.Optional[ManagedStream]: log.info("get lbry://%s#%s", claim_info['name'], claim_info['claim_id']) claim = ClaimDict.load_dict(claim_info['value']) - if fee_address and fee_amount: - if fee_amount > await self.wallet.default_account.get_balance(): - raise Exception("not enough funds") sd_hash = claim.source_hash.decode() if sd_hash in self.starting_streams: return await self.starting_streams[sd_hash] already_started = tuple(filter(lambda s: s.descriptor.sd_hash == sd_hash, self.streams)) if already_started: return already_started[0] + if should_pay and fee_address and fee_amount and fee_amount > await self.wallet.default_account.get_balance(): + raise Exception("not enough funds") self.starting_streams[sd_hash] = asyncio.Future(loop=self.loop) stream_task = self.loop.create_task( @@ -251,7 +276,7 @@ class StreamManager: await asyncio.wait_for(stream_task, timeout or self.config.download_timeout) stream = await stream_task self.starting_streams[sd_hash].set_result(stream) - if fee_address and fee_amount: + if should_pay and fee_address and fee_amount: await self.wallet.send_amount_to_address(lbc_to_dewies(str(fee_amount)), fee_address.encode('latin1')) return stream except (asyncio.TimeoutError, asyncio.CancelledError): @@ -301,3 +326,60 @@ class StreamManager: if reverse: streams.reverse() return streams + + async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', + file_name: typing.Optional[str] = None, + timeout: typing.Optional[float] = None) -> typing.Optional[ManagedStream]: + timeout = timeout or self.config.download_timeout + parsed_uri = parse_lbry_uri(uri) + if parsed_uri.is_channel: + raise Exception("cannot download a channel claim, specify a /path") + + resolved = (await self.wallet.resolve(uri)).get(uri, {}) + resolved = resolved if 'value' in resolved else resolved.get('claim') + + if not resolved: + raise ResolveError( + "Failed to resolve stream at lbry://{}".format(uri.replace("lbry://", "")) + ) + if 'error' in resolved: + raise ResolveError(f"error resolving stream: {resolved['error']}") + + claim = ClaimDict.load_dict(resolved['value']) + fee_amount, fee_address = None, None + if claim.has_fee: + fee_amount = round(exchange_rate_manager.convert_currency( + claim.source_fee.currency, "LBC", claim.source_fee.amount + ), 5) + fee_address = claim.source_fee.address + outpoint = f"{resolved['txid']}:{resolved['nout']}" + existing = self.get_filtered_streams(outpoint=outpoint) + + if not existing: + existing.extend(self.get_filtered_streams(sd_hash=claim.source_hash.decode())) + if existing and existing[0].claim_id != resolved['claim_id']: + raise Exception(f"stream for {existing[0].claim_id} collides with existing " + f"download {resolved['claim_id']}") + elif not existing: + existing.extend(self.get_filtered_streams(claim_id=resolved['claim_id'])) + if existing and existing[0].sd_hash != claim.source_hash.decode(): + log.info("claim contains an update to a stream we have, downloading it") + stream = await self.download_stream_from_claim( + self.node, resolved, file_name, timeout, fee_amount, fee_address, False + ) + log.info("started new stream, deleting old one") + await self.delete_stream(existing[0]) + return stream + elif existing: + log.info("already have matching stream for %s", uri) + stream = existing[0] + await self.start_stream(stream) + return stream + else: + stream = existing[0] + await self.start_stream(stream) + return stream + log.info("download stream from %s", uri) + return await self.download_stream_from_claim( + self.node, resolved, file_name, timeout, fee_amount, fee_address + )