forked from LBRYCommunity/lbry-sdk
torrent manager and torrent source
This commit is contained in:
parent
e888e69d4d
commit
6865ddfc12
4 changed files with 151 additions and 9 deletions
|
@ -21,6 +21,7 @@ from lbry.file.file_manager import FileManager
|
||||||
from lbry.extras.daemon.component import Component
|
from lbry.extras.daemon.component import Component
|
||||||
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager
|
||||||
from lbry.extras.daemon.storage import SQLiteStorage
|
from lbry.extras.daemon.storage import SQLiteStorage
|
||||||
|
from lbry.torrent.torrent_manager import TorrentManager
|
||||||
from lbry.wallet import WalletManager
|
from lbry.wallet import WalletManager
|
||||||
from lbry.wallet.usage_payment import WalletServerPayer
|
from lbry.wallet.usage_payment import WalletServerPayer
|
||||||
try:
|
try:
|
||||||
|
@ -327,7 +328,7 @@ class HashAnnouncerComponent(Component):
|
||||||
|
|
||||||
class FileManagerComponent(Component):
|
class FileManagerComponent(Component):
|
||||||
component_name = FILE_MANAGER_COMPONENT
|
component_name = FILE_MANAGER_COMPONENT
|
||||||
depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT]
|
depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, LIBTORRENT_COMPONENT]
|
||||||
|
|
||||||
def __init__(self, component_manager):
|
def __init__(self, component_manager):
|
||||||
super().__init__(component_manager)
|
super().__init__(component_manager)
|
||||||
|
@ -350,13 +351,18 @@ class FileManagerComponent(Component):
|
||||||
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
wallet = self.component_manager.get_component(WALLET_COMPONENT)
|
||||||
node = self.component_manager.get_component(DHT_COMPONENT) \
|
node = self.component_manager.get_component(DHT_COMPONENT) \
|
||||||
if self.component_manager.has_component(DHT_COMPONENT) else None
|
if self.component_manager.has_component(DHT_COMPONENT) else None
|
||||||
|
torrent = self.component_manager.get_component(LIBTORRENT_COMPONENT) if TorrentSession else None
|
||||||
log.info('Starting the file manager')
|
log.info('Starting the file manager')
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
self.file_manager = FileManager(
|
self.file_manager = FileManager(
|
||||||
loop, self.conf, wallet, storage, self.component_manager.analytics_manager
|
loop, self.conf, wallet, storage, self.component_manager.analytics_manager
|
||||||
)
|
)
|
||||||
self.file_manager.source_managers['stream'] = StreamManager(
|
self.file_manager.source_managers['stream'] = StreamManager(
|
||||||
loop, self.conf, blob_manager, wallet, storage, node, self.component_manager.analytics_manager
|
loop, self.conf, blob_manager, wallet, storage, node,
|
||||||
|
)
|
||||||
|
if TorrentSession:
|
||||||
|
self.file_manager.source_managers['torrent'] = TorrentManager(
|
||||||
|
loop, self.conf, torrent, storage, self.component_manager.analytics_manager
|
||||||
)
|
)
|
||||||
await self.file_manager.start()
|
await self.file_manager.start()
|
||||||
log.info('Done setting up file manager')
|
log.info('Done setting up file manager')
|
||||||
|
|
|
@ -6,6 +6,7 @@ from aiohttp.web import Request
|
||||||
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
|
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
|
||||||
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
|
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
|
||||||
from lbry.stream.managed_stream import ManagedStream
|
from lbry.stream.managed_stream import ManagedStream
|
||||||
|
from lbry.torrent.torrent_manager import TorrentSource
|
||||||
from lbry.utils import cache_concurrent
|
from lbry.utils import cache_concurrent
|
||||||
from lbry.schema.url import URL
|
from lbry.schema.url import URL
|
||||||
from lbry.wallet.dewies import dewies_to_lbc
|
from lbry.wallet.dewies import dewies_to_lbc
|
||||||
|
@ -110,11 +111,12 @@ class FileManager:
|
||||||
|
|
||||||
if claim.stream.source.bt_infohash:
|
if claim.stream.source.bt_infohash:
|
||||||
source_manager = self.source_managers['torrent']
|
source_manager = self.source_managers['torrent']
|
||||||
|
existing = source_manager.get_filtered(bt_infohash=claim.stream.source.bt_infohash)
|
||||||
else:
|
else:
|
||||||
source_manager = self.source_managers['stream']
|
source_manager = self.source_managers['stream']
|
||||||
|
existing = source_manager.get_filtered(sd_hash=claim.stream.source.sd_hash)
|
||||||
|
|
||||||
# resume or update an existing stream, if the stream changed: download it and delete the old one after
|
# resume or update an existing stream, if the stream changed: download it and delete the old one after
|
||||||
existing = self.get_filtered(sd_hash=claim.stream.source.sd_hash)
|
|
||||||
to_replace, updated_stream = None, None
|
to_replace, updated_stream = None, None
|
||||||
if existing and existing[0].claim_id != txo.claim_id:
|
if existing and existing[0].claim_id != txo.claim_id:
|
||||||
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
|
raise ResolveError(f"stream for {existing[0].claim_id} collides with existing download {txo.claim_id}")
|
||||||
|
@ -151,7 +153,6 @@ class FileManager:
|
||||||
)
|
)
|
||||||
return updated_stream
|
return updated_stream
|
||||||
|
|
||||||
|
|
||||||
####################
|
####################
|
||||||
# pay fee
|
# pay fee
|
||||||
####################
|
####################
|
||||||
|
@ -174,7 +175,13 @@ class FileManager:
|
||||||
)
|
)
|
||||||
stream.downloader.node = source_manager.node
|
stream.downloader.node = source_manager.node
|
||||||
else:
|
else:
|
||||||
stream = None
|
stream = TorrentSource(
|
||||||
|
self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash,
|
||||||
|
file_name=file_name, download_directory=download_directory or self.config.download_dir,
|
||||||
|
status=ManagedStream.STATUS_RUNNING,
|
||||||
|
claim=claim, analytics_manager=self.analytics_manager,
|
||||||
|
torrent_session=source_manager.torrent_session
|
||||||
|
)
|
||||||
log.info("starting download for %s", uri)
|
log.info("starting download for %s", uri)
|
||||||
|
|
||||||
before_download = self.loop.time()
|
before_download = self.loop.time()
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import binascii
|
import binascii
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
import libtorrent
|
import libtorrent
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,6 +32,15 @@ NOTIFICATION_MASKS = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted?
|
||||||
|
libtorrent.add_torrent_params_flags_t.flag_paused
|
||||||
|
| libtorrent.add_torrent_params_flags_t.flag_auto_managed
|
||||||
|
| libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error
|
||||||
|
| libtorrent.add_torrent_params_flags_t.flag_upload_mode
|
||||||
|
| libtorrent.add_torrent_params_flags_t.flag_update_subscribe
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_notification_type(notification) -> str:
|
def get_notification_type(notification) -> str:
|
||||||
for i, notification_type in enumerate(NOTIFICATION_MASKS):
|
for i, notification_type in enumerate(NOTIFICATION_MASKS):
|
||||||
if (1 << i) & notification:
|
if (1 << i) & notification:
|
||||||
|
@ -123,10 +134,11 @@ class TorrentSession:
|
||||||
self._executor, self._session.resume
|
self._executor, self._session.resume
|
||||||
)
|
)
|
||||||
|
|
||||||
def _add_torrent(self, btih: str, download_directory: str):
|
def _add_torrent(self, btih: str, download_directory: Optional[str]):
|
||||||
self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(
|
params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS}
|
||||||
{'info_hash': binascii.unhexlify(btih.encode()), 'save_path': download_directory}
|
if download_directory:
|
||||||
))
|
params['save_path'] = download_directory
|
||||||
|
self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params))
|
||||||
|
|
||||||
async def add_torrent(self, btih, download_path):
|
async def add_torrent(self, btih, download_path):
|
||||||
await self._loop.run_in_executor(
|
await self._loop.run_in_executor(
|
||||||
|
@ -135,6 +147,12 @@ class TorrentSession:
|
||||||
self._loop.create_task(self._handles[btih].status_loop())
|
self._loop.create_task(self._handles[btih].status_loop())
|
||||||
await self._handles[btih].finished.wait()
|
await self._handles[btih].finished.wait()
|
||||||
|
|
||||||
|
async def remove_torrent(self, btih, remove_files=False):
|
||||||
|
if btih in self._handles:
|
||||||
|
handle = self._handles[btih]
|
||||||
|
self._session.remove_torrent(handle, 1 if remove_files else 0)
|
||||||
|
self._handles.pop(btih)
|
||||||
|
|
||||||
|
|
||||||
def get_magnet_uri(btih):
|
def get_magnet_uri(btih):
|
||||||
return f"magnet:?xt=urn:btih:{btih}"
|
return f"magnet:?xt=urn:btih:{btih}"
|
||||||
|
|
111
lbry/torrent/torrent_manager.py
Normal file
111
lbry/torrent/torrent_manager.py
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
import asyncio
|
||||||
|
import binascii
|
||||||
|
import logging
|
||||||
|
import typing
|
||||||
|
from typing import Optional
|
||||||
|
from aiohttp.web import Request
|
||||||
|
from lbry.file.source_manager import SourceManager
|
||||||
|
from lbry.file.source import ManagedDownloadSource
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
from lbry.torrent.session import TorrentSession
|
||||||
|
from lbry.conf import Config
|
||||||
|
from lbry.wallet.transaction import Transaction
|
||||||
|
from lbry.extras.daemon.analytics import AnalyticsManager
|
||||||
|
from lbry.extras.daemon.storage import SQLiteStorage, StoredContentClaim
|
||||||
|
from lbry.extras.daemon.storage import StoredContentClaim
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def path_or_none(encoded_path) -> Optional[str]:
|
||||||
|
if not encoded_path:
|
||||||
|
return
|
||||||
|
return binascii.unhexlify(encoded_path).decode()
|
||||||
|
|
||||||
|
|
||||||
|
class TorrentSource(ManagedDownloadSource):
|
||||||
|
STATUS_STOPPED = "stopped"
|
||||||
|
|
||||||
|
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str,
|
||||||
|
file_name: Optional[str] = None, download_directory: Optional[str] = None,
|
||||||
|
status: Optional[str] = STATUS_STOPPED, claim: Optional['StoredContentClaim'] = None,
|
||||||
|
download_id: Optional[str] = None, rowid: Optional[int] = None,
|
||||||
|
content_fee: Optional['Transaction'] = None,
|
||||||
|
analytics_manager: Optional['AnalyticsManager'] = None,
|
||||||
|
added_on: Optional[int] = None, torrent_session: Optional['TorrentSession'] = None):
|
||||||
|
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
|
||||||
|
rowid, content_fee, analytics_manager, added_on)
|
||||||
|
self.torrent_session = torrent_session
|
||||||
|
|
||||||
|
async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
|
||||||
|
await self.torrent_session.add_torrent(self.identifier, self.download_directory)
|
||||||
|
|
||||||
|
async def stop(self, finished: bool = False):
|
||||||
|
await self.torrent_session.remove_torrent(self.identifier)
|
||||||
|
|
||||||
|
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def stop_tasks(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def completed(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
class TorrentManager(SourceManager):
|
||||||
|
_sources: typing.Dict[str, ManagedDownloadSource]
|
||||||
|
|
||||||
|
filter_fields = set(SourceManager.filter_fields)
|
||||||
|
filter_fields.update({
|
||||||
|
'bt_infohash',
|
||||||
|
'blobs_remaining', # TODO: here they call them "parts", but its pretty much the same concept
|
||||||
|
'blobs_in_stream'
|
||||||
|
})
|
||||||
|
|
||||||
|
def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', torrent_session: 'TorrentSession',
|
||||||
|
storage: 'SQLiteStorage', analytics_manager: Optional['AnalyticsManager'] = None):
|
||||||
|
super().__init__(loop, config, storage, analytics_manager)
|
||||||
|
self.torrent_session: 'TorrentSession' = torrent_session
|
||||||
|
|
||||||
|
def add(self, source: ManagedDownloadSource):
|
||||||
|
super().add(source)
|
||||||
|
|
||||||
|
async def recover_streams(self, file_infos: typing.List[typing.Dict]):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str],
|
||||||
|
download_directory: Optional[str], status: str,
|
||||||
|
claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'],
|
||||||
|
added_on: Optional[int]):
|
||||||
|
stream = TorrentSource(
|
||||||
|
self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name,
|
||||||
|
download_directory=download_directory, status=status, claim=claim, rowid=rowid,
|
||||||
|
content_fee=content_fee, analytics_manager=self.analytics_manager, added_on=added_on,
|
||||||
|
torrent_session=self.torrent_session
|
||||||
|
)
|
||||||
|
self.add(stream)
|
||||||
|
|
||||||
|
async def initialize_from_database(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def start(self):
|
||||||
|
await super().start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
|
log.info("finished stopping the torrent manager")
|
||||||
|
|
||||||
|
async def create(self, file_path: str, key: Optional[bytes] = None,
|
||||||
|
iv_generator: Optional[typing.Generator[bytes, None, None]] = None):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
|
||||||
|
raise NotImplementedError
|
||||||
|
# blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]]
|
||||||
|
# await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False)
|
||||||
|
# await self.storage.delete_stream(source.descriptor)
|
||||||
|
|
||||||
|
async def stream_partial_content(self, request: Request, sd_hash: str):
|
||||||
|
raise NotImplementedError
|
Loading…
Reference in a new issue