From 507db5f79a25c0bcfb4e505f9d840701ed425297 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor1984@riseup.net>
Date: Wed, 5 Feb 2020 12:29:26 -0300
Subject: [PATCH] torrent manager and torrent source

---
 lbry/extras/daemon/components.py |  10 ++-
 lbry/file/file_manager.py        |  13 +++-
 lbry/torrent/session.py          |  26 ++++++--
 lbry/torrent/torrent_manager.py  | 111 +++++++++++++++++++++++++++++++
 4 files changed, 151 insertions(+), 9 deletions(-)
 create mode 100644 lbry/torrent/torrent_manager.py

diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py
index 549c678f3..c1e41ab5e 100644
--- a/lbry/extras/daemon/components.py
+++ b/lbry/extras/daemon/components.py
@@ -21,6 +21,7 @@ from lbry.file.file_manager import FileManager
 from lbry.extras.daemon.component import Component
 from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
 from lbry.extras.daemon.storage import SQLiteStorage
+from lbry.torrent.torrent_manager import TorrentManager
 from lbry.wallet import WalletManager
 from lbry.wallet.usage_payment import WalletServerPayer
 try:
@@ -327,7 +328,7 @@ class HashAnnouncerComponent(Component):
 
 class FileManagerComponent(Component):
     component_name = FILE_MANAGER_COMPONENT
-    depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
+    depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, LIBTORRENT_COMPONENT]
 
     def __init__(self, component_manager):
         super().__init__(component_manager)
@@ -350,14 +351,19 @@ class FileManagerComponent(Component):
         wallet = self.component_manager.get_component(WALLET_COMPONENT)
         node = self.component_manager.get_component(DHT_COMPONENT) \
             if self.component_manager.has_component(DHT_COMPONENT) else None
+        torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT) if TorrentSession else None
         log.info('Starting the file manager')
         loop = asyncio.get_event_loop()
         self.file_manager = FileManager(
             loop, self.conf, wallet, storage, self.component_manager.analytics_manager
         )
         self.file_manager.source_managers['stream'] = StreamManager(
-            loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager
+            loop, self.conf, blob_manager, wallet, storage, node,
         )
+        if TorrentSession:
+            self.file_manager.source_managers['torrent'] = TorrentManager(
+                loop, self.conf, torrent, storage, self.component_manager.analytics_manager
+            )
         await self.file_manager.start()
         log.info('Done setting up file manager')
 
diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py
index 765cd1b53..50b21e9b2 100644
--- a/lbry/file/file_manager.py
+++ b/lbry/file/file_manager.py
@@ -6,6 +6,7 @@ from aiohttp.web import Request
 from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
 from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
 from lbry.stream.managed_stream import ManagedStream
+from lbry.torrent.torrent_manager import TorrentSource
 from lbry.utils import cache_concurrent
 from lbry.schema.url import URL
 from lbry.wallet.dewies import dewies_to_lbc
@@ -110,11 +111,12 @@ class FileManager:
 
             if claim.stream.source.bt_infohash:
                 source_manager = self.source_managers['torrent']
+                existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash)
             else:
                 source_manager = self.source_managers['stream']
+                existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash)
 
             # resume or update an existing stream, if the stream changed: download it and delete the old one after
-            existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash)
             to_replace, updated_stream = None, None
             if existing and existing[0].claim_id != txo.claim_id:
                 raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
@@ -151,7 +153,6 @@ class FileManager:
                     )
                 return updated_stream
 
-
             ####################
             # pay fee
             ####################
@@ -174,7 +175,13 @@ class FileManager:
                 )
                 stream.downloader.node = source_manager.node
             else:
-                stream = None
+                stream = TorrentSource(
+                    self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
+                    file_name=file_name, download_directory=download_directory or self.config.download_dir,
+                    status=ManagedStream.STATUS_RUNNING,
+                    claim=claim, analytics_manager=self.analytics_manager,
+                    torrent_session=source_manager.torrent_session
+                )
             log.info("starting download for %s", uri)
 
             before_download = self.loop.time()
diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py
index 294a2cba5..0a33c0bf6 100644
--- a/lbry/torrent/session.py
+++ b/lbry/torrent/session.py
@@ -1,5 +1,7 @@
 import asyncio
 import binascii
+from typing import Optional
+
 import libtorrent
 
 
@@ -30,6 +32,15 @@ NOTIFICATION_MASKS = [
 ]
 
 
+DEFAULT_FLAGS = (  # fixme: somehow the logic here is inverted?
+        libtorrent.add_torrent_params_flags_t.flag_paused
+        | libtorrent.add_torrent_params_flags_t.flag_auto_managed
+        | libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error
+        | libtorrent.add_torrent_params_flags_t.flag_upload_mode
+        | libtorrent.add_torrent_params_flags_t.flag_update_subscribe
+)
+
+
 def get_notification_type(notification) -> str:
     for i, notification_type in enumerate(NOTIFICATION_MASKS):
         if (1 << i) & notification:
@@ -123,10 +134,11 @@ class TorrentSession:
             self._executor, self._session.resume
         )
 
-    def _add_torrent(self, btih: str, download_directory: str):
-        self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(
-            {'info_hash': binascii.unhexlify(btih.encode()), 'save_path': download_directory}
-        ))
+    def _add_torrent(self, btih: str, download_directory: Optional[str]):
+        params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
+        if download_directory:
+            params['save_path'] = download_directory
+        self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params))
 
     async def add_torrent(self, btih, download_path):
         await self._loop.run_in_executor(
@@ -135,6 +147,12 @@ class TorrentSession:
         self._loop.create_task(self._handles[btih].status_loop())
         await self._handles[btih].finished.wait()
 
+    async def remove_torrent(self, btih, remove_files=False):
+        if btih in self._handles:
+            handle = self._handles[btih]
+            self._session.remove_torrent(handle, 1 if remove_files else 0)
+            self._handles.pop(btih)
+
 
 def get_magnet_uri(btih):
     return f"magnet:?xt=urn:btih:{btih}"
diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py
new file mode 100644
index 000000000..02f6b7cf9
--- /dev/null
+++ b/lbry/torrent/torrent_manager.py
@@ -0,0 +1,111 @@
+import asyncio
+import binascii
+import logging
+import typing
+from typing import Optional
+from aiohttp.web import Request
+from lbry.file.source_manager import SourceManager
+from lbry.file.source import ManagedDownloadSource
+
+if typing.TYPE_CHECKING:
+    from lbry.torrent.session import TorrentSession
+    from lbry.conf import Config
+    from lbry.wallet.transaction import Transaction
+    from lbry.extras.daemon.analytics import AnalyticsManager
+    from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
+    from lbry.extras.daemon.storage import StoredContentClaim
+
+log = logging.getLogger(__name__)
+
+
+def path_or_none(encoded_path) -> Optional[str]:
+    if not encoded_path:
+        return
+    return binascii.unhexlify(encoded_path).decode()
+
+
+class TorrentSource(ManagedDownloadSource):
+    STATUS_STOPPED = "stopped"
+
+    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
+                 file_name: Optional[str] = None, download_directory: Optional[str] = None,
+                 status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None,
+                 download_id: Optional[str] = None, rowid: Optional[int] = None,
+                 content_fee: Optional['Transaction'] = None,
+                 analytics_manager: Optional['AnalyticsManager'] = None,
+                 added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None):
+        super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
+                         rowid, content_fee, analytics_manager, added_on)
+        self.torrent_session = torrent_session
+
+    async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
+        await self.torrent_session.add_torrent(self.identifier, self.download_directory)
+
+    async def stop(self, finished: bool = False):
+        await self.torrent_session.remove_torrent(self.identifier)
+
+    async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
+        raise NotImplementedError()
+
+    def stop_tasks(self):
+        raise NotImplementedError()
+
+    @property
+    def completed(self):
+        raise NotImplementedError()
+
+class TorrentManager(SourceManager):
+    _sources: typing.Dict[str, ManagedDownloadSource]
+
+    filter_fields = set(SourceManager.filter_fields)
+    filter_fields.update({
+        'bt_infohash',
+        'blobs_remaining',  # TODO: here they call them "parts", but its pretty much the same concept
+        'blobs_in_stream'
+    })
+
+    def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession',
+                 storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
+        super().__init__(loop, config, storage, analytics_manager)
+        self.torrent_session: 'TorrentSession' = torrent_session
+
+    def add(self, source: ManagedDownloadSource):
+        super().add(source)
+
+    async def recover_streams(self, file_infos: typing.List[typing.Dict]):
+        raise NotImplementedError
+
+    async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
+                           download_directory: Optional[str], status: str,
+                           claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
+                           added_on: Optional[int]):
+        stream = TorrentSource(
+            self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
+            download_directory=download_directory, status=status, claim=claim, rowid=rowid,
+            content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on,
+            torrent_session=self.torrent_session
+        )
+        self.add(stream)
+
+    async def initialize_from_database(self):
+        pass
+
+    async def start(self):
+        await super().start()
+
+    def stop(self):
+        super().stop()
+        log.info("finished stopping the torrent manager")
+
+    async def create(self, file_path: str, key: Optional[bytes] = None,
+                     iv_generator: Optional[typing.Generator[bytes, None, None]] = None):
+        raise NotImplementedError
+
+    async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
+        raise NotImplementedError
+        # blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
+        # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
+        # await self.storage.delete_stream(source.descriptor)
+
+    async def stream_partial_content(self, request: Request, sd_hash: str):
+        raise NotImplementedError