diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index a8308d449..abdbff27b 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -23,7 +23,6 @@ class TorrentHandle: self._loop = loop self._executor = executor self._handle: libtorrent.torrent_handle = handle - self.started = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) self.size = handle.status().total_wanted @@ -37,12 +36,14 @@ class TorrentHandle: def torrent_file(self) -> Optional[libtorrent.file_storage]: return self._torrent_info.files() - @property - def largest_file(self) -> Optional[str]: + def full_path_at(self, file_num) -> Optional[str]: if self.torrent_file is None: return None - index = self.largest_file_index - return os.path.join(self.save_path, self.torrent_file.file_path(index)) + return os.path.join(self.save_path, self.torrent_file.file_path(file_num)) + + def size_at(self, file_num) -> Optional[int]: + if self.torrent_file is not None: + return self.torrent_file.file_size(file_num) @property def save_path(self) -> Optional[str]: @@ -50,16 +51,12 @@ class TorrentHandle: self._base_path = self._handle.status().save_path return self._base_path - @property - def largest_file_index(self): - largest_size, index = 0, 0 + def index_from_name(self, file_name): for file_num in range(self.torrent_file.num_files()): if '.pad' in self.torrent_file.file_path(file_num): continue # ignore padding files - if self.torrent_file.file_size(file_num) > largest_size: - largest_size = self.torrent_file.file_size(file_num) - index = file_num - return index + if file_name == os.path.basename(self.full_path_at(file_num)): + return file_num def stop_tasks(self): self._handle.save_resume_data() @@ -72,14 +69,16 @@ class TorrentHandle: end_piece = self._torrent_info.map_file(file_index, end_offset, 0) return start_piece, end_piece - async def stream_range_as_completed(self, file_index, start, end): + async def stream_range_as_completed(self, file_name, start, end): + file_index = self.index_from_name(file_name) + if file_index is None: + raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}") first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) start_piece_offset = first_piece.start piece_size = self._torrent_info.piece_length() log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s", first_piece.piece, final_piece.piece, start, end, piece_size, self.name) self.prioritize(file_index, start, end) - await self.resume() for piece_index in range(first_piece.piece, final_piece.piece + 1): while not self._handle.have_piece(piece_index): log.info("Waiting for piece %d: %s", piece_index, self.name) @@ -102,13 +101,6 @@ class TorrentHandle: self.metadata_completed.set() self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) - # prioritize first 2mb - self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024) - first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) - if not self.started.is_set(): - if self._handle.have_piece(first_piece): - log.debug("Got first piece, set started - %s", self.name) - self.started.set() log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, status.num_peers, status.num_seeds, status.state, status.save_path) @@ -150,9 +142,11 @@ class TorrentSession: self._loop = loop self._executor = executor self._session: Optional[libtorrent.session] = None - self._handles = {} + self._handles: Dict[str, TorrentHandle] = {} self.tasks = [] - self.wait_start = True + + def add_peer(self, btih, addr, port): + self._handles[btih]._handle.connect_peer((addr, port)) async def add_fake_torrent(self, file_count=3): tmpdir = mkdtemp() @@ -180,9 +174,10 @@ class TorrentSession: self._handles.popitem()[1].stop_tasks() while self.tasks: self.tasks.pop().cancel() - self._session.save_state() - self._session.pause() - self._session = None + if self._session: + self._session.save_state() + self._session.pause() + self._session = None def _pop_alerts(self): for alert in self._session.pop_alerts(): @@ -216,21 +211,23 @@ class TorrentSession: handle.force_dht_announce() self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) - def full_path(self, btih): - return self._handles[btih].largest_file + def full_path(self, btih, file_num) -> Optional[str]: + return self._handles[btih].full_path_at(file_num) def save_path(self, btih): return self._handles[btih].save_path + def has_torrent(self, btih): + return btih in self._handles + async def add_torrent(self, btih, download_path): + if btih in self._handles: + return await self._handles[btih].metadata_completed.wait() await self._loop.run_in_executor( self._executor, self._add_torrent, btih, download_path ) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) await self._handles[btih].metadata_completed.wait() - if self.wait_start: - # fixme: temporary until we add streaming support, otherwise playback fails! - await self._handles[btih].started.wait() def remove_torrent(self, btih, remove_files=False): if btih in self._handles: @@ -243,9 +240,17 @@ class TorrentSession: handle = self._handles[btih] await handle.resume() - def get_size(self, btih): + def get_total_size(self, btih): return self._handles[btih].size + def get_index_from_name(self, btih, file_name): + return self._handles[btih].index_from_name(file_name) + + def get_size(self, btih, file_name) -> Optional[int]: + for (path, size) in self.get_files(btih).items(): + if os.path.basename(path) == file_name: + return size + def get_name(self, btih): return self._handles[btih].name @@ -255,14 +260,14 @@ class TorrentSession: def is_completed(self, btih): return self._handles[btih].finished.is_set() - def stream_largest_file(self, btih, start, end): + def stream_file(self, btih, file_name, start, end): handle = self._handles[btih] - return handle.stream_range_as_completed(handle.largest_file_index, start, end) + return handle.stream_range_as_completed(file_name, start, end) def get_files(self, btih) -> Dict: handle = self._handles[btih] return { - handle.torrent_file.file_path(file_num): handle.torrent_file.file_size(file_num) + self.full_path(btih, file_num): handle.torrent_file.file_size(file_num) for file_num in range(handle.torrent_file.num_files()) if '.pad' not in handle.torrent_file.file_path(file_num) } @@ -302,7 +307,7 @@ async def main(): await session.bind() await session.add_torrent(btih, os.path.expanduser("~/Downloads")) while True: - session.full_path(btih) + session.full_path(btih, 0) await asyncio.sleep(1) await session.pause() executor.shutdown() diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index a6e9c8941..b2b4b3f98 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -39,12 +39,33 @@ class TorrentSource(ManagedDownloadSource): super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, rowid, content_fee, analytics_manager, added_on) self.torrent_session = torrent_session + self._suggested_file_name = None + self._full_path = None @property def full_path(self) -> Optional[str]: - full_path = self.torrent_session.full_path(self.identifier) + if not self._full_path: + self._full_path = self.select_path() + self._file_name = os.path.basename(self._full_path) self.download_directory = self.torrent_session.save_path(self.identifier) - return full_path + return self._full_path + + def select_path(self): + wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or '' + wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name) + if wanted_index is None: + # maybe warn? + largest = None + for (path, size) in self.torrent_session.get_files(self.identifier).items(): + largest = (path, size) if not largest or size > largest[1] else largest + return largest[0] + else: + return self.torrent_session.full_path(self.identifier, wanted_index or 0) + + @property + def suggested_file_name(self): + self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path()) + return self._suggested_file_name @property def mime_type(self) -> Optional[str]: @@ -58,14 +79,15 @@ class TorrentSource(ManagedDownloadSource): self.torrent_session.remove_torrent(btih=self.identifier) raise DownloadMetadataTimeoutError(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier) - self._file_name = Path(self.torrent_session.full_path(self.identifier)).name + self._file_name = os.path.basename(self.full_path) async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): await self.setup(timeout) - await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) - self.rowid = await self.storage.save_downloaded_file( - self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on - ) + if not self.rowid: + await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) + self.rowid = await self.storage.save_downloaded_file( + self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on + ) async def stop(self, finished: bool = False): await self.torrent_session.remove_torrent(self.identifier) @@ -75,11 +97,11 @@ class TorrentSource(ManagedDownloadSource): @property def torrent_length(self): - return self.torrent_session.get_size(self.identifier) + return self.torrent_session.get_total_size(self.identifier) @property def stream_length(self): - return os.path.getsize(self.full_path) + return self.torrent_session.get_size(self.identifier, self.file_name) @property def written_bytes(self): @@ -110,15 +132,19 @@ class TorrentSource(ManagedDownloadSource): headers, start, end = self._prepare_range_response_headers( request.headers.get('range', 'bytes=0-') ) + target = self.suggested_file_name await self.start() response = StreamResponse( status=206, headers=headers ) await response.prepare(request) + while not os.path.exists(self.full_path): + async for _ in self.torrent_session.stream_file(self.identifier, target, start, end): + break with open(self.full_path, 'rb') as infile: infile.seek(start) - async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): + async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end): if infile.tell() + read_size < end: await response.write(infile.read(read_size)) else: