file manager refactor

This commit is contained in:
Jack Robison 2020-01-15 10:19:29 -05:00 committed by Victor Shyba
parent c9b868a238
commit 404f0dcfe0

435
lbry/file/file_manager.py Normal file
View file

@ -0,0 +1,435 @@
import time
import asyncio
import binascii
import logging
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.utils import cache_concurrent
from lbry.schema.claim import Claim
from lbry.schema.url import URL
from lbry.wallet.dewies import dewies_to_lbc
from lbry.wallet.transaction import Output
from lbry.file.source_manager import SourceManager
from lbry.file.source import ManagedDownloadSource
if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.wallet import LbryWalletManager
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
log = logging.getLogger(__name__)
def path_or_none(p) -> Optional[str]:
if not p:
return
return binascii.unhexlify(p).decode()
class FileManager:
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'LbryWalletManager',
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
self.loop = loop
self.config = config
self.wallet_manager = wallet_manager
self.storage = storage
self.analytics_manager = analytics_manager
self.source_managers: typing.Dict[str, SourceManager] = {}
async def start(self):
await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values()))
def stop(self):
while self.source_managers:
_, source_manager = self.source_managers.popitem()
source_manager.stop()
@cache_concurrent
async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
timeout: Optional[float] = None, file_name: Optional[str] = None,
download_directory: Optional[str] = None,
save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
wallet = wallet or self.wallet_manager.default_wallet
timeout = timeout or self.config.download_timeout
start_time = self.loop.time()
resolved_time = None
stream = None
error = None
outpoint = None
if save_file is None:
save_file = self.config.save_files
if file_name and not save_file:
save_file = True
if save_file:
download_directory = download_directory or self.config.download_dir
else:
download_directory = None
payment = None
try:
# resolve the claim
if not URL.parse(uri).has_stream:
raise ResolveError("cannot download a channel claim, specify a /path")
try:
resolved_result = await asyncio.wait_for(
self.wallet_manager.ledger.resolve(wallet.accounts, [uri]),
resolve_timeout
)
except asyncio.TimeoutError:
raise ResolveTimeoutError(uri)
except Exception as err:
if isinstance(err, asyncio.CancelledError):
raise
log.exception("Unexpected error resolving stream:")
raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
if not resolved_result:
raise ResolveError(f"Failed to resolve stream at '{uri}'")
if 'error' in resolved_result:
raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}")
await self.storage.save_claims(
resolved_result, self.wallet_manager.ledger
)
txo = resolved_result[uri]
claim = txo.claim
outpoint = f"{txo.tx_ref.id}:{txo.position}"
resolved_time = self.loop.time() - start_time
####################
# update or replace
####################
if claim.stream.source.bt_infohash:
source_manager = self.source_managers['torrent']
else:
source_manager = self.source_managers['stream']
# resume or update an existing stream, if the stream changed: download it and delete the old one after
existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash)
if existing and existing[0].claim_id != txo.claim_id:
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
if existing:
log.info("claim contains a metadata only update to a stream we have")
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
)
await source_manager._update_content_claim(existing[0])
return existing[0]
else:
existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id)
if existing_for_claim_id:
log.info("claim contains an update to a stream we have, downloading it")
if save_file and existing_for_claim_id[0].output_file_exists:
save_file = False
await existing_for_claim_id[0].start(node=self.node, timeout=timeout, save_now=save_file)
if not existing_for_claim_id[0].output_file_exists and (save_file or file_name or download_directory):
await existing_for_claim_id[0].save_file(
file_name=file_name, download_directory=download_directory, node=self.node
)
return existing_for_claim_id[0]
if updated_stream:
####################
# pay fee
####################
if not to_replace and txo.has_price and not txo.purchase_receipt:
payment = await manager.create_purchase_transaction(
wallet.accounts, txo, exchange_rate_manager
)
####################
# make downloader and wait for start
####################
stream = ManagedStream(
self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
analytics_manager=self.analytics_manager
)
log.info("starting download for %s", uri)
before_download = self.loop.time()
await stream.start(self.node, timeout)
stream.set_claim(resolved, claim)
####################
# success case: delete to_replace if applicable, broadcast fee payment
####################
if to_replace: # delete old stream now that the replacement has started downloading
await self.delete(to_replace)
if payment is not None:
await self.wallet_manager.broadcast_or_release(payment)
payment = None # to avoid releasing in `finally` later
log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
self._sources[stream.sd_hash] = stream
self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
await self.storage.save_content_claim(stream.stream_hash, outpoint)
if save_file:
await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download),
loop=self.loop)
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
elif isinstance(err, asyncio.CancelledError):
pass
else:
log.exception("Unexpected error downloading stream:")
error = err
raise
finally:
if payment is not None:
# payment is set to None after broadcasting, if we're here an exception probably happened
await self.wallet_manager.ledger.release_tx(payment)
if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
stream.downloader.time_to_first_bytes))):
server = self.wallet_manager.ledger.network.client.server
self.loop.create_task(
self.analytics_manager.send_time_to_first_bytes(
resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
uri, outpoint,
None if not stream else len(stream.downloader.blob_downloader.active_connections),
None if not stream else len(stream.downloader.blob_downloader.scores),
None if not stream else len(stream.downloader.blob_downloader.connection_failures),
False if not stream else stream.downloader.added_fixed_peers,
self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
None if not stream else stream.sd_hash,
None if not stream else stream.downloader.time_to_descriptor,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
None if not stream else stream.downloader.time_to_first_bytes,
None if not error else error.__class__.__name__,
None if not error else str(error),
None if not server else f"{server[0]}:{server[1]}"
)
)
async def stream_partial_content(self, request: Request, sd_hash: str):
return await self._sources[sd_hash].stream_file(request, self.node)
def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False,
comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
"""
if sort_by and sort_by not in self.filter_fields:
raise ValueError(f"'{sort_by}' is not a valid field to sort by")
if comparison and comparison not in comparison_operators:
raise ValueError(f"'{comparison}' is not a valid comparison")
if 'full_status' in search_by:
del search_by['full_status']
for search in search_by.keys():
if search not in self.filter_fields:
raise ValueError(f"'{search}' is not a valid search operation")
if search_by:
comparison = comparison or 'eq'
sources = []
for stream in self._sources.values():
for search, val in search_by.items():
if comparison_operators[comparison](getattr(stream, search), val):
sources.append(stream)
break
else:
sources = list(self._sources.values())
if sort_by:
sources.sort(key=lambda s: getattr(s, sort_by))
if reverse:
sources.reverse()
return sources
# @cache_concurrent
# async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager',
# timeout: Optional[float] = None, file_name: Optional[str] = None,
# download_directory: Optional[str] = None,
# save_file: Optional[bool] = None, resolve_timeout: float = 3.0,
# wallet: Optional['Wallet'] = None) -> ManagedDownloadSource:
# wallet = wallet or self.wallet_manager.default_wallet
# timeout = timeout or self.config.download_timeout
# start_time = self.loop.time()
# resolved_time = None
# stream = None
# txo: Optional[Output] = None
# error = None
# outpoint = None
# if save_file is None:
# save_file = self.config.save_files
# if file_name and not save_file:
# save_file = True
# if save_file:
# download_directory = download_directory or self.config.download_dir
# else:
# download_directory = None
#
# payment = None
# try:
# # resolve the claim
# if not URL.parse(uri).has_stream:
# raise ResolveError("cannot download a channel claim, specify a /path")
# try:
# response = await asyncio.wait_for(
# self.wallet_manager.ledger.resolve(wallet.accounts, [uri]),
# resolve_timeout
# )
# resolved_result = {}
# for url, txo in response.items():
# if isinstance(txo, Output):
# tx_height = txo.tx_ref.height
# best_height = self.wallet_manager.ledger.headers.height
# resolved_result[url] = {
# 'name': txo.claim_name,
# 'value': txo.claim,
# 'protobuf': binascii.hexlify(txo.claim.to_bytes()),
# 'claim_id': txo.claim_id,
# 'txid': txo.tx_ref.id,
# 'nout': txo.position,
# 'amount': dewies_to_lbc(txo.amount),
# 'effective_amount': txo.meta.get('effective_amount', 0),
# 'height': tx_height,
# 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height,
# 'claim_sequence': -1,
# 'address': txo.get_address(self.wallet_manager.ledger),
# 'valid_at_height': txo.meta.get('activation_height', None),
# 'timestamp': self.wallet_manager.ledger.headers[tx_height]['timestamp'],
# 'supports': []
# }
# else:
# resolved_result[url] = txo
# except asyncio.TimeoutError:
# raise ResolveTimeoutError(uri)
# except Exception as err:
# if isinstance(err, asyncio.CancelledError):
# raise
# log.exception("Unexpected error resolving stream:")
# raise ResolveError(f"Unexpected error resolving stream: {str(err)}")
# await self.storage.save_claims_for_resolve([
# value for value in resolved_result.values() if 'error' not in value
# ])
#
# resolved = resolved_result.get(uri, {})
# resolved = resolved if 'value' in resolved else resolved.get('claim')
# if not resolved:
# raise ResolveError(f"Failed to resolve stream at '{uri}'")
# if 'error' in resolved:
# raise ResolveError(f"error resolving stream: {resolved['error']}")
# txo = response[uri]
#
# claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf']))
# outpoint = f"{resolved['txid']}:{resolved['nout']}"
# resolved_time = self.loop.time() - start_time
#
# # resume or update an existing stream, if the stream changed: download it and delete the old one after
# updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim)
# if updated_stream:
# log.info("already have stream for %s", uri)
# if save_file and updated_stream.output_file_exists:
# save_file = False
# await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file)
# if not updated_stream.output_file_exists and (save_file or file_name or download_directory):
# await updated_stream.save_file(
# file_name=file_name, download_directory=download_directory, node=self.node
# )
# return updated_stream
#
# if not to_replace and txo.has_price and not txo.purchase_receipt:
# payment = await manager.create_purchase_transaction(
# wallet.accounts, txo, exchange_rate_manager
# )
#
# stream = ManagedStream(
# self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory,
# file_name, ManagedStream.STATUS_RUNNING, content_fee=payment,
# analytics_manager=self.analytics_manager
# )
# log.info("starting download for %s", uri)
#
# before_download = self.loop.time()
# await stream.start(self.node, timeout)
# stream.set_claim(resolved, claim)
# if to_replace: # delete old stream now that the replacement has started downloading
# await self.delete(to_replace)
#
# if payment is not None:
# await manager.broadcast_or_release(payment)
# payment = None # to avoid releasing in `finally` later
# log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri)
# await self.storage.save_content_fee(stream.stream_hash, stream.content_fee)
#
# self._sources[stream.sd_hash] = stream
# self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream)
# await self.storage.save_content_claim(stream.stream_hash, outpoint)
# if save_file:
# await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download),
# loop=self.loop)
# return stream
# except asyncio.TimeoutError:
# error = DownloadDataTimeoutError(stream.sd_hash)
# raise error
# except Exception as err: # forgive data timeout, don't delete stream
# expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
# KeyFeeAboveMaxAllowedError)
# if isinstance(err, expected):
# log.warning("Failed to download %s: %s", uri, str(err))
# elif isinstance(err, asyncio.CancelledError):
# pass
# else:
# log.exception("Unexpected error downloading stream:")
# error = err
# raise
# finally:
# if payment is not None:
# # payment is set to None after broadcasting, if we're here an exception probably happened
# await manager.ledger.release_tx(payment)
# if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or
# stream.downloader.time_to_first_bytes))):
# server = self.wallet_manager.ledger.network.client.server
# self.loop.create_task(
# self.analytics_manager.send_time_to_first_bytes(
# resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id,
# uri, outpoint,
# None if not stream else len(stream.downloader.blob_downloader.active_connections),
# None if not stream else len(stream.downloader.blob_downloader.scores),
# None if not stream else len(stream.downloader.blob_downloader.connection_failures),
# False if not stream else stream.downloader.added_fixed_peers,
# self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay,
# None if not stream else stream.sd_hash,
# None if not stream else stream.downloader.time_to_descriptor,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash,
# None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length,
# None if not stream else stream.downloader.time_to_first_bytes,
# None if not error else error.__class__.__name__,
# None if not error else str(error),
# None if not server else f"{server[0]}:{server[1]}"
# )
# )