From 6865ddfc12cd36f2513df416faa8425cbde93a63 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 5 Feb 2020 12:29:26 -0300 Subject: [PATCH] torrent manager and torrent source --- lbry/extras/daemon/components.py | 10 ++- lbry/file/file_manager.py | 13 +++- lbry/torrent/session.py | 26 ++++++-- lbry/torrent/torrent_manager.py | 111 +++++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 9 deletions(-) create mode 100644 lbry/torrent/torrent_manager.py diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 046faca2a..38c4d4650 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -21,6 +21,7 @@ from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage +from lbry.torrent.torrent_manager import TorrentManager from lbry.wallet import WalletManager from lbry.wallet.usage_payment import WalletServerPayer try: @@ -327,7 +328,7 @@ class HashAnnouncerComponent(Component): class FileManagerComponent(Component): component_name = FILE_MANAGER_COMPONENT - depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] + depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, LIBTORRENT_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -350,14 +351,19 @@ class FileManagerComponent(Component): wallet = self.component_manager.get_component(WALLET_COMPONENT) node = self.component_manager.get_component(DHT_COMPONENT) \ if self.component_manager.has_component(DHT_COMPONENT) else None + torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT) if TorrentSession else None log.info('Starting the file manager') loop = asyncio.get_event_loop() self.file_manager = FileManager( loop, self.conf, wallet, storage, self.component_manager.analytics_manager ) self.file_manager.source_managers['stream'] = StreamManager( - loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager + loop, self.conf, blob_manager, wallet, storage, node, ) + if TorrentSession: + self.file_manager.source_managers['torrent'] = TorrentManager( + loop, self.conf, torrent, storage, self.component_manager.analytics_manager + ) await self.file_manager.start() log.info('Done setting up file manager') diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 765cd1b53..50b21e9b2 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -6,6 +6,7 @@ from aiohttp.web import Request from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.stream.managed_stream import ManagedStream +from lbry.torrent.torrent_manager import TorrentSource from lbry.utils import cache_concurrent from lbry.schema.url import URL from lbry.wallet.dewies import dewies_to_lbc @@ -110,11 +111,12 @@ class FileManager: if claim.stream.source.bt_infohash: source_manager = self.source_managers['torrent'] + existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash) else: source_manager = self.source_managers['stream'] + existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash) # resume or update an existing stream, if the stream changed: download it and delete the old one after - existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash) to_replace, updated_stream = None, None if existing and existing[0].claim_id != txo.claim_id: raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}") @@ -151,7 +153,6 @@ class FileManager: ) return updated_stream - #################### # pay fee #################### @@ -174,7 +175,13 @@ class FileManager: ) stream.downloader.node = source_manager.node else: - stream = None + stream = TorrentSource( + self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash, + file_name=file_name, download_directory=download_directory or self.config.download_dir, + status=ManagedStream.STATUS_RUNNING, + claim=claim, analytics_manager=self.analytics_manager, + torrent_session=source_manager.torrent_session + ) log.info("starting download for %s", uri) before_download = self.loop.time() diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 294a2cba5..0a33c0bf6 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -1,5 +1,7 @@ import asyncio import binascii +from typing import Optional + import libtorrent @@ -30,6 +32,15 @@ NOTIFICATION_MASKS = [ ] +DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? + libtorrent.add_torrent_params_flags_t.flag_paused + | libtorrent.add_torrent_params_flags_t.flag_auto_managed + | libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error + | libtorrent.add_torrent_params_flags_t.flag_upload_mode + | libtorrent.add_torrent_params_flags_t.flag_update_subscribe +) + + def get_notification_type(notification) -> str: for i, notification_type in enumerate(NOTIFICATION_MASKS): if (1 << i) & notification: @@ -123,10 +134,11 @@ class TorrentSession: self._executor, self._session.resume ) - def _add_torrent(self, btih: str, download_directory: str): - self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent( - {'info_hash': binascii.unhexlify(btih.encode()), 'save_path': download_directory} - )) + def _add_torrent(self, btih: str, download_directory: Optional[str]): + params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS} + if download_directory: + params['save_path'] = download_directory + self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params)) async def add_torrent(self, btih, download_path): await self._loop.run_in_executor( @@ -135,6 +147,12 @@ class TorrentSession: self._loop.create_task(self._handles[btih].status_loop()) await self._handles[btih].finished.wait() + async def remove_torrent(self, btih, remove_files=False): + if btih in self._handles: + handle = self._handles[btih] + self._session.remove_torrent(handle, 1 if remove_files else 0) + self._handles.pop(btih) + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py new file mode 100644 index 000000000..02f6b7cf9 --- /dev/null +++ b/lbry/torrent/torrent_manager.py @@ -0,0 +1,111 @@ +import asyncio +import binascii +import logging +import typing +from typing import Optional +from aiohttp.web import Request +from lbry.file.source_manager import SourceManager +from lbry.file.source import ManagedDownloadSource + +if typing.TYPE_CHECKING: + from lbry.torrent.session import TorrentSession + from lbry.conf import Config + from lbry.wallet.transaction import Transaction + from lbry.extras.daemon.analytics import AnalyticsManager + from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim + from lbry.extras.daemon.storage import StoredContentClaim + +log = logging.getLogger(__name__) + + +def path_or_none(encoded_path) -> Optional[str]: + if not encoded_path: + return + return binascii.unhexlify(encoded_path).decode() + + +class TorrentSource(ManagedDownloadSource): + STATUS_STOPPED = "stopped" + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str, + file_name: Optional[str] = None, download_directory: Optional[str] = None, + status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None, + download_id: Optional[str] = None, rowid: Optional[int] = None, + content_fee: Optional['Transaction'] = None, + analytics_manager: Optional['AnalyticsManager'] = None, + added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None): + super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, + rowid, content_fee, analytics_manager, added_on) + self.torrent_session = torrent_session + + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): + await self.torrent_session.add_torrent(self.identifier, self.download_directory) + + async def stop(self, finished: bool = False): + await self.torrent_session.remove_torrent(self.identifier) + + async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None): + raise NotImplementedError() + + def stop_tasks(self): + raise NotImplementedError() + + @property + def completed(self): + raise NotImplementedError() + +class TorrentManager(SourceManager): + _sources: typing.Dict[str, ManagedDownloadSource] + + filter_fields = set(SourceManager.filter_fields) + filter_fields.update({ + 'bt_infohash', + 'blobs_remaining', # TODO: here they call them "parts", but its pretty much the same concept + 'blobs_in_stream' + }) + + def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession', + storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None): + super().__init__(loop, config, storage, analytics_manager) + self.torrent_session: 'TorrentSession' = torrent_session + + def add(self, source: ManagedDownloadSource): + super().add(source) + + async def recover_streams(self, file_infos: typing.List[typing.Dict]): + raise NotImplementedError + + async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str], + download_directory: Optional[str], status: str, + claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], + added_on: Optional[int]): + stream = TorrentSource( + self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name, + download_directory=download_directory, status=status, claim=claim, rowid=rowid, + content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on, + torrent_session=self.torrent_session + ) + self.add(stream) + + async def initialize_from_database(self): + pass + + async def start(self): + await super().start() + + def stop(self): + super().stop() + log.info("finished stopping the torrent manager") + + async def create(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None): + raise NotImplementedError + + async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + raise NotImplementedError + # blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]] + # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) + # await self.storage.delete_stream(source.descriptor) + + async def stream_partial_content(self, request: Request, sd_hash: str): + raise NotImplementedError