From 34c6e09e6ff01f8471cd3101b2cb58934d1f90eb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 7 Feb 2020 12:32:39 -0300 Subject: [PATCH] adds more torrent parts --- lbry/extras/daemon/daemon.py | 10 +- lbry/file/file_manager.py | 5 +- lbry/file/source_manager.py | 4 - lbry/stream/stream_manager.py | 191 ++++---------------------------- lbry/torrent/session.py | 102 +++++++++++++---- lbry/torrent/torrent_manager.py | 4 + 6 files changed, 118 insertions(+), 198 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 01aad4ef9..a1e9fcf7f 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -55,8 +55,8 @@ if typing.TYPE_CHECKING: from lbry.extras.daemon.components import UPnPComponent from lbry.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbry.extras.daemon.storage import SQLiteStorage - from lbry.stream.stream_manager import StreamManager from lbry.wallet import WalletManager, Ledger + from lbry.file.file_manager import FileManager log = logging.getLogger(__name__) @@ -341,7 +341,7 @@ class Daemon(metaclass=JSONRPCServerType): return self.component_manager.get_component(DATABASE_COMPONENT) @property - def file_manager(self) -> typing.Optional['StreamManager']: + def file_manager(self) -> typing.Optional['FileManager']: return self.component_manager.get_component(FILE_MANAGER_COMPONENT) @property @@ -3346,11 +3346,11 @@ class Daemon(metaclass=JSONRPCServerType): stream_hash = None if not preview: - old_stream = self.stream_manager.streams.get(old_txo.claim.stream.source.sd_hash, None) + old_stream = self.file_manager.get_filtered(sd_hash=old_txo.claim.stream.source.sd_hash)[0] if file_path is not None: if old_stream: - await self.stream_manager.delete_stream(old_stream, delete_file=False) - file_stream = await self.stream_manager.create_stream(file_path) + await self.file_manager.delete(old_stream, delete_file=False) + file_stream = await self.file_manager.create_stream(file_path) new_txo.claim.stream.source.sd_hash = file_stream.sd_hash new_txo.script.generate() stream_hash = file_stream.stream_hash diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 50b21e9b2..1ab72f7c1 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -98,8 +98,9 @@ class FileManager: raise ResolveError(f"Unexpected error resolving uri for download: {resolved_result['error']}") if not resolved_result or uri not in resolved_result: raise ResolveError(f"Failed to resolve stream at '{uri}'") - txo = resolved_result[uri] + if isinstance(txo, dict): + raise ResolveError(f"Failed to resolve stream at '{uri}': {txo}") claim = txo.claim outpoint = f"{txo.tx_ref.id}:{txo.position}" resolved_time = self.loop.time() - start_time @@ -179,7 +180,7 @@ class FileManager: self.loop, self.config, self.storage, identifier=claim.stream.source.bt_infohash, file_name=file_name, download_directory=download_directory or self.config.download_dir, status=ManagedStream.STATUS_RUNNING, - claim=claim, analytics_manager=self.analytics_manager, + analytics_manager=self.analytics_manager, torrent_session=source_manager.torrent_session ) log.info("starting download for %s", uri) diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 9eada3cca..e3f7d4ad3 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -74,11 +74,7 @@ class SourceManager: iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource: raise NotImplementedError() - async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): - raise NotImplementedError() - async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): - await self._delete(source) self.remove(source) if delete_file and source.output_file_exists: os.remove(source.full_path) diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 2743584df..2b0c2c2f4 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]: class StreamManager(SourceManager): _sources: typing.Dict[str, ManagedStream] - filter_fields = set(SourceManager.filter_fields) + filter_fields = SourceManager.filter_fields filter_fields.update({ 'sd_hash', 'stream_hash', @@ -180,6 +180,7 @@ class StreamManager(SourceManager): self.re_reflect_task = self.loop.create_task(self.reflect_streams()) def stop(self): + super().stop() if self.resume_saving_task and not self.resume_saving_task.done(): self.resume_saving_task.cancel() if self.re_reflect_task and not self.re_reflect_task.done(): @@ -206,16 +207,30 @@ class StreamManager(SourceManager): ) return task - async def create_stream(self, file_path: str, key: Optional[bytes] = None, - iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: - stream = await ManagedStream.create(self.loop, self.config, self.blob_manager, file_path, key, iv_generator) + async def create(self, file_path: str, key: Optional[bytes] = None, + iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: + descriptor = await StreamDescriptor.create_stream( + self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, + blob_completed_callback=self.blob_manager.blob_completed + ) + await self.storage.store_stream( + self.blob_manager.get_blob(descriptor.sd_hash), descriptor + ) + row_id = await self.storage.save_published_file( + descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0 + ) + stream = ManagedStream( + self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path), + os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED, + rowid=row_id, descriptor=descriptor + ) self.streams[stream.sd_hash] = stream self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) if self.config.reflect_streams and self.config.reflector_servers: self.reflect_stream(stream) return stream - async def delete_stream(self, stream: ManagedStream, delete_file: Optional[bool] = False): + async def delete(self, stream: ManagedStream, delete_file: Optional[bool] = False): if stream.sd_hash in self.running_reflector_uploads: self.running_reflector_uploads[stream.sd_hash].cancel() stream.stop_tasks() @@ -223,151 +238,10 @@ class StreamManager(SourceManager): del self.streams[stream.sd_hash] blob_hashes = [stream.sd_hash] + [b.blob_hash for b in stream.descriptor.blobs[:-1]] await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) - await self.storage.delete(stream.descriptor) + await self.storage.delete_stream(stream.descriptor) + if delete_file and stream.output_file_exists: + os.remove(stream.full_path) - # @cache_concurrent - # async def download_stream_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManager', - # timeout: Optional[float] = None, - # file_name: Optional[str] = None, - # download_directory: Optional[str] = None, - # save_file: Optional[bool] = None, - # resolve_timeout: float = 3.0, - # wallet: Optional['Wallet'] = None) -> ManagedStream: - # manager = self.wallet_manager - # wallet = wallet or manager.default_wallet - # timeout = timeout or self.config.download_timeout - # start_time = self.loop.time() - # resolved_time = None - # stream = None - # txo: Optional[Output] = None - # error = None - # outpoint = None - # if save_file is None: - # save_file = self.config.save_files - # if file_name and not save_file: - # save_file = True - # if save_file: - # download_directory = download_directory or self.config.download_dir - # else: - # download_directory = None - # - # payment = None - # try: - # # resolve the claim - # if not URL.parse(uri).has_stream: - # raise ResolveError("cannot download a channel claim, specify a /path") - # try: - # response = await asyncio.wait_for( - # manager.ledger.resolve(wallet.accounts, [uri]), - # resolve_timeout - # ) - # resolved_result = self._convert_to_old_resolve_output(manager, response) - # except asyncio.TimeoutError: - # raise ResolveTimeoutError(uri) - # except Exception as err: - # if isinstance(err, asyncio.CancelledError): - # raise - # log.exception("Unexpected error resolving stream:") - # raise ResolveError(f"Unexpected error resolving stream: {str(err)}") - # await self.storage.save_claims_for_resolve([ - # value for value in resolved_result.values() if 'error' not in value - # ]) - # resolved = resolved_result.get(uri, {}) - # resolved = resolved if 'value' in resolved else resolved.get('claim') - # if not resolved: - # raise ResolveError(f"Failed to resolve stream at '{uri}'") - # if 'error' in resolved: - # raise ResolveError(f"error resolving stream: {resolved['error']}") - # txo = response[uri] - # - # claim = Claim.from_bytes(binascii.unhexlify(resolved['protobuf'])) - # outpoint = f"{resolved['txid']}:{resolved['nout']}" - # resolved_time = self.loop.time() - start_time - # - # # resume or update an existing stream, if the stream changed: download it and delete the old one after - # updated_stream, to_replace = await self._check_update_or_replace(outpoint, resolved['claim_id'], claim) - # if updated_stream: - # log.info("already have stream for %s", uri) - # if save_file and updated_stream.output_file_exists: - # save_file = False - # await updated_stream.start(node=self.node, timeout=timeout, save_now=save_file) - # if not updated_stream.output_file_exists and (save_file or file_name or download_directory): - # await updated_stream.save_file( - # file_name=file_name, download_directory=download_directory, node=self.node - # ) - # return updated_stream - # - # if not to_replace and txo.has_price and not txo.purchase_receipt: - # payment = await manager.create_purchase_transaction( - # wallet.accounts, txo, exchange_rate_manager - # ) - # - # stream = ManagedStream( - # self.loop, self.config, self.blob_manager, claim.stream.source.sd_hash, download_directory, - # file_name, ManagedStream.STATUS_RUNNING, content_fee=payment, - # analytics_manager=self.analytics_manager - # ) - # log.info("starting download for %s", uri) - # - # before_download = self.loop.time() - # await stream.start(self.node, timeout) - # stream.set_claim(resolved, claim) - # if to_replace: # delete old stream now that the replacement has started downloading - # await self.delete(to_replace) - # - # if payment is not None: - # await manager.broadcast_or_release(payment) - # payment = None # to avoid releasing in `finally` later - # log.info("paid fee of %s for %s", dewies_to_lbc(stream.content_fee.outputs[0].amount), uri) - # await self.storage.save_content_fee(stream.stream_hash, stream.content_fee) - # - # self._sources[stream.sd_hash] = stream - # self.storage.content_claim_callbacks[stream.stream_hash] = lambda: self._update_content_claim(stream) - # await self.storage.save_content_claim(stream.stream_hash, outpoint) - # if save_file: - # await asyncio.wait_for(stream.save_file(node=self.node), timeout - (self.loop.time() - before_download), - # loop=self.loop) - # return stream - # except asyncio.TimeoutError: - # error = DownloadDataTimeoutError(stream.sd_hash) - # raise error - # except Exception as err: # forgive data timeout, don't delete stream - # expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, - # KeyFeeAboveMaxAllowedError) - # if isinstance(err, expected): - # log.warning("Failed to download %s: %s", uri, str(err)) - # elif isinstance(err, asyncio.CancelledError): - # pass - # else: - # log.exception("Unexpected error downloading stream:") - # error = err - # raise - # finally: - # if payment is not None: - # # payment is set to None after broadcasting, if we're here an exception probably happened - # await manager.ledger.release_tx(payment) - # if self.analytics_manager and (error or (stream and (stream.downloader.time_to_descriptor or - # stream.downloader.time_to_first_bytes))): - # server = self.wallet_manager.ledger.network.client.server - # self.loop.create_task( - # self.analytics_manager.send_time_to_first_bytes( - # resolved_time, self.loop.time() - start_time, None if not stream else stream.download_id, - # uri, outpoint, - # None if not stream else len(stream.downloader.blob_downloader.active_connections), - # None if not stream else len(stream.downloader.blob_downloader.scores), - # None if not stream else len(stream.downloader.blob_downloader.connection_failures), - # False if not stream else stream.downloader.added_fixed_peers, - # self.config.fixed_peer_delay if not stream else stream.downloader.fixed_peers_delay, - # None if not stream else stream.sd_hash, - # None if not stream else stream.downloader.time_to_descriptor, - # None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].blob_hash, - # None if not (stream and stream.descriptor) else stream.descriptor.blobs[0].length, - # None if not stream else stream.downloader.time_to_first_bytes, - # None if not error else error.__class__.__name__, - # None if not error else str(error), - # None if not server else f"{server[0]}:{server[1]}" - # ) - # ) # ======= # self.running_reflector_uploads.pop().cancel() # super().stop() @@ -385,21 +259,6 @@ class StreamManager(SourceManager): # # async def create(self, file_path: str, key: Optional[bytes] = None, # iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: -# descriptor = await StreamDescriptor.create_stream( -# self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, -# blob_completed_callback=self.blob_manager.blob_completed -# ) -# await self.storage.store_stream( -# self.blob_manager.get_blob(descriptor.sd_hash), descriptor -# ) -# row_id = await self.storage.save_published_file( -# descriptor.stream_hash, os.path.basename(file_path), os.path.dirname(file_path), 0 -# ) -# source = ManagedStream( -# self.loop, self.config, self.blob_manager, descriptor.sd_hash, os.path.dirname(file_path), -# os.path.basename(file_path), status=ManagedDownloadSource.STATUS_FINISHED, -# rowid=row_id, descriptor=descriptor -# ) # self.add(source) # if self.config.reflect_streams: # self._upload_stream_to_reflector(source) @@ -407,10 +266,6 @@ class StreamManager(SourceManager): # # async def _delete(self, stream: ManagedStream, delete_file: Optional[bool] = False): # >>>>>>> ManagedDownloadSource and SourceManager refactor - async def _delete(self, source: ManagedStream, delete_file: Optional[bool] = False): - blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]] - await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) - await self.storage.delete_stream(source.descriptor) async def stream_partial_content(self, request: Request, sd_hash: str): return await self._sources[sd_hash].stream_file(request, self.node) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 0a33c0bf6..af2663e64 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -1,5 +1,8 @@ import asyncio import binascii +import os +from hashlib import sha1 +from tempfile import mkdtemp from typing import Optional import libtorrent @@ -33,10 +36,8 @@ NOTIFICATION_MASKS = [ DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? - libtorrent.add_torrent_params_flags_t.flag_paused - | libtorrent.add_torrent_params_flags_t.flag_auto_managed + libtorrent.add_torrent_params_flags_t.flag_auto_managed | libtorrent.add_torrent_params_flags_t.flag_duplicate_is_error - | libtorrent.add_torrent_params_flags_t.flag_upload_mode | libtorrent.add_torrent_params_flags_t.flag_update_subscribe ) @@ -52,15 +53,23 @@ class TorrentHandle: def __init__(self, loop, executor, handle): self._loop = loop self._executor = executor - self._handle = handle + self._handle: libtorrent.torrent_handle = handle self.finished = asyncio.Event(loop=loop) + self.metadata_completed = asyncio.Event(loop=loop) def _show_status(self): + # fixme: cleanup status = self._handle.status() + if status.has_metadata: + self.metadata_completed.set() + # metadata: libtorrent.torrent_info = self._handle.get_torrent_info() + # print(metadata) + # print(metadata.files()) + # print(type(self._handle)) if not status.is_seeding: - print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % ( + print('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s' % ( status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, - status.num_peers, status.state)) + status.num_peers, status.num_seeds, status.state, status.save_path)) elif not self.finished.is_set(): self.finished.set() print("finished!") @@ -72,7 +81,7 @@ class TorrentHandle: ) if self.finished.is_set(): break - await asyncio.sleep(1, loop=self._loop) + await asyncio.sleep(0.1, loop=self._loop) async def pause(self): await self._loop.run_in_executor( @@ -89,25 +98,44 @@ class TorrentSession: def __init__(self, loop, executor): self._loop = loop self._executor = executor - self._session = None + self._session: Optional[libtorrent.session] = None self._handles = {} - async def bind(self, interface: str = '0.0.0.0', port: int = 6881): + async def add_fake_torrent(self): + dir = mkdtemp() + info, btih = self._create_fake(dir) + flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode + handle = self._session.add_torrent({ + 'ti': info, 'save_path': dir, 'flags': flags + }) + self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) + return btih + + def _create_fake(self, dir): + # beware, that's just for testing + path = os.path.join(dir, 'tmp') + with open(path, 'wb') as myfile: + size = myfile.write(b'0' * 40 * 1024 * 1024) + fs = libtorrent.file_storage() + fs.add_file('tmp', size) + print(fs.file_path(0)) + t = libtorrent.create_torrent(fs, 0, 4 * 1024 * 1024) + libtorrent.set_piece_hashes(t, dir) + info = libtorrent.torrent_info(t.generate()) + btih = sha1(info.metadata()).hexdigest() + return info, btih + + async def bind(self, interface: str = '0.0.0.0', port: int = 10889): settings = { 'listen_interfaces': f"{interface}:{port}", 'enable_outgoing_utp': True, 'enable_incoming_utp': True, - 'enable_outgoing_tcp': True, - 'enable_incoming_tcp': True + 'enable_outgoing_tcp': False, + 'enable_incoming_tcp': False } self._session = await self._loop.run_in_executor( self._executor, libtorrent.session, settings # pylint: disable=c-extension-no-member ) - await self._loop.run_in_executor( - self._executor, - # lambda necessary due boost functions raising errors when asyncio inspects them. try removing later - lambda: self._session.add_dht_router("router.utorrent.com", 6881) # pylint: disable=unnecessary-lambda - ) self._loop.create_task(self.process_alerts()) def _pop_alerts(self): @@ -135,17 +163,25 @@ class TorrentSession: ) def _add_torrent(self, btih: str, download_directory: Optional[str]): - params = {'info_hash': binascii.unhexlify(btih.encode()), 'flags': DEFAULT_FLAGS} + params = {'info_hash': binascii.unhexlify(btih.encode())} + flags = DEFAULT_FLAGS + print(bin(flags)) + flags ^= libtorrent.add_torrent_params_flags_t.flag_paused + # flags ^= libtorrent.add_torrent_params_flags_t.flag_auto_managed + # flags ^= libtorrent.add_torrent_params_flags_t.flag_stop_when_ready + print(bin(flags)) + # params['flags'] = flags if download_directory: params['save_path'] = download_directory - self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params)) + handle = self._handles[btih] = TorrentHandle(self._loop, self._executor, self._session.add_torrent(params)) + handle._handle.force_dht_announce() async def add_torrent(self, btih, download_path): await self._loop.run_in_executor( self._executor, self._add_torrent, btih, download_path ) self._loop.create_task(self._handles[btih].status_loop()) - await self._handles[btih].finished.wait() + await self._handles[btih].metadata_completed.wait() async def remove_torrent(self, btih, remove_files=False): if btih in self._handles: @@ -156,3 +192,31 @@ class TorrentSession: def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" + + +async def main(): + if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent"): + os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.torrent") + if os.path.exists("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso"): + os.remove("~/Downloads/ubuntu-18.04.3-live-server-amd64.iso") + + btih = "dd8255ecdc7ca55fb0bbf81323d87062db1f6d1c" + + executor = None + session = TorrentSession(asyncio.get_event_loop(), executor) + session2 = TorrentSession(asyncio.get_event_loop(), executor) + await session.bind('localhost', port=4040) + await session2.bind('localhost', port=4041) + btih = await session.add_fake_torrent() + session2._session.add_dht_node(('localhost', 4040)) + await session2.add_torrent(btih, "/tmp/down") + print('added') + while True: + print("idling") + await asyncio.sleep(100) + await session.pause() + executor.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 02f6b7cf9..24ed651be 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -26,6 +26,10 @@ def path_or_none(encoded_path) -> Optional[str]: class TorrentSource(ManagedDownloadSource): STATUS_STOPPED = "stopped" + filter_fields = SourceManager.filter_fields + filter_fields.update({ + 'bt_infohash' + }) def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: 'SQLiteStorage', identifier: str, file_name: Optional[str] = None, download_directory: Optional[str] = None,