lbry-sdk/lbry/file/file_manager.py
Victor Shyba 7e1795b7b1 wip
2020-03-10 20:25:15 -03:00

439 lines
22 KiB
Python

import time
import asyncio
import binascii
import logging
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.stream.managed_stream import ManagedStream
from lbry.utils import cache_concurrent
from lbry.schema.claim import Claim
from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet.transaction import Output
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import LbryWalletManager
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
def path_or_none(p) -> Optional[str]:
if not p:
return
return binascii.unhexlify(p).decode()
class FileManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'LbryWalletManager',
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.wallet_manager = wallet_manager
self.storage = storage
self.analytics_manager = analytics_manager
self.source_managers: typing.Dict[str, SourceManager] = {}
async def start(self):
await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values()))
def stop(self):
while self.source_managers:
_, source_manager = self.source_managers.popitem()
source_manager.stop()
@cache_concurrent
async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: Optional[float] = None, file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
wallet = wallet or self.wallet_manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
error = None
outpoint = None
if save_file is None:
save_file = self.config.save_files
if file_name and not save_file:
save_file = True
if save_file:
download_directory = download_directory or self.config.download_dir
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
resolved_result = await asyncio.wait_for(
self.wallet_manager.ledger.resolve(wallet.accounts, [uri]),
resolve_timeout
)
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if not resolved_result:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
if 'error' in resolved_result:
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
txo = resolved_result[uri]
claim = txo.claim
outpoint = f"{txo.tx_ref.id}:{txo.position}"
resolved_time = self.loop.time() - start_time
await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo)
####################
# update or replace
####################
if claim.stream.source.bt_infohash:
source_manager = self.source_managers['torrent']
else:
source_manager = self.source_managers['stream']
# resume or update an existing stream, if the stream changed: download it and delete the old one after
existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash)
to_replace, updated_stream = None, None
if existing and existing[0].claim_id != txo.claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await source_manager._update_content_claim(existing[0])
updated_stream = existing[0]
else:
existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
if save_file and existing_for_claim_id[0].output_file_exists:
save_file = False
await existing_for_claim_id[0].start(node=self.node, timeout=timeout, save_now=save_file)
if not existing_for_claim_id[0].output_file_exists and (save_file or file_name or download_directory):
await existing_for_claim_id[0].save_file(
file_name=file_name, download_directory=download_directory, node=self.node
)
to_replace = existing_for_claim_id[0]
# resume or update an existing stream, if the stream changed: download it and delete the old one after
if updated_stream:
log.info("already have stream for %s", uri)
if save_file and updated_stream.output_file_exists:
save_file = False
await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file)
if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
await updated_stream.save_file(
file_name=file_name, download_directory=download_directory, node=self.node
)
return updated_stream
####################
# pay fee
####################
if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await manager.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager
)
####################
# make downloader and wait for start
####################
if not claim.stream.source.bt_infohash:
stream = ManagedStream(
self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, download_directory,
file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
else:
stream = None
log.info("starting download for %s", uri)
before_download = self.loop.time()
await stream.start(source_manager.node, timeout)
####################
# success case: delete to_replace if applicable, broadcast fee payment
####################
if to_replace: # delete old stream now that the replacement has started downloading
await source_manager.delete(to_replace)
if payment is not None:
await self.wallet_manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
source_manager.add(stream)
await self.storage.save_content_claim(stream.stream_hash, outpoint)
if save_file:
await asyncio.wait_for(stream.save_file(node=source_manager.node), timeout - (self.loop.time() - before_download),
loop=self.loop)
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
elif isinstance(err, asyncio.CancelledError):
pass
else:
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await self.wallet_manager.ledger.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
uri, outpoint,
None if not stream else len(stream.downloader.blob_downloader.active_connections),
None if not stream else len(stream.downloader.blob_downloader.scores),
None if not stream else len(stream.downloader.blob_downloader.connection_failures),
False if not stream else stream.downloader.added_fixed_peers,
self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
None if not stream else stream.sd_hash,
None if not stream else stream.downloader.time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__,
None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}"
)
)
async def stream_partial_content(self, request: Request, sd_hash: str):
return await self._sources[sd_hash].stream_file(request, self.node)
def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
return sum(*(manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
async def _check_update_or_replace(
self, outpoint: str, claim_id: str, claim: Claim
) -> typing.Tuple[Optional[ManagedDownloadSource], Optional[ManagedDownloadSource]]:
existing = self.get_filtered(outpoint=outpoint)
if existing:
return existing[0], None
existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash)
if existing and existing[0].claim_id != claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await self._update_content_claim(existing[0])
return existing[0], None
else:
existing_for_claim_id = self.get_filtered(claim_id=claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
return None, existing_for_claim_id[0]
return None, None
# @cache_concurrent
# async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
# timeout: Optional[float] = None, file_name: Optional[str] = None,
# download_directory: Optional[str] = None,
# save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
# wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
# wallet = wallet or self.wallet_manager.default_wallet
# timeout = timeout or self.config.download_timeout
# start_time = self.loop.time()
# resolved_time = None
# stream = None
# txo: Optional[Output] = None
# error = None
# outpoint = None
# if save_file is None:
# save_file = self.config.save_files
# if file_name and not save_file:
# save_file = True
# if save_file:
# download_directory = download_directory or self.config.download_dir
# else:
# download_directory = None
#
# payment = None
# try:
# # resolve the claim
# if not URL.parse(uri).has_stream:
# raise ResolveError("cannot download a channel claim, specify a /path")
# try:
# response = await asyncio.wait_for(
# self.wallet_manager.ledger.resolve(wallet.accounts, [uri]),
# resolve_timeout
# )
# resolved_result = {}
# for url, txo in response.items():
# if isinstance(txo, Output):
# tx_height = txo.tx_ref.height
# best_height = self.wallet_manager.ledger.headers.height
# resolved_result[url] = {
# 'name': txo.claim_name,
# 'value': txo.claim,
# 'protobuf': binascii.hexlify(txo.claim.to_bytes()),
# 'claim_id': txo.claim_id,
# 'txid': txo.tx_ref.id,
# 'nout': txo.position,
# 'amount': dewies_to_lbc(txo.amount),
# 'effective_amount': txo.meta.get('effective_amount', 0),
# 'height': tx_height,
# 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
# 'claim_sequence': -1,
# 'address': txo.get_address(self.wallet_manager.ledger),
# 'valid_at_height': txo.meta.get('activation_height', None),
# 'timestamp': self.wallet_manager.ledger.headers[tx_height]['timestamp'],
# 'supports': []
# }
# else:
# resolved_result[url] = txo
# except asyncio.TimeoutError:
# raise ResolveTimeoutError(uri)
# except Exception as err:
# if isinstance(err, asyncio.CancelledError):
# raise
# log.exception("Unexpected error resolving stream:")
# raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
# await self.storage.save_claims_for_resolve([
# value for value in resolved_result.values() if 'error' not in value
# ])
#
# resolved = resolved_result.get(uri, {})
# resolved = resolved if 'value' in resolved else resolved.get('claim')
# if not resolved:
# raise ResolveError(f"Failed to resolve stream at '{uri}'")
# if 'error' in resolved:
# raise ResolveError(f"error resolving stream: {resolved['error']}")
# txo = response[uri]
#
# claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
# outpoint = f"{resolved['txid']}:{resolved['nout']}"
# resolved_time = self.loop.time() - start_time
#
# # resume or update an existing stream, if the stream changed: download it and delete the old one after
# updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
# if updated_stream:
# log.info("already have stream for %s", uri)
# if save_file and updated_stream.output_file_exists:
# save_file = False
# await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file)
# if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
# await updated_stream.save_file(
# file_name=file_name, download_directory=download_directory, node=self.node
# )
# return updated_stream
#
# if not to_replace and txo.has_price and not txo.purchase_receipt:
# payment = await manager.create_purchase_transaction(
# wallet.accounts, txo, exchange_rate_manager
# )
#
# stream = ManagedStream(
# self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
# file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
# analytics_manager=self.analytics_manager
# )
# log.info("starting download for %s", uri)
#
# before_download = self.loop.time()
# await stream.start(self.node, timeout)
# stream.set_claim(resolved, claim)
# if to_replace: # delete old stream now that the replacement has started downloading
# await self.delete(to_replace)
#
# if payment is not None:
# await manager.broadcast_or_release(payment)
# payment = None # to avoid releasing in `finally` later
# log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
# await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
#
# self._sources[stream.sd_hash] = stream
# self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
# await self.storage.save_content_claim(stream.stream_hash, outpoint)
# if save_file:
# await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download),
# loop=self.loop)
# return stream
# except asyncio.TimeoutError:
# error = DownloadDataTimeoutError(stream.sd_hash)
# raise error
# except Exception as err: # forgive data timeout, don't delete stream
# expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
# KeyFeeAboveMaxAllowedError)
# if isinstance(err, expected):
# log.warning("Failed to download %s: %s", uri, str(err))
# elif isinstance(err, asyncio.CancelledError):
# pass
# else:
# log.exception("Unexpected error downloading stream:")
# error = err
# raise
# finally:
# if payment is not None:
# # payment is set to None after broadcasting, if we're here an exception probably happened
# await manager.ledger.release_tx(payment)
# if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
# stream.downloader.time_to_first_bytes))):
# server = self.wallet_manager.ledger.network.client.server
# self.loop.create_task(
# self.analytics_manager.send_time_to_first_bytes(
# resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
# uri, outpoint,
# None if not stream else len(stream.downloader.blob_downloader.active_connections),
# None if not stream else len(stream.downloader.blob_downloader.scores),
# None if not stream else len(stream.downloader.blob_downloader.connection_failures),
# False if not stream else stream.downloader.added_fixed_peers,
# self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
# None if not stream else stream.sd_hash,
# None if not stream else stream.downloader.time_to_descriptor,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
# None if not stream else stream.downloader.time_to_first_bytes,
# None if not error else error.__class__.__name__,
# None if not error else str(error),
# None if not server else f"{server[0]}:{server[1]}"
# )
# )