From 64aad14ba6e26e9d4e44de873dda7299d1f4c782 Mon Sep 17 00:00:00 2001
From: Victor Shyba <victor.shyba@gmail.com>
Date: Sat, 5 Nov 2022 00:55:38 -0300
Subject: [PATCH] pick file from file name, fallback to largest

---
 lbry/torrent/session.py         | 77 ++++++++++++++++++---------------
 lbry/torrent/torrent_manager.py | 46 +++++++++++++++-----
 2 files changed, 77 insertions(+), 46 deletions(-)

diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py
index a8308d449..abdbff27b 100644
--- a/lbry/torrent/session.py
+++ b/lbry/torrent/session.py
@@ -23,7 +23,6 @@ class TorrentHandle:
         self._loop = loop
         self._executor = executor
         self._handle: libtorrent.torrent_handle = handle
-        self.started = asyncio.Event(loop=loop)
         self.finished = asyncio.Event(loop=loop)
         self.metadata_completed = asyncio.Event(loop=loop)
         self.size = handle.status().total_wanted
@@ -37,12 +36,14 @@ class TorrentHandle:
     def torrent_file(self) -> Optional[libtorrent.file_storage]:
         return self._torrent_info.files()
 
-    @property
-    def largest_file(self) -> Optional[str]:
+    def full_path_at(self, file_num) -> Optional[str]:
         if self.torrent_file is None:
             return None
-        index = self.largest_file_index
-        return os.path.join(self.save_path, self.torrent_file.file_path(index))
+        return os.path.join(self.save_path, self.torrent_file.file_path(file_num))
+
+    def size_at(self, file_num) -> Optional[int]:
+        if self.torrent_file is not None:
+            return self.torrent_file.file_size(file_num)
 
     @property
     def save_path(self) -> Optional[str]:
@@ -50,16 +51,12 @@ class TorrentHandle:
             self._base_path = self._handle.status().save_path
         return self._base_path
 
-    @property
-    def largest_file_index(self):
-        largest_size, index = 0, 0
+    def index_from_name(self, file_name):
         for file_num in range(self.torrent_file.num_files()):
             if '.pad' in self.torrent_file.file_path(file_num):
                 continue  # ignore padding files
-            if self.torrent_file.file_size(file_num) > largest_size:
-                largest_size = self.torrent_file.file_size(file_num)
-                index = file_num
-        return index
+            if file_name == os.path.basename(self.full_path_at(file_num)):
+                return file_num
 
     def stop_tasks(self):
         self._handle.save_resume_data()
@@ -72,14 +69,16 @@ class TorrentHandle:
         end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
         return start_piece, end_piece
 
-    async def stream_range_as_completed(self, file_index, start, end):
+    async def stream_range_as_completed(self, file_name, start, end):
+        file_index = self.index_from_name(file_name)
+        if file_index is None:
+            raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
         first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
         start_piece_offset = first_piece.start
         piece_size = self._torrent_info.piece_length()
         log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
                  first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
         self.prioritize(file_index, start, end)
-        await self.resume()
         for piece_index in range(first_piece.piece, final_piece.piece + 1):
             while not self._handle.have_piece(piece_index):
                 log.info("Waiting for piece %d: %s", piece_index, self.name)
@@ -102,13 +101,6 @@ class TorrentHandle:
                 self.metadata_completed.set()
                 self._torrent_info = self._handle.torrent_file()
                 log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
-                # prioritize first 2mb
-                self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024)
-            first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index)
-            if not self.started.is_set():
-                if self._handle.have_piece(first_piece):
-                    log.debug("Got first piece, set started - %s", self.name)
-                    self.started.set()
         log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
                   status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
                   status.num_peers, status.num_seeds, status.state, status.save_path)
@@ -150,9 +142,11 @@ class TorrentSession:
         self._loop = loop
         self._executor = executor
         self._session: Optional[libtorrent.session] = None
-        self._handles = {}
+        self._handles: Dict[str, TorrentHandle] = {}
         self.tasks = []
-        self.wait_start = True
+
+    def add_peer(self, btih, addr, port):
+        self._handles[btih]._handle.connect_peer((addr, port))
 
     async def add_fake_torrent(self, file_count=3):
         tmpdir = mkdtemp()
@@ -180,9 +174,10 @@ class TorrentSession:
             self._handles.popitem()[1].stop_tasks()
         while self.tasks:
             self.tasks.pop().cancel()
-        self._session.save_state()
-        self._session.pause()
-        self._session = None
+        if self._session:
+            self._session.save_state()
+            self._session.pause()
+            self._session = None
 
     def _pop_alerts(self):
         for alert in self._session.pop_alerts():
@@ -216,21 +211,23 @@ class TorrentSession:
         handle.force_dht_announce()
         self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
 
-    def full_path(self, btih):
-        return self._handles[btih].largest_file
+    def full_path(self, btih, file_num) -> Optional[str]:
+        return self._handles[btih].full_path_at(file_num)
 
     def save_path(self, btih):
         return self._handles[btih].save_path
 
+    def has_torrent(self, btih):
+        return btih in self._handles
+
     async def add_torrent(self, btih, download_path):
