From 698ee271d6673e8d53289bde5eceecf3e2b184d6 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Mon, 27 Jan 2020 19:02:31 -0300
Subject: [PATCH] stream manager component becomes file manager component

---
 lbry/extras/daemon/components.py              |  7 ++-
 lbry/extras/daemon/daemon.py                  | 60 +++++++++----------
 .../datanetwork/test_file_commands.py         | 12 ++--
 .../integration/datanetwork/test_streaming.py |  4 +-
 tests/integration/other/test_cli.py           |  6 +-
 .../unit/components/test_component_manager.py |  2 +-
 6 files changed, 46 insertions(+), 45 deletions(-)

diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index de43eb926..ff6de3c61 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -28,6 +28,7 @@ try:
     from lbry.torrent.session import TorrentSession
 except ImportError:
     libtorrent = None
+    TorrentSession = None
 
 log = logging.getLogger(__name__)
 
@@ -39,7 +40,7 @@ WALLET_COMPONENT = "wallet"
 WALLET_SERVER_PAYMENTS_COMPONENT = "wallet_server_payments"
 DHT_COMPONENT = "dht"
 HASH_ANNOUNCER_COMPONENT = "hash_announcer"
-STREAM_MANAGER_COMPONENT = "stream_manager"
+FILE_MANAGER_COMPONENT = "file_manager"
 PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
 UPNP_COMPONENT = "upnp"
 EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
@@ -326,8 +327,8 @@ class HashAnnouncerComponent(Component):
         }
 
 
-class StreamManagerComponent(Component):
-    component_name = STREAM_MANAGER_COMPONENT
+class FileManagerComponent(Component):
+    component_name = FILE_MANAGER_COMPONENT
     depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
 
     def __init__(self, component_manager):
diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py
index 276e1693e..5432cd79f 100644
--- a/lbry/extras/daemon/daemon.py
+++ b/lbry/extras/daemon/daemon.py
@@ -40,7 +40,7 @@ from lbry.error import (
 from lbry.extras import system_info
 from lbry.extras.daemon import analytics
 from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
-from lbry.extras.daemon.components import STREAM_MANAGER_COMPONENT
+from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT
 from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT
 from lbry.extras.daemon.componentmanager import RequiredCondition
 from lbry.extras.daemon.componentmanager import ComponentManager
@@ -372,8 +372,8 @@ class Daemon(metaclass=JSONRPCServerType):
         return self.component_manager.get_component(DATABASE_COMPONENT)
 
     @property
-    def stream_manager(self) -> typing.Optional['StreamManager']:
-        return self.component_manager.get_component(STREAM_MANAGER_COMPONENT)
+    def file_manager(self) -> typing.Optional['StreamManager']:
+        return self.component_manager.get_component(FILE_MANAGER_COMPONENT)
 
     @property
     def exchange_rate_manager(self) -> typing.Optional['ExchangeRateManager']:
@@ -609,8 +609,8 @@ class Daemon(metaclass=JSONRPCServerType):
         else:
             name, claim_id = name_and_claim_id.split("/")
             uri = f"lbry://{name}#{claim_id}"
-        if not self.stream_manager.started.is_set():
-            await self.stream_manager.started.wait()
+        if not self.file_manager.started.is_set():
+            await self.file_manager.started.wait()
         stream = await self.jsonrpc_get(uri)
         if isinstance(stream, dict):
             raise web.HTTPServerError(text=stream['error'])
@@ -634,11 +634,11 @@ class Daemon(metaclass=JSONRPCServerType):
 
     async def _handle_stream_range_request(self, request: web.Request):
         sd_hash = request.path.split("/stream/")[1]
-        if not self.stream_manager.started.is_set():
-            await self.stream_manager.started.wait()
-        if sd_hash not in self.stream_manager.streams:
+        if not self.file_manager.started.is_set():
+            await self.file_manager.started.wait()
+        if sd_hash not in self.file_manager.streams:
             return web.HTTPNotFound()
-        return await self.stream_manager.stream_partial_content(request, sd_hash)
+        return await self.file_manager.stream_partial_content(request, sd_hash)
 
     async def _process_rpc_call(self, data):
         args = data.get('params', {})
@@ -1077,7 +1077,7 @@ class Daemon(metaclass=JSONRPCServerType):
         return results
 
     @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT,
-              STREAM_MANAGER_COMPONENT)
+              FILE_MANAGER_COMPONENT)
     async def jsonrpc_get(
             self, uri, file_name=None, download_directory=None, timeout=None, save_file=None, wallet_id=None):
         """
@@ -1103,7 +1103,7 @@ class Daemon(metaclass=JSONRPCServerType):
         if download_directory and not os.path.isdir(download_directory):
             return {"error": f"specified download directory \"{download_directory}\" does not exist"}
         try:
-            stream = await self.stream_manager.download_from_uri(
+            stream = await self.file_manager.download_from_uri(
                 uri, self.exchange_rate_manager, timeout, file_name, download_directory,
                 save_file=save_file, wallet=wallet
             )
@@ -1949,7 +1949,7 @@ class Daemon(metaclass=JSONRPCServerType):
     File management.
     """
 
-    @requires(STREAM_MANAGER_COMPONENT)
+    @requires(FILE_MANAGER_COMPONENT)
     async def jsonrpc_file_list(self, sort=None, reverse=False, comparison=None, wallet_id=None, page=None,
                                 page_size=None, **kwargs):
         """
@@ -1994,7 +1994,7 @@ class Daemon(metaclass=JSONRPCServerType):
         comparison = comparison or 'eq'
 
         paginated = paginate_list(
-            self.stream_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size
+            self.file_manager.get_filtered_streams(sort, reverse, comparison, **kwargs), page, page_size
         )
         if paginated['items']:
             receipts = {
@@ -2008,7 +2008,7 @@ class Daemon(metaclass=JSONRPCServerType):
                 stream.purchase_receipt = receipts.get(stream.claim_id)
         return paginated
 
-    @requires(STREAM_MANAGER_COMPONENT)
+    @requires(FILE_MANAGER_COMPONENT)
     async def jsonrpc_file_set_status(self, status, **kwargs):
         """
         Start or stop downloading a file
@@ -2032,12 +2032,12 @@ class Daemon(metaclass=JSONRPCServerType):
         if status not in ['start', 'stop']:
             raise Exception('Status must be "start" or "stop".')
 
-        streams = self.stream_manager.get_filtered_streams(**kwargs)
+        streams = self.file_manager.get_filtered_streams(**kwargs)
         if not streams:
             raise Exception(f'Unable to find a file for {kwargs}')
         stream = streams[0]
         if status == 'start' and not stream.running:
-            await stream.save_file(node=self.stream_manager.node)
+            await stream.save_file(node=self.file_manager.node)
             msg = "Resumed download"
         elif status == 'stop' and stream.running:
             await stream.stop()
@@ -2049,7 +2049,7 @@ class Daemon(metaclass=JSONRPCServerType):
             )
         return msg
 
-    @requires(STREAM_MANAGER_COMPONENT)
+    @requires(FILE_MANAGER_COMPONENT)
     async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs):
         """
         Delete a LBRY file
@@ -2081,7 +2081,7 @@ class Daemon(metaclass=JSONRPCServerType):
             (bool) true if deletion was successful
         """
 
-        streams = self.stream_manager.get_filtered_streams(**kwargs)
+        streams = self.file_manager.get_filtered_streams(**kwargs)
 
         if len(streams) > 1:
             if not delete_all:
@@ -2098,12 +2098,12 @@ class Daemon(metaclass=JSONRPCServerType):
         else:
             for stream in streams:
                 message = f"Deleted file {stream.file_name}"
-                await self.stream_manager.delete_stream(stream, delete_file=delete_from_download_dir)
+                await self.file_manager.delete_stream(stream, delete_file=delete_from_download_dir)
                 log.info(message)
             result = True
         return result
 
-    @requires(STREAM_MANAGER_COMPONENT)
+    @requires(FILE_MANAGER_COMPONENT)
     async def jsonrpc_file_save(self, file_name=None, download_directory=None, **kwargs):
         """
         Start saving a file to disk.
@@ -2130,7 +2130,7 @@ class Daemon(metaclass=JSONRPCServerType):
         Returns: {File}
         """
 
