lbry-sdk/lbry/file/file_manager.py

287 lines
14 KiB
Python
Raw Normal View History

2020-01-15 10:19:29 -05:00
import asyncio
import logging
import typing
from typing import Optional
from aiohttp.web import Request
2020-01-28 22:37:52 -03:00
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
2020-01-15 10:19:29 -05:00
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
2020-01-27 02:10:55 -03:00
from lbry.stream.managed_stream import ManagedStream
2020-02-05 12:29:26 -03:00
from lbry.torrent.torrent_manager import TorrentSource
2020-01-15 10:19:29 -05:00
from lbry.utils import cache_concurrent
from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
2020-02-10 23:15:18 -03:00
from lbry.wallet import WalletManager, Output
2020-01-15 10:19:29 -05:00
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
class FileManager:
2020-01-28 22:37:52 -03:00
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager',
2020-01-15 10:19:29 -05:00
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.wallet_manager = wallet_manager
self.storage = storage
self.analytics_manager = analytics_manager
self.source_managers: typing.Dict[str, SourceManager] = {}
2020-01-28 21:24:05 -03:00
self.started = asyncio.Event()
@property
def streams(self):
return self.source_managers['stream']._sources
async def create_stream(self, file_path: str, key: Optional[bytes] = None, **kwargs) -> ManagedDownloadSource:
if 'stream' in self.source_managers:
return await self.source_managers['stream'].create(file_path, key, **kwargs)
raise NotImplementedError
2020-01-15 10:19:29 -05:00
async def start(self):
await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values()))
2020-01-28 21:24:05 -03:00
for manager in self.source_managers.values():
await manager.started.wait()
self.started.set()
2020-01-15 10:19:29 -05:00
def stop(self):
2020-01-28 21:24:05 -03:00
for manager in self.source_managers.values():
# fixme: pop or not?
manager.stop()
self.started.clear()
2020-01-15 10:19:29 -05:00
@cache_concurrent
async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: Optional[float] = None, file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
wallet = wallet or self.wallet_manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
2020-02-10 21:50:16 -03:00
claim = None
2020-01-15 10:19:29 -05:00
error = None
outpoint = None
if save_file is None:
save_file = self.config.save_files
if file_name and not save_file:
save_file = True
if save_file:
download_directory = download_directory or self.config.download_dir
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
resolved_result = await asyncio.wait_for(
self.wallet_manager.ledger.resolve(wallet.accounts, [uri]),
resolve_timeout
)
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if 'error' in resolved_result:
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
2020-01-29 13:49:14 -03:00
if not resolved_result or uri not in resolved_result:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
2020-01-15 10:19:29 -05:00
txo = resolved_result[uri]
2020-02-07 12:32:39 -03:00
if isinstance(txo, dict):
raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}")
2020-01-15 10:19:29 -05:00
claim = txo.claim
outpoint = f"{txo.tx_ref.id}:{txo.position}"
resolved_time = self.loop.time() - start_time
2020-01-27 02:10:55 -03:00
await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo)
2020-01-15 10:19:29 -05:00
####################
# update or replace
####################
if claim.stream.source.bt_infohash:
source_manager = self.source_managers['torrent']
2020-02-05 12:29:26 -03:00
existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash)
2020-01-15 10:19:29 -05:00
else:
source_manager = self.source_managers['stream']
2020-02-05 12:29:26 -03:00
existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash)
2020-01-15 10:19:29 -05:00
# resume or update an existing stream, if the stream changed: download it and delete the old one after
2020-01-27 02:10:55 -03:00
to_replace, updated_stream = None, None
2020-01-15 10:19:29 -05:00
if existing and existing[0].claim_id != txo.claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
2020-02-25 18:18:48 -03:00
if claim.stream.source.bt_infohash:
await self.storage.save_torrent_content_claim(
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
existing[0].set_claim(claim_info, claim)
else:
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await source_manager._update_content_claim(existing[0])
2020-01-27 02:10:55 -03:00
updated_stream = existing[0]
2020-01-15 10:19:29 -05:00
else:
existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
if save_file and existing_for_claim_id[0].output_file_exists:
save_file = False
2020-01-28 22:37:52 -03:00
await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file)
if not existing_for_claim_id[0].output_file_exists and (
save_file or file_name or download_directory):
2020-01-15 10:19:29 -05:00
await existing_for_claim_id[0].save_file(
2020-01-28 21:24:05 -03:00
file_name=file_name, download_directory=download_directory
2020-01-15 10:19:29 -05:00
)
2020-01-27 02:10:55 -03:00
to_replace = existing_for_claim_id[0]
2020-01-15 10:19:29 -05:00
2020-01-27 02:10:55 -03:00
# resume or update an existing stream, if the stream changed: download it and delete the old one after
2020-01-15 10:19:29 -05:00
if updated_stream:
2020-01-27 02:10:55 -03:00
log.info("already have stream for %s", uri)
if save_file and updated_stream.output_file_exists:
save_file = False
2020-01-28 21:24:05 -03:00
await updated_stream.start(timeout=timeout, save_now=save_file)
2020-01-27 02:10:55 -03:00
if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
await updated_stream.save_file(
2020-01-28 21:24:05 -03:00
file_name=file_name, download_directory=download_directory
2020-01-27 02:10:55 -03:00
)
return updated_stream
2020-01-15 10:19:29 -05:00
####################
# pay fee
####################
if not to_replace and txo.has_price and not txo.purchase_receipt:
2020-01-28 21:24:05 -03:00
payment = await self.wallet_manager.create_purchase_transaction(
2020-01-15 10:19:29 -05:00
wallet.accounts, txo, exchange_rate_manager
)
####################
# make downloader and wait for start
####################
2020-01-27 02:10:55 -03:00
if not claim.stream.source.bt_infohash:
2020-01-29 13:49:14 -03:00
# fixme: this shouldnt be here
2020-01-27 02:10:55 -03:00
stream = ManagedStream(
2020-01-28 22:37:52 -03:00
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash,
download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
2020-01-27 02:10:55 -03:00
analytics_manager=self.analytics_manager
)
2020-01-29 13:49:14 -03:00
stream.downloader.node = source_manager.node
2020-01-27 02:10:55 -03:00
else:
2020-02-05 12:29:26 -03:00
stream = TorrentSource(
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
file_name=file_name, download_directory=download_directory or self.config.download_dir,
status=ManagedStream.STATUS_RUNNING,
2020-02-07 12:32:39 -03:00
analytics_manager=self.analytics_manager,
2020-02-05 12:29:26 -03:00
torrent_session=source_manager.torrent_session
)
2020-01-15 10:19:29 -05:00
log.info("starting download for %s", uri)
before_download = self.loop.time()
2020-01-28 21:24:05 -03:00
await stream.start(timeout, save_file)
2020-01-15 10:19:29 -05:00
####################
# success case: delete to_replace if applicable, broadcast fee payment
####################
if to_replace: # delete old stream now that the replacement has started downloading
2020-01-27 02:10:55 -03:00
await source_manager.delete(to_replace)
2020-01-15 10:19:29 -05:00
if payment is not None:
await self.wallet_manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
2020-01-27 02:10:55 -03:00
source_manager.add(stream)
2020-01-15 10:19:29 -05:00
2020-02-10 21:50:16 -03:00
if not claim.stream.source.bt_infohash:
await self.storage.save_content_claim(stream.stream_hash, outpoint)
2020-02-10 23:15:18 -03:00
else:
await self.storage.save_torrent_content_claim(
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
2020-01-15 10:19:29 -05:00
if save_file:
2020-01-28 21:24:05 -03:00
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download),
2020-01-15 10:19:29 -05:00
loop=self.loop)
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
elif isinstance(err, asyncio.CancelledError):
pass
else:
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await self.wallet_manager.ledger.release_tx(payment)
2020-02-10 21:50:16 -03:00
if self.analytics_manager and claim and claim.stream.source.bt_infohash:
# TODO: analytics for torrents
pass
elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
2020-02-14 13:23:33 -03:00
stream.downloader.time_to_first_bytes))):
2020-01-15 10:19:29 -05:00
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
uri, outpoint,
None if not stream else len(stream.downloader.blob_downloader.active_connections),
None if not stream else len(stream.downloader.blob_downloader.scores),
None if not stream else len(stream.downloader.blob_downloader.connection_failures),
False if not stream else stream.downloader.added_fixed_peers,
self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
None if not stream else stream.sd_hash,
None if not stream else stream.downloader.time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__,
None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}"
)
)
async def stream_partial_content(self, request: Request, sd_hash: str):
2020-01-28 21:24:05 -03:00
return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
2020-01-15 10:19:29 -05:00
2020-01-27 02:10:55 -03:00
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
2020-01-15 10:19:29 -05:00
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
2020-01-28 21:24:05 -03:00
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
2020-01-27 02:10:55 -03:00
2020-01-28 21:24:05 -03:00
async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values():
return await manager.delete(source, delete_file)