diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index ea24541b9..46aa3a845 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -41,7 +41,7 @@ test:datanetwork-integration: stage: test script: - pip install tox-travis - - tox -e datanetwork + - tox -e datanetwork --recreate test:blockchain-integration: stage: test @@ -94,6 +94,7 @@ build:linux: - apt-get update - apt-get install -y --no-install-recommends python3.7-dev - python3.7 <(curl -q https://bootstrap.pypa.io/get-pip.py) # make sure we get pip with python3.7 + - pip install lbry-libtorrent build:mac: extends: .build diff --git a/Makefile b/Makefile index 23b3f84e8..a6221fa03 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ .PHONY: install tools lint test idea install: + pip install https://s3.amazonaws.com/files.lbry.io/python_libtorrent-1.2.4-py3-none-any.whl CFLAGS="-DSQLITE_MAX_VARIABLE_NUMBER=2500000" pip install -U https://github.com/rogerbinns/apsw/releases/download/3.30.1-r1/apsw-3.30.1-r1.zip \ --global-option=fetch \ --global-option=--version --global-option=3.30.1 --global-option=--all \ diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 5271c1558..38c4d4650 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -17,11 +17,17 @@ from lbry.dht.blob_announcer import BlobAnnouncer from lbry.blob.blob_manager import BlobManager from lbry.blob_exchange.server import BlobServer from lbry.stream.stream_manager import StreamManager +from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage +from lbry.torrent.torrent_manager import TorrentManager from lbry.wallet import WalletManager from lbry.wallet.usage_payment import WalletServerPayer +try: + from lbry.torrent.session import TorrentSession +except ImportError: + TorrentSession = None log = logging.getLogger(__name__) @@ -33,10 +39,11 @@ WALLET_COMPONENT = "wallet" WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments" DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" -STREAM_MANAGER_COMPONENT = "stream_manager" +FILE_MANAGER_COMPONENT = "file_manager" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" +LIBTORRENT_COMPONENT = "libtorrent_component" class DatabaseComponent(Component): @@ -319,23 +326,23 @@ class HashAnnouncerComponent(Component): } -class StreamManagerComponent(Component): - component_name = STREAM_MANAGER_COMPONENT - depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] +class FileManagerComponent(Component): + component_name = FILE_MANAGER_COMPONENT + depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, LIBTORRENT_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) - self.stream_manager: typing.Optional[StreamManager] = None + self.file_manager: typing.Optional[FileManager] = None @property - def component(self) -> typing.Optional[StreamManager]: - return self.stream_manager + def component(self) -> typing.Optional[FileManager]: + return self.file_manager async def get_status(self): - if not self.stream_manager: + if not self.file_manager: return return { - 'managed_files': len(self.stream_manager.streams), + 'managed_files': len(self.file_manager.get_filtered()), } async def start(self): @@ -344,16 +351,52 @@ class StreamManagerComponent(Component): wallet = self.component_manager.get_component(WALLET_COMPONENT) node = self.component_manager.get_component(DHT_COMPONENT) \ if self.component_manager.has_component(DHT_COMPONENT) else None + torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT) if TorrentSession else None log.info('Starting the file manager') loop = asyncio.get_event_loop() - self.stream_manager = StreamManager( - loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager + self.file_manager = FileManager( + loop, self.conf, wallet, storage, self.component_manager.analytics_manager ) - await self.stream_manager.start() + self.file_manager.source_managers['stream'] = StreamManager( + loop, self.conf, blob_manager, wallet, storage, node, + ) + if TorrentSession: + self.file_manager.source_managers['torrent'] = TorrentManager( + loop, self.conf, torrent, storage, self.component_manager.analytics_manager + ) + await self.file_manager.start() log.info('Done setting up file manager') async def stop(self): - self.stream_manager.stop() + self.file_manager.stop() + + +class TorrentComponent(Component): + component_name = LIBTORRENT_COMPONENT + + def __init__(self, component_manager): + super().__init__(component_manager) + self.torrent_session = None + + @property + def component(self) -> typing.Optional[TorrentSession]: + return self.torrent_session + + async def get_status(self): + if not self.torrent_session: + return + return { + 'running': True, # TODO: what to return here? + } + + async def start(self): + if TorrentSession: + self.torrent_session = TorrentSession(asyncio.get_event_loop(), None) + await self.torrent_session.bind() # TODO: specify host/port + + async def stop(self): + if self.torrent_session: + await self.torrent_session.pause() class PeerProtocolServerComponent(Component): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 5c84f8227..9a8c5c92c 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -40,7 +40,7 @@ from lbry.error import ( from lbry.extras import system_info from lbry.extras.daemon import analytics from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT -from lbry.extras.daemon.components import STREAM_MANAGER_COMPONENT +from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT from lbry.extras.daemon.componentmanager import RequiredCondition from lbry.extras.daemon.componentmanager import ComponentManager @@ -57,8 +57,8 @@ if typing.TYPE_CHECKING: from lbry.extras.daemon.components import UPnPComponent from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage - from lbry.stream.stream_manager import StreamManager from lbry.wallet import WalletManager, Ledger + from lbry.file.file_manager import FileManager log = logging.getLogger(__name__) @@ -372,8 +372,8 @@ class Daemon(metaclass=JSONRPCServerType): return self.component_manager.get_component(DATABASE_COMPONENT) @property - def stream_manager(self) -> typing.Optional['StreamManager']: - return self.component_manager.get_component(STREAM_MANAGER_COMPONENT) + def file_manager(self) -> typing.Optional['FileManager']: + return self.component_manager.get_component(FILE_MANAGER_COMPONENT) @property def exchange_rate_manager(self) -> typing.Optional['ExchangeRateManager']: @@ -609,8 +609,8 @@ class Daemon(metaclass=JSONRPCServerType): else: name, claim_id = name_and_claim_id.split("/") uri = f"lbry://{name}#{claim_id}" - if not self.stream_manager.started.is_set(): - await self.stream_manager.started.wait() + if not self.file_manager.started.is_set(): + await self.file_manager.started.wait() stream = await self.jsonrpc_get(uri) if isinstance(stream, dict): raise web.HTTPServerError(text=stream['error']) @@ -634,11 +634,11 @@ class Daemon(metaclass=JSONRPCServerType): async def _handle_stream_range_request(self, request: web.Request): sd_hash = request.path.split("/stream/")[1] - if not self.stream_manager.started.is_set(): - await self.stream_manager.started.wait() - if sd_hash not in self.stream_manager.streams: + if not self.file_manager.started.is_set(): + await self.file_manager.started.wait() + if sd_hash not in self.file_manager.streams: return web.HTTPNotFound() - return await self.stream_manager.stream_partial_content(request, sd_hash) + return await self.file_manager.stream_partial_content(request, sd_hash) async def _process_rpc_call(self, data): args = data.get('params', {}) @@ -858,7 +858,8 @@ class Daemon(metaclass=JSONRPCServerType): 'exchange_rate_manager': (bool), 'hash_announcer': (bool), 'peer_protocol_server': (bool), - 'stream_manager': (bool), + 'file_manager': (bool), + 'libtorrent_component': (bool), 'upnp': (bool), 'wallet': (bool), }, @@ -885,6 +886,9 @@ class Daemon(metaclass=JSONRPCServerType): } ], }, + 'libtorrent_component': { + 'running': (bool) libtorrent was detected and started successfully, + }, 'dht': { 'node_id': (str) lbry dht node id - hex encoded, 'peers_in_routing_table': (int) the number of peers in the routing table, @@ -906,7 +910,7 @@ class Daemon(metaclass=JSONRPCServerType): 'hash_announcer': { 'announce_queue_size': (int) number of blobs currently queued to be announced }, - 'stream_manager': { + 'file_manager': { 'managed_files': (int) count of files in the stream manager, }, 'upnp': { @@ -1077,7 +1081,7 @@ class Daemon(metaclass=JSONRPCServerType): return results @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, - STREAM_MANAGER_COMPONENT) + FILE_MANAGER_COMPONENT) async def jsonrpc_get( self, uri, file_name=None, download_directory=None, timeout=None, save_file=None, wallet_id=None): """ @@ -1103,7 +1107,7 @@ class Daemon(metaclass=JSONRPCServerType): if download_directory and not os.path.isdir(download_directory): return {"error": f"specified download directory \"{download_directory}\" does not exist"} try: - stream = await self.stream_manager.download_stream_from_uri( + stream = await self.file_manager.download_from_uri( uri, self.exchange_rate_manager, timeout, file_name, download_directory, save_file=save_file, wallet=wallet ) @@ -1949,7 +1953,7 @@ class Daemon(metaclass=JSONRPCServerType): File management. """ - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_list(self, sort=None, reverse=False, comparison=None, wallet_id=None, page=None, page_size=None, **kwargs): """ @@ -1994,7 +1998,7 @@ class Daemon(metaclass=JSONRPCServerType): comparison = comparison or 'eq' paginated = paginate_list( - self.stream_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size + self.file_manager.get_filtered(sort, reverse, comparison, **kwargs), page, page_size ) if paginated['items']: receipts = { @@ -2008,7 +2012,7 @@ class Daemon(metaclass=JSONRPCServerType): stream.purchase_receipt = receipts.get(stream.claim_id) return paginated - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_set_status(self, status, **kwargs): """ Start or stop downloading a file @@ -2032,12 +2036,12 @@ class Daemon(metaclass=JSONRPCServerType): if status not in ['start', 'stop']: raise Exception('Status must be "start" or "stop".') - streams = self.stream_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered(**kwargs) if not streams: raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running: - await stream.save_file(node=self.stream_manager.node) + await stream.save_file() msg = "Resumed download" elif status == 'stop' and stream.running: await stream.stop() @@ -2049,7 +2053,7 @@ class Daemon(metaclass=JSONRPCServerType): ) return msg - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): """ Delete a LBRY file @@ -2081,7 +2085,7 @@ class Daemon(metaclass=JSONRPCServerType): (bool) true if deletion was successful """ - streams = self.stream_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered(**kwargs) if len(streams) > 1: if not delete_all: @@ -2098,12 +2102,12 @@ class Daemon(metaclass=JSONRPCServerType): else: for stream in streams: message = f"Deleted file {stream.file_name}" - await self.stream_manager.delete_stream(stream, delete_file=delete_from_download_dir) + await self.file_manager.delete(stream, delete_file=delete_from_download_dir) log.info(message) result = True return result - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_save(self, file_name=None, download_directory=None, **kwargs): """ Start saving a file to disk. @@ -2130,7 +2134,7 @@ class Daemon(metaclass=JSONRPCServerType): Returns: {File} """ - streams = self.stream_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered(**kwargs) if len(streams) > 1: log.warning("There are %i matching files, use narrower filters to select one", len(streams)) @@ -2905,7 +2909,7 @@ class Daemon(metaclass=JSONRPCServerType): Create, update, abandon, list and inspect your stream claims. """ - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_publish(self, name, **kwargs): """ Create or replace a stream claim at a given name (use 'stream create/update' for more control). @@ -3027,7 +3031,7 @@ class Daemon(metaclass=JSONRPCServerType): f"to update a specific stream claim." ) - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_stream_repost(self, name, bid, claim_id, allow_duplicate_name=False, channel_id=None, channel_name=None, channel_account_id=None, account_id=None, wallet_id=None, claim_address=None, funding_account_ids=None, preview=False, blocking=False): @@ -3099,7 +3103,7 @@ class Daemon(metaclass=JSONRPCServerType): return tx - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_stream_create( self, name, bid, file_path, allow_duplicate_name=False, channel_id=None, channel_name=None, channel_account_id=None, @@ -3237,7 +3241,7 @@ class Daemon(metaclass=JSONRPCServerType): file_stream = None if not preview: - file_stream = await self.stream_manager.create_stream(file_path) + file_stream = await self.file_manager.create_stream(file_path) claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() @@ -3257,7 +3261,7 @@ class Daemon(metaclass=JSONRPCServerType): return tx - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_stream_update( self, claim_id, bid=None, file_path=None, channel_id=None, channel_name=None, channel_account_id=None, clear_channel=False, @@ -3447,11 +3451,12 @@ class Daemon(metaclass=JSONRPCServerType): stream_hash = None if not preview: - old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) + old_stream = self.file_manager.get_filtered(sd_hash=old_txo.claim.stream.source.sd_hash) + old_stream = old_stream[0] if old_stream else None if file_path is not None: if old_stream: - await self.stream_manager.delete_stream(old_stream, delete_file=False) - file_stream = await self.stream_manager.create_stream(file_path) + await self.file_manager.delete(old_stream, delete_file=False) + file_stream = await self.file_manager.create_stream(file_path) new_txo.claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() stream_hash = file_stream.stream_hash @@ -4583,9 +4588,9 @@ class Daemon(metaclass=JSONRPCServerType): """ if not blob_hash or not is_valid_blobhash(blob_hash): return f"Invalid blob hash to delete '{blob_hash}'" - streams = self.stream_manager.get_filtered_streams(sd_hash=blob_hash) + streams = self.file_manager.get_filtered(sd_hash=blob_hash) if streams: - await self.stream_manager.delete_stream(streams[0]) + await self.file_manager.delete(streams[0]) else: await self.blob_manager.delete_blobs([blob_hash]) return "Deleted %s" % blob_hash @@ -4758,7 +4763,7 @@ class Daemon(metaclass=JSONRPCServerType): raise NotImplementedError() - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_reflect(self, **kwargs): """ Reflect all the blobs in a file matching the filter criteria @@ -4787,8 +4792,8 @@ class Daemon(metaclass=JSONRPCServerType): else: server, port = random.choice(self.conf.reflector_servers) reflected = await asyncio.gather(*[ - self.stream_manager.reflect_stream(stream, server, port) - for stream in self.stream_manager.get_filtered_streams(**kwargs) + self.file_manager['stream'].reflect_stream(stream, server, port) + for stream in self.file_manager.get_filtered_streams(**kwargs) ]) total = [] for reflected_for_stream in reflected: @@ -5334,10 +5339,10 @@ class Daemon(metaclass=JSONRPCServerType): results = await self.ledger.resolve(accounts, urls, **kwargs) if self.conf.save_resolved_claims and results: try: - claims = self.stream_manager._convert_to_old_resolve_output(self.wallet_manager, results) - await self.storage.save_claims_for_resolve([ - value for value in claims.values() if 'error' not in value - ]) + await self.storage.save_claim_from_output( + self.ledger, + *(result for result in results.values() if isinstance(result, Output)) + ) except DecodeError: pass return results diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index b7702f541..99d487cd2 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -7,6 +7,7 @@ from json import JSONEncoder from google.protobuf.message import DecodeError from lbry.schema.claim import Claim +from lbry.torrent.torrent_manager import TorrentSource from lbry.wallet import Wallet, Ledger, Account, Transaction, Output from lbry.wallet.bip32 import PubKey from lbry.wallet.dewies import dewies_to_lbc @@ -126,7 +127,7 @@ class JSONResponseEncoder(JSONEncoder): return self.encode_account(obj) if isinstance(obj, Wallet): return self.encode_wallet(obj) - if isinstance(obj, ManagedStream): + if isinstance(obj, (ManagedStream, TorrentSource)): return self.encode_file(obj) if isinstance(obj, Transaction): return self.encode_transaction(obj) @@ -273,26 +274,32 @@ class JSONResponseEncoder(JSONEncoder): output_exists = managed_stream.output_file_exists tx_height = managed_stream.stream_claim_info.height best_height = self.ledger.headers.height - return { - 'streaming_url': managed_stream.stream_url, + is_stream = hasattr(managed_stream, 'stream_hash') + if is_stream: + total_bytes_lower_bound = managed_stream.descriptor.lower_bound_decrypted_length() + total_bytes = managed_stream.descriptor.upper_bound_decrypted_length() + else: + total_bytes_lower_bound = total_bytes = managed_stream.torrent_length + result = { + 'streaming_url': None, 'completed': managed_stream.completed, - 'file_name': managed_stream.file_name if output_exists else None, - 'download_directory': managed_stream.download_directory if output_exists else None, - 'download_path': managed_stream.full_path if output_exists else None, + 'file_name': None, + 'download_directory': None, + 'download_path': None, 'points_paid': 0.0, 'stopped': not managed_stream.running, - 'stream_hash': managed_stream.stream_hash, - 'stream_name': managed_stream.descriptor.stream_name, - 'suggested_file_name': managed_stream.descriptor.suggested_file_name, - 'sd_hash': managed_stream.descriptor.sd_hash, - 'mime_type': managed_stream.mime_type, - 'key': managed_stream.descriptor.key, - 'total_bytes_lower_bound': managed_stream.descriptor.lower_bound_decrypted_length(), - 'total_bytes': managed_stream.descriptor.upper_bound_decrypted_length(), + 'stream_hash': None, + 'stream_name': None, + 'suggested_file_name': None, + 'sd_hash': None, + 'mime_type': None, + 'key': None, + 'total_bytes_lower_bound': total_bytes_lower_bound, + 'total_bytes': total_bytes, 'written_bytes': managed_stream.written_bytes, - 'blobs_completed': managed_stream.blobs_completed, - 'blobs_in_stream': managed_stream.blobs_in_stream, - 'blobs_remaining': managed_stream.blobs_remaining, + 'blobs_completed': None, + 'blobs_in_stream': None, + 'blobs_remaining': None, 'status': managed_stream.status, 'claim_id': managed_stream.claim_id, 'txid': managed_stream.txid, @@ -309,10 +316,37 @@ class JSONResponseEncoder(JSONEncoder): 'height': tx_height, 'confirmations': (best_height + 1) - tx_height if tx_height > 0 else tx_height, 'timestamp': self.ledger.headers.estimated_timestamp(tx_height), - 'is_fully_reflected': managed_stream.is_fully_reflected, - 'reflector_progress': managed_stream.reflector_progress, - 'uploading_to_reflector': managed_stream.uploading_to_reflector + 'is_fully_reflected': False, + 'reflector_progress': False, + 'uploading_to_reflector': False } + if is_stream: + result.update({ + 'streaming_url': managed_stream.stream_url, + 'stream_hash': managed_stream.stream_hash, + 'stream_name': managed_stream.descriptor.stream_name, + 'suggested_file_name': managed_stream.descriptor.suggested_file_name, + 'sd_hash': managed_stream.descriptor.sd_hash, + 'mime_type': managed_stream.mime_type, + 'key': managed_stream.descriptor.key, + 'blobs_completed': managed_stream.blobs_completed, + 'blobs_in_stream': managed_stream.blobs_in_stream, + 'blobs_remaining': managed_stream.blobs_remaining, + 'is_fully_reflected': managed_stream.is_fully_reflected, + 'reflector_progress': managed_stream.reflector_progress, + 'uploading_to_reflector': managed_stream.uploading_to_reflector + }) + else: + result.update({ + 'streaming_url': f'file://{managed_stream.full_path}', + }) + if output_exists: + result.update({ + 'file_name': managed_stream.file_name, + 'download_directory': managed_stream.download_directory, + 'download_path': managed_stream.full_path, + }) + return result def encode_claim(self, claim): encoded = getattr(claim, claim.claim_type).to_dict() diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 11a61e45e..1387f94a7 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -9,7 +9,7 @@ from typing import Optional from lbry.wallet import SQLiteMixin from lbry.conf import Config from lbry.wallet.dewies import dewies_to_lbc, lbc_to_dewies -from lbry.wallet.transaction import Transaction +from lbry.wallet.transaction import Transaction, Output from lbry.schema.claim import Claim from lbry.dht.constants import DATA_EXPIRATION from lbry.blob.blob_info import BlobInfo @@ -727,6 +727,19 @@ class SQLiteStorage(SQLiteMixin): if claim_id_to_supports: await self.save_supports(claim_id_to_supports) + def save_claim_from_output(self, ledger, *outputs: Output): + return self.save_claims([{ + "claim_id": output.claim_id, + "name": output.claim_name, + "amount": dewies_to_lbc(output.amount), + "address": output.get_address(ledger), + "txid": output.tx_ref.id, + "nout": output.position, + "value": output.claim, + "height": output.tx_ref.height, + "claim_sequence": -1, + } for output in outputs]) + def save_claims_for_resolve(self, claim_infos): to_save = {} for info in claim_infos: @@ -740,7 +753,8 @@ class SQLiteStorage(SQLiteMixin): return self.save_claims(to_save.values()) @staticmethod - def _save_content_claim(transaction, claim_outpoint, stream_hash): + def _save_content_claim(transaction, claim_outpoint, stream_hash=None, bt_infohash=None): + assert stream_hash or bt_infohash # get the claim id and serialized metadata claim_info = transaction.execute( "select claim_id, serialized_metadata from claim where claim_outpoint=?", (claim_outpoint,) @@ -788,6 +802,19 @@ class SQLiteStorage(SQLiteMixin): if stream_hash in self.content_claim_callbacks: await self.content_claim_callbacks[stream_hash]() + async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name): + def _save_torrent(transaction): + transaction.execute( + "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) + ).fetchall() + transaction.execute( + "insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint) + ).fetchall() + await self.db.run(_save_torrent) + # update corresponding ManagedEncryptedFileDownloader object + if bt_infohash in self.content_claim_callbacks: + await self.content_claim_callbacks[bt_infohash]() + async def get_content_claim(self, stream_hash: str, include_supports: typing.Optional[bool] = True) -> typing.Dict: claims = await self.db.run(get_claims_from_stream_hashes, [stream_hash]) claim = None @@ -799,6 +826,10 @@ class SQLiteStorage(SQLiteMixin): claim['effective_amount'] = calculate_effective_amount(claim['amount'], supports) return claim + async def get_content_claim_for_torrent(self, bt_infohash): + claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash]) + return claims[bt_infohash].as_dict() if claims else None + # # # # # # # # # reflector functions # # # # # # # # # def update_reflected_stream(self, sd_hash, reflector_address, success=True): diff --git a/lbry/file/__init__.py b/lbry/file/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py new file mode 100644 index 000000000..0c1eb7069 --- /dev/null +++ b/lbry/file/file_manager.py @@ -0,0 +1,286 @@ +import asyncio +import logging +import typing +from typing import Optional +from aiohttp.web import Request +from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError +from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError +from lbry.stream.managed_stream import ManagedStream +from lbry.torrent.torrent_manager import TorrentSource +from lbry.utils import cache_concurrent +from lbry.schema.url import URL +from lbry.wallet.dewies import dewies_to_lbc +from lbry.file.source_manager import SourceManager +from lbry.file.source import ManagedDownloadSource +if typing.TYPE_CHECKING: + from lbry.conf import Config + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.extras.daemon.storage import SQLiteStorage + from lbry.wallet import WalletManager, Output + from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager + +log = logging.getLogger(__name__) + + +class FileManager: + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', wallet_manager: 'WalletManager', + storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None): + self.loop = loop + self.config = config + self.wallet_manager = wallet_manager + self.storage = storage + self.analytics_manager = analytics_manager + self.source_managers: typing.Dict[str, SourceManager] = {} + self.started = asyncio.Event() + + @property + def streams(self): + return self.source_managers['stream']._sources + + async def create_stream(self, file_path: str, key: Optional[bytes] = None, **kwargs) -> ManagedDownloadSource: + if 'stream' in self.source_managers: + return await self.source_managers['stream'].create(file_path, key, **kwargs) + raise NotImplementedError + + async def start(self): + await asyncio.gather(*(source_manager.start() for source_manager in self.source_managers.values())) + for manager in self.source_managers.values(): + await manager.started.wait() + self.started.set() + + def stop(self): + for manager in self.source_managers.values(): + # fixme: pop or not? + manager.stop() + self.started.clear() + + @cache_concurrent + async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', + timeout: Optional[float] = None, file_name: Optional[str] = None, + download_directory: Optional[str] = None, + save_file: Optional[bool] = None, resolve_timeout: float = 3.0, + wallet: Optional['Wallet'] = None) -> ManagedDownloadSource: + + wallet = wallet or self.wallet_manager.default_wallet + timeout = timeout or self.config.download_timeout + start_time = self.loop.time() + resolved_time = None + stream = None + claim = None + error = None + outpoint = None + if save_file is None: + save_file = self.config.save_files + if file_name and not save_file: + save_file = True + if save_file: + download_directory = download_directory or self.config.download_dir + else: + download_directory = None + + payment = None + try: + # resolve the claim + if not URL.parse(uri).has_stream: + raise ResolveError("cannot download a channel claim, specify a /path") + try: + resolved_result = await asyncio.wait_for( + self.wallet_manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True), + resolve_timeout + ) + except asyncio.TimeoutError: + raise ResolveTimeoutError(uri) + except Exception as err: + if isinstance(err, asyncio.CancelledError): + raise + log.exception("Unexpected error resolving stream:") + raise ResolveError(f"Unexpected error resolving stream: {str(err)}") + if 'error' in resolved_result: + raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}") + if not resolved_result or uri not in resolved_result: + raise ResolveError(f"Failed to resolve stream at '{uri}'") + txo = resolved_result[uri] + if isinstance(txo, dict): + raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}") + claim = txo.claim + outpoint = f"{txo.tx_ref.id}:{txo.position}" + resolved_time = self.loop.time() - start_time + await self.storage.save_claim_from_output(self.wallet_manager.ledger, txo) + + #################### + # update or replace + #################### + + if claim.stream.source.bt_infohash: + source_manager = self.source_managers['torrent'] + existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash) + else: + source_manager = self.source_managers['stream'] + existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash) + + # resume or update an existing stream, if the stream changed: download it and delete the old one after + to_replace, updated_stream = None, None + if existing and existing[0].claim_id != txo.claim_id: + raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}") + if existing: + log.info("claim contains a metadata only update to a stream we have") + if claim.stream.source.bt_infohash: + await self.storage.save_torrent_content_claim( + existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name + ) + claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier) + existing[0].set_claim(claim_info, claim) + else: + await self.storage.save_content_claim( + existing[0].stream_hash, outpoint + ) + await source_manager._update_content_claim(existing[0]) + updated_stream = existing[0] + else: + existing_for_claim_id = self.get_filtered(claim_id=txo.claim_id) + if existing_for_claim_id: + log.info("claim contains an update to a stream we have, downloading it") + if save_file and existing_for_claim_id[0].output_file_exists: + save_file = False + await existing_for_claim_id[0].start(timeout=timeout, save_now=save_file) + if not existing_for_claim_id[0].output_file_exists and ( + save_file or file_name or download_directory): + await existing_for_claim_id[0].save_file( + file_name=file_name, download_directory=download_directory + ) + to_replace = existing_for_claim_id[0] + + # resume or update an existing stream, if the stream changed: download it and delete the old one after + if updated_stream: + log.info("already have stream for %s", uri) + if save_file and updated_stream.output_file_exists: + save_file = False + await updated_stream.start(timeout=timeout, save_now=save_file) + if not updated_stream.output_file_exists and (save_file or file_name or download_directory): + await updated_stream.save_file( + file_name=file_name, download_directory=download_directory + ) + return updated_stream + + #################### + # pay fee + #################### + + if not to_replace and txo.has_price and not txo.purchase_receipt: + payment = await self.wallet_manager.create_purchase_transaction( + wallet.accounts, txo, exchange_rate_manager + ) + + #################### + # make downloader and wait for start + #################### + + if not claim.stream.source.bt_infohash: + # fixme: this shouldnt be here + stream = ManagedStream( + self.loop, self.config, source_manager.blob_manager, claim.stream.source.sd_hash, + download_directory, file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, + analytics_manager=self.analytics_manager + ) + stream.downloader.node = source_manager.node + else: + stream = TorrentSource( + self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash, + file_name=file_name, download_directory=download_directory or self.config.download_dir, + status=ManagedStream.STATUS_RUNNING, + analytics_manager=self.analytics_manager, + torrent_session=source_manager.torrent_session + ) + log.info("starting download for %s", uri) + + before_download = self.loop.time() + await stream.start(timeout, save_file) + + #################### + # success case: delete to_replace if applicable, broadcast fee payment + #################### + + if to_replace: # delete old stream now that the replacement has started downloading + await source_manager.delete(to_replace) + + if payment is not None: + await self.wallet_manager.broadcast_or_release(payment) + payment = None # to avoid releasing in `finally` later + log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) + await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) + + source_manager.add(stream) + + if not claim.stream.source.bt_infohash: + await self.storage.save_content_claim(stream.stream_hash, outpoint) + else: + await self.storage.save_torrent_content_claim( + stream.identifier, outpoint, stream.torrent_length, stream.torrent_name + ) + claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) + stream.set_claim(claim_info, claim) + if save_file: + await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), + loop=self.loop) + return stream + except asyncio.TimeoutError: + error = DownloadDataTimeoutError(stream.sd_hash) + raise error + except Exception as err: # forgive data timeout, don't delete stream + expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, + KeyFeeAboveMaxAllowedError) + if isinstance(err, expected): + log.warning("Failed to download %s: %s", uri, str(err)) + elif isinstance(err, asyncio.CancelledError): + pass + else: + log.exception("Unexpected error downloading stream:") + error = err + raise + finally: + if payment is not None: + # payment is set to None after broadcasting, if we're here an exception probably happened + await self.wallet_manager.ledger.release_tx(payment) + if self.analytics_manager and claim and claim.stream.source.bt_infohash: + # TODO: analytics for torrents + pass + elif self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or + stream.downloader.time_to_first_bytes))): + server = self.wallet_manager.ledger.network.client.server + self.loop.create_task( + self.analytics_manager.send_time_to_first_bytes( + resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, + uri, outpoint, + None if not stream else len(stream.downloader.blob_downloader.active_connections), + None if not stream else len(stream.downloader.blob_downloader.scores), + None if not stream else len(stream.downloader.blob_downloader.connection_failures), + False if not stream else stream.downloader.added_fixed_peers, + self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay, + None if not stream else stream.sd_hash, + None if not stream else stream.downloader.time_to_descriptor, + None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, + None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, + None if not stream else stream.downloader.time_to_first_bytes, + None if not error else error.__class__.__name__, + None if not error else str(error), + None if not server else f"{server[0]}:{server[1]}" + ) + ) + + async def stream_partial_content(self, request: Request, sd_hash: str): + return await self.source_managers['stream'].stream_partial_content(request, sd_hash) + + def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: + """ + Get a list of filtered and sorted ManagedStream objects + + :param sort_by: field to sort by + :param reverse: reverse sorting + :param comparison: comparison operator used for filtering + :param search_by: fields and values to filter by + """ + return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) + + async def delete(self, source: ManagedDownloadSource, delete_file=False): + for manager in self.source_managers.values(): + await manager.delete(source, delete_file) diff --git a/lbry/file/source.py b/lbry/file/source.py new file mode 100644 index 000000000..b661eb594 --- /dev/null +++ b/lbry/file/source.py @@ -0,0 +1,161 @@ +import os +import asyncio +import typing +import logging +import binascii +from typing import Optional +from lbry.utils import generate_id +from lbry.extras.daemon.storage import StoredContentClaim + +if typing.TYPE_CHECKING: + from lbry.conf import Config + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.wallet.transaction import Transaction + from lbry.extras.daemon.storage import SQLiteStorage + +log = logging.getLogger(__name__) + + +class ManagedDownloadSource: + STATUS_RUNNING = "running" + STATUS_STOPPED = "stopped" + STATUS_FINISHED = "finished" + + SAVING_ID = 1 + STREAMING_ID = 2 + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str, + file_name: Optional[str] = None, download_directory: Optional[str] = None, + status: Optional[str] = STATUS_STOPPED, claim: Optional[StoredContentClaim] = None, + download_id: Optional[str] = None, rowid: Optional[int] = None, + content_fee: Optional['Transaction'] = None, + analytics_manager: Optional['AnalyticsManager'] = None, + added_on: Optional[int] = None): + self.loop = loop + self.storage = storage + self.config = config + self.identifier = identifier + self.download_directory = download_directory + self._file_name = file_name + self._status = status + self.stream_claim_info = claim + self.download_id = download_id or binascii.hexlify(generate_id()).decode() + self.rowid = rowid + self.content_fee = content_fee + self.purchase_receipt = None + self._added_on = added_on + self.analytics_manager = analytics_manager + + self.saving = asyncio.Event(loop=self.loop) + self.finished_writing = asyncio.Event(loop=self.loop) + self.started_writing = asyncio.Event(loop=self.loop) + self.finished_write_attempt = asyncio.Event(loop=self.loop) + + # @classmethod + # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', file_path: str, + # key: Optional[bytes] = None, + # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource': + # raise NotImplementedError() + + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): + raise NotImplementedError() + + async def stop(self, finished: bool = False): + raise NotImplementedError() + + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): + raise NotImplementedError() + + def stop_tasks(self): + raise NotImplementedError() + + def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): + self.stream_claim_info = StoredContentClaim( + f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], + claim_info['name'], claim_info['amount'], claim_info['height'], + binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], + claim_info['claim_sequence'], claim_info.get('channel_name') + ) + + # async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): + # if not claim_info: + # claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) + # self.set_claim(claim_info, claim_info['value']) + + @property + def file_name(self) -> Optional[str]: + return self._file_name + + @property + def added_on(self) -> Optional[int]: + return self._added_on + + @property + def status(self) -> str: + return self._status + + @property + def completed(self): + raise NotImplementedError() + + # @property + # def stream_url(self): + # return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash} + + @property + def finished(self) -> bool: + return self.status == self.STATUS_FINISHED + + @property + def running(self) -> bool: + return self.status == self.STATUS_RUNNING + + @property + def claim_id(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.claim_id + + @property + def txid(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.txid + + @property + def nout(self) -> Optional[int]: + return None if not self.stream_claim_info else self.stream_claim_info.nout + + @property + def outpoint(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.outpoint + + @property + def claim_height(self) -> Optional[int]: + return None if not self.stream_claim_info else self.stream_claim_info.height + + @property + def channel_claim_id(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id + + @property + def channel_name(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.channel_name + + @property + def claim_name(self) -> Optional[str]: + return None if not self.stream_claim_info else self.stream_claim_info.claim_name + + @property + def metadata(self) -> Optional[typing.Dict]: + return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict() + + @property + def metadata_protobuf(self) -> bytes: + if self.stream_claim_info: + return binascii.hexlify(self.stream_claim_info.claim.to_bytes()) + + @property + def full_path(self) -> Optional[str]: + return os.path.join(self.download_directory, os.path.basename(self.file_name)) \ + if self.file_name and self.download_directory else None + + @property + def output_file_exists(self): + return os.path.isfile(self.full_path) if self.full_path else False diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py new file mode 100644 index 000000000..87f0a17f1 --- /dev/null +++ b/lbry/file/source_manager.py @@ -0,0 +1,134 @@ +import os +import asyncio +import logging +import typing +from typing import Optional +from lbry.file.source import ManagedDownloadSource +if typing.TYPE_CHECKING: + from lbry.conf import Config + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.extras.daemon.storage import SQLiteStorage + +log = logging.getLogger(__name__) + +COMPARISON_OPERATORS = { + 'eq': lambda a, b: a == b, + 'ne': lambda a, b: a != b, + 'g': lambda a, b: a > b, + 'l': lambda a, b: a < b, + 'ge': lambda a, b: a >= b, + 'le': lambda a, b: a <= b, +} + + +class SourceManager: + filter_fields = { + 'rowid', + 'status', + 'file_name', + 'added_on', + 'claim_name', + 'claim_height', + 'claim_id', + 'outpoint', + 'txid', + 'nout', + 'channel_claim_id', + 'channel_name' + } + + source_class = ManagedDownloadSource + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', + analytics_manager: Optional['AnalyticsManager'] = None): + self.loop = loop + self.config = config + self.storage = storage + self.analytics_manager = analytics_manager + self._sources: typing.Dict[str, ManagedDownloadSource] = {} + self.started = asyncio.Event(loop=self.loop) + + def add(self, source: ManagedDownloadSource): + self._sources[source.identifier] = source + + def remove(self, source: ManagedDownloadSource): + if source.identifier not in self._sources: + return + self._sources.pop(source.identifier) + source.stop_tasks() + + async def initialize_from_database(self): + raise NotImplementedError() + + async def start(self): + await self.initialize_from_database() + self.started.set() + + def stop(self): + while self._sources: + _, source = self._sources.popitem() + source.stop_tasks() + self.started.clear() + + async def create(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource: + raise NotImplementedError() + + async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + self.remove(source) + if delete_file and source.output_file_exists: + os.remove(source.full_path) + + def get_filtered(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, + comparison: Optional[str] = None, **search_by) -> typing.List[ManagedDownloadSource]: + """ + Get a list of filtered and sorted ManagedStream objects + + :param sort_by: field to sort by + :param reverse: reverse sorting + :param comparison: comparison operator used for filtering + :param search_by: fields and values to filter by + """ + if sort_by and sort_by not in self.filter_fields: + raise ValueError(f"'{sort_by}' is not a valid field to sort by") + if comparison and comparison not in COMPARISON_OPERATORS: + raise ValueError(f"'{comparison}' is not a valid comparison") + if 'full_status' in search_by: + del search_by['full_status'] + + for search in search_by: + if search not in self.filter_fields: + raise ValueError(f"'{search}' is not a valid search operation") + + compare_sets = {} + if isinstance(search_by.get('claim_id'), list): + compare_sets['claim_ids'] = search_by.pop('claim_id') + if isinstance(search_by.get('outpoint'), list): + compare_sets['outpoints'] = search_by.pop('outpoint') + if isinstance(search_by.get('channel_claim_id'), list): + compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id') + + if search_by: + comparison = comparison or 'eq' + streams = [] + for stream in self._sources.values(): + matched = False + for set_search, val in compare_sets.items(): + if COMPARISON_OPERATORS[comparison](getattr(stream, self.filter_fields[set_search]), val): + streams.append(stream) + matched = True + break + if matched: + continue + for search, val in search_by.items(): + this_stream = getattr(stream, search) + if COMPARISON_OPERATORS[comparison](this_stream, val): + streams.append(stream) + break + else: + streams = list(self._sources.values()) + if sort_by: + streams.sort(key=lambda s: getattr(s, sort_by)) + if reverse: + streams.reverse() + return streams diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 9fe98ac54..588263b0e 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -92,8 +92,8 @@ class StreamDownloader: async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): # set up peer accumulation - if node: - self.node = node + self.node = node or self.node # fixme: this shouldnt be set here! + if self.node: if self.accumulate_task and not self.accumulate_task.done(): self.accumulate_task.cancel() _, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue) diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index c449fe232..7d87577a8 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -3,9 +3,8 @@ import asyncio import time import typing import logging -import binascii +from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable -from lbry.utils import generate_id from lbry.error import DownloadSDTimeoutError from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader @@ -13,6 +12,7 @@ from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name from lbry.stream.reflector.client import StreamReflectorClient from lbry.extras.daemon.storage import StoredContentClaim from lbry.blob import MAX_BLOB_SIZE +from lbry.file.source import ManagedDownloadSource if typing.TYPE_CHECKING: from lbry.conf import Config @@ -40,65 +40,20 @@ async def get_next_available_file_name(loop: asyncio.AbstractEventLoop, download return await loop.run_in_executor(None, _get_next_available_file_name, download_directory, file_name) -class ManagedStream: - STATUS_RUNNING = "running" - STATUS_STOPPED = "stopped" - STATUS_FINISHED = "finished" - - SAVING_ID = 1 - STREAMING_ID = 2 - - __slots__ = [ - 'loop', - 'config', - 'blob_manager', - 'sd_hash', - 'download_directory', - '_file_name', - '_added_on', - '_status', - 'stream_claim_info', - 'download_id', - 'rowid', - 'content_fee', - 'purchase_receipt', - 'downloader', - 'analytics_manager', - 'fully_reflected', - 'reflector_progress', - 'file_output_task', - 'delayed_stop_task', - 'streaming_responses', - 'streaming', - '_running', - 'saving', - 'finished_writing', - 'started_writing', - 'finished_write_attempt', - 'uploading_to_reflector' - ] - +class ManagedStream(ManagedDownloadSource): def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', - sd_hash: str, download_directory: typing.Optional[str] = None, file_name: typing.Optional[str] = None, - status: typing.Optional[str] = STATUS_STOPPED, claim: typing.Optional[StoredContentClaim] = None, - download_id: typing.Optional[str] = None, rowid: typing.Optional[int] = None, - descriptor: typing.Optional[StreamDescriptor] = None, - content_fee: typing.Optional['Transaction'] = None, - analytics_manager: typing.Optional['AnalyticsManager'] = None, - added_on: typing.Optional[int] = None): - self.loop = loop - self.config = config + sd_hash: str, download_directory: Optional[str] = None, file_name: Optional[str] = None, + status: Optional[str] = ManagedDownloadSource.STATUS_STOPPED, + claim: Optional[StoredContentClaim] = None, + download_id: Optional[str] = None, rowid: Optional[int] = None, + descriptor: Optional[StreamDescriptor] = None, + content_fee: Optional['Transaction'] = None, + analytics_manager: Optional['AnalyticsManager'] = None, + added_on: Optional[int] = None): + super().__init__(loop, config, blob_manager.storage, sd_hash, file_name, download_directory, status, claim, + download_id, rowid, content_fee, analytics_manager, added_on) self.blob_manager = blob_manager - self.sd_hash = sd_hash - self.download_directory = download_directory - self._file_name = file_name - self._status = status - self.stream_claim_info = claim - self.download_id = download_id or binascii.hexlify(generate_id()).decode() - self.rowid = rowid - self.content_fee = content_fee self.purchase_receipt = None - self._added_on = added_on self.downloader = StreamDownloader(self.loop, self.config, self.blob_manager, sd_hash, descriptor) self.analytics_manager = analytics_manager @@ -108,12 +63,13 @@ class ManagedStream: self.file_output_task: typing.Optional[asyncio.Task] = None self.delayed_stop_task: typing.Optional[asyncio.Task] = None self.streaming_responses: typing.List[typing.Tuple[Request, StreamResponse]] = [] + self.fully_reflected = asyncio.Event(loop=self.loop) self.streaming = asyncio.Event(loop=self.loop) self._running = asyncio.Event(loop=self.loop) - self.saving = asyncio.Event(loop=self.loop) - self.finished_writing = asyncio.Event(loop=self.loop) - self.started_writing = asyncio.Event(loop=self.loop) - self.finished_write_attempt = asyncio.Event(loop=self.loop) + + @property + def sd_hash(self) -> str: + return self.identifier @property def is_fully_reflected(self) -> bool: @@ -128,17 +84,9 @@ class ManagedStream: return self.descriptor.stream_hash @property - def file_name(self) -> typing.Optional[str]: + def file_name(self) -> Optional[str]: return self._file_name or (self.descriptor.suggested_file_name if self.descriptor else None) - @property - def added_on(self) -> typing.Optional[int]: - return self._added_on - - @property - def status(self) -> str: - return self._status - @property def written_bytes(self) -> int: return 0 if not self.output_file_exists else os.stat(self.full_path).st_size @@ -156,55 +104,6 @@ class ManagedStream: self._status = status await self.blob_manager.storage.change_file_status(self.stream_hash, status) - @property - def finished(self) -> bool: - return self.status == self.STATUS_FINISHED - - @property - def running(self) -> bool: - return self.status == self.STATUS_RUNNING - - @property - def claim_id(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.claim_id - - @property - def txid(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.txid - - @property - def nout(self) -> typing.Optional[int]: - return None if not self.stream_claim_info else self.stream_claim_info.nout - - @property - def outpoint(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.outpoint - - @property - def claim_height(self) -> typing.Optional[int]: - return None if not self.stream_claim_info else self.stream_claim_info.height - - @property - def channel_claim_id(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.channel_claim_id - - @property - def channel_name(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.channel_name - - @property - def claim_name(self) -> typing.Optional[str]: - return None if not self.stream_claim_info else self.stream_claim_info.claim_name - - @property - def metadata(self) -> typing.Optional[typing.Dict]: - return None if not self.stream_claim_info else self.stream_claim_info.claim.stream.to_dict() - - @property - def metadata_protobuf(self) -> bytes: - if self.stream_claim_info: - return binascii.hexlify(self.stream_claim_info.claim.to_bytes()) - @property def blobs_completed(self) -> int: return sum([1 if b.blob_hash in self.blob_manager.completed_blob_hashes else 0 @@ -218,39 +117,30 @@ class ManagedStream: def blobs_remaining(self) -> int: return self.blobs_in_stream - self.blobs_completed - @property - def full_path(self) -> typing.Optional[str]: - return os.path.join(self.download_directory, os.path.basename(self.file_name)) \ - if self.file_name and self.download_directory else None - - @property - def output_file_exists(self): - return os.path.isfile(self.full_path) if self.full_path else False - @property def mime_type(self): return guess_media_type(os.path.basename(self.descriptor.suggested_file_name))[0] - @classmethod - async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', - file_path: str, key: typing.Optional[bytes] = None, - iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedStream': - """ - Generate a stream from a file and save it to the db - """ - descriptor = await StreamDescriptor.create_stream( - loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, - blob_completed_callback=blob_manager.blob_completed - ) - await blob_manager.storage.store_stream( - blob_manager.get_blob(descriptor.sd_hash), descriptor - ) - row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), - os.path.dirname(file_path), 0) - return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), - os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) + # @classmethod + # async def create(cls, loop: asyncio.AbstractEventLoop, config: 'Config', + # file_path: str, key: Optional[bytes] = None, + # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> 'ManagedDownloadSource': + # """ + # Generate a stream from a file and save it to the db + # """ + # descriptor = await StreamDescriptor.create_stream( + # loop, blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, + # blob_completed_callback=blob_manager.blob_completed + # ) + # await blob_manager.storage.store_stream( + # blob_manager.get_blob(descriptor.sd_hash), descriptor + # ) + # row_id = await blob_manager.storage.save_published_file(descriptor.stream_hash, os.path.basename(file_path), + # os.path.dirname(file_path), 0) + # return cls(loop, config, blob_manager, descriptor.sd_hash, os.path.dirname(file_path), + # os.path.basename(file_path), status=cls.STATUS_FINISHED, rowid=row_id, descriptor=descriptor) - async def start(self, node: typing.Optional['Node'] = None, timeout: typing.Optional[float] = None, + async def start(self, timeout: Optional[float] = None, save_now: bool = False): timeout = timeout or self.config.download_timeout if self._running.is_set(): @@ -258,7 +148,7 @@ class ManagedStream: log.info("start downloader for stream (sd hash: %s)", self.sd_hash) self._running.set() try: - await asyncio.wait_for(self.downloader.start(node), timeout, loop=self.loop) + await asyncio.wait_for(self.downloader.start(), timeout, loop=self.loop) except asyncio.TimeoutError: self._running.clear() raise DownloadSDTimeoutError(self.sd_hash) @@ -268,6 +158,11 @@ class ManagedStream: self.delayed_stop_task = self.loop.create_task(self._delayed_stop()) if not await self.blob_manager.storage.file_exists(self.sd_hash): if save_now: + if not self._file_name: + self._file_name = await get_next_available_file_name( + self.loop, self.download_directory, + self._file_name or sanitize_file_name(self.descriptor.suggested_file_name) + ) file_name, download_dir = self._file_name, self.download_directory else: file_name, download_dir = None, None @@ -287,7 +182,7 @@ class ManagedStream: if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING: await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED) - async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, connection_id: int = 0)\ + async def _aiter_read_stream(self, start_blob_num: Optional[int] = 0, connection_id: int = 0)\ -> typing.AsyncIterator[typing.Tuple['BlobInfo', bytes]]: if start_blob_num >= len(self.descriptor.blobs[:-1]): raise IndexError(start_blob_num) @@ -299,7 +194,7 @@ class ManagedStream: decrypted = await self.downloader.read_blob(blob_info, connection_id) yield (blob_info, decrypted) - async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse: + async def stream_file(self, request: Request, node: Optional['Node'] = None) -> StreamResponse: log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id, self.sd_hash[:6]) headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers( @@ -391,9 +286,8 @@ class ManagedStream: self.saving.clear() self.finished_write_attempt.set() - async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None, - node: typing.Optional['Node'] = None): - await self.start(node) + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): + await self.start() if self.file_output_task and not self.file_output_task.done(): # cancel an already running save task self.file_output_task.cancel() self.download_directory = download_directory or self.download_directory or self.config.download_dir @@ -468,15 +362,7 @@ class ManagedStream: await self.blob_manager.storage.update_reflected_stream(self.sd_hash, f"{host}:{port}") return sent - def set_claim(self, claim_info: typing.Dict, claim: 'Claim'): - self.stream_claim_info = StoredContentClaim( - f"{claim_info['txid']}:{claim_info['nout']}", claim_info['claim_id'], - claim_info['name'], claim_info['amount'], claim_info['height'], - binascii.hexlify(claim.to_bytes()).decode(), claim.signing_channel_id, claim_info['address'], - claim_info['claim_sequence'], claim_info.get('channel_name') - ) - - async def update_content_claim(self, claim_info: typing.Optional[typing.Dict] = None): + async def update_content_claim(self, claim_info: Optional[typing.Dict] = None): if not claim_info: claim_info = await self.blob_manager.storage.get_content_claim(self.stream_hash) self.set_claim(claim_info, claim_info['value']) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 4fb37e99a..8df388452 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -6,93 +6,65 @@ import random import typing from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, InvalidStreamDescriptorError, DownloadSDTimeoutError, InsufficientFundsError -from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError -from lbry.utils import cache_concurrent +from lbry.error import InvalidStreamDescriptorError +from lbry.file.source_manager import SourceManager from lbry.stream.descriptor import StreamDescriptor from lbry.stream.managed_stream import ManagedStream -from lbry.schema.claim import Claim -from lbry.schema.url import URL -from lbry.wallet.dewies import dewies_to_lbc -from lbry.wallet import Output - +from lbry.file.source import ManagedDownloadSource if typing.TYPE_CHECKING: from lbry.conf import Config from lbry.blob.blob_manager import BlobManager from lbry.dht.node import Node + from lbry.wallet.wallet import WalletManager + from lbry.wallet.transaction import Transaction from lbry.extras.daemon.analytics import AnalyticsManager from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim - from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager - from lbry.wallet.transaction import Transaction - from lbry.wallet.manager import WalletManager - from lbry.wallet.wallet import Wallet log = logging.getLogger(__name__) -FILTER_FIELDS = [ - 'rowid', - 'status', - 'file_name', - 'added_on', - 'sd_hash', - 'stream_hash', - 'claim_name', - 'claim_height', - 'claim_id', - 'outpoint', - 'txid', - 'nout', - 'channel_claim_id', - 'channel_name', - 'full_status', # TODO: remove - 'blobs_remaining', - 'blobs_in_stream' -] -SET_FILTER_FIELDS = { - "claim_ids": "claim_id", - "channel_claim_ids": "channel_claim_id", - "outpoints": "outpoint" -} - -COMPARISON_OPERATORS = { - 'eq': lambda a, b: a == b, - 'ne': lambda a, b: a != b, - 'g': lambda a, b: a > b, - 'l': lambda a, b: a < b, - 'ge': lambda a, b: a >= b, - 'le': lambda a, b: a <= b, - 'in': lambda a, b: a in b -} - - -def path_or_none(path) -> Optional[str]: - if not path: +def path_or_none(encoded_path) -> Optional[str]: + if not encoded_path: return - return binascii.unhexlify(path).decode() + return binascii.unhexlify(encoded_path).decode() -class StreamManager: +class StreamManager(SourceManager): + _sources: typing.Dict[str, ManagedStream] + + filter_fields = SourceManager.filter_fields + filter_fields.update({ + 'sd_hash', + 'stream_hash', + 'full_status', # TODO: remove + 'blobs_remaining', + 'blobs_in_stream' + }) + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', blob_manager: 'BlobManager', wallet_manager: 'WalletManager', storage: 'SQLiteStorage', node: Optional['Node'], analytics_manager: Optional['AnalyticsManager'] = None): - self.loop = loop - self.config = config + super().__init__(loop, config, storage, analytics_manager) self.blob_manager = blob_manager self.wallet_manager = wallet_manager - self.storage = storage self.node = node - self.analytics_manager = analytics_manager - self.streams: typing.Dict[str, ManagedStream] = {} self.resume_saving_task: Optional[asyncio.Task] = None self.re_reflect_task: Optional[asyncio.Task] = None self.update_stream_finished_futs: typing.List[asyncio.Future] = [] self.running_reflector_uploads: typing.Dict[str, asyncio.Task] = {} self.started = asyncio.Event(loop=self.loop) + @property + def streams(self): + return self._sources + + def add(self, source: ManagedStream): + super().add(source) + self.storage.content_claim_callbacks[source.stream_hash] = lambda: self._update_content_claim(source) + async def _update_content_claim(self, stream: ManagedStream): claim_info = await self.storage.get_content_claim(stream.stream_hash) - self.streams.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) + self._sources.setdefault(stream.sd_hash, stream).set_claim(claim_info, claim_info['value']) async def recover_streams(self, file_infos: typing.List[typing.Dict]): to_restore = [] @@ -123,10 +95,10 @@ class StreamManager: # if self.blob_manager._save_blobs: # log.info("Recovered %i/%i attempted streams", len(to_restore), len(file_infos)) - async def add_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], - download_directory: Optional[str], status: str, - claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], - added_on: Optional[int], fully_reflected: bool): + async def _load_stream(self, rowid: int, sd_hash: str, file_name: Optional[str], + download_directory: Optional[str], status: str, + claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], + added_on: Optional[int], fully_reflected: Optional[bool]): try: descriptor = await self.blob_manager.get_stream_descriptor(sd_hash) except InvalidStreamDescriptorError as err: @@ -139,10 +111,9 @@ class StreamManager: ) if fully_reflected: stream.fully_reflected.set() - self.streams[sd_hash] = stream - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) + self.add(stream) - async def load_and_resume_streams_from_database(self): + async def initialize_from_database(self): to_recover = [] to_start = [] @@ -156,7 +127,6 @@ class StreamManager: to_recover.append(file_info) to_start.append(file_info) if to_recover: - log.info("Recover %i files", len(to_recover)) await self.recover_streams(to_recover) log.info("Initializing %i files", len(to_start)) @@ -167,7 +137,7 @@ class StreamManager: download_directory = path_or_none(file_info['download_directory']) if file_name and download_directory and not file_info['saved_file'] and file_info['status'] == 'running': to_resume_saving.append((file_name, download_directory, file_info['sd_hash'])) - add_stream_tasks.append(self.loop.create_task(self.add_stream( + add_stream_tasks.append(self.loop.create_task(self._load_stream( file_info['rowid'], file_info['sd_hash'], file_name, download_directory, file_info['status'], file_info['claim'], file_info['content_fee'], @@ -175,25 +145,22 @@ class StreamManager: ))) if add_stream_tasks: await asyncio.gather(*add_stream_tasks, loop=self.loop) - log.info("Started stream manager with %i files", len(self.streams)) + log.info("Started stream manager with %i files", len(self._sources)) if not self.node: log.info("no DHT node given, resuming downloads trusting that we can contact reflector") if to_resume_saving: - self.resume_saving_task = self.loop.create_task(self.resume(to_resume_saving)) - - async def resume(self, to_resume_saving): - log.info("Resuming saving %i files", len(to_resume_saving)) - await asyncio.gather( - *(self.streams[sd_hash].save_file(file_name, download_directory, node=self.node) - for (file_name, download_directory, sd_hash) in to_resume_saving), - loop=self.loop - ) + log.info("Resuming saving %i files", len(to_resume_saving)) + self.resume_saving_task = asyncio.ensure_future(asyncio.gather( + *(self._sources[sd_hash].save_file(file_name, download_directory) + for (file_name, download_directory, sd_hash) in to_resume_saving), + loop=self.loop + )) async def reflect_streams(self): while True: if self.config.reflect_streams and self.config.reflector_servers: sd_hashes = await self.storage.get_streams_to_re_reflect() - sd_hashes = [sd for sd in sd_hashes if sd in self.streams] + sd_hashes = [sd for sd in sd_hashes if sd in self._sources] batch = [] while sd_hashes: stream = self.streams[sd_hashes.pop()] @@ -209,18 +176,15 @@ class StreamManager: await asyncio.sleep(300, loop=self.loop) async def start(self): - await self.load_and_resume_streams_from_database() + await super().start() self.re_reflect_task = self.loop.create_task(self.reflect_streams()) - self.started.set() def stop(self): + super().stop() if self.resume_saving_task and not self.resume_saving_task.done(): self.resume_saving_task.cancel() if self.re_reflect_task and not self.re_reflect_task.done(): self.re_reflect_task.cancel() - while self.streams: - _, stream = self.streams.popitem() - stream.stop_tasks() while self.update_stream_finished_futs: self.update_stream_finished_futs.pop().cancel() while self.running_reflector_uploads: @@ -243,280 +207,42 @@ class StreamManager: ) return task - async def create_stream(self, file_path: str, key: Optional[bytes] = None, - iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: - stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) + async def create(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: + descriptor = await StreamDescriptor.create_stream( + self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, + blob_completed_callback=self.blob_manager.blob_completed + ) + await self.storage.store_stream( + self.blob_manager.get_blob(descriptor.sd_hash), descriptor + ) + row_id = await self.storage.save_published_file( + descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0 + ) + stream = ManagedStream( + self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path), + os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED, + rowid=row_id, descriptor=descriptor + ) self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: self.reflect_stream(stream) return stream - async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): - if stream.sd_hash in self.running_reflector_uploads: - self.running_reflector_uploads[stream.sd_hash].cancel() - stream.stop_tasks() - if stream.sd_hash in self.streams: - del self.streams[stream.sd_hash] - blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] + async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + if not isinstance(source, ManagedStream): + return + if source.identifier in self.running_reflector_uploads: + self.running_reflector_uploads[source.identifier].cancel() + source.stop_tasks() + if source.identifier in self.streams: + del self.streams[source.identifier] + blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]] await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) - await self.storage.delete_stream(stream.descriptor) - if delete_file and stream.output_file_exists: - os.remove(stream.full_path) - - def get_stream_by_stream_hash(self, stream_hash: str) -> Optional[ManagedStream]: - streams = tuple(filter(lambda stream: stream.stream_hash == stream_hash, self.streams.values())) - if streams: - return streams[0] - - def get_filtered_streams(self, sort_by: Optional[str] = None, reverse: Optional[bool] = False, - comparison: Optional[str] = None, - **search_by) -> typing.List[ManagedStream]: - """ - Get a list of filtered and sorted ManagedStream objects - - :param sort_by: field to sort by - :param reverse: reverse sorting - :param comparison: comparison operator used for filtering - :param search_by: fields and values to filter by - """ - if sort_by and sort_by not in FILTER_FIELDS: - raise ValueError(f"'{sort_by}' is not a valid field to sort by") - if comparison and comparison not in COMPARISON_OPERATORS: - raise ValueError(f"'{comparison}' is not a valid comparison") - if 'full_status' in search_by: - del search_by['full_status'] - - for search in search_by: - if search not in FILTER_FIELDS: - raise ValueError(f"'{search}' is not a valid search operation") - - compare_sets = {} - if isinstance(search_by.get('claim_id'), list): - compare_sets['claim_ids'] = search_by.pop('claim_id') - if isinstance(search_by.get('outpoint'), list): - compare_sets['outpoints'] = search_by.pop('outpoint') - if isinstance(search_by.get('channel_claim_id'), list): - compare_sets['channel_claim_ids'] = search_by.pop('channel_claim_id') - - if search_by: - comparison = comparison or 'eq' - streams = [] - for stream in self.streams.values(): - matched = False - for set_search, val in compare_sets.items(): - if COMPARISON_OPERATORS[comparison](getattr(stream, SET_FILTER_FIELDS[set_search]), val): - streams.append(stream) - matched = True - break - if matched: - continue - for search, val in search_by.items(): - this_stream = getattr(stream, search) - if COMPARISON_OPERATORS[comparison](this_stream, val): - streams.append(stream) - break - else: - streams = list(self.streams.values()) - if sort_by: - streams.sort(key=lambda s: getattr(s, sort_by)) - if reverse: - streams.reverse() - return streams - - async def _check_update_or_replace(self, outpoint: str, claim_id: str, claim: Claim - ) -> typing.Tuple[Optional[ManagedStream], Optional[ManagedStream]]: - existing = self.get_filtered_streams(outpoint=outpoint) - if existing: - return existing[0], None - existing = self.get_filtered_streams(sd_hash=claim.stream.source.sd_hash) - if existing and existing[0].claim_id != claim_id: - raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {claim_id}") - if existing: - log.info("claim contains a metadata only update to a stream we have") - await self.storage.save_content_claim( - existing[0].stream_hash, outpoint - ) - await self._update_content_claim(existing[0]) - return existing[0], None - else: - existing_for_claim_id = self.get_filtered_streams(claim_id=claim_id) - if existing_for_claim_id: - log.info("claim contains an update to a stream we have, downloading it") - return None, existing_for_claim_id[0] - return None, None - - @staticmethod - def _convert_to_old_resolve_output(wallet_manager, resolves): - result = {} - for url, txo in resolves.items(): - if isinstance(txo, Output): - tx_height = txo.tx_ref.height - best_height = wallet_manager.ledger.headers.height - result[url] = { - 'name': txo.claim_name, - 'value': txo.claim, - 'protobuf': binascii.hexlify(txo.claim.to_bytes()), - 'claim_id': txo.claim_id, - 'txid': txo.tx_ref.id, - 'nout': txo.position, - 'amount': dewies_to_lbc(txo.amount), - 'effective_amount': txo.meta.get('effective_amount', 0), - 'height': tx_height, - 'confirmations': (best_height+1) - tx_height if tx_height > 0 else tx_height, - 'claim_sequence': -1, - 'address': txo.get_address(wallet_manager.ledger), - 'valid_at_height': txo.meta.get('activation_height', None), - 'timestamp': wallet_manager.ledger.headers.estimated_timestamp(tx_height), - 'supports': [] - } - else: - result[url] = txo - return result - - @cache_concurrent - async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', - timeout: Optional[float] = None, - file_name: Optional[str] = None, - download_directory: Optional[str] = None, - save_file: Optional[bool] = None, - resolve_timeout: float = 3.0, - wallet: Optional['Wallet'] = None) -> ManagedStream: - manager = self.wallet_manager - wallet = wallet or manager.default_wallet - timeout = timeout or self.config.download_timeout - start_time = self.loop.time() - resolved_time = None - stream = None - txo: Optional[Output] = None - error = None - outpoint = None - if save_file is None: - save_file = self.config.save_files - if file_name and not save_file: - save_file = True - if save_file: - download_directory = download_directory or self.config.download_dir - else: - download_directory = None - - payment = None - try: - # resolve the claim - if not URL.parse(uri).has_stream: - raise ResolveError("cannot download a channel claim, specify a /path") - try: - response = await asyncio.wait_for( - manager.ledger.resolve(wallet.accounts, [uri], include_purchase_receipt=True), - resolve_timeout - ) - resolved_result = self._convert_to_old_resolve_output(manager, response) - except asyncio.TimeoutError: - raise ResolveTimeoutError(uri) - except Exception as err: - if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 - raise - log.exception("Unexpected error resolving stream:") - raise ResolveError(f"Unexpected error resolving stream: {str(err)}") - await self.storage.save_claims_for_resolve([ - value for value in resolved_result.values() if 'error' not in value - ]) - resolved = resolved_result.get(uri, {}) - resolved = resolved if 'value' in resolved else resolved.get('claim') - if not resolved: - raise ResolveError(f"Failed to resolve stream at '{uri}'") - if 'error' in resolved: - raise ResolveError(f"error resolving stream: {resolved['error']}") - txo = response[uri] - - claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) - outpoint = f"{resolved['txid']}:{resolved['nout']}" - resolved_time = self.loop.time() - start_time - - # resume or update an existing stream, if the stream changed: download it and delete the old one after - updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) - if updated_stream: - log.info("already have stream for %s", uri) - if save_file and updated_stream.output_file_exists: - save_file = False - await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) - if not updated_stream.output_file_exists and (save_file or file_name or download_directory): - await updated_stream.save_file( - file_name=file_name, download_directory=download_directory, node=self.node - ) - return updated_stream - - if not to_replace and txo.has_price and not txo.purchase_receipt: - payment = await manager.create_purchase_transaction( - wallet.accounts, txo, exchange_rate_manager - ) - - stream = ManagedStream( - self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, - analytics_manager=self.analytics_manager - ) - log.info("starting download for %s", uri) - - before_download = self.loop.time() - await stream.start(self.node, timeout) - stream.set_claim(resolved, claim) - if to_replace: # delete old stream now that the replacement has started downloading - await self.delete_stream(to_replace) - - if payment is not None: - await manager.broadcast_or_release(payment) - payment = None # to avoid releasing in `finally` later - log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) - await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) - - self.streams[stream.sd_hash] = stream - self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - await self.storage.save_content_claim(stream.stream_hash, outpoint) - if save_file: - await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), - loop=self.loop) - return stream - except asyncio.TimeoutError: - error = DownloadDataTimeoutError(stream.sd_hash) - raise error - except Exception as err: # forgive data timeout, don't delete stream - expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, - KeyFeeAboveMaxAllowedError) - if isinstance(err, expected): - log.warning("Failed to download %s: %s", uri, str(err)) - elif isinstance(err, asyncio.CancelledError): - pass - else: - log.exception("Unexpected error downloading stream:") - error = err - raise - finally: - if payment is not None: - # payment is set to None after broadcasting, if we're here an exception probably happened - await manager.ledger.release_tx(payment) - if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or - stream.downloader.time_to_first_bytes))): - server = self.wallet_manager.ledger.network.client.server - self.loop.create_task( - self.analytics_manager.send_time_to_first_bytes( - resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, - uri, outpoint, - None if not stream else len(stream.downloader.blob_downloader.active_connections), - None if not stream else len(stream.downloader.blob_downloader.scores), - None if not stream else len(stream.downloader.blob_downloader.connection_failures), - False if not stream else stream.downloader.added_fixed_peers, - self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay, - None if not stream else stream.sd_hash, - None if not stream else stream.downloader.time_to_descriptor, - None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, - None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, - None if not stream else stream.downloader.time_to_first_bytes, - None if not error else error.__class__.__name__, - None if not error else str(error), - None if not server else f"{server[0]}:{server[1]}" - ) - ) + await self.storage.delete_stream(source.descriptor) + if delete_file and source.output_file_exists: + os.remove(source.full_path) async def stream_partial_content(self, request: Request, sd_hash: str): - return await self.streams[sd_hash].stream_file(request, self.node) + return await self._sources[sd_hash].stream_file(request, self.node) diff --git a/lbry/torrent/__init__.py b/lbry/torrent/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py new file mode 100644 index 000000000..feff53f75 --- /dev/null +++ b/lbry/torrent/session.py @@ -0,0 +1,290 @@ +import asyncio +import binascii +import os +import logging +import random +from hashlib import sha1 +from tempfile import mkdtemp +from typing import Optional + +import libtorrent + + +NOTIFICATION_MASKS = [ + "error", + "peer", + "port_mapping", + "storage", + "tracker", + "debug", + "status", + "progress", + "ip_block", + "dht", + "stats", + "session_log", + "torrent_log", + "peer_log", + "incoming_request", + "dht_log", + "dht_operation", + "port_mapping_log", + "picker_log", + "file_progress", + "piece_progress", + "upload", + "block_progress" +] +log = logging.getLogger(__name__) + + +DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? + libtorrent.add_torrent_params_flags_t.flag_auto_managed + | libtorrent.add_torrent_params_flags_t.flag_update_subscribe +) + + +def get_notification_type(notification) -> str: + for i, notification_type in enumerate(NOTIFICATION_MASKS): + if (1 << i) & notification: + return notification_type + raise ValueError("unrecognized notification type") + + +class TorrentHandle: + def __init__(self, loop, executor, handle): + self._loop = loop + self._executor = executor + self._handle: libtorrent.torrent_handle = handle + self.started = asyncio.Event(loop=loop) + self.finished = asyncio.Event(loop=loop) + self.metadata_completed = asyncio.Event(loop=loop) + self.size = 0 + self.total_wanted_done = 0 + self.name = '' + self.tasks = [] + self.torrent_file: Optional[libtorrent.file_storage] = None + self._base_path = None + self._handle.set_sequential_download(1) + + @property + def largest_file(self) -> Optional[str]: + if not self.torrent_file: + return None + index = self.largest_file_index + return os.path.join(self._base_path, self.torrent_file.at(index).path) + + @property + def largest_file_index(self): + largest_size, index = 0, 0 + for file_num in range(self.torrent_file.num_files()): + if self.torrent_file.file_size(file_num) > largest_size: + largest_size = self.torrent_file.file_size(file_num) + index = file_num + return index + + def stop_tasks(self): + while self.tasks: + self.tasks.pop().cancel() + + def _show_status(self): + # fixme: cleanup + if not self._handle.is_valid(): + return + status = self._handle.status() + if status.has_metadata: + self.size = status.total_wanted + self.total_wanted_done = status.total_wanted_done + self.name = status.name + if not self.metadata_completed.is_set(): + self.metadata_completed.set() + log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) + self.torrent_file = self._handle.get_torrent_info().files() + self._base_path = status.save_path + first_piece = self.torrent_file.at(self.largest_file_index).offset + if not self.started.is_set(): + if self._handle.have_piece(first_piece): + self.started.set() + else: + # prioritize it + self._handle.set_piece_deadline(first_piece, 100) + if not status.is_seeding: + log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', + status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, + status.num_peers, status.num_seeds, status.state, status.save_path) + elif not self.finished.is_set(): + self.finished.set() + log.info("Torrent finished: %s", self.name) + + async def status_loop(self): + while True: + self._show_status() + if self.finished.is_set(): + break + await asyncio.sleep(0.1, loop=self._loop) + + async def pause(self): + await self._loop.run_in_executor( + self._executor, self._handle.pause + ) + + async def resume(self): + await self._loop.run_in_executor( + self._executor, lambda: self._handle.resume() # pylint: disable=unnecessary-lambda + ) + + +class TorrentSession: + def __init__(self, loop, executor): + self._loop = loop + self._executor = executor + self._session: Optional[libtorrent.session] = None + self._handles = {} + self.tasks = [] + self.wait_start = True + + async def add_fake_torrent(self): + tmpdir = mkdtemp() + info, btih = _create_fake_torrent(tmpdir) + flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode + handle = self._session.add_torrent({ + 'ti': info, 'save_path': tmpdir, 'flags': flags + }) + self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) + return btih + + async def bind(self, interface: str = '0.0.0.0', port: int = 10889): + settings = { + 'listen_interfaces': f"{interface}:{port}", + 'enable_outgoing_utp': True, + 'enable_incoming_utp': True, + 'enable_outgoing_tcp': False, + 'enable_incoming_tcp': False + } + self._session = await self._loop.run_in_executor( + self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member + ) + self.tasks.append(self._loop.create_task(self.process_alerts())) + + def stop(self): + while self.tasks: + self.tasks.pop().cancel() + self._session.save_state() + self._session.pause() + self._session.stop_dht() + self._session.stop_lsd() + self._session.stop_natpmp() + self._session.stop_upnp() + self._session = None + + def _pop_alerts(self): + for alert in self._session.pop_alerts(): + log.info("torrent alert: %s", alert) + + async def process_alerts(self): + while True: + await self._loop.run_in_executor( + self._executor, self._pop_alerts + ) + await asyncio.sleep(1, loop=self._loop) + + async def pause(self): + await self._loop.run_in_executor( + self._executor, lambda: self._session.save_state() # pylint: disable=unnecessary-lambda + ) + await self._loop.run_in_executor( + self._executor, lambda: self._session.pause() # pylint: disable=unnecessary-lambda + ) + + async def resume(self): + await self._loop.run_in_executor( + self._executor, self._session.resume + ) + + def _add_torrent(self, btih: str, download_directory: Optional[str]): + params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS} + if download_directory: + params['save_path'] = download_directory + handle = self._session.add_torrent(params) + handle.force_dht_announce() + self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) + + def full_path(self, btih): + return self._handles[btih].largest_file + + async def add_torrent(self, btih, download_path): + await self._loop.run_in_executor( + self._executor, self._add_torrent, btih, download_path + ) + self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) + await self._handles[btih].metadata_completed.wait() + if self.wait_start: + # fixme: temporary until we add streaming support, otherwise playback fails! + await self._handles[btih].started.wait() + + def remove_torrent(self, btih, remove_files=False): + if btih in self._handles: + handle = self._handles[btih] + handle.stop_tasks() + self._session.remove_torrent(handle._handle, 1 if remove_files else 0) + self._handles.pop(btih) + + async def save_file(self, btih, download_directory): + handle = self._handles[btih] + await handle.resume() + + def get_size(self, btih): + return self._handles[btih].size + + def get_name(self, btih): + return self._handles[btih].name + + def get_downloaded(self, btih): + return self._handles[btih].total_wanted_done + + def is_completed(self, btih): + return self._handles[btih].finished.is_set() + + +def get_magnet_uri(btih): + return f"magnet:?xt=urn:btih:{btih}" + + +def _create_fake_torrent(tmpdir): + # beware, that's just for testing + path = os.path.join(tmpdir, 'tmp') + with open(path, 'wb') as myfile: + size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024) + file_storage = libtorrent.file_storage() + file_storage.add_file('tmp', size) + t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024) + libtorrent.set_piece_hashes(t, tmpdir) + info = libtorrent.torrent_info(t.generate()) + btih = sha1(info.metadata()).hexdigest() + return info, btih + + +async def main(): + if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"): + os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent") + if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"): + os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso") + + btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c" + + executor = None + session = TorrentSession(asyncio.get_event_loop(), executor) + session2 = TorrentSession(asyncio.get_event_loop(), executor) + await session.bind('localhost', port=4040) + await session2.bind('localhost', port=4041) + btih = await session.add_fake_torrent() + session2._session.add_dht_node(('localhost', 4040)) + await session2.add_torrent(btih, "/tmp/down") + while True: + await asyncio.sleep(100) + await session.pause() + executor.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/lbry/torrent/torrent.py b/lbry/torrent/torrent.py new file mode 100644 index 000000000..04a8544c7 --- /dev/null +++ b/lbry/torrent/torrent.py @@ -0,0 +1,72 @@ +import asyncio +import logging +import typing + + +log = logging.getLogger(__name__) + + +class TorrentInfo: + __slots__ = ('dht_seeds', 'http_seeds', 'trackers', 'total_size') + + def __init__(self, dht_seeds: typing.Tuple[typing.Tuple[str, int]], + http_seeds: typing.Tuple[typing.Dict[str, typing.Any]], + trackers: typing.Tuple[typing.Tuple[str, int]], total_size: int): + self.dht_seeds = dht_seeds + self.http_seeds = http_seeds + self.trackers = trackers + self.total_size = total_size + + @classmethod + def from_libtorrent_info(cls, torrent_info): + return cls( + torrent_info.nodes(), tuple( + { + 'url': web_seed['url'], + 'type': web_seed['type'], + 'auth': web_seed['auth'] + } for web_seed in torrent_info.web_seeds() + ), tuple( + (tracker.url, tracker.tier) for tracker in torrent_info.trackers() + ), torrent_info.total_size() + ) + + +class Torrent: + def __init__(self, loop, handle): + self._loop = loop + self._handle = handle + self.finished = asyncio.Event(loop=loop) + + def _threaded_update_status(self): + status = self._handle.status() + if not status.is_seeding: + log.info( + '%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s', + status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, + status.num_peers, status.state + ) + elif not self.finished.is_set(): + self.finished.set() + + async def wait_for_finished(self): + while True: + await self._loop.run_in_executor( + None, self._threaded_update_status + ) + if self.finished.is_set(): + log.info("finished downloading torrent!") + await self.pause() + break + await asyncio.sleep(1, loop=self._loop) + + async def pause(self): + log.info("pause torrent") + await self._loop.run_in_executor( + None, self._handle.pause + ) + + async def resume(self): + await self._loop.run_in_executor( + None, self._handle.resume + ) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py new file mode 100644 index 000000000..cf9106731 --- /dev/null +++ b/lbry/torrent/torrent_manager.py @@ -0,0 +1,140 @@ +import asyncio +import binascii +import logging +import os +import typing +from typing import Optional +from aiohttp.web import Request +from lbry.file.source_manager import SourceManager +from lbry.file.source import ManagedDownloadSource + +if typing.TYPE_CHECKING: + from lbry.torrent.session import TorrentSession + from lbry.conf import Config + from lbry.wallet.transaction import Transaction + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim + from lbry.extras.daemon.storage import StoredContentClaim + +log = logging.getLogger(__name__) + + +def path_or_none(encoded_path) -> Optional[str]: + if not encoded_path: + return + return binascii.unhexlify(encoded_path).decode() + + +class TorrentSource(ManagedDownloadSource): + STATUS_STOPPED = "stopped" + filter_fields = SourceManager.filter_fields + filter_fields.update({ + 'bt_infohash' + }) + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str, + file_name: Optional[str] = None, download_directory: Optional[str] = None, + status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None, + download_id: Optional[str] = None, rowid: Optional[int] = None, + content_fee: Optional['Transaction'] = None, + analytics_manager: Optional['AnalyticsManager'] = None, + added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None): + super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, + rowid, content_fee, analytics_manager, added_on) + self.torrent_session = torrent_session + + @property + def full_path(self) -> Optional[str]: + full_path = self.torrent_session.full_path(self.identifier) + self.download_directory = os.path.dirname(full_path) + return full_path + + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): + await self.torrent_session.add_torrent(self.identifier, self.download_directory) + + async def stop(self, finished: bool = False): + await self.torrent_session.remove_torrent(self.identifier) + + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): + await self.torrent_session.save_file(self.identifier, download_directory) + + @property + def torrent_length(self): + return self.torrent_session.get_size(self.identifier) + + @property + def written_bytes(self): + return self.torrent_session.get_downloaded(self.identifier) + + @property + def torrent_name(self): + return self.torrent_session.get_name(self.identifier) + + @property + def bt_infohash(self): + return self.identifier + + def stop_tasks(self): + pass + + @property + def completed(self): + return self.torrent_session.is_completed(self.identifier) + + +class TorrentManager(SourceManager): + _sources: typing.Dict[str, ManagedDownloadSource] + + filter_fields = set(SourceManager.filter_fields) + filter_fields.update({ + 'bt_infohash', + 'blobs_remaining', # TODO: here they call them "parts", but its pretty much the same concept + 'blobs_in_stream' + }) + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession', + storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None): + super().__init__(loop, config, storage, analytics_manager) + self.torrent_session: 'TorrentSession' = torrent_session + + async def recover_streams(self, file_infos: typing.List[typing.Dict]): + raise NotImplementedError + + async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str], + download_directory: Optional[str], status: str, + claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], + added_on: Optional[int]): + stream = TorrentSource( + self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name, + download_directory=download_directory, status=status, claim=claim, rowid=rowid, + content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on, + torrent_session=self.torrent_session + ) + self.add(stream) + + async def initialize_from_database(self): + pass + + async def start(self): + await super().start() + + def stop(self): + super().stop() + log.info("finished stopping the torrent manager") + + async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + await super().delete(source, delete_file) + self.torrent_session.remove_torrent(source.identifier, delete_file) + + async def create(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None): + raise NotImplementedError + + async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + raise NotImplementedError + # blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]] + # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) + # await self.storage.delete_stream(source.descriptor) + + async def stream_partial_content(self, request: Request, sd_hash: str): + raise NotImplementedError diff --git a/setup.cfg b/setup.cfg index c5d268dbb..e8bc1920b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -6,7 +6,7 @@ source = lbry .tox/*/lib/python*/site-packages/lbry -[cryptography.*,coincurve.*,pbkdf2] +[cryptography.*,coincurve.*,pbkdf2, libtorrent] ignore_missing_imports = True [pylint] @@ -18,6 +18,7 @@ max-line-length=120 good-names=T,t,n,i,j,k,x,y,s,f,d,h,c,e,op,db,tx,io,cachedproperty,log,id,r,iv,ts,l valid-metaclass-classmethod-first-arg=mcs disable= + c-extension-no-member, fixme, broad-except, no-else-return, diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 1ab06d088..df46c6fab 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -2,10 +2,60 @@ import asyncio import os from binascii import hexlify +from lbry.schema import Claim from lbry.testcase import CommandTestCase +from lbry.torrent.session import TorrentSession +from lbry.wallet import Transaction class FileCommands(CommandTestCase): + async def initialize_torrent(self, tx_to_update=None): + if not hasattr(self, 'seeder_session'): + self.seeder_session = TorrentSession(self.loop, None) + self.addCleanup(self.seeder_session.stop) + await self.seeder_session.bind(port=4040) + btih = await self.seeder_session.add_fake_torrent() + address = await self.account.receiving.get_or_create_usable_address() + if not tx_to_update: + claim = Claim() + claim.stream.update(bt_infohash=btih) + tx = await Transaction.claim_create( + 'torrent', claim, 1, address, [self.account], self.account + ) + else: + claim = tx_to_update.outputs[0].claim + claim.stream.update(bt_infohash=btih) + tx = await Transaction.claim_update( + tx_to_update.outputs[0], claim, 1, address, [self.account], self.account + ) + await tx.sign([self.account]) + await self.broadcast(tx) + await self.confirm_tx(tx.id) + self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session + self.client_session._session.add_dht_node(('localhost', 4040)) + self.client_session.wait_start = False # fixme: this is super slow on tests + return tx, btih + + async def test_download_torrent(self): + tx, btih = await self.initialize_torrent() + self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + # second call, see its there and move on + self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih) + self.assertIn(btih, self.client_session._handles) + tx, new_btih = await self.initialize_torrent(tx) + self.assertNotEqual(btih, new_btih) + # claim now points to another torrent, update to it + self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) + self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertIn(new_btih, self.client_session._handles) + self.assertNotIn(btih, self.client_session._handles) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + await self.daemon.jsonrpc_file_delete(delete_all=True) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) + self.assertNotIn(new_btih, self.client_session._handles) async def create_streams_in_range(self, *args, **kwargs): self.stream_claim_ids = [] @@ -228,11 +278,11 @@ class FileCommands(CommandTestCase): await self.daemon.jsonrpc_get('lbry://foo') with open(original_path, 'wb') as handle: handle.write(b'some other stuff was there instead') - self.daemon.stream_manager.stop() - await self.daemon.stream_manager.start() + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed # check that internal state got through up to the file list API - stream = self.daemon.stream_manager.get_stream_by_stream_hash(file_info['stream_hash']) + stream = self.daemon.file_manager.get_filtered(stream_hash=file_info['stream_hash'])[0] file_info = (await self.file_list())[0] self.assertEqual(stream.file_name, file_info['file_name']) # checks if what the API shows is what he have at the very internal level. @@ -255,7 +305,7 @@ class FileCommands(CommandTestCase): resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) self.assertNotIn('error', resp) self.assertTrue(os.path.isfile(path)) - self.daemon.stream_manager.stop() + self.daemon.file_manager.stop() await asyncio.sleep(0.01, loop=self.loop) # FIXME: this sleep should not be needed self.assertFalse(os.path.isfile(path)) @@ -348,8 +398,8 @@ class FileCommands(CommandTestCase): # restart the daemon and make sure the fee is still there - self.daemon.stream_manager.stop() - await self.daemon.stream_manager.start() + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) await self.daemon.jsonrpc_file_delete(claim_name='icanpay') diff --git a/tests/integration/datanetwork/test_streaming.py b/tests/integration/datanetwork/test_streaming.py index e6d572e94..856a3c090 100644 --- a/tests/integration/datanetwork/test_streaming.py +++ b/tests/integration/datanetwork/test_streaming.py @@ -21,8 +21,8 @@ def get_random_bytes(n: int) -> bytes: class RangeRequests(CommandTestCase): async def _restart_stream_manager(self): - self.daemon.stream_manager.stop() - await self.daemon.stream_manager.start() + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() return async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: bool = False, file_size=0): diff --git a/tests/integration/other/test_cli.py b/tests/integration/other/test_cli.py index 59b629747..459d2171a 100644 --- a/tests/integration/other/test_cli.py +++ b/tests/integration/other/test_cli.py @@ -6,7 +6,7 @@ from lbry.conf import Config from lbry.extras import cli from lbry.extras.daemon.components import ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, - HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, + HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) from lbry.extras.daemon.daemon import Daemon @@ -21,7 +21,7 @@ class CLIIntegrationTest(AsyncioTestCase): conf.api = 'localhost:5299' conf.components_to_skip = ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, - HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, + HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) Daemon.component_attributes = {} @@ -34,4 +34,4 @@ class CLIIntegrationTest(AsyncioTestCase): with contextlib.redirect_stdout(actual_output): cli.main(["--api", "localhost:5299", "status"]) actual_output = actual_output.getvalue() - self.assertIn("connection_status", actual_output) \ No newline at end of file + self.assertIn("connection_status", actual_output) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index d8d2ed5a9..b4e81fed7 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -16,6 +16,7 @@ class TestComponentManager(AsyncioTestCase): [ components.DatabaseComponent, components.ExchangeRateManagerComponent, + components.TorrentComponent, components.UPnPComponent ], [ @@ -24,9 +25,9 @@ class TestComponentManager(AsyncioTestCase): components.WalletComponent ], [ + components.FileManagerComponent, components.HashAnnouncerComponent, components.PeerProtocolServerComponent, - components.StreamManagerComponent, components.WalletServerPaymentsComponent ] ] @@ -135,8 +136,8 @@ class FakeDelayedBlobManager(FakeComponent): await asyncio.sleep(1) -class FakeDelayedStreamManager(FakeComponent): - component_name = "stream_manager" +class FakeDelayedFileManager(FakeComponent): + component_name = "file_manager" depends_on = [FakeDelayedBlobManager.component_name] async def start(self): @@ -153,7 +154,7 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase): PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT], wallet=FakeDelayedWallet, - stream_manager=FakeDelayedStreamManager, + file_manager=FakeDelayedFileManager, blob_manager=FakeDelayedBlobManager ) @@ -163,17 +164,17 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase): await self.advance(0) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertFalse(self.component_manager.get_component('blob_manager').running) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) await self.advance(1) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) await self.advance(1) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) - self.assertTrue(self.component_manager.get_component('stream_manager').running) + self.assertTrue(self.component_manager.get_component('file_manager').running) async def test_proper_stopping_of_components(self): asyncio.create_task(self.component_manager.start()) @@ -182,18 +183,18 @@ class TestComponentManagerProperStart(AdvanceTimeTestCase): await self.advance(1) self.assertTrue(self.component_manager.get_component('wallet').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) - self.assertTrue(self.component_manager.get_component('stream_manager').running) + self.assertTrue(self.component_manager.get_component('file_manager').running) asyncio.create_task(self.component_manager.stop()) await self.advance(0) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) self.assertTrue(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('wallet').running) await self.advance(1) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertTrue(self.component_manager.get_component('wallet').running) await self.advance(1) - self.assertFalse(self.component_manager.get_component('stream_manager').running) + self.assertFalse(self.component_manager.get_component('file_manager').running) self.assertFalse(self.component_manager.get_component('blob_manager').running) self.assertFalse(self.component_manager.get_component('wallet').running) diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index dbdfa5157..3542c60e4 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -76,7 +76,8 @@ class TestManagedStream(BlobExchangeTestBase): return q2, self.loop.create_task(_task()) mock_node.accumulate_peers = mock_accumulate_peers or _mock_accumulate_peers - await self.stream.save_file(node=mock_node) + self.stream.downloader.node = mock_node + await self.stream.save_file() await self.stream.finished_write_attempt.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) if stop_when_done: @@ -109,7 +110,6 @@ class TestManagedStream(BlobExchangeTestBase): await self.setup_stream(2) mock_node = mock.Mock(spec=Node) - q = asyncio.Queue() bad_peer = make_kademlia_peer(b'2' * 48, "127.0.0.1", tcp_port=3334, allow_localhost=True) @@ -123,7 +123,8 @@ class TestManagedStream(BlobExchangeTestBase): mock_node.accumulate_peers = _mock_accumulate_peers - await self.stream.save_file(node=mock_node) + self.stream.downloader.node = mock_node + await self.stream.save_file() await self.stream.finished_writing.wait() self.assertTrue(os.path.isfile(self.stream.full_path)) with open(self.stream.full_path, 'rb') as f: diff --git a/tests/unit/stream/test_reflector.py b/tests/unit/stream/test_reflector.py index 4845948d1..b47cf31d0 100644 --- a/tests/unit/stream/test_reflector.py +++ b/tests/unit/stream/test_reflector.py @@ -39,7 +39,7 @@ class TestStreamAssembler(AsyncioTestCase): with open(file_path, 'wb') as f: f.write(self.cleartext) - self.stream = await self.stream_manager.create_stream(file_path) + self.stream = await self.stream_manager.create(file_path) async def _test_reflect_stream(self, response_chunk_size): reflector = ReflectorServer(self.server_blob_manager, response_chunk_size=response_chunk_size) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index e33064503..3299bcb4d 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -5,6 +5,8 @@ from unittest import mock import asyncio import json from decimal import Decimal + +from lbry.file.file_manager import FileManager from tests.unit.blob_exchange.test_transfer_blob import BlobExchangeTestBase from lbry.testcase import get_fake_exchange_rate_manager from lbry.utils import generate_id @@ -110,10 +112,7 @@ async def get_mock_wallet(sd_hash, storage, balance=10.0, fee=None): async def mock_resolve(*args, **kwargs): result = {txo.meta['permanent_url']: txo} - claims = [ - StreamManager._convert_to_old_resolve_output(manager, result)[txo.meta['permanent_url']] - ] - await storage.save_claims(claims) + await storage.save_claim_from_output(ledger, txo) return result manager.ledger.resolve = mock_resolve @@ -138,11 +137,20 @@ class TestStreamManager(BlobExchangeTestBase): ) self.sd_hash = descriptor.sd_hash self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, balance, fee) - self.stream_manager = StreamManager(self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, - self.client_storage, get_mock_node(self.server_from_client), - AnalyticsManager(self.client_config, - binascii.hexlify(generate_id()).decode(), - binascii.hexlify(generate_id()).decode())) + analytics_manager = AnalyticsManager( + self.client_config, + binascii.hexlify(generate_id()).decode(), + binascii.hexlify(generate_id()).decode() + ) + self.stream_manager = StreamManager( + self.loop, self.client_config, self.client_blob_manager, self.mock_wallet, + self.client_storage, get_mock_node(self.server_from_client), + analytics_manager + ) + self.file_manager = FileManager( + self.loop, self.client_config, self.mock_wallet, self.client_storage, analytics_manager + ) + self.file_manager.source_managers['stream'] = self.stream_manager self.exchange_rate_manager = get_fake_exchange_rate_manager() async def _test_time_to_first_bytes(self, check_post, error=None, after_setup=None): @@ -159,9 +167,9 @@ class TestStreamManager(BlobExchangeTestBase): self.stream_manager.analytics_manager._post = _check_post if error: with self.assertRaises(error): - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) else: - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await asyncio.sleep(0, loop=self.loop) self.assertTrue(checked_analytics_event) @@ -281,7 +289,7 @@ class TestStreamManager(BlobExchangeTestBase): self.stream_manager.analytics_manager._post = check_post self.assertDictEqual(self.stream_manager.streams, {}) - stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) stream_hash = stream.stream_hash self.assertDictEqual(self.stream_manager.streams, {stream.sd_hash: stream}) self.assertTrue(stream.running) @@ -302,7 +310,8 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "stopped") - await stream.save_file(node=self.stream_manager.node) + stream.downloader.node = self.stream_manager.node + await stream.save_file() await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) self.assertTrue(stream.finished) @@ -313,7 +322,7 @@ class TestStreamManager(BlobExchangeTestBase): ) self.assertEqual(stored_status, "finished") - await self.stream_manager.delete_stream(stream, True) + await self.stream_manager.delete(stream, True) self.assertDictEqual(self.stream_manager.streams, {}) self.assertFalse(os.path.isfile(os.path.join(self.client_dir, "test_file"))) stored_status = await self.client_storage.run_and_return_one_or_none( @@ -325,7 +334,7 @@ class TestStreamManager(BlobExchangeTestBase): async def _test_download_error_on_start(self, expected_error, timeout=None): error = None try: - await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager, timeout) + await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager, timeout) except Exception as err: if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8 raise @@ -401,7 +410,7 @@ class TestStreamManager(BlobExchangeTestBase): last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash'] self.server_blob_manager.delete_blob(last_blob_hash) self.client_config.blob_download_timeout = 0.1 - stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await stream.started_writing.wait() self.assertEqual('running', stream.status) self.assertIsNotNone(stream.full_path) @@ -433,7 +442,7 @@ class TestStreamManager(BlobExchangeTestBase): self.stream_manager.analytics_manager._post = check_post self.assertDictEqual(self.stream_manager.streams, {}) - stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager) + stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager) await stream.finished_writing.wait() await asyncio.sleep(0, loop=self.loop) self.stream_manager.stop() diff --git a/tox.ini b/tox.ini index 3b446a241..73d57410b 100644 --- a/tox.ini +++ b/tox.ini @@ -11,6 +11,7 @@ commands = --global-option=fetch \ --global-option=--version --global-option=3.30.1 --global-option=--all \ --global-option=build --global-option=--enable --global-option=fts5 + pip install lbry-libtorrent orchstr8 download blockchain: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.blockchain {posargs} datanetwork: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.datanetwork {posargs}