pick file from file name, fallback to largest

This commit is contained in:
Victor Shyba 2022-11-05 00:55:38 -03:00
parent 9d869820a3
commit 64aad14ba6
2 changed files with 77 additions and 46 deletions

View file

@ -23,7 +23,6 @@ class TorrentHandle:
self._loop = loop self._loop = loop
self._executor = executor self._executor = executor
self._handle: libtorrent.torrent_handle = handle self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop)
self.size = handle.status().total_wanted self.size = handle.status().total_wanted
@ -37,12 +36,14 @@ class TorrentHandle:
def torrent_file(self) -> Optional[libtorrent.file_storage]: def torrent_file(self) -> Optional[libtorrent.file_storage]:
return self._torrent_info.files() return self._torrent_info.files()
@property def full_path_at(self, file_num) -> Optional[str]:
def largest_file(self) -> Optional[str]:
if self.torrent_file is None: if self.torrent_file is None:
return None return None
index = self.largest_file_index return os.path.join(self.save_path, self.torrent_file.file_path(file_num))
return os.path.join(self.save_path, self.torrent_file.file_path(index))
def size_at(self, file_num) -> Optional[int]:
if self.torrent_file is not None:
return self.torrent_file.file_size(file_num)
@property @property
def save_path(self) -> Optional[str]: def save_path(self) -> Optional[str]:
@ -50,16 +51,12 @@ class TorrentHandle:
self._base_path = self._handle.status().save_path self._base_path = self._handle.status().save_path
return self._base_path return self._base_path
@property def index_from_name(self, file_name):
def largest_file_index(self):
largest_size, index = 0, 0
for file_num in range(self.torrent_file.num_files()): for file_num in range(self.torrent_file.num_files()):
if '.pad' in self.torrent_file.file_path(file_num): if '.pad' in self.torrent_file.file_path(file_num):
continue # ignore padding files continue # ignore padding files
if self.torrent_file.file_size(file_num) > largest_size: if file_name == os.path.basename(self.full_path_at(file_num)):
largest_size = self.torrent_file.file_size(file_num) return file_num
index = file_num
return index
def stop_tasks(self): def stop_tasks(self):
self._handle.save_resume_data() self._handle.save_resume_data()
@ -72,14 +69,16 @@ class TorrentHandle:
end_piece = self._torrent_info.map_file(file_index, end_offset, 0) end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
return start_piece, end_piece return start_piece, end_piece
async def stream_range_as_completed(self, file_index, start, end): async def stream_range_as_completed(self, file_name, start, end):
file_index = self.index_from_name(file_name)
if file_index is None:
raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
start_piece_offset = first_piece.start start_piece_offset = first_piece.start
piece_size = self._torrent_info.piece_length() piece_size = self._torrent_info.piece_length()
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s", log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
first_piece.piece, final_piece.piece, start, end, piece_size, self.name) first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
self.prioritize(file_index, start, end) self.prioritize(file_index, start, end)
await self.resume()
for piece_index in range(first_piece.piece, final_piece.piece + 1): for piece_index in range(first_piece.piece, final_piece.piece + 1):
while not self._handle.have_piece(piece_index): while not self._handle.have_piece(piece_index):
log.info("Waiting for piece %d: %s", piece_index, self.name) log.info("Waiting for piece %d: %s", piece_index, self.name)
@ -102,13 +101,6 @@ class TorrentHandle:
self.metadata_completed.set() self.metadata_completed.set()
self._torrent_info = self._handle.torrent_file() self._torrent_info = self._handle.torrent_file()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
# prioritize first 2mb
self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024)
first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index)
if not self.started.is_set():
if self._handle.have_piece(first_piece):
log.debug("Got first piece, set started - %s", self.name)
self.started.set()
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.num_seeds, status.state, status.save_path) status.num_peers, status.num_seeds, status.state, status.save_path)
@ -150,9 +142,11 @@ class TorrentSession:
self._loop = loop self._loop = loop
self._executor = executor self._executor = executor
self._session: Optional[libtorrent.session] = None self._session: Optional[libtorrent.session] = None
self._handles = {} self._handles: Dict[str, TorrentHandle] = {}
self.tasks = [] self.tasks = []
self.wait_start = True
def add_peer(self, btih, addr, port):
self._handles[btih]._handle.connect_peer((addr, port))
async def add_fake_torrent(self, file_count=3): async def add_fake_torrent(self, file_count=3):
tmpdir = mkdtemp() tmpdir = mkdtemp()
@ -180,9 +174,10 @@ class TorrentSession:
self._handles.popitem()[1].stop_tasks() self._handles.popitem()[1].stop_tasks()
while self.tasks: while self.tasks:
self.tasks.pop().cancel() self.tasks.pop().cancel()
self._session.save_state() if self._session:
self._session.pause() self._session.save_state()
self._session = None self._session.pause()
self._session = None
def _pop_alerts(self): def _pop_alerts(self):
for alert in self._session.pop_alerts(): for alert in self._session.pop_alerts():
@ -216,21 +211,23 @@ class TorrentSession:
handle.force_dht_announce() handle.force_dht_announce()
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)
def full_path(self, btih): def full_path(self, btih, file_num) -> Optional[str]:
return self._handles[btih].largest_file return self._handles[btih].full_path_at(file_num)
def save_path(self, btih): def save_path(self, btih):
return self._handles[btih].save_path return self._handles[btih].save_path
def has_torrent(self, btih):
return btih in self._handles
async def add_torrent(self, btih, download_path): async def add_torrent(self, btih, download_path):
if btih in self._handles:
return await self._handles[btih].metadata_completed.wait()
await self._loop.run_in_executor( await self._loop.run_in_executor(
self._executor, self._add_torrent, btih, download_path self._executor, self._add_torrent, btih, download_path
) )
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
await self._handles[btih].metadata_completed.wait() await self._handles[btih].metadata_completed.wait()
if self.wait_start:
# fixme: temporary until we add streaming support, otherwise playback fails!
await self._handles[btih].started.wait()
def remove_torrent(self, btih, remove_files=False): def remove_torrent(self, btih, remove_files=False):
if btih in self._handles: if btih in self._handles:
@ -243,9 +240,17 @@ class TorrentSession:
handle = self._handles[btih] handle = self._handles[btih]
await handle.resume() await handle.resume()
def get_size(self, btih): def get_total_size(self, btih):
return self._handles[btih].size return self._handles[btih].size
def get_index_from_name(self, btih, file_name):
return self._handles[btih].index_from_name(file_name)
def get_size(self, btih, file_name) -> Optional[int]:
for (path, size) in self.get_files(btih).items():
if os.path.basename(path) == file_name:
return size
def get_name(self, btih): def get_name(self, btih):
return self._handles[btih].name return self._handles[btih].name
@ -255,14 +260,14 @@ class TorrentSession:
def is_completed(self, btih): def is_completed(self, btih):
return self._handles[btih].finished.is_set() return self._handles[btih].finished.is_set()
def stream_largest_file(self, btih, start, end): def stream_file(self, btih, file_name, start, end):
handle = self._handles[btih] handle = self._handles[btih]
return handle.stream_range_as_completed(handle.largest_file_index, start, end) return handle.stream_range_as_completed(file_name, start, end)
def get_files(self, btih) -> Dict: def get_files(self, btih) -> Dict:
handle = self._handles[btih] handle = self._handles[btih]
return { return {
handle.torrent_file.file_path(file_num): handle.torrent_file.file_size(file_num) self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
for file_num in range(handle.torrent_file.num_files()) for file_num in range(handle.torrent_file.num_files())
if '.pad' not in handle.torrent_file.file_path(file_num) if '.pad' not in handle.torrent_file.file_path(file_num)
} }
@ -302,7 +307,7 @@ async def main():
await session.bind() await session.bind()
await session.add_torrent(btih, os.path.expanduser("~/Downloads")) await session.add_torrent(btih, os.path.expanduser("~/Downloads"))
while True: while True:
session.full_path(btih) session.full_path(btih, 0)
await asyncio.sleep(1) await asyncio.sleep(1)
await session.pause() await session.pause()
executor.shutdown() executor.shutdown()