+        if btih in self._handles:
+            return await self._handles[btih].metadata_completed.wait()
         await self._loop.run_in_executor(
             self._executor, self._add_torrent, btih, download_path
         )
         self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
         await self._handles[btih].metadata_completed.wait()
-        if self.wait_start:
-            # fixme: temporary until we add streaming support, otherwise playback fails!
-            await self._handles[btih].started.wait()
 
     def remove_torrent(self, btih, remove_files=False):
         if btih in self._handles:
@@ -243,9 +240,17 @@ class TorrentSession:
         handle = self._handles[btih]
         await handle.resume()
 
-    def get_size(self, btih):
+    def get_total_size(self, btih):
         return self._handles[btih].size
 
+    def get_index_from_name(self, btih, file_name):
+        return self._handles[btih].index_from_name(file_name)
+
+    def get_size(self, btih, file_name) -> Optional[int]:
+        for (path, size) in self.get_files(btih).items():
+            if os.path.basename(path) == file_name:
+                return size
+
     def get_name(self, btih):
         return self._handles[btih].name
 
@@ -255,14 +260,14 @@ class TorrentSession:
     def is_completed(self, btih):
         return self._handles[btih].finished.is_set()
 
-    def stream_largest_file(self, btih, start, end):
+    def stream_file(self, btih, file_name, start, end):
         handle = self._handles[btih]
-        return handle.stream_range_as_completed(handle.largest_file_index, start, end)
+        return handle.stream_range_as_completed(file_name, start, end)
 
     def get_files(self, btih) -> Dict:
         handle = self._handles[btih]
         return {
-            handle.torrent_file.file_path(file_num): handle.torrent_file.file_size(file_num)
+            self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
             for file_num in range(handle.torrent_file.num_files())
             if '.pad' not in handle.torrent_file.file_path(file_num)
         }
@@ -302,7 +307,7 @@ async def main():
     await session.bind()
     await session.add_torrent(btih, os.path.expanduser("~/Downloads"))
     while True:
-        session.full_path(btih)
+        session.full_path(btih, 0)
         await asyncio.sleep(1)
     await session.pause()
     executor.shutdown()
diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py
index a6e9c8941..b2b4b3f98 100644
--- a/lbry/torrent/torrent_manager.py
+++ b/lbry/torrent/torrent_manager.py
@@ -39,12 +39,33 @@ class TorrentSource(ManagedDownloadSource):
         super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
                          rowid, content_fee, analytics_manager, added_on)
         self.torrent_session = torrent_session
+        self._suggested_file_name = None
+        self._full_path = None
 
     @property
     def full_path(self) -> Optional[str]:
-        full_path = self.torrent_session.full_path(self.identifier)
+        if not self._full_path:
+            self._full_path = self.select_path()
+            self._file_name = os.path.basename(self._full_path)
         self.download_directory = self.torrent_session.save_path(self.identifier)
-        return full_path
+        return self._full_path
+
+    def select_path(self):
+        wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or ''
+        wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
+        if wanted_index is None:
+            # maybe warn?
+            largest = None
+            for (path, size) in self.torrent_session.get_files(self.identifier).items():
+                largest = (path, size) if not largest or size > largest[1] else largest
+            return largest[0]
+        else:
+            return self.torrent_session.full_path(self.identifier, wanted_index or 0)
+
+    @property
+    def suggested_file_name(self):
+        self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
+        return self._suggested_file_name
 
     @property
     def mime_type(self) -> Optional[str]:
@@ -58,14 +79,15 @@ class TorrentSource(ManagedDownloadSource):
             self.torrent_session.remove_torrent(btih=self.identifier)
             raise DownloadMetadataTimeoutError(self.identifier)
         self.download_directory = self.torrent_session.save_path(self.identifier)
-        self._file_name = Path(self.torrent_session.full_path(self.identifier)).name
+        self._file_name = os.path.basename(self.full_path)
 
     async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
         await self.setup(timeout)
-        await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
-        self.rowid = await self.storage.save_downloaded_file(
-            self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
-        )
+        if not self.rowid:
+            await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
+            self.rowid = await self.storage.save_downloaded_file(
+                self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
+            )
 
     async def stop(self, finished: bool = False):
         await self.torrent_session.remove_torrent(self.identifier)
@@ -75,11 +97,11 @@ class TorrentSource(ManagedDownloadSource):
 
     @property
     def torrent_length(self):
-        return self.torrent_session.get_size(self.identifier)
+        return self.torrent_session.get_total_size(self.identifier)
 
     @property
     def stream_length(self):
-        return os.path.getsize(self.full_path)
+        return self.torrent_session.get_size(self.identifier, self.file_name)
 
     @property
     def written_bytes(self):
@@ -110,15 +132,19 @@ class TorrentSource(ManagedDownloadSource):
         headers, start, end = self._prepare_range_response_headers(
             request.headers.get('range', 'bytes=0-')
         )
+        target = self.suggested_file_name
         await self.start()
         response = StreamResponse(
             status=206,
             headers=headers
         )
         await response.prepare(request)
+        while not os.path.exists(self.full_path):
+            async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
+                break
         with open(self.full_path, 'rb') as infile:
             infile.seek(start)
-            async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end):
+            async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
                 if infile.tell() + read_size < end:
                     await response.write(infile.read(read_size))
                 else: