diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index aaba65fb7..cf1e126f5 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -28,6 +28,7 @@ try: from lbry.torrent.session import TorrentSession except ImportError: libtorrent = None + TorrentSession = None log = logging.getLogger(__name__) @@ -39,7 +40,7 @@ WALLET_COMPONENT = "wallet" WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments" DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" -STREAM_MANAGER_COMPONENT = "stream_manager" +FILE_MANAGER_COMPONENT = "file_manager" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" @@ -326,8 +327,8 @@ class HashAnnouncerComponent(Component): } -class StreamManagerComponent(Component): - component_name = STREAM_MANAGER_COMPONENT +class FileManagerComponent(Component): + component_name = FILE_MANAGER_COMPONENT depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] def __init__(self, component_manager): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 223bcb2d4..83299af91 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -38,7 +38,7 @@ from lbry.error import ( from lbry.extras import system_info from lbry.extras.daemon import analytics from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT -from lbry.extras.daemon.components import STREAM_MANAGER_COMPONENT +from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT from lbry.extras.daemon.componentmanager import RequiredCondition from lbry.extras.daemon.componentmanager import ComponentManager @@ -341,8 +341,8 @@ class Daemon(metaclass=JSONRPCServerType): return self.component_manager.get_component(DATABASE_COMPONENT) @property - def stream_manager(self) -> typing.Optional['StreamManager']: - return self.component_manager.get_component(STREAM_MANAGER_COMPONENT) + def file_manager(self) -> typing.Optional['StreamManager']: + return self.component_manager.get_component(FILE_MANAGER_COMPONENT) @property def exchange_rate_manager(self) -> typing.Optional['ExchangeRateManager']: @@ -568,8 +568,8 @@ class Daemon(metaclass=JSONRPCServerType): else: name, claim_id = name_and_claim_id.split("/") uri = f"lbry://{name}#{claim_id}" - if not self.stream_manager.started.is_set(): - await self.stream_manager.started.wait() + if not self.file_manager.started.is_set(): + await self.file_manager.started.wait() stream = await self.jsonrpc_get(uri) if isinstance(stream, dict): raise web.HTTPServerError(text=stream['error']) @@ -593,11 +593,11 @@ class Daemon(metaclass=JSONRPCServerType): async def _handle_stream_range_request(self, request: web.Request): sd_hash = request.path.split("/stream/")[1] - if not self.stream_manager.started.is_set(): - await self.stream_manager.started.wait() - if sd_hash not in self.stream_manager.streams: + if not self.file_manager.started.is_set(): + await self.file_manager.started.wait() + if sd_hash not in self.file_manager.streams: return web.HTTPNotFound() - return await self.stream_manager.stream_partial_content(request, sd_hash) + return await self.file_manager.stream_partial_content(request, sd_hash) async def _process_rpc_call(self, data): args = data.get('params', {}) @@ -1010,7 +1010,7 @@ class Daemon(metaclass=JSONRPCServerType): return results @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, - STREAM_MANAGER_COMPONENT) + FILE_MANAGER_COMPONENT) async def jsonrpc_get( self, uri, file_name=None, download_directory=None, timeout=None, save_file=None, wallet_id=None): """ @@ -1036,7 +1036,7 @@ class Daemon(metaclass=JSONRPCServerType): if download_directory and not os.path.isdir(download_directory): return {"error": f"specified download directory \"{download_directory}\" does not exist"} try: - stream = await self.stream_manager.download_from_uri( + stream = await self.file_manager.download_from_uri( uri, self.exchange_rate_manager, timeout, file_name, download_directory, save_file=save_file, wallet=wallet ) @@ -1876,7 +1876,7 @@ class Daemon(metaclass=JSONRPCServerType): File management. """ - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_list( self, sort=None, reverse=False, comparison=None, wallet_id=None, page=None, page_size=None, **kwargs): @@ -1921,7 +1921,7 @@ class Daemon(metaclass=JSONRPCServerType): sort = sort or 'rowid' comparison = comparison or 'eq' paginated = paginate_list( - self.stream_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size + self.file_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size ) if paginated['items']: receipts = { @@ -1935,7 +1935,7 @@ class Daemon(metaclass=JSONRPCServerType): stream.purchase_receipt = receipts.get(stream.claim_id) return paginated - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_set_status(self, status, **kwargs): """ Start or stop downloading a file @@ -1959,12 +1959,12 @@ class Daemon(metaclass=JSONRPCServerType): if status not in ['start', 'stop']: raise Exception('Status must be "start" or "stop".') - streams = self.stream_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered_streams(**kwargs) if not streams: raise Exception(f'Unable to find a file for {kwargs}') stream = streams[0] if status == 'start' and not stream.running: - await stream.save_file(node=self.stream_manager.node) + await stream.save_file(node=self.file_manager.node) msg = "Resumed download" elif status == 'stop' and stream.running: await stream.stop() @@ -1976,7 +1976,7 @@ class Daemon(metaclass=JSONRPCServerType): ) return msg - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): """ Delete a LBRY file @@ -2008,7 +2008,7 @@ class Daemon(metaclass=JSONRPCServerType): (bool) true if deletion was successful """ - streams = self.stream_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered_streams(**kwargs) if len(streams) > 1: if not delete_all: @@ -2025,12 +2025,12 @@ class Daemon(metaclass=JSONRPCServerType): else: for stream in streams: message = f"Deleted file {stream.file_name}" - await self.stream_manager.delete_stream(stream, delete_file=delete_from_download_dir) + await self.file_manager.delete_stream(stream, delete_file=delete_from_download_dir) log.info(message) result = True return result - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_save(self, file_name=None, download_directory=None, **kwargs): """ Start saving a file to disk. @@ -2057,7 +2057,7 @@ class Daemon(metaclass=JSONRPCServerType): Returns: {File} """ - streams = self.stream_manager.get_filtered_streams(**kwargs) + streams = self.file_manager.get_filtered_streams(**kwargs) if len(streams) > 1: log.warning("There are %i matching files, use narrower filters to select one", len(streams)) @@ -2817,7 +2817,7 @@ class Daemon(metaclass=JSONRPCServerType): Create, update, abandon, list and inspect your stream claims. """ - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_publish(self, name, **kwargs): """ Create or replace a stream claim at a given name (use 'stream create/update' for more control). @@ -2939,7 +2939,7 @@ class Daemon(metaclass=JSONRPCServerType): f"to update a specific stream claim." ) - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_stream_repost(self, name, bid, claim_id, allow_duplicate_name=False, channel_id=None, channel_name=None, channel_account_id=None, account_id=None, wallet_id=None, claim_address=None, funding_account_ids=None, preview=False, blocking=False): @@ -3011,7 +3011,7 @@ class Daemon(metaclass=JSONRPCServerType): return tx - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_stream_create( self, name, bid, file_path, allow_duplicate_name=False, channel_id=None, channel_name=None, channel_account_id=None, @@ -3147,7 +3147,7 @@ class Daemon(metaclass=JSONRPCServerType): file_stream = None if not preview: - file_stream = await self.stream_manager.create_stream(file_path) + file_stream = await self.file_manager.create_stream(file_path) claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() @@ -3167,7 +3167,7 @@ class Daemon(metaclass=JSONRPCServerType): return tx - @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) + @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_stream_update( self, claim_id, bid=None, file_path=None, channel_id=None, channel_name=None, channel_account_id=None, clear_channel=False, @@ -4249,9 +4249,9 @@ class Daemon(metaclass=JSONRPCServerType): """ if not blob_hash or not is_valid_blobhash(blob_hash): return f"Invalid blob hash to delete '{blob_hash}'" - streams = self.stream_manager.get_filtered_streams(sd_hash=blob_hash) + streams = self.file_manager.get_filtered_streams(sd_hash=blob_hash) if streams: - await self.stream_manager.delete_stream(streams[0]) + await self.file_manager.delete_stream(streams[0]) else: await self.blob_manager.delete_blobs([blob_hash]) return "Deleted %s" % blob_hash @@ -4424,7 +4424,7 @@ class Daemon(metaclass=JSONRPCServerType): raise NotImplementedError() - @requires(STREAM_MANAGER_COMPONENT) + @requires(FILE_MANAGER_COMPONENT) async def jsonrpc_file_reflect(self, **kwargs): """ Reflect all the blobs in a file matching the filter criteria @@ -4919,7 +4919,7 @@ class Daemon(metaclass=JSONRPCServerType): results = await self.ledger.resolve(accounts, urls) if self.conf.save_resolved_claims and results: try: - claims = self.stream_manager._convert_to_old_resolve_output(self.wallet_manager, results) + claims = self.file_manager._convert_to_old_resolve_output(self.wallet_manager, results) await self.storage.save_claims_for_resolve([ value for value in claims.values() if 'error' not in value ]) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index cfd14ef0a..128415df4 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -223,11 +223,11 @@ class FileCommands(CommandTestCase): await self.daemon.jsonrpc_get('lbry://foo') with open(original_path, 'wb') as handle: handle.write(b'some other stuff was there instead') - self.daemon.stream_manager.stop() - await self.daemon.stream_manager.start() + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() await asyncio.wait_for(self.wait_files_to_complete(), timeout=5) # if this hangs, file didn't get set completed # check that internal state got through up to the file list API - stream = self.daemon.stream_manager.get_stream_by_stream_hash(file_info['stream_hash']) + stream = self.daemon.file_manager.get_stream_by_stream_hash(file_info['stream_hash']) file_info = (await self.file_list())[0] self.assertEqual(stream.file_name, file_info['file_name']) # checks if what the API shows is what he have at the very internal level. @@ -250,7 +250,7 @@ class FileCommands(CommandTestCase): resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2)) self.assertNotIn('error', resp) self.assertTrue(os.path.isfile(path)) - self.daemon.stream_manager.stop() + self.daemon.file_manager.stop() await asyncio.sleep(0.01, loop=self.loop) # FIXME: this sleep should not be needed self.assertFalse(os.path.isfile(path)) @@ -343,8 +343,8 @@ class FileCommands(CommandTestCase): # restart the daemon and make sure the fee is still there - self.daemon.stream_manager.stop() - await self.daemon.stream_manager.start() + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee) await self.daemon.jsonrpc_file_delete(claim_name='icanpay') diff --git a/tests/integration/datanetwork/test_streaming.py b/tests/integration/datanetwork/test_streaming.py index e6d572e94..856a3c090 100644 --- a/tests/integration/datanetwork/test_streaming.py +++ b/tests/integration/datanetwork/test_streaming.py @@ -21,8 +21,8 @@ def get_random_bytes(n: int) -> bytes: class RangeRequests(CommandTestCase): async def _restart_stream_manager(self): - self.daemon.stream_manager.stop() - await self.daemon.stream_manager.start() + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() return async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: bool = False, file_size=0): diff --git a/tests/integration/other/test_cli.py b/tests/integration/other/test_cli.py index 59b629747..459d2171a 100644 --- a/tests/integration/other/test_cli.py +++ b/tests/integration/other/test_cli.py @@ -6,7 +6,7 @@ from lbry.conf import Config from lbry.extras import cli from lbry.extras.daemon.components import ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, - HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, + HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) from lbry.extras.daemon.daemon import Daemon @@ -21,7 +21,7 @@ class CLIIntegrationTest(AsyncioTestCase): conf.api = 'localhost:5299' conf.components_to_skip = ( DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, - HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, + HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT ) Daemon.component_attributes = {} @@ -34,4 +34,4 @@ class CLIIntegrationTest(AsyncioTestCase): with contextlib.redirect_stdout(actual_output): cli.main(["--api", "localhost:5299", "status"]) actual_output = actual_output.getvalue() - self.assertIn("connection_status", actual_output) \ No newline at end of file + self.assertIn("connection_status", actual_output) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index d8d2ed5a9..6738c14e4 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -26,7 +26,7 @@ class TestComponentManager(AsyncioTestCase): [ components.HashAnnouncerComponent, components.PeerProtocolServerComponent, - components.StreamManagerComponent, + components.FileManagerComponent, components.WalletServerPaymentsComponent ] ]