diff --git a/lbry/file/__init__.py b/lbry/file/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbry/file/source.py b/lbry/file/source.py new file mode 100644 index 000000000..05d8fb35d --- /dev/null +++ b/lbry/file/source.py @@ -0,0 +1,175 @@ +import os +import asyncio +import typing +import logging +import binascii +from typing import Optional +from lbry.utils import generate_id +from lbry.extras.daemon.storage import StoredContentClaim + +if typing.TYPE_CHECKING: + from lbry.conf import Config + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.wallet.transaction import Transaction + from lbry.extras.daemon.storage import SQLiteStorage + +log = logging.getLogger(__name__) + + +# def _get_next_available_file_name(download_directory: str, file_name: str) -> str: +# base_name, ext = os.path.splitext(os.path.basename(file_name)) +# i = 0 +# while os.path.isfile(os.path.join(download_directory, file_name)): +# i += 1 +# file_name = "%s_%i%s" % (base_name, i, ext) +# +# return file_name +# +# +# async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download_directory: str, file_name: str) -> str: +# return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) + + +class ManagedDownloadSource: + STATUS_RUNNING = "running" + STATUS_STOPPED = "stopped" + STATUS_FINISHED = "finished" + + SAVING_ID = 1 + STREAMING_ID = 2 + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str, + file_name: Optional[str] = None, download_directory: Optional[str] = None, + status: Optional[str] = STATUS_STOPPED, claim: Optional[StoredContentClaim] = None, + download_id: Optional[str] = None, rowid: Optional[int] = None, + content_fee: Optional['Transaction'] = None, + analytics_manager: Optional['AnalyticsManager'] = None, + added_on: Optional[int] = None): + self.loop = loop + self.storage = storage + self.config = config + self.identifier = identifier + self.download_directory = download_directory + self._file_name = file_name + self._status = status + self.stream_claim_info = claim + self.download_id = download_id or binascii.hexlify(generate_id()).decode() + self.rowid = rowid + self.content_fee = content_fee + self.purchase_receipt = None + self._added_on = added_on + self.analytics_manager = analytics_manager + + self.saving = asyncio.Event(loop=self.loop) + self.finished_writing = asyncio.Event(loop=self.loop) + self.started_writing = asyncio.Event(loop=self.loop) + self.finished_write_attempt = asyncio.Event(loop=self.loop) + + # @classmethod + # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str, + # key: Optional[bytes] = None, + # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource': + # raise NotImplementedError() + + async def start(self, timeout: Optional[float] = None): + raise NotImplementedError() + + async def stop(self, finished: bool = False): + raise NotImplementedError() + + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): + raise NotImplementedError() + + def stop_tasks(self): + raise NotImplementedError() + + # def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): + # self.stream_claim_info = StoredContentClaim( + # f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + # claim_info['name'], claim_info['amount'], claim_info['height'], + # binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], + # claim_info['claim_sequence'], claim_info.get('channel_name') + # ) + # + # async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): + # if not claim_info: + # claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) + # self.set_claim(claim_info, claim_info['value']) + + @property + def file_name(self) -> Optional[str]: + return self._file_name + + @property + def added_on(self) -> Optional[int]: + return self._added_on + + @property + def status(self) -> str: + return self._status + + @property + def completed(self): + raise NotImplementedError() + + # @property + # def stream_url(self): + # return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash} + + @property + def finished(self) -> bool: + return self.status == self.STATUS_FINISHED + + @property + def running(self) -> bool: + return self.status == self.STATUS_RUNNING + + @property + def claim_id(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.claim_id + + @property + def txid(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.txid + + @property + def nout(self) -> Optional[int]: + return None if not self.stream_claim_info else self.stream_claim_info.nout + + @property + def outpoint(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.outpoint + + @property + def claim_height(self) -> Optional[int]: + return None if not self.stream_claim_info else self.stream_claim_info.height + + @property + def channel_claim_id(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id + + @property + def channel_name(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.channel_name + + @property + def claim_name(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.claim_name + + @property + def metadata(self) -> Optional[typing.Dict]: + return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict() + + @property + def metadata_protobuf(self) -> bytes: + if self.stream_claim_info: + return binascii.hexlify(self.stream_claim_info.claim.to_bytes()) + + @property + def full_path(self) -> Optional[str]: + return os.path.join(self.download_directory, os.path.basename(self.file_name)) \ + if self.file_name and self.download_directory else None + + @property + def output_file_exists(self): + return os.path.isfile(self.full_path) if self.full_path else False diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py new file mode 100644 index 000000000..56ba5fd5f --- /dev/null +++ b/lbry/file/source_manager.py @@ -0,0 +1,125 @@ +import os +import asyncio +import binascii +import logging +import typing +from typing import Optional +from lbry.file.source import ManagedDownloadSource +if typing.TYPE_CHECKING: + from lbry.conf import Config + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.extras.daemon.storage import SQLiteStorage + +log = logging.getLogger(__name__) + +comparison_operators = { + 'eq': lambda a, b: a == b, + 'ne': lambda a, b: a != b, + 'g': lambda a, b: a > b, + 'l': lambda a, b: a < b, + 'ge': lambda a, b: a >= b, + 'le': lambda a, b: a <= b, +} + + +def path_or_none(p) -> Optional[str]: + if not p: + return + return binascii.unhexlify(p).decode() + + +class SourceManager: + filter_fields = { + 'rowid', + 'status', + 'file_name', + 'added_on', + 'claim_name', + 'claim_height', + 'claim_id', + 'outpoint', + 'txid', + 'nout', + 'channel_claim_id', + 'channel_name' + } + + source_class = ManagedDownloadSource + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', + analytics_manager: Optional['AnalyticsManager'] = None): + self.loop = loop + self.config = config + self.storage = storage + self.analytics_manager = analytics_manager + self._sources: typing.Dict[str, ManagedDownloadSource] = {} + self.started = asyncio.Event(loop=self.loop) + + def add(self, source: ManagedDownloadSource): + self._sources[source.identifier] = source + + def remove(self, source: ManagedDownloadSource): + if source.identifier not in self._sources: + return + self._sources.pop(source.identifier) + source.stop_tasks() + + async def initialize_from_database(self): + raise NotImplementedError() + + async def start(self): + await self.initialize_from_database() + self.started.set() + + def stop(self): + while self._sources: + _, source = self._sources.popitem() + source.stop_tasks() + self.started.clear() + + async def create(self, file_path: str, key: Optional[bytes] = None, **kw) -> ManagedDownloadSource: + raise NotImplementedError() + + async def _delete(self, source: ManagedDownloadSource): + raise NotImplementedError() + + async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + await self._delete(source) + self.remove(source) + if delete_file and source.output_file_exists: + os.remove(source.full_path) + + def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, + comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]: + """ + Get a list of filtered and sorted ManagedStream objects + + :param sort_by: field to sort by + :param reverse: reverse sorting + :param comparison: comparison operator used for filtering + :param search_by: fields and values to filter by + """ + if sort_by and sort_by not in self.filter_fields: + raise ValueError(f"'{sort_by}' is not a valid field to sort by") + if comparison and comparison not in comparison_operators: + raise ValueError(f"'{comparison}' is not a valid comparison") + if 'full_status' in search_by: + del search_by['full_status'] + for search in search_by.keys(): + if search not in self.filter_fields: + raise ValueError(f"'{search}' is not a valid search operation") + if search_by: + comparison = comparison or 'eq' + sources = [] + for stream in self._sources.values(): + for search, val in search_by.items(): + if comparison_operators[comparison](getattr(stream, search), val): + sources.append(stream) + break + else: + sources = list(self._sources.values()) + if sort_by: + sources.sort(key=lambda s: getattr(s, sort_by)) + if reverse: + sources.reverse() + return sources diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index c449fe232..c530550c6 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -4,6 +4,7 @@ import time import typing import logging import binascii +from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable from lbry.utils import generate_id from lbry.error import DownloadSDTimeoutError @@ -13,12 +14,14 @@ from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name from lbry.stream.reflector.client import StreamReflectorClient from lbry.extras.daemon.storage import StoredContentClaim from lbry.blob import MAX_BLOB_SIZE +from lbry.file.source import ManagedDownloadSource if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.schema.claim import Claim from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_info import BlobInfo + from lbry.extras.daemon.storage import SQLiteStorage from lbry.dht.node import Node from lbry.extras.daemon.analytics import AnalyticsManager from lbry.wallet.transaction import Transaction @@ -40,65 +43,20 @@ async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) -class ManagedStream: - STATUS_RUNNING = "running" - STATUS_STOPPED = "stopped" - STATUS_FINISHED = "finished" - - SAVING_ID = 1 - STREAMING_ID = 2 - - __slots__ = [ - 'loop', - 'config', - 'blob_manager', - 'sd_hash', - 'download_directory', - '_file_name', - '_added_on', - '_status', - 'stream_claim_info', - 'download_id', - 'rowid', - 'content_fee', - 'purchase_receipt', - 'downloader', - 'analytics_manager', - 'fully_reflected', - 'reflector_progress', - 'file_output_task', - 'delayed_stop_task', - 'streaming_responses', - 'streaming', - '_running', - 'saving', - 'finished_writing', - 'started_writing', - 'finished_write_attempt', - 'uploading_to_reflector' - ] - +class ManagedStream(ManagedDownloadSource): def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', - sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, - status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None, - download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, - descriptor: typing.Optional[StreamDescriptor] = None, - content_fee: typing.Optional['Transaction'] = None, - analytics_manager: typing.Optional['AnalyticsManager'] = None, - added_on: typing.Optional[int] = None): - self.loop = loop - self.config = config + sd_hash: str, download_directory: Optional[str] = None, file_name: Optional[str] = None, + status: Optional[str] = ManagedDownloadSource.STATUS_STOPPED, + claim: Optional[StoredContentClaim] = None, + download_id: Optional[str] = None, rowid: Optional[int] = None, + descriptor: Optional[StreamDescriptor] = None, + content_fee: Optional['Transaction'] = None, + analytics_manager: Optional['AnalyticsManager'] = None, + added_on: Optional[int] = None): + super().__init__(loop, config, blob_manager.storage, sd_hash, file_name, download_directory, status, claim, + download_id, rowid, content_fee, analytics_manager, added_on) self.blob_manager = blob_manager - self.sd_hash = sd_hash - self.download_directory = download_directory - self._file_name = file_name - self._status = status - self.stream_claim_info = claim - self.download_id = download_id or binascii.hexlify(generate_id()).decode() - self.rowid = rowid - self.content_fee = content_fee self.purchase_receipt = None - self._added_on = added_on self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager @@ -108,12 +66,13 @@ class ManagedStream: self.file_output_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] + self.fully_reflected = asyncio.Event(loop=self.loop) self.streaming = asyncio.Event(loop=self.loop) self._running = asyncio.Event(loop=self.loop) - self.saving = asyncio.Event(loop=self.loop) - self.finished_writing = asyncio.Event(loop=self.loop) - self.started_writing = asyncio.Event(loop=self.loop) - self.finished_write_attempt = asyncio.Event(loop=self.loop) + + @property + def sd_hash(self) -> str: + return self.identifier @property def is_fully_reflected(self) -> bool: @@ -128,17 +87,9 @@ class ManagedStream: return self.descriptor.stream_hash @property - def file_name(self) -> typing.Optional[str]: + def file_name(self) -> Optional[str]: return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None) - @property - def added_on(self) -> typing.Optional[int]: - return self._added_on - - @property - def status(self) -> str: - return self._status - @property def written_bytes(self) -> int: return 0 if not self.output_file_exists else os.stat(self.full_path).st_size @@ -156,55 +107,6 @@ class ManagedStream: self._status = status await self.blob_manager.storage.change_file_status(self.stream_hash, status) - @property - def finished(self) -> bool: - return self.status == self.STATUS_FINISHED - - @property - def running(self) -> bool: - return self.status == self.STATUS_RUNNING - - @property - def claim_id(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.claim_id - - @property - def txid(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.txid - - @property - def nout(self) -> typing.Optional[int]: - return None if not self.stream_claim_info else self.stream_claim_info.nout - - @property - def outpoint(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.outpoint - - @property - def claim_height(self) -> typing.Optional[int]: - return None if not self.stream_claim_info else self.stream_claim_info.height - - @property - def channel_claim_id(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id - - @property - def channel_name(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.channel_name - - @property - def claim_name(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.claim_name - - @property - def metadata(self) -> typing.Optional[typing.Dict]: - return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict() - - @property - def metadata_protobuf(self) -> bytes: - if self.stream_claim_info: - return binascii.hexlify(self.stream_claim_info.claim.to_bytes()) - @property def blobs_completed(self) -> int: return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0 @@ -218,39 +120,30 @@ class ManagedStream: def blobs_remaining(self) -> int: return self.blobs_in_stream - self.blobs_completed - @property - def full_path(self) -> typing.Optional[str]: - return os.path.join(self.download_directory, os.path.basename(self.file_name)) \ - if self.file_name and self.download_directory else None - - @property - def output_file_exists(self): - return os.path.isfile(self.full_path) if self.full_path else False - @property def mime_type(self): return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] - @classmethod - async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', - file_path: str, key: typing.Optional[bytes] = None, - iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': - """ - Generate a stream from a file and save it to the db - """ - descriptor = await StreamDescriptor.create_stream( - loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, - blob_completed_callback=blob_manager.blob_completed - ) - await blob_manager.storage.store_stream( - blob_manager.get_blob(descriptor.sd_hash), descriptor - ) - row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), - os.path.dirname(file_path), 0) - return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), - os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) + # @classmethod + # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', + # file_path: str, key: Optional[bytes] = None, + # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource': + # """ + # Generate a stream from a file and save it to the db + # """ + # descriptor = await StreamDescriptor.create_stream( + # loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, + # blob_completed_callback=blob_manager.blob_completed + # ) + # await blob_manager.storage.store_stream( + # blob_manager.get_blob(descriptor.sd_hash), descriptor + # ) + # row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), + # os.path.dirname(file_path), 0) + # return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), + # os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) - async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None, + async def start(self, node: Optional['Node'] = None, timeout: Optional[float] = None, save_now: bool = False): timeout = timeout or self.config.download_timeout if self._running.is_set(): @@ -287,7 +180,7 @@ class ManagedStream: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) - async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\ + async def _aiter_read_stream(self, start_blob_num: Optional[int] = 0, connection_id: int = 0)\ -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: if start_blob_num >= len(self.descriptor.blobs[:-1]): raise IndexError(start_blob_num) @@ -299,7 +192,7 @@ class ManagedStream: decrypted = await self.downloader.read_blob(blob_info, connection_id) yield (blob_info, decrypted) - async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: + async def stream_file(self, request: Request, node: Optional['Node'] = None) -> StreamResponse: log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, self.sd_hash[:6]) headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers( @@ -391,8 +284,8 @@ class ManagedStream: self.saving.clear() self.finished_write_attempt.set() - async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, - node: typing.Optional['Node'] = None): + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None, + node: Optional['Node'] = None): await self.start(node) if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task self.file_output_task.cancel() @@ -476,7 +369,7 @@ class ManagedStream: claim_info['claim_sequence'], claim_info.get('channel_name') ) - async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None): + async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): if not claim_info: claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) self.set_claim(claim_info, claim_info['value']) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 4fb37e99a..58035e174 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -15,11 +15,14 @@ from lbry.schema.claim import Claim from lbry.schema.url import URL from lbry.wallet.dewies import dewies_to_lbc from lbry.wallet import Output - +from lbry.source_manager import SourceManager +from lbry.file.source import ManagedDownloadSource if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.blob.blob_manager import BlobManager from lbry.dht.node import Node + from lbry.wallet.wallet import WalletManager + from lbry.wallet.transaction import Transaction from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager @@ -29,32 +32,12 @@ if typing.TYPE_CHECKING: log = logging.getLogger(__name__) -FILTER_FIELDS = [ - 'rowid', - 'status', - 'file_name', - 'added_on', - 'sd_hash', - 'stream_hash', - 'claim_name', - 'claim_height', - 'claim_id', - 'outpoint', - 'txid', - 'nout', - 'channel_claim_id', - 'channel_name', - 'full_status', # TODO: remove - 'blobs_remaining', - 'blobs_in_stream' -] SET_FILTER_FIELDS = { "claim_ids": "claim_id", "channel_claim_ids": "channel_claim_id", "outpoints": "outpoint" } - COMPARISON_OPERATORS = { 'eq': lambda a, b: a == b, 'ne': lambda a, b: a != b, @@ -64,35 +47,44 @@ COMPARISON_OPERATORS = { 'le': lambda a, b: a <= b, 'in': lambda a, b: a in b } - - -def path_or_none(path) -> Optional[str]: - if not path: +def path_or_none(p) -> Optional[str]: + if not p: return - return binascii.unhexlify(path).decode() + return binascii.unhexlify(p).decode() -class StreamManager: +class StreamManager(SourceManager): + _sources: typing.Dict[str, ManagedStream] + + filter_fields = set(SourceManager.filter_fields) + filter_fields.update({ + 'sd_hash', + 'stream_hash', + 'full_status', # TODO: remove + 'blobs_remaining', + 'blobs_in_stream' + }) + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'], analytics_manager: Optional['AnalyticsManager'] = None): - self.loop = loop - self.config = config + super().__init__(loop, config, storage, analytics_manager) self.blob_manager = blob_manager self.wallet_manager = wallet_manager - self.storage = storage self.node = node - self.analytics_manager = analytics_manager - self.streams: typing.Dict[str, ManagedStream] = {} self.resume_saving_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.started = asyncio.Event(loop=self.loop) + def add(self, source: ManagedStream): + super().add(source) + self.storage.content_claim_callbacks[source.stream_hash] = lambda: self._update_content_claim(source) + async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) - self.streams.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) + self._sources.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) async def recover_streams(self, file_infos: typing.List[typing.Dict]): to_restore = [] @@ -123,10 +115,10 @@ class StreamManager: # if self.blob_manager._save_blobs: # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) - async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], - download_directory: Optional[str], status: str, - claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], - added_on: Optional[int], fully_reflected: bool): + async def _load_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], + download_directory: Optional[str], status: str, + claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], + added_on: Optional[int]): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) except InvalidStreamDescriptorError as err: @@ -139,10 +131,9 @@ class StreamManager: ) if fully_reflected: stream.fully_reflected.set() - self.streams[sd_hash] = stream - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) + self.add(stream) - async def load_and_resume_streams_from_database(self): + async def initialize_from_database(self): to_recover = [] to_start = [] @@ -156,7 +147,6 @@ class StreamManager: to_recover.append(file_info) to_start.append(file_info) if to_recover: - log.info("Recover %i files", len(to_recover)) await self.recover_streams(to_recover) log.info("Initializing %i files", len(to_start)) @@ -167,7 +157,7 @@ class StreamManager: download_directory = path_or_none(file_info['download_directory']) if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running': to_resume_saving.append((file_name, download_directory, file_info['sd_hash'])) - add_stream_tasks.append(self.loop.create_task(self.add_stream( + add_stream_tasks.append(self.loop.create_task(self._load_stream( file_info['rowid'], file_info['sd_hash'], file_name, download_directory, file_info['status'], file_info['claim'], file_info['content_fee'], @@ -175,25 +165,22 @@ class StreamManager: ))) if add_stream_tasks: await asyncio.gather(*add_stream_tasks, loop=self.loop) - log.info("Started stream manager with %i files", len(self.streams)) + log.info("Started stream manager with %i files", len(self._sources)) if not self.node: log.info("no DHT node given, resuming downloads trusting that we can contact reflector") if to_resume_saving: - self.resume_saving_task = self.loop.create_task(self.resume(to_resume_saving)) - - async def resume(self, to_resume_saving): - log.info("Resuming saving %i files", len(to_resume_saving)) - await asyncio.gather( - *(self.streams[sd_hash].save_file(file_name, download_directory, node=self.node) - for (file_name, download_directory, sd_hash) in to_resume_saving), - loop=self.loop - ) + log.info("Resuming saving %i files", len(to_resume_saving)) + self.resume_saving_task = self.loop.create_task(asyncio.gather( + *(self._sources[sd_hash].save_file(file_name, download_directory, node=self.node) + for (file_name, download_directory, sd_hash) in to_resume_saving), + loop=self.loop + )) async def reflect_streams(self): while True: if self.config.reflect_streams and self.config.reflector_servers: sd_hashes = await self.storage.get_streams_to_re_reflect() - sd_hashes = [sd for sd in sd_hashes if sd in self.streams] + sd_hashes = [sd for sd in sd_hashes if sd in self._sources] batch = [] while sd_hashes: stream = self.streams[sd_hashes.pop()] @@ -209,18 +196,14 @@ class StreamManager: await asyncio.sleep(300, loop=self.loop) async def start(self): - await self.load_and_resume_streams_from_database() + await super().start() self.re_reflect_task = self.loop.create_task(self.reflect_streams()) - self.started.set() def stop(self): if self.resume_saving_task and not self.resume_saving_task.done(): self.resume_saving_task.cancel() if self.re_reflect_task and not self.re_reflect_task.done(): self.re_reflect_task.cancel() - while self.streams: - _, stream = self.streams.popitem() - stream.stop_tasks() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: @@ -260,14 +243,7 @@ class StreamManager: del self.streams[stream.sd_hash] blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) - await self.storage.delete_stream(stream.descriptor) - if delete_file and stream.output_file_exists: - os.remove(stream.full_path) - - def get_stream_by_stream_hash(self, stream_hash: str) -> Optional[ManagedStream]: - streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values())) - if streams: - return streams[0] + await self.storage.delete(stream.descriptor) def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, comparison: Optional[str] = None, @@ -324,199 +300,5 @@ class StreamManager: streams.reverse() return streams - async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim - ) -> typing.Tuple[Optional[ManagedStream], Optional[ManagedStream]]: - existing = self.get_filtered_streams(outpoint=outpoint) - if existing: - return existing[0], None - existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash) - if existing and existing[0].claim_id != claim_id: - raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}") - if existing: - log.info("claim contains a metadata only update to a stream we have") - await self.storage.save_content_claim( - existing[0].stream_hash, outpoint - ) - await self._update_content_claim(existing[0]) - return existing[0], None - else: - existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id) - if existing_for_claim_id: - log.info("claim contains an update to a stream we have, downloading it") - return None, existing_for_claim_id[0] - return None, None - - @staticmethod - def _convert_to_old_resolve_output(wallet_manager, resolves): - result = {} - for url, txo in resolves.items(): - if isinstance(txo, Output): - tx_height = txo.tx_ref.height - best_height = wallet_manager.ledger.headers.height - result[url] = { - 'name': txo.claim_name, - 'value': txo.claim, - 'protobuf': binascii.hexlify(txo.claim.to_bytes()), - 'claim_id': txo.claim_id, - 'txid': txo.tx_ref.id, - 'nout': txo.position, - 'amount': dewies_to_lbc(txo.amount), - 'effective_amount': txo.meta.get('effective_amount', 0), - 'height': tx_height, - 'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height, - 'claim_sequence': -1, - 'address': txo.get_address(wallet_manager.ledger), - 'valid_at_height': txo.meta.get('activation_height', None), - 'timestamp': wallet_manager.ledger.headers.estimated_timestamp(tx_height), - 'supports': [] - } - else: - result[url] = txo - return result - - @cache_concurrent - async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', - timeout: Optional[float] = None, - file_name: Optional[str] = None, - download_directory: Optional[str] = None, - save_file: Optional[bool] = None, - resolve_timeout: float = 3.0, - wallet: Optional['Wallet'] = None) -> ManagedStream: - manager = self.wallet_manager - wallet = wallet or manager.default_wallet - timeout = timeout or self.config.download_timeout - start_time = self.loop.time() - resolved_time = None - stream = None - txo: Optional[Output] = None - error = None - outpoint = None - if save_file is None: - save_file = self.config.save_files - if file_name and not save_file: - save_file = True - if save_file: - download_directory = download_directory or self.config.download_dir - else: - download_directory = None - - payment = None - try: - # resolve the claim - if not URL.parse(uri).has_stream: - raise ResolveError("cannot download a channel claim, specify a /path") - try: - response = await asyncio.wait_for( - manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True), - resolve_timeout - ) - resolved_result = self._convert_to_old_resolve_output(manager, response) - except asyncio.TimeoutError: - raise ResolveTimeoutError(uri) - except Exception as err: - if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 - raise - log.exception("Unexpected error resolving stream:") - raise ResolveError(f"Unexpected error resolving stream: {str(err)}") - await self.storage.save_claims_for_resolve([ - value for value in resolved_result.values() if 'error' not in value - ]) - resolved = resolved_result.get(uri, {}) - resolved = resolved if 'value' in resolved else resolved.get('claim') - if not resolved: - raise ResolveError(f"Failed to resolve stream at '{uri}'") - if 'error' in resolved: - raise ResolveError(f"error resolving stream: {resolved['error']}") - txo = response[uri] - - claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) - outpoint = f"{resolved['txid']}:{resolved['nout']}" - resolved_time = self.loop.time() - start_time - - # resume or update an existing stream, if the stream changed: download it and delete the old one after - updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) - if updated_stream: - log.info("already have stream for %s", uri) - if save_file and updated_stream.output_file_exists: - save_file = False - await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) - if not updated_stream.output_file_exists and (save_file or file_name or download_directory): - await updated_stream.save_file( - file_name=file_name, download_directory=download_directory, node=self.node - ) - return updated_stream - - if not to_replace and txo.has_price and not txo.purchase_receipt: - payment = await manager.create_purchase_transaction( - wallet.accounts, txo, exchange_rate_manager - ) - - stream = ManagedStream( - self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, - analytics_manager=self.analytics_manager - ) - log.info("starting download for %s", uri) - - before_download = self.loop.time() - await stream.start(self.node, timeout) - stream.set_claim(resolved, claim) - if to_replace: # delete old stream now that the replacement has started downloading - await self.delete_stream(to_replace) - - if payment is not None: - await manager.broadcast_or_release(payment) - payment = None # to avoid releasing in `finally` later - log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) - await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) - - self.streams[stream.sd_hash] = stream - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - await self.storage.save_content_claim(stream.stream_hash, outpoint) - if save_file: - await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), - loop=self.loop) - return stream - except asyncio.TimeoutError: - error = DownloadDataTimeoutError(stream.sd_hash) - raise error - except Exception as err: # forgive data timeout, don't delete stream - expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, - KeyFeeAboveMaxAllowedError) - if isinstance(err, expected): - log.warning("Failed to download %s: %s", uri, str(err)) - elif isinstance(err, asyncio.CancelledError): - pass - else: - log.exception("Unexpected error downloading stream:") - error = err - raise - finally: - if payment is not None: - # payment is set to None after broadcasting, if we're here an exception probably happened - await manager.ledger.release_tx(payment) - if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or - stream.downloader.time_to_first_bytes))): - server = self.wallet_manager.ledger.network.client.server - self.loop.create_task( - self.analytics_manager.send_time_to_first_bytes( - resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, - uri, outpoint, - None if not stream else len(stream.downloader.blob_downloader.active_connections), - None if not stream else len(stream.downloader.blob_downloader.scores), - None if not stream else len(stream.downloader.blob_downloader.connection_failures), - False if not stream else stream.downloader.added_fixed_peers, - self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay, - None if not stream else stream.sd_hash, - None if not stream else stream.downloader.time_to_descriptor, - None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, - None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, - None if not stream else stream.downloader.time_to_first_bytes, - None if not error else error.__class__.__name__, - None if not error else str(error), - None if not server else f"{server[0]}:{server[1]}" - ) - ) - async def stream_partial_content(self, request: Request, sd_hash: str): - return await self.streams[sd_hash].stream_file(request, self.node) + return await self._sources[sd_hash].stream_file(request, self.node)