View file

@ -39,12 +39,33 @@ class TorrentSource(ManagedDownloadSource):
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
rowid, content_fee, analytics_manager, added_on) rowid, content_fee, analytics_manager, added_on)
self.torrent_session = torrent_session self.torrent_session = torrent_session
self._suggested_file_name = None
self._full_path = None
@property @property
def full_path(self) -> Optional[str]: def full_path(self) -> Optional[str]:
full_path = self.torrent_session.full_path(self.identifier) if not self._full_path:
self._full_path = self.select_path()
self._file_name = os.path.basename(self._full_path)
self.download_directory = self.torrent_session.save_path(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier)
return full_path return self._full_path
def select_path(self):
wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or ''
wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
if wanted_index is None:
# maybe warn?
largest = None
for (path, size) in self.torrent_session.get_files(self.identifier).items():
largest = (path, size) if not largest or size > largest[1] else largest
return largest[0]
else:
return self.torrent_session.full_path(self.identifier, wanted_index or 0)
@property
def suggested_file_name(self):
self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
return self._suggested_file_name
@property @property
def mime_type(self) -> Optional[str]: def mime_type(self) -> Optional[str]:
@ -58,14 +79,15 @@ class TorrentSource(ManagedDownloadSource):
self.torrent_session.remove_torrent(btih=self.identifier) self.torrent_session.remove_torrent(btih=self.identifier)
raise DownloadMetadataTimeoutError(self.identifier) raise DownloadMetadataTimeoutError(self.identifier)
self.download_directory = self.torrent_session.save_path(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier)
self._file_name = Path(self.torrent_session.full_path(self.identifier)).name self._file_name = os.path.basename(self.full_path)
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
await self.setup(timeout) await self.setup(timeout)
await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) if not self.rowid:
self.rowid = await self.storage.save_downloaded_file( await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on self.rowid = await self.storage.save_downloaded_file(
) self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
)
async def stop(self, finished: bool = False): async def stop(self, finished: bool = False):
await self.torrent_session.remove_torrent(self.identifier) await self.torrent_session.remove_torrent(self.identifier)
@ -75,11 +97,11 @@ class TorrentSource(ManagedDownloadSource):
@property @property
def torrent_length(self): def torrent_length(self):
return self.torrent_session.get_size(self.identifier) return self.torrent_session.get_total_size(self.identifier)
@property @property
def stream_length(self): def stream_length(self):
return os.path.getsize(self.full_path) return self.torrent_session.get_size(self.identifier, self.file_name)
@property @property
def written_bytes(self): def written_bytes(self):
@ -110,15 +132,19 @@ class TorrentSource(ManagedDownloadSource):
headers, start, end = self._prepare_range_response_headers( headers, start, end = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-') request.headers.get('range', 'bytes=0-')
) )
target = self.suggested_file_name
await self.start() await self.start()
response = StreamResponse( response = StreamResponse(
status=206, status=206,
headers=headers headers=headers
) )
await response.prepare(request) await response.prepare(request)
while not os.path.exists(self.full_path):
async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
break
with open(self.full_path, 'rb') as infile: with open(self.full_path, 'rb') as infile:
infile.seek(start) infile.seek(start)
async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
if infile.tell() + read_size < end: if infile.tell() + read_size < end:
await response.write(infile.read(read_size)) await response.write(infile.read(read_size))
else: else: