diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 1ac385c60..de43eb926 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -17,6 +17,7 @@ from lbry.dht.blob_announcer import BlobAnnouncer from lbry.blob.blob_manager import BlobManager from lbry.blob_exchange.server import BlobServer from lbry.stream.stream_manager import StreamManager +from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage @@ -331,17 +332,17 @@ class StreamManagerComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.stream_manager: typing.Optional[StreamManager] = None + self.file_manager: typing.Optional[FileManager] = None @property - def component(self) -> typing.Optional[StreamManager]: - return self.stream_manager + def component(self) -> typing.Optional[FileManager]: + return self.file_manager async def get_status(self): - if not self.stream_manager: + if not self.file_manager: return return { - 'managed_files': len(self.stream_manager._sources), + 'managed_files': len(self.file_manager._sources), } async def start(self): @@ -352,14 +353,17 @@ class StreamManagerComponent(Component): if self.component_manager.has_component(DHT_COMPONENT) else None log.info('Starting the file manager') loop = asyncio.get_event_loop() - self.stream_manager = StreamManager( + self.file_manager = FileManager( + loop, self.conf, wallet, storage, self.component_manager.analytics_manager + ) + self.file_manager.source_managers['stream'] = StreamManager( loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager ) - await self.stream_manager.start() + await self.file_manager.start() log.info('Done setting up file manager') async def stop(self): - self.stream_manager.stop() + self.file_manager.stop() class TorrentComponent(Component): @@ -370,7 +374,7 @@ class TorrentComponent(Component): self.torrent_session = None @property - def component(self) -> typing.Optional[StreamManager]: + def component(self) -> typing.Optional[TorrentSession]: return self.torrent_session async def get_status(self): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 5c84f8227..276e1693e 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1103,7 +1103,7 @@ class Daemon(metaclass=JSONRPCServerType): if download_directory and not os.path.isdir(download_directory): return {"error": f"specified download directory \"{download_directory}\" does not exist"} try: - stream = await self.stream_manager.download_stream_from_uri( + stream = await self.stream_manager.download_from_uri( uri, self.exchange_rate_manager, timeout, file_name, download_directory, save_file=save_file, wallet=wallet ) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 11a61e45e..426985a48 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -9,7 +9,7 @@ from typing import Optional from lbry.wallet import SQLiteMixin from lbry.conf import Config from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies -from lbry.wallet.transaction import Transaction +from lbry.wallet.transaction import Transaction, Output from lbry.schema.claim import Claim from lbry.dht.constants import DATA_EXPIRATION from lbry.blob.blob_info import BlobInfo @@ -727,6 +727,19 @@ class SQLiteStorage(SQLiteMixin): if claim_id_to_supports: await self.save_supports(claim_id_to_supports) + def save_claim_from_output(self, ledger, output: Output): + return self.save_claims([{ + "claim_id": output.claim_id, + "name": output.claim_name, + "amount": dewies_to_lbc(output.amount), + "address": output.get_address(ledger), + "txid": output.tx_ref.id, + "nout": output.position, + "value": output.claim, + "height": -1, + "claim_sequence": -1, + }]) + def save_claims_for_resolve(self, claim_infos): to_save = {} for info in claim_infos: diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index dc95829d2..443c94a69 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -7,6 +7,7 @@ from typing import Optional from aiohttp.web import Request from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError +from lbry.stream.managed_stream import ManagedStream from lbry.utils import cache_concurrent from lbry.schema.claim import Claim from lbry.schema.url import URL @@ -93,14 +94,11 @@ class FileManager: if 'error' in resolved_result: raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}") - await self.storage.save_claims( - resolved_result, self.wallet_manager.ledger - ) - txo = resolved_result[uri] claim = txo.claim outpoint = f"{txo.tx_ref.id}:{txo.position}" resolved_time = self.loop.time() - start_time + await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo) #################### # update or replace @@ -113,6 +111,7 @@ class FileManager: # resume or update an existing stream, if the stream changed: download it and delete the old one after existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash) + to_replace, updated_stream = None, None if existing and existing[0].claim_id != txo.claim_id: raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}") if existing: @@ -121,7 +120,7 @@ class FileManager: existing[0].stream_hash, outpoint ) await source_manager._update_content_claim(existing[0]) - return existing[0] + updated_stream = existing[0] else: existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id) if existing_for_claim_id: @@ -133,14 +132,19 @@ class FileManager: await existing_for_claim_id[0].save_file( file_name=file_name, download_directory=download_directory, node=self.node ) - return existing_for_claim_id[0] - - - - - + to_replace = existing_for_claim_id[0] + # resume or update an existing stream, if the stream changed: download it and delete the old one after if updated_stream: + log.info("already have stream for %s", uri) + if save_file and updated_stream.output_file_exists: + save_file = False + await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) + if not updated_stream.output_file_exists and (save_file or file_name or download_directory): + await updated_stream.save_file( + file_name=file_name, download_directory=download_directory, node=self.node + ) + return updated_stream #################### @@ -156,23 +160,25 @@ class FileManager: # make downloader and wait for start #################### - stream = ManagedStream( - self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, - analytics_manager=self.analytics_manager - ) + if not claim.stream.source.bt_infohash: + stream = ManagedStream( + self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, download_directory, + file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, + analytics_manager=self.analytics_manager + ) + else: + stream = None log.info("starting download for %s", uri) before_download = self.loop.time() - await stream.start(self.node, timeout) - stream.set_claim(resolved, claim) + await stream.start(source_manager.node, timeout) #################### # success case: delete to_replace if applicable, broadcast fee payment #################### if to_replace: # delete old stream now that the replacement has started downloading - await self.delete(to_replace) + await source_manager.delete(to_replace) if payment is not None: await self.wallet_manager.broadcast_or_release(payment) @@ -180,12 +186,11 @@ class FileManager: log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) - self._sources[stream.sd_hash] = stream - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) + source_manager.add(stream) await self.storage.save_content_claim(stream.stream_hash, outpoint) if save_file: - await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), + await asyncio.wait_for(stream.save_file(node=source_manager.node), timeout - (self.loop.time() - before_download), loop=self.loop) return stream except asyncio.TimeoutError: @@ -232,8 +237,7 @@ class FileManager: async def stream_partial_content(self, request: Request, sd_hash: str): return await self._sources[sd_hash].stream_file(request, self.node) - def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, - comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]: + def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: """ Get a list of filtered and sorted ManagedStream objects @@ -242,30 +246,30 @@ class FileManager: :param comparison: comparison operator used for filtering :param search_by: fields and values to filter by """ - if sort_by and sort_by not in self.filter_fields: - raise ValueError(f"'{sort_by}' is not a valid field to sort by") - if comparison and comparison not in comparison_operators: - raise ValueError(f"'{comparison}' is not a valid comparison") - if 'full_status' in search_by: - del search_by['full_status'] - for search in search_by.keys(): - if search not in self.filter_fields: - raise ValueError(f"'{search}' is not a valid search operation") - if search_by: - comparison = comparison or 'eq' - sources = [] - for stream in self._sources.values(): - for search, val in search_by.items(): - if comparison_operators[comparison](getattr(stream, search), val): - sources.append(stream) - break + return sum(*(manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) + + async def _check_update_or_replace( + self, outpoint: str, claim_id: str, claim: Claim + ) -> typing.Tuple[Optional[ManagedDownloadSource], Optional[ManagedDownloadSource]]: + existing = self.get_filtered(outpoint=outpoint) + if existing: + return existing[0], None + existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash) + if existing and existing[0].claim_id != claim_id: + raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}") + if existing: + log.info("claim contains a metadata only update to a stream we have") + await self.storage.save_content_claim( + existing[0].stream_hash, outpoint + ) + await self._update_content_claim(existing[0]) + return existing[0], None else: - sources = list(self._sources.values()) - if sort_by: - sources.sort(key=lambda s: getattr(s, sort_by)) - if reverse: - sources.reverse() - return sources + existing_for_claim_id = self.get_filtered(claim_id=claim_id) + if existing_for_claim_id: + log.info("claim contains an update to a stream we have, downloading it") + return None, existing_for_claim_id[0] + return None, None diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 58035e174..d5fc9202b 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -6,16 +6,10 @@ import random import typing from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError -from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError -from lbry.utils import cache_concurrent +from lbry.error import InvalidStreamDescriptorError +from lbry.file.source_manager import SourceManager from lbry.stream.descriptor import StreamDescriptor from lbry.stream.managed_stream import ManagedStream -from lbry.schema.claim import Claim -from lbry.schema.url import URL -from lbry.wallet.dewies import dewies_to_lbc -from lbry.wallet import Output -from lbry.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource if typing.TYPE_CHECKING: from lbry.conf import Config @@ -25,10 +19,6 @@ if typing.TYPE_CHECKING: from lbry.wallet.transaction import Transaction from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim - from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager - from lbry.wallet.transaction import Transaction - from lbry.wallet.manager import WalletManager - from lbry.wallet.wallet import Wallet log = logging.getLogger(__name__)