-        streams = self.stream_manager.get_filtered_streams(**kwargs)
+        streams = self.file_manager.get_filtered_streams(**kwargs)
 
         if len(streams) > 1:
             log.warning("There are %i matching files, use narrower filters to select one", len(streams))
@@ -2905,7 +2905,7 @@ class Daemon(metaclass=JSONRPCServerType):
     Create, update, abandon, list and inspect your stream claims.
     """
 
-    @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
+    @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
     async def jsonrpc_publish(self, name, **kwargs):
         """
         Create or replace a stream claim at a given name (use 'stream create/update' for more control).
@@ -3027,7 +3027,7 @@ class Daemon(metaclass=JSONRPCServerType):
             f"to update a specific stream claim."
         )
 
-    @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
+    @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
     async def jsonrpc_stream_repost(self, name, bid, claim_id, allow_duplicate_name=False, channel_id=None,
                                     channel_name=None, channel_account_id=None, account_id=None, wallet_id=None,
                                     claim_address=None, funding_account_ids=None, preview=False, blocking=False):
@@ -3099,7 +3099,7 @@ class Daemon(metaclass=JSONRPCServerType):
 
         return tx
 
-    @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
+    @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
     async def jsonrpc_stream_create(
             self, name, bid, file_path, allow_duplicate_name=False,
             channel_id=None, channel_name=None, channel_account_id=None,
@@ -3237,7 +3237,7 @@ class Daemon(metaclass=JSONRPCServerType):
 
         file_stream = None
         if not preview:
-            file_stream = await self.stream_manager.create_stream(file_path)
+            file_stream = await self.file_manager.create_stream(file_path)
             claim.stream.source.sd_hash = file_stream.sd_hash
             new_txo.script.generate()
 
@@ -3257,7 +3257,7 @@ class Daemon(metaclass=JSONRPCServerType):
 
         return tx
 
-    @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
+    @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT)
     async def jsonrpc_stream_update(
             self, claim_id, bid=None, file_path=None,
             channel_id=None, channel_name=None, channel_account_id=None, clear_channel=False,
@@ -4583,9 +4583,9 @@ class Daemon(metaclass=JSONRPCServerType):
         """
         if not blob_hash or not is_valid_blobhash(blob_hash):
             return f"Invalid blob hash to delete '{blob_hash}'"
-        streams = self.stream_manager.get_filtered_streams(sd_hash=blob_hash)
+        streams = self.file_manager.get_filtered_streams(sd_hash=blob_hash)
         if streams:
-            await self.stream_manager.delete_stream(streams[0])
+            await self.file_manager.delete_stream(streams[0])
         else:
             await self.blob_manager.delete_blobs([blob_hash])
         return "Deleted %s" % blob_hash
@@ -4758,7 +4758,7 @@ class Daemon(metaclass=JSONRPCServerType):
 
         raise NotImplementedError()
 
-    @requires(STREAM_MANAGER_COMPONENT)
+    @requires(FILE_MANAGER_COMPONENT)
     async def jsonrpc_file_reflect(self, **kwargs):
         """
         Reflect all the blobs in a file matching the filter criteria
@@ -5334,7 +5334,7 @@ class Daemon(metaclass=JSONRPCServerType):
         results = await self.ledger.resolve(accounts, urls, **kwargs)
         if self.conf.save_resolved_claims and results:
             try:
-                claims = self.stream_manager._convert_to_old_resolve_output(self.wallet_manager, results)
+                claims = self.file_manager._convert_to_old_resolve_output(self.wallet_manager, results)
                 await self.storage.save_claims_for_resolve([
                     value for value in claims.values() if 'error' not in value
                 ])
diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py
index 1ab06d088..861040f91 100644
--- a/tests/integration/datanetwork/test_file_commands.py
+++ b/tests/integration/datanetwork/test_file_commands.py
@@ -228,11 +228,11 @@ class FileCommands(CommandTestCase):
         await self.daemon.jsonrpc_get('lbry://foo')
         with open(original_path, 'wb') as handle:
             handle.write(b'some other stuff was there instead')
-        self.daemon.stream_manager.stop()
-        await self.daemon.stream_manager.start()
+        self.daemon.file_manager.stop()
+        await self.daemon.file_manager.start()
         await asyncio.wait_for(self.wait_files_to_complete(), timeout=5)  # if this hangs, file didn't get set completed
         # check that internal state got through up to the file list API
-        stream = self.daemon.stream_manager.get_stream_by_stream_hash(file_info['stream_hash'])
+        stream = self.daemon.file_manager.get_stream_by_stream_hash(file_info['stream_hash'])
         file_info = (await self.file_list())[0]
         self.assertEqual(stream.file_name, file_info['file_name'])
         # checks if what the API shows is what he have at the very internal level.
@@ -255,7 +255,7 @@ class FileCommands(CommandTestCase):
         resp = await self.out(self.daemon.jsonrpc_get('lbry://foo', timeout=2))
         self.assertNotIn('error', resp)
         self.assertTrue(os.path.isfile(path))
-        self.daemon.stream_manager.stop()
+        self.daemon.file_manager.stop()
         await asyncio.sleep(0.01, loop=self.loop)  # FIXME: this sleep should not be needed
         self.assertFalse(os.path.isfile(path))
 
@@ -348,8 +348,8 @@ class FileCommands(CommandTestCase):
 
         # restart the daemon and make sure the fee is still there
 
-        self.daemon.stream_manager.stop()
-        await self.daemon.stream_manager.start()
+        self.daemon.file_manager.stop()
+        await self.daemon.file_manager.start()
         self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)
         self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].content_fee.raw, raw_content_fee)
         await self.daemon.jsonrpc_file_delete(claim_name='icanpay')
diff --git a/tests/integration/datanetwork/test_streaming.py b/tests/integration/datanetwork/test_streaming.py
index e6d572e94..856a3c090 100644
--- a/tests/integration/datanetwork/test_streaming.py
+++ b/tests/integration/datanetwork/test_streaming.py
@@ -21,8 +21,8 @@ def get_random_bytes(n: int) -> bytes:
 
 class RangeRequests(CommandTestCase):
     async def _restart_stream_manager(self):
-        self.daemon.stream_manager.stop()
-        await self.daemon.stream_manager.start()
+        self.daemon.file_manager.stop()
+        await self.daemon.file_manager.start()
         return
 
     async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: bool = False, file_size=0):
diff --git a/tests/integration/other/test_cli.py b/tests/integration/other/test_cli.py
index 59b629747..459d2171a 100644
--- a/tests/integration/other/test_cli.py
+++ b/tests/integration/other/test_cli.py
@@ -6,7 +6,7 @@ from lbry.conf import Config
 from lbry.extras import cli
 from lbry.extras.daemon.components import (
     DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
-    HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
+    HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
     UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT
 )
 from lbry.extras.daemon.daemon import Daemon
@@ -21,7 +21,7 @@ class CLIIntegrationTest(AsyncioTestCase):
         conf.api = 'localhost:5299'
         conf.components_to_skip = (
             DATABASE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT,
-            HASH_ANNOUNCER_COMPONENT, STREAM_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
+            HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
             UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT
         )
         Daemon.component_attributes = {}
@@ -34,4 +34,4 @@ class CLIIntegrationTest(AsyncioTestCase):
         with contextlib.redirect_stdout(actual_output):
             cli.main(["--api", "localhost:5299", "status"])
         actual_output = actual_output.getvalue()
-        self.assertIn("connection_status", actual_output)
\ No newline at end of file
+        self.assertIn("connection_status", actual_output)
diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py
index d8d2ed5a9..6738c14e4 100644
--- a/tests/unit/components/test_component_manager.py
+++ b/tests/unit/components/test_component_manager.py
@@ -26,7 +26,7 @@ class TestComponentManager(AsyncioTestCase):
             [
                 components.HashAnnouncerComponent,
                 components.PeerProtocolServerComponent,
-                components.StreamManagerComponent,
+                components.FileManagerComponent,
                 components.WalletServerPaymentsComponent
             ]
         ]