diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index de655d7dd..329d0b0ac 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -2,22 +2,63 @@ import sys import json import asyncio import argparse -import logging +import typing +import logging.handlers +import aiohttp from docopt import docopt from textwrap import dedent -import aiohttp -from lbrynet.extras.compat import force_asyncioreactor_install -force_asyncioreactor_install() - -from lbrynet import log_support, __name__ as lbrynet_name, __version__ as lbrynet_version -from lbrynet.extras.daemon.loggly_handler import get_loggly_handler +from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version from lbrynet.conf import Config, CLIConfig -from lbrynet.utils import check_connection from lbrynet.extras.daemon.Daemon import Daemon +from lbrynet.extras.daemon.loggly_handler import get_loggly_handler log = logging.getLogger(lbrynet_name) log.addHandler(logging.NullHandler()) +default_formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(name)s:%(lineno)d: %(message)s") + +optional_path_getter_type = typing.Optional[typing.Callable[[], str]] + + +async def start_daemon(conf: Config, args): + file_handler = logging.handlers.RotatingFileHandler(conf.log_file_path, + maxBytes=2097152, backupCount=5) + file_handler.setFormatter(default_formatter) + log.addHandler(file_handler) + + if not args.quiet: + handler = logging.StreamHandler() + handler.setFormatter(default_formatter) + log.addHandler(handler) + + # mostly disable third part logging + logging.getLogger('urllib3').setLevel(logging.CRITICAL) + logging.getLogger('BitcoinRPC').setLevel(logging.INFO) + logging.getLogger('aioupnp').setLevel(logging.WARNING) + logging.getLogger('aiohttp').setLevel(logging.CRITICAL) + + if args.verbose: + log.setLevel(logging.DEBUG) + else: + log.setLevel(logging.INFO) + + if conf.share_usage_data: + loggly_handler = get_loggly_handler() + loggly_handler.setLevel(logging.ERROR) + log.addHandler(loggly_handler) + + log.info("Starting lbrynet-daemon from command line") + daemon = Daemon(conf) + + try: + await daemon.start_listening() + except (OSError, asyncio.CancelledError): + return 1 + try: + await daemon.server.wait_closed() + except (KeyboardInterrupt, asyncio.CancelledError): + await daemon.shutdown() + return 0 def display(data): @@ -179,55 +220,27 @@ def main(argv=None): argv = argv or sys.argv[1:] parser = get_argument_parser() args, command_args = parser.parse_known_args(argv) - conf = Config.create_from_arguments(args) if args.cli_version: print(f"{lbrynet_name} {lbrynet_version}") return 0 - elif args.command == 'start': - - if args.help: - args.start_parser.print_help() - return 0 - - log_support.configure_logging(conf.log_file_path, not args.quiet, args.verbose) - - if conf.share_usage_data: - loggly_handler = get_loggly_handler() - loggly_handler.setLevel(logging.ERROR) - log.addHandler(loggly_handler) - - log.debug('Final Settings: %s', conf.settings_dict) - log.info("Starting lbrynet-daemon from command line") - - daemon = Daemon(conf) - - if check_connection(): - from twisted.internet import reactor - reactor._asyncioEventloop.create_task(daemon.start()) - reactor.run() - else: - log.info("Not connected to internet, unable to start") - + return asyncio.run(start_daemon(conf, args)) elif args.command is not None: - doc = args.doc api_method_name = args.api_method_name if args.replaced_by: print(f"{args.api_method_name} is deprecated, using {args.replaced_by['api_method_name']}.") doc = args.replaced_by['doc'] api_method_name = args.replaced_by['api_method_name'] - if args.help: print(doc) + return 0 else: parsed = docopt(doc, command_args) params = set_kwargs(parsed) - loop = asyncio.get_event_loop() - loop.run_until_complete(execute_command(conf, api_method_name, params)) - + asyncio.run(execute_command(conf, api_method_name, params)) elif args.group is not None: args.group_parser.print_help() diff --git a/lbrynet/extras/daemon/Component.py b/lbrynet/extras/daemon/Component.py index c50de2e0f..7b55c2766 100644 --- a/lbrynet/extras/daemon/Component.py +++ b/lbrynet/extras/daemon/Component.py @@ -1,7 +1,5 @@ +import asyncio import logging -from twisted.internet import defer -from twisted._threads import AlreadyQuit - from lbrynet.conf import Config from lbrynet.extras.daemon.ComponentManager import ComponentManager @@ -57,7 +55,7 @@ class Component(metaclass=ComponentType): result = await self.start() self._running = True return result - except (defer.CancelledError, AlreadyQuit): + except asyncio.CancelledError: pass except Exception as err: log.exception("Error setting up %s", self.component_name or self.__class__.__name__) @@ -68,7 +66,7 @@ class Component(metaclass=ComponentType): result = await self.stop() self._running = False return result - except (defer.CancelledError, AlreadyQuit): + except asyncio.CancelledError: pass except Exception as err: log.exception("Error stopping %s", self.__class__.__name__) diff --git a/lbrynet/extras/daemon/ComponentManager.py b/lbrynet/extras/daemon/ComponentManager.py index 67b846cfb..ab9518e16 100644 --- a/lbrynet/extras/daemon/ComponentManager.py +++ b/lbrynet/extras/daemon/ComponentManager.py @@ -1,9 +1,8 @@ -import asyncio import logging -from lbrynet.p2p.Error import ComponentStartConditionNotMet -from lbrynet.extras.daemon.PeerManager import PeerManager -from lbrynet.extras.daemon.PeerFinder import DHTPeerFinder +import asyncio from lbrynet.conf import Config +from lbrynet.error import ComponentStartConditionNotMet +from lbrynet.dht.peer import PeerManager log = logging.getLogger(__name__) @@ -35,16 +34,16 @@ class RequiredCondition(metaclass=RequiredConditionType): class ComponentManager: default_component_classes = {} - def __init__(self, conf: Config, reactor=None, analytics_manager=None, skip_components=None, - peer_manager=None, peer_finder=None, **override_components): + def __init__(self, conf: Config, analytics_manager=None, skip_components=None, + peer_manager=None, **override_components): self.conf = conf self.skip_components = skip_components or [] - self.reactor = reactor + self.loop = asyncio.get_event_loop() + self.analytics_manager = analytics_manager self.component_classes = {} self.components = set() - self.analytics_manager = analytics_manager - self.peer_manager = peer_manager or PeerManager() - self.peer_finder = peer_finder or DHTPeerFinder(self) + self.started = asyncio.Event(loop=self.loop) + self.peer_manager = peer_manager or PeerManager(asyncio.get_event_loop_policy().get_event_loop()) for component_name, component_class in self.default_component_classes.items(): if component_name in override_components: @@ -127,7 +126,7 @@ class ComponentManager: if component.component_name in callbacks: maybe_coro = callbacks[component.component_name](component) if asyncio.iscoroutine(maybe_coro): - asyncio.create_task(maybe_coro) + await asyncio.create_task(maybe_coro) stages = self.sort_components() for stage in stages: @@ -136,12 +135,11 @@ class ComponentManager: ] if needing_start: await asyncio.wait(needing_start) + self.started.set() async def stop(self): """ Stop Components in reversed startup order - - :return: (defer.Deferred) """ stages = self.sort_components(reverse=True) for stage in stages: @@ -149,7 +147,7 @@ class ComponentManager: component._stop() for component in stage if component.running ] if needing_stop: - await asyncio.wait(needing_stop) + await asyncio.wait(needing_stop, loop=self.loop) def all_components_running(self, *component_names): """ diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 0ccb30410..3a2ecb9c7 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -2,39 +2,30 @@ import os import asyncio import aiohttp import logging -import treq import math import binascii +import typing +import socket from hashlib import sha256 from types import SimpleNamespace -from twisted.internet import defer, reactor, error from aioupnp import __version__ as aioupnp_version from aioupnp.upnp import UPnP from aioupnp.fault import UPnPError import lbrynet.schema - from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM -from lbrynet.extras.compat import d2f -from lbrynet.blob.EncryptedFileManager import EncryptedFileManager -from lbrynet.blob.client.EncryptedFileDownloader import EncryptedFileSaverFactory -from lbrynet.blob.client.EncryptedFileOptions import add_lbry_file_to_sd_identifier from lbrynet.dht.node import Node +from lbrynet.dht.peer import KademliaPeer +from lbrynet.dht.blob_announcer import BlobAnnouncer +from lbrynet.blob.blob_manager import BlobFileManager +from lbrynet.blob_exchange.server import BlobServer +from lbrynet.stream.stream_manager import StreamManager from lbrynet.extras.daemon.Component import Component from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager from lbrynet.extras.daemon.storage import SQLiteStorage -from lbrynet.extras.daemon.HashAnnouncer import DHTHashAnnouncer -from lbrynet.extras.reflector.server.server import ReflectorServerFactory from lbrynet.extras.wallet import LbryWalletManager from lbrynet.extras.wallet import Network -from lbrynet.utils import generate_id -from lbrynet.p2p.PaymentRateManager import OnlyFreePaymentsManager -from lbrynet.p2p.RateLimiter import RateLimiter -from lbrynet.p2p.BlobManager import DiskBlobManager -from lbrynet.p2p.StreamDescriptor import StreamDescriptorIdentifier, EncryptedFileStreamType -from lbrynet.p2p.server.BlobRequestHandler import BlobRequestHandlerFactory -from lbrynet.p2p.server.ServerProtocol import ServerProtocolFactory log = logging.getLogger(__name__) @@ -47,13 +38,10 @@ HEADERS_COMPONENT = "blockchain_headers" WALLET_COMPONENT = "wallet" DHT_COMPONENT = "dht" HASH_ANNOUNCER_COMPONENT = "hash_announcer" -FILE_MANAGER_COMPONENT = "file_manager" +STREAM_MANAGER_COMPONENT = "stream_manager" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" -REFLECTOR_COMPONENT = "reflector" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" -RATE_LIMITER_COMPONENT = "rate_limiter" -PAYMENT_RATE_COMPONENT = "payment_rate_manager" async def gather_dict(tasks: dict): @@ -75,6 +63,14 @@ async def get_external_ip(): # used if upnp is disabled or non-functioning pass +async def resolve_host(loop: asyncio.BaseEventLoop, url: str): + info = await loop.getaddrinfo( + url, 'https', + proto=socket.IPPROTO_TCP, + ) + return info[0][4][0] + + class DatabaseComponent(Component): component_name = DATABASE_COMPONENT @@ -158,39 +154,41 @@ class HeadersComponent(Component): 'download_progress': self._headers_progress_percent } - @defer.inlineCallbacks - def fetch_headers_from_s3(self): - def collector(data, h_file): - h_file.write(data) + async def fetch_headers_from_s3(self): + def collector(d, h_file): + h_file.write(d) local_size = float(h_file.tell()) final_size = float(final_size_after_download) self._headers_progress_percent = math.ceil(local_size / final_size * 100) local_header_size = self.local_header_file_size() resume_header = {"Range": f"bytes={local_header_size}-"} - response = yield treq.get(HEADERS_URL, headers=resume_header) - got_406 = response.code == 406 # our file is bigger - final_size_after_download = response.length + local_header_size - if got_406: - log.warning("s3 is more out of date than we are") - # should have something to download and a final length divisible by the header size - elif final_size_after_download and not final_size_after_download % HEADER_SIZE: - s3_height = (final_size_after_download / HEADER_SIZE) - 1 - local_height = self.local_header_file_height() - if s3_height > local_height: - if local_header_size: - log.info("Resuming download of %i bytes from s3", response.length) - with open(self.headers_file, "a+b") as headers_file: - yield treq.collect(response, lambda d: collector(d, headers_file)) - else: - with open(self.headers_file, "wb") as headers_file: - yield treq.collect(response, lambda d: collector(d, headers_file)) - log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", s3_height) - self._check_header_file_integrity() - else: + async with aiohttp.request('get', HEADERS_URL, headers=resume_header) as response: + got_406 = response.status == 406 # our file is bigger + final_size_after_download = response.content_length + local_header_size + if got_406: log.warning("s3 is more out of date than we are") - else: - log.error("invalid size for headers from s3") + # should have something to download and a final length divisible by the header size + elif final_size_after_download and not final_size_after_download % HEADER_SIZE: + s3_height = (final_size_after_download / HEADER_SIZE) - 1 + local_height = self.local_header_file_height() + if s3_height > local_height: + data = await response.read() + + if local_header_size: + log.info("Resuming download of %i bytes from s3", response.content_length) + with open(self.headers_file, "a+b") as headers_file: + collector(data, headers_file) + else: + with open(self.headers_file, "wb") as headers_file: + collector(data, headers_file) + log.info("fetched headers from s3 (s3 height: %i), now verifying integrity after download.", + s3_height) + self._check_header_file_integrity() + else: + log.warning("s3 is more out of date than we are") + else: + log.error("invalid size for headers from s3") def local_header_file_height(self): return max((self.local_header_file_size() / HEADER_SIZE) - 1, 0) @@ -259,7 +257,7 @@ class HeadersComponent(Component): self._downloading_headers = await self.should_download_headers_from_s3() if self._downloading_headers: try: - await d2f(self.fetch_headers_from_s3()) + await self.fetch_headers_from_s3() except Exception as err: log.error("failed to fetch headers from s3: %s", err) finally: @@ -313,29 +311,32 @@ class BlobComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.blob_manager = None + self.blob_manager: BlobFileManager = None @property - def component(self): + def component(self) -> typing.Optional[BlobFileManager]: return self.blob_manager - def start(self): + async def start(self): storage = self.component_manager.get_component(DATABASE_COMPONENT) - datastore = None + data_store = None if DHT_COMPONENT not in self.component_manager.skip_components: - dht_node = self.component_manager.get_component(DHT_COMPONENT) + dht_node: Node = self.component_manager.get_component(DHT_COMPONENT) if dht_node: - datastore = dht_node._dataStore - self.blob_manager = DiskBlobManager(os.path.join(self.conf.data_dir, "blobfiles"), storage, datastore) - return self.blob_manager.setup() + data_store = dht_node.protocol.data_store + self.blob_manager = BlobFileManager(asyncio.get_event_loop(), os.path.join(self.conf.data_dir, "blobfiles"), + storage, data_store) + return await self.blob_manager.setup() - def stop(self): - return self.blob_manager.stop() + async def stop(self): + while self.blob_manager and self.blob_manager.blobs: + _, blob = self.blob_manager.blobs.popitem() + await blob.close() async def get_status(self): count = 0 if self.blob_manager: - count = await self.blob_manager.storage.count_finished_blobs() + count = len(self.blob_manager.completed_blob_hashes) return {'finished_blobs': count} @@ -345,28 +346,27 @@ class DHTComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.dht_node = None + self.dht_node: Node = None self.upnp_component = None self.external_udp_port = None self.external_peer_port = None @property - def component(self): + def component(self) -> typing.Optional[Node]: return self.dht_node async def get_status(self): return { 'node_id': binascii.hexlify(self.component_manager.daemon.node_id), - 'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.contacts) + 'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.protocol.routing_table.get_peers()) } async def start(self): + log.info("start the dht") self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.peer_port) self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.dht_node_port) node_id = self.component_manager.daemon.node_id - if node_id is None: - node_id = generate_id() external_ip = self.upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") @@ -375,18 +375,21 @@ class DHTComponent(Component): log.warning("failed to get external ip") self.dht_node = Node( + asyncio.get_event_loop(), + self.component_manager.peer_manager, node_id=node_id, - udpPort=self.conf.dht_node_port, - externalUDPPort=self.external_udp_port, - externalIP=external_ip, - peerPort=self.external_peer_port + internal_udp_port=self.conf.dht_node_port, + udp_port=self.external_udp_port, + external_ip=external_ip, + peer_port=self.external_peer_port + ) + self.dht_node.start( + interface='0.0.0.0', known_node_urls=self.conf.known_dht_nodes ) - - await d2f(self.dht_node.start(self.conf.known_dht_nodes, block_on_join=False)) log.info("Started the dht") - def stop(self): - return d2f(self.dht_node.stop()) + async def stop(self): + self.dht_node.stop() class HashAnnouncerComponent(Component): @@ -395,195 +398,96 @@ class HashAnnouncerComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.hash_announcer = None + self.hash_announcer: BlobAnnouncer = None @property - def component(self): + def component(self) -> typing.Optional[BlobAnnouncer]: return self.hash_announcer async def start(self): storage = self.component_manager.get_component(DATABASE_COMPONENT) dht_node = self.component_manager.get_component(DHT_COMPONENT) - self.hash_announcer = DHTHashAnnouncer(self.conf, dht_node, storage) - self.hash_announcer.start() + self.hash_announcer = BlobAnnouncer(asyncio.get_event_loop(), dht_node, storage) + self.hash_announcer.start(self.conf.concurrent_announcers) + log.info("Started blob announcer") - def stop(self): + async def stop(self): self.hash_announcer.stop() + log.info("Stopped blob announcer") async def get_status(self): return { - 'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.hash_queue) + 'announce_queue_size': 0 if not self.hash_announcer else len(self.hash_announcer.announce_queue) } -class RateLimiterComponent(Component): - component_name = RATE_LIMITER_COMPONENT +class StreamManagerComponent(Component): + component_name = STREAM_MANAGER_COMPONENT + depends_on = [BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) - self.rate_limiter = RateLimiter() + self.stream_manager: StreamManager = None @property - def component(self): - return self.rate_limiter - - async def start(self): - self.rate_limiter.start() - - async def stop(self): - self.rate_limiter.stop() - - -class PaymentRateComponent(Component): - component_name = PAYMENT_RATE_COMPONENT - - def __init__(self, component_manager): - super().__init__(component_manager) - self.payment_rate_manager = OnlyFreePaymentsManager() - - @property - def component(self): - return self.payment_rate_manager - - async def start(self): - pass - - async def stop(self): - pass - - -class FileManagerComponent(Component): - component_name = FILE_MANAGER_COMPONENT - depends_on = [RATE_LIMITER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT, - PAYMENT_RATE_COMPONENT] - - def __init__(self, component_manager): - super().__init__(component_manager) - self.file_manager = None - - @property - def component(self): - return self.file_manager + def component(self) -> typing.Optional[StreamManager]: + return self.stream_manager async def get_status(self): - if not self.file_manager: + if not self.stream_manager: return return { - 'managed_files': len(self.file_manager.lbry_files) + 'managed_files': len(self.stream_manager.streams) } - def start(self): - rate_limiter = self.component_manager.get_component(RATE_LIMITER_COMPONENT) + async def start(self): blob_manager = self.component_manager.get_component(BLOB_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT) + node = self.component_manager.get_component(DHT_COMPONENT) - sd_identifier = StreamDescriptorIdentifier() - add_lbry_file_to_sd_identifier(sd_identifier) - file_saver_factory = EncryptedFileSaverFactory( - self.conf, - self.component_manager.peer_finder, - rate_limiter, - blob_manager, - storage, - wallet, - self.conf.download_dir - ) - sd_identifier.add_stream_downloader_factory(EncryptedFileStreamType, file_saver_factory) - - payment_rate_manager = self.component_manager.get_component(PAYMENT_RATE_COMPONENT) log.info('Starting the file manager') - self.file_manager = EncryptedFileManager( - self.conf, self.component_manager.peer_finder, rate_limiter, - blob_manager, wallet, payment_rate_manager, storage, sd_identifier + loop = asyncio.get_event_loop() + self.stream_manager = StreamManager( + loop, blob_manager, wallet, storage, node, self.conf.blob_download_timeout, + self.conf.peer_connect_timeout, [ + KademliaPeer(loop, address=(await resolve_host(loop, url)), tcp_port=port + 1) + for url, port in self.conf.reflector_servers + ] ) - return self.file_manager.setup() + await self.stream_manager.start() + log.info('Done setting up file manager') - def stop(self): - return d2f(self.file_manager.stop()) + async def stop(self): + await self.stream_manager.stop() class PeerProtocolServerComponent(Component): component_name = PEER_PROTOCOL_SERVER_COMPONENT - depends_on = [UPNP_COMPONENT, RATE_LIMITER_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, - PAYMENT_RATE_COMPONENT] + depends_on = [UPNP_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) - self.lbry_server_port = None + self.blob_server: BlobServer = None @property - def component(self): - return self.lbry_server_port + def component(self) -> typing.Optional[BlobServer]: + return self.blob_server async def start(self): - wallet = self.component_manager.get_component(WALLET_COMPONENT) + log.info("start blob server") upnp = self.component_manager.get_component(UPNP_COMPONENT) - peer_port = self.conf.peer_port - query_handlers = { - handler.get_primary_query_identifier(): handler for handler in [ - BlobRequestHandlerFactory( - self.component_manager.get_component(BLOB_COMPONENT), - wallet, - self.component_manager.get_component(PAYMENT_RATE_COMPONENT), - self.component_manager.analytics_manager - ), - wallet.get_wallet_info_query_handler_factory(), - ] - } - server_factory = ServerProtocolFactory( - self.component_manager.get_component(RATE_LIMITER_COMPONENT), query_handlers, - self.component_manager.peer_manager - ) - - try: - log.info("Peer protocol listening on TCP %i (ext port %i)", peer_port, - upnp.upnp_redirects.get("TCP", peer_port)) - self.lbry_server_port = reactor.listenTCP(peer_port, server_factory) - except error.CannotListenError as e: - import traceback - log.error("Couldn't bind to port %d. Visit lbry.io/faq/how-to-change-port for" - " more details.", peer_port) - log.error("%s", traceback.format_exc()) - raise ValueError("%s lbrynet may already be running on your computer." % str(e)) + blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT) + wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT) + peer_port = upnp.upnp_redirects.get("TCP", self.conf.settings["peer_port"]) + address = await wallet.get_unused_address() + self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address) + self.blob_server.start_server(peer_port, interface='0.0.0.0') + await self.blob_server.started_listening.wait() async def stop(self): - if self.lbry_server_port is not None: - self.lbry_server_port, old_port = None, self.lbry_server_port - log.info('Stop listening on port %s', old_port.port) - await d2f(old_port.stopListening()) - - -class ReflectorComponent(Component): - component_name = REFLECTOR_COMPONENT - depends_on = [BLOB_COMPONENT, FILE_MANAGER_COMPONENT] - - def __init__(self, component_manager): - super().__init__(component_manager) - self.reflector_server_port = self.conf.reflector_port - self.reflector_server = None - - @property - def component(self): - return self.reflector_server - - async def start(self): - log.info("Starting reflector server") - blob_manager = self.component_manager.get_component(BLOB_COMPONENT) - file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) - reflector_factory = ReflectorServerFactory(self.component_manager.peer_manager, blob_manager, file_manager) - try: - self.reflector_server = await d2f(reactor.listenTCP(self.reflector_server_port, reflector_factory)) - log.info('Started reflector on port %s', self.reflector_server_port) - except error.CannotListenError as e: - log.exception("Couldn't bind reflector to port %d", self.reflector_server_port) - raise ValueError(f"{e} lbrynet may already be running on your computer.") - - async def stop(self): - if self.reflector_server is not None: - log.info("Stopping reflector server") - self.reflector_server, p = None, self.reflector_server - await d2f(p.stopListening()) + if self.blob_server: + self.blob_server.stop_server() class UPnPComponent(Component): @@ -600,7 +504,7 @@ class UPnPComponent(Component): self._maintain_redirects_task = None @property - def component(self): + def component(self) -> 'UPnPComponent': return self async def _repeatedly_maintain_redirects(self, now=True): @@ -696,7 +600,8 @@ class UPnPComponent(Component): log.debug("set up upnp port redirects for gateway: %s", self.upnp.gateway.manufacturer_string) else: log.error("failed to setup upnp") - await self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status()) + if self.component_manager.analytics_manager: + self.component_manager.analytics_manager.send_upnp_setup_success_fail(success, await self.get_status()) self._maintain_redirects_task = asyncio.create_task(self._repeatedly_maintain_redirects(now=False)) async def stop(self): @@ -704,7 +609,7 @@ class UPnPComponent(Component): await asyncio.wait([ self.upnp.delete_port_mapping(port, protocol) for protocol, port in self.upnp_redirects.items() ]) - if self._maintain_redirects_task is not None and not self._maintain_redirects_task.done(): + if self._maintain_redirects_task and not self._maintain_redirects_task.done(): self._maintain_redirects_task.cancel() async def get_status(self): @@ -726,7 +631,7 @@ class ExchangeRateManagerComponent(Component): self.exchange_rate_manager = ExchangeRateManager() @property - def component(self): + def component(self) -> ExchangeRateManager: return self.exchange_rate_manager async def start(self): diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index 7642c15c0..e8e7d3bbd 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1,67 +1,53 @@ import os -import requests -import urllib -import textwrap -import random - -from typing import Callable, Optional, List -from operator import itemgetter -from binascii import hexlify, unhexlify -from copy import deepcopy -from twisted.internet.task import LoopingCall -from traceback import format_exc - -from torba.client.baseaccount import SingleKey, HierarchicalDeterministic - -from lbrynet import __version__ -from lbrynet.dht.error import TimeoutError -from lbrynet.blob.blob_file import is_valid_blobhash -from lbrynet.extras import system_info -from lbrynet.extras.reflector import reupload -from lbrynet.extras.daemon.Components import d2f -from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT -from lbrynet.extras.daemon.Components import FILE_MANAGER_COMPONENT, RATE_LIMITER_COMPONENT -from lbrynet.extras.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, PAYMENT_RATE_COMPONENT, UPNP_COMPONENT -from lbrynet.extras.daemon.ComponentManager import RequiredCondition -from lbrynet.extras.daemon.Downloader import GetStream -from lbrynet.extras.daemon.Publisher import Publisher -from lbrynet.extras.daemon.mime_types import guess_media_type -from lbrynet.extras.wallet import LbryWalletManager -from lbrynet.extras.wallet.account import Account as LBCAccount -from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies -from lbrynet.p2p.StreamDescriptor import download_sd_blob -from lbrynet.p2p.Error import InsufficientFundsError, UnknownNameError, DownloadDataTimeout, DownloadSDTimeout -from lbrynet.p2p.Error import NullFundsError, NegativeFundsError, ResolveError -from lbrynet.p2p.Peer import Peer -from lbrynet.p2p.SinglePeerDownloader import SinglePeerDownloader -from lbrynet.p2p.client.StandaloneBlobDownloader import StandaloneBlobDownloader -from lbrynet.schema.claim import ClaimDict -from lbrynet.schema.uri import parse_lbry_uri -from lbrynet.schema.error import URIParseError, DecodeError -from lbrynet.schema.validator import validate_claim_id -from lbrynet.schema.address import decode_address -from lbrynet.schema.decode import smart_decode -from lbrynet.extras.daemon import analytics -from lbrynet.extras.daemon.ComponentManager import ComponentManager -from lbrynet.extras.looping_call_manager import LoopingCallManager -from lbrynet.p2p.Error import ComponentsNotStarted, ComponentStartConditionNotMet -from lbrynet.extras.daemon.json_response_encoder import JSONResponseEncoder -import base58 - import asyncio import logging import json import inspect import signal -from functools import wraps -from twisted.internet import defer - -from lbrynet import utils -from lbrynet.extras.daemon.undecorated import undecorated -from lbrynet.conf import Config, Setting, SLACK_WEBHOOK - +import textwrap +import typing +import aiohttp +import base58 +from urllib.parse import urlencode, quote +from typing import Callable, Optional, List +from binascii import hexlify, unhexlify +from copy import deepcopy +from traceback import format_exc from aiohttp import web +from functools import wraps +from torba.client.baseaccount import SingleKey, HierarchicalDeterministic +from lbrynet import __version__, utils +from lbrynet.conf import Config, Setting, SLACK_WEBHOOK +from lbrynet.blob.blob_file import is_valid_blobhash +from lbrynet.blob_exchange.downloader import BlobDownloader +from lbrynet.error import InsufficientFundsError, UnknownNameError, DownloadSDTimeout, ComponentsNotStarted +from lbrynet.error import NullFundsError, NegativeFundsError, ResolveError, ComponentStartConditionNotMet +from lbrynet.extras import system_info +from lbrynet.extras.daemon import analytics +from lbrynet.extras.daemon.Components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT +from lbrynet.extras.daemon.Components import STREAM_MANAGER_COMPONENT +from lbrynet.extras.daemon.Components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT +from lbrynet.extras.daemon.ComponentManager import RequiredCondition +from lbrynet.extras.daemon.ComponentManager import ComponentManager +from lbrynet.extras.daemon.json_response_encoder import JSONResponseEncoder +from lbrynet.extras.daemon.undecorated import undecorated +from lbrynet.extras.wallet.account import Account as LBCAccount +from lbrynet.extras.wallet.dewies import dewies_to_lbc, lbc_to_dewies +from lbrynet.schema.claim import ClaimDict +from lbrynet.schema.uri import parse_lbry_uri +from lbrynet.schema.error import URIParseError, DecodeError +from lbrynet.schema.validator import validate_claim_id +from lbrynet.schema.address import decode_address + +if typing.TYPE_CHECKING: + from lbrynet.blob.blob_manager import BlobFileManager + from lbrynet.dht.node import Node + from lbrynet.extras.daemon.Components import UPnPComponent + from lbrynet.extras.wallet import LbryWalletManager + from lbrynet.extras.daemon.exchange_rate_manager import ExchangeRateManager + from lbrynet.extras.daemon.storage import SQLiteStorage + from lbrynet.stream.stream_manager import StreamManager log = logging.getLogger(__name__) @@ -120,16 +106,9 @@ CONNECTION_MESSAGES = { SHORT_ID_LEN = 20 MAX_UPDATE_FEE_ESTIMATE = 0.3 -DIRECTION_ASCENDING = 'asc' -DIRECTION_DESCENDING = 'desc' -DIRECTIONS = DIRECTION_ASCENDING, DIRECTION_DESCENDING - -EMPTY_PARAMS = [{}] -LBRY_SECRET = "LBRY_SECRET" - async def maybe_paginate(get_records: Callable, get_record_count: Callable, - page: Optional[int], page_size: Optional[int], **constraints): + page: Optional[int], page_size: Optional[int], **constraints): if None not in (page, page_size): constraints.update({ "offset": page_size * (page-1), @@ -143,71 +122,6 @@ async def maybe_paginate(get_records: Callable, get_record_count: Callable, return await get_records(**constraints) -class IterableContainer: - def __iter__(self): - for attr in dir(self): - if not attr.startswith("_"): - yield getattr(self, attr) - - def __contains__(self, item): - for attr in self: - if item == attr: - return True - return False - - -class Checker: - """The looping calls the daemon runs""" - INTERNET_CONNECTION = 'internet_connection_checker', 300 - # CONNECTION_STATUS = 'connection_status_checker' - - -class _FileID(IterableContainer): - """The different ways a file can be identified""" - SD_HASH = 'sd_hash' - FILE_NAME = 'file_name' - STREAM_HASH = 'stream_hash' - ROWID = "rowid" - CLAIM_ID = "claim_id" - OUTPOINT = "outpoint" - TXID = "txid" - NOUT = "nout" - CHANNEL_CLAIM_ID = "channel_claim_id" - CLAIM_NAME = "claim_name" - CHANNEL_NAME = "channel_name" - - -FileID = _FileID() - - -# TODO add login credentials in a conf file -# TODO alert if your copy of a lbry file is out of date with the name record - - -class NoValidSearch(Exception): - pass - - -class CheckInternetConnection: - def __init__(self, daemon): - self.daemon = daemon - - def __call__(self): - self.daemon.connected_to_internet = utils.check_connection() - - -class AlwaysSend: - def __init__(self, value_generator, *args, **kwargs): - self.value_generator = value_generator - self.args = args - self.kwargs = kwargs - - def __call__(self): - d = defer.maybeDeferred(self.value_generator, *self.args, **self.kwargs) - d.addCallback(lambda v: (True, v)) - return d - - def sort_claim_results(claims): claims.sort(key=lambda d: (d['height'], d['name'], d['claim_id'], d['txid'], d['nout'])) return claims @@ -331,63 +245,21 @@ class Daemon(metaclass=JSONRPCServerType): """ LBRYnet daemon, a jsonrpc interface to lbry functions """ - - component_attributes = { - DATABASE_COMPONENT: "storage", - DHT_COMPONENT: "dht_node", - WALLET_COMPONENT: "wallet_manager", - FILE_MANAGER_COMPONENT: "file_manager", - EXCHANGE_RATE_MANAGER_COMPONENT: "exchange_rate_manager", - PAYMENT_RATE_COMPONENT: "payment_rate_manager", - RATE_LIMITER_COMPONENT: "rate_limiter", - BLOB_COMPONENT: "blob_manager", - UPNP_COMPONENT: "upnp" - } - allowed_during_startup = [] - def __init__(self, conf: Config, component_manager: ComponentManager = None): + def __init__(self, conf: Config, analytics_manager: typing.Optional[analytics.Manager] = None, + component_manager: typing.Optional[ComponentManager] = None): self.conf = conf - to_skip = conf.components_to_skip - if 'reflector' not in to_skip and not conf.run_reflector_server: - to_skip.append('reflector') - looping_calls = { - Checker.INTERNET_CONNECTION[0]: (LoopingCall(CheckInternetConnection(self)), - Checker.INTERNET_CONNECTION[1]) - } self._node_id = None self._installation_id = None self.session_id = base58.b58encode(utils.generate_id()).decode() - self.analytics_manager = analytics.Manager(conf, self.installation_id, self.session_id) + to_skip = conf.settings['components_to_skip'] + self.analytics_manager = analytics_manager or analytics.Manager(asyncio.get_event_loop()) self.component_manager = component_manager or ComponentManager( conf, analytics_manager=self.analytics_manager, skip_components=to_skip or [] ) - self.component_manager.daemon = self - self.looping_call_manager = LoopingCallManager({n: lc for n, (lc, t) in (looping_calls or {}).items()}) - self._looping_call_times = {n: t for n, (lc, t) in (looping_calls or {}).items()} self.listening_port = None self._component_setup_task = None - self.announced_startup = False - self.sessions = {} - - # TODO: move this to a component - self.connected_to_internet = True - self.connection_status_code = None - - # components - # TODO: delete these, get the components where needed - self.storage = None - self.dht_node = None - self.wallet_manager: LbryWalletManager = None - self.file_manager = None - self.exchange_rate_manager = None - self.payment_rate_manager = None - self.rate_limiter = None - self.blob_manager = None - self.upnp = None - - # TODO: delete this - self.streams = {} logging.getLogger('aiohttp.access').setLevel(logging.WARN) self.app = web.Application() @@ -395,7 +267,35 @@ class Daemon(metaclass=JSONRPCServerType): self.app.router.add_post('/lbryapi', self.handle_old_jsonrpc) self.app.router.add_post('/', self.handle_old_jsonrpc) self.handler = self.app.make_handler() - self.server = None + self.server: asyncio.AbstractServer = None + + @property + def dht_node(self) -> typing.Optional['Node']: + return self.component_manager.get_component(DHT_COMPONENT) + + @property + def wallet_manager(self) -> typing.Optional['LbryWalletManager']: + return self.component_manager.get_component(WALLET_COMPONENT) + + @property + def storage(self) -> typing.Optional['SQLiteStorage']: + return self.component_manager.get_component(DATABASE_COMPONENT) + + @property + def stream_manager(self) -> typing.Optional['StreamManager']: + return self.component_manager.get_component(STREAM_MANAGER_COMPONENT) + + @property + def exchange_rate_manager(self) -> typing.Optional['ExchangeRateManager']: + return self.component_manager.get_component(EXCHANGE_RATE_MANAGER_COMPONENT) + + @property + def blob_manager(self) -> typing.Optional['BlobFileManager']: + return self.component_manager.get_component(BLOB_COMPONENT) + + @property + def upnp(self) -> typing.Optional['UPnPComponent']: + return self.component_manager.get_component(UPNP_COMPONENT) @classmethod def get_api_definitions(cls): @@ -492,21 +392,28 @@ class Daemon(metaclass=JSONRPCServerType): if not os.path.isdir(self.conf.download_dir): os.makedirs(self.conf.download_dir) - async def start(self): + async def start_listening(self): try: self.server = await asyncio.get_event_loop().create_server( self.handler, self.conf.api_host, self.conf.api_port ) log.info('lbrynet API listening on TCP %s:%i', *self.server.sockets[0].getsockname()[:2]) await self.setup() - await self.analytics_manager.send_server_startup_success() + if self.analytics_manager: + await self.analytics_manager.send_server_startup_success() except OSError: log.error('lbrynet API failed to bind TCP %s for listening. Daemon is already running or this port is ' 'already in use by another application.', self.conf.api) - except defer.CancelledError: + raise + try: + await self.setup() + except asyncio.CancelledError: log.info("shutting down before finished starting") + await self.shutdown() + raise except Exception as err: - await self.analytics_manager.send_server_startup_error(str(err)) + if self.analytics_manager: + await self.analytics_manager.send_server_startup_error(str(err)) log.exception('Failed to start lbrynet-daemon') async def setup(self): @@ -517,19 +424,12 @@ class Daemon(metaclass=JSONRPCServerType): self.ensure_wallet_dir() self.ensure_download_dir() - if not self.analytics_manager.is_started: + if self.analytics_manager: self.analytics_manager.start() - await self.analytics_manager.send_server_startup() - for lc_name, lc_time in self._looping_call_times.items(): - self.looping_call_manager.start(lc_name, lc_time) - def update_attribute(component): - setattr(self, self.component_attributes[component.component_name], component.component) - - kwargs = {component: update_attribute for component in self.component_attributes.keys()} - self._component_setup_task = self.component_manager.setup(**kwargs) + self._component_setup_task = self.component_manager.setup() await self._component_setup_task - + await self.analytics_manager.send_server_startup() log.info("Started lbrynet-daemon") @staticmethod @@ -537,7 +437,6 @@ class Daemon(metaclass=JSONRPCServerType): log.info("Already shutting down") async def shutdown(self): - self._stop_streams() # ignore INT/TERM signals once shutdown has started signal.signal(signal.SIGINT, self._already_shutting_down) signal.signal(signal.SIGTERM, self._already_shutting_down) @@ -583,7 +482,7 @@ class Daemon(metaclass=JSONRPCServerType): f"Invalid method requested: {function_name}.", JSONRPCError.CODE_METHOD_NOT_FOUND ) - if args in (EMPTY_PARAMS, []): + if args in ([{}], []): _args, _kwargs = (), {} elif isinstance(args, dict): _args, _kwargs = (), args @@ -614,6 +513,7 @@ class Daemon(metaclass=JSONRPCServerType): result = await result return result except Exception as e: # pylint: disable=broad-except + log.exception("error handling api request") return JSONRPCError( str(e), JSONRPCError.CODE_APPLICATION_ERROR, format_exc() ) @@ -684,216 +584,7 @@ class Daemon(metaclass=JSONRPCServerType): except AttributeError: return None - def _stop_streams(self): - """stop pending GetStream downloads""" - for sd_hash, stream in self.streams.items(): - stream.cancel(reason="daemon shutdown") - - async def _download_blob(self, blob_hash, rate_manager=None, timeout=None): - """ - Download a blob - - :param blob_hash (str): blob hash - :param rate_manager (PaymentRateManager), optional: the payment rate manager to use, - defaults to session.payment_rate_manager - :param timeout (int): blob timeout - :return: BlobFile - """ - if not blob_hash: - raise Exception("Nothing to download") - - rate_manager = rate_manager or self.payment_rate_manager - timeout = timeout or 30 - downloader = StandaloneBlobDownloader( - blob_hash, self.blob_manager, self.component_manager.peer_finder, self.rate_limiter, - rate_manager, self.wallet_manager, timeout - ) - return await d2f(downloader.download()) - - async def _get_stream_analytics_report(self, claim_dict): - sd_hash = claim_dict.source_hash.decode() - try: - stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash) - except Exception: - stream_hash = None - report = { - "sd_hash": sd_hash, - "stream_hash": stream_hash, - } - blobs = {} - try: - sd_host = await d2f(self.blob_manager.get_host_downloaded_from(sd_hash)) - except Exception: - sd_host = None - report["sd_blob"] = sd_host - if stream_hash: - blob_infos = await self.storage.get_blobs_for_stream(stream_hash) - report["known_blobs"] = len(blob_infos) - else: - blob_infos = [] - report["known_blobs"] = 0 - # for blob_hash, blob_num, iv, length in blob_infos: - # try: - # host = yield self.session.blob_manager.get_host_downloaded_from(blob_hash) - # except Exception: - # host = None - # if host: - # blobs[blob_num] = host - # report["blobs"] = json.dumps(blobs) - return report - - async def _download_name(self, name, claim_dict, sd_hash, txid, nout, timeout=None, file_name=None): - """ - Add a lbry file to the file manager, start the download, and return the new lbry file. - If it already exists in the file manager, return the existing lbry file - """ - - async def _download_finished(download_id, name, claim_dict): - report = await self._get_stream_analytics_report(claim_dict) - await self.analytics_manager.send_download_finished(download_id, name, report, claim_dict) - await self.analytics_manager.send_new_download_success(download_id, name, claim_dict) - - async def _download_failed(error, download_id, name, claim_dict): - report = await self._get_stream_analytics_report(claim_dict) - await self.analytics_manager.send_download_errored(error, download_id, name, claim_dict, - report) - await self.analytics_manager.send_new_download_fail(download_id, name, claim_dict, error) - - if sd_hash in self.streams: - downloader = self.streams[sd_hash] - return await d2f(downloader.finished_deferred) - else: - download_id = utils.random_string() - await self.analytics_manager.send_download_started(download_id, name, claim_dict) - await self.analytics_manager.send_new_download_start(download_id, name, claim_dict) - self.streams[sd_hash] = GetStream( - self.conf, self.file_manager.sd_identifier, self.wallet_manager, self.exchange_rate_manager, - self.blob_manager, self.component_manager.peer_finder, self.rate_limiter, self.payment_rate_manager, - self.storage, self.conf.max_key_fee, self.conf.disable_max_key_fee, self.conf.data_rate, timeout - ) - try: - lbry_file, finished_deferred = await d2f(self.streams[sd_hash].start( - claim_dict, name, txid, nout, file_name - )) - finished_deferred.addCallbacks( - lambda _: asyncio.create_task(_download_finished(download_id, name, claim_dict)), - lambda e: asyncio.create_task(_download_failed(e, download_id, name, claim_dict)) - ) - result = await self._get_lbry_file_dict(lbry_file) - except Exception as err: - await _download_failed(err, download_id, name, claim_dict) - if isinstance(err, (DownloadDataTimeout, DownloadSDTimeout)): - log.warning('Failed to get %s (%s)', name, err) - else: - log.error('Failed to get %s (%s)', name, err) - if self.streams[sd_hash].downloader and self.streams[sd_hash].code != 'running': - await d2f(self.streams[sd_hash].downloader.stop(err)) - result = {'error': str(err)} - finally: - del self.streams[sd_hash] - return result - - async def _publish_stream(self, account, name, bid, claim_dict, file_path=None, certificate=None, - claim_address=None, change_address=None): - publisher = Publisher( - account, self.blob_manager, self.payment_rate_manager, self.storage, - self.file_manager, self.wallet_manager, certificate - ) - parse_lbry_uri(name) - if not file_path: - stream_hash = await self.storage.get_stream_hash_for_sd_hash( - claim_dict['stream']['source']['source']) - tx = await publisher.publish_stream(name, bid, claim_dict, stream_hash, claim_address) - else: - tx = await publisher.create_and_publish_stream(name, bid, claim_dict, file_path, claim_address) - if self.conf.reflect_uploads: - d = reupload.reflect_file(publisher.lbry_file, random.choice(self.conf.reflector_servers)) - d.addCallbacks(lambda _: log.info("Reflected new publication to lbry://%s", name), - log.exception) - await self.analytics_manager.send_claim_action('publish') - nout = 0 - txo = tx.outputs[nout] - log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout) - return { - "success": True, - "tx": tx, - "claim_id": txo.claim_id, - "claim_address": self.ledger.hash160_to_address(txo.script.values['pubkey_hash']), - "output": tx.outputs[nout] - } - - async def _get_or_download_sd_blob(self, blob, sd_hash): - if blob: - return self.blob_manager.get_blob(blob[0]) - return await d2f(download_sd_blob( - self.conf, sd_hash.decode(), self.blob_manager, self.component_manager.peer_finder, self.rate_limiter, - self.payment_rate_manager, self.wallet_manager, timeout=self.conf.peer_search_timeout, - download_mirrors=self.conf.download_mirrors - )) - - def get_or_download_sd_blob(self, sd_hash): - """Return previously downloaded sd blob if already in the blob - manager, otherwise download and return it - """ - return self._get_or_download_sd_blob( - self.blob_manager.completed_blobs([sd_hash.decode()]), sd_hash - ) - - def get_size_from_sd_blob(self, sd_blob): - """ - Get total stream size in bytes from a sd blob - """ - - d = self.file_manager.sd_identifier.get_metadata_for_sd_blob(sd_blob) - d.addCallback(lambda metadata: metadata.validator.info_to_show()) - d.addCallback(lambda info: int(dict(info)['stream_size'])) - return d - - def _get_est_cost_from_stream_size(self, size): - """ - Calculate estimated LBC cost for a stream given its size in bytes - """ - - if self.payment_rate_manager.generous: - return 0.0 - return size / (10 ** 6) * self.conf.data_rate - - async def get_est_cost_using_known_size(self, uri, size): - """ - Calculate estimated LBC cost for a stream given its size in bytes - """ - cost = self._get_est_cost_from_stream_size(size) - resolved = await self.wallet_manager.resolve(uri) - - if uri in resolved and 'claim' in resolved[uri]: - claim = ClaimDict.load_dict(resolved[uri]['claim']['value']) - final_fee = self._add_key_fee_to_est_data_cost(claim.source_fee, cost) - return final_fee - - async def get_est_cost_from_sd_hash(self, sd_hash): - """ - Get estimated cost from a sd hash - """ - sd_blob = await self.get_or_download_sd_blob(sd_hash) - stream_size = await d2f(self.get_size_from_sd_blob(sd_blob)) - return self._get_est_cost_from_stream_size(stream_size) - - async def _get_est_cost_from_metadata(self, metadata, name): - try: - return self._add_key_fee_to_est_data_cost( - metadata.source_fee, await self.get_est_cost_from_sd_hash(metadata.source_hash) - ) - except: - log.warning("Timeout getting blob for cost est for lbry://%s, using only key fee", name) - return 0.0 - - def _add_key_fee_to_est_data_cost(self, fee, data_cost): - fee_amount = 0.0 if not fee else self.exchange_rate_manager.convert_currency(fee.currency, - "LBC", - fee.amount) - return data_cost + fee_amount - - async def get_est_cost_from_uri(self, uri): + async def get_est_cost_from_uri(self, uri: str) -> typing.Optional[float]: """ Resolve a name and return the estimated stream cost """ @@ -907,155 +598,16 @@ class Daemon(metaclass=JSONRPCServerType): if claim_response and 'claim' in claim_response: if 'value' in claim_response['claim'] and claim_response['claim']['value'] is not None: claim_value = ClaimDict.load_dict(claim_response['claim']['value']) - cost = await self._get_est_cost_from_metadata(claim_value, uri) - return round(cost, 5) + if not claim_value.has_fee: + return 0.0 + return round( + self.exchange_rate_manager.convert_currency( + claim_value.source_fee.currency, "LBC", claim_value.source_fee.amount + ), 5 + ) else: log.warning("Failed to estimate cost for %s", uri) - def get_est_cost(self, uri, size=None): - """Get a cost estimate for a lbry stream, if size is not provided the - sd blob will be downloaded to determine the stream size - - """ - if size is not None: - return self.get_est_cost_using_known_size(uri, size) - return self.get_est_cost_from_uri(uri) - - async def _get_lbry_file_dict(self, lbry_file): - key = hexlify(lbry_file.key) if lbry_file.key else None - download_directory = lbry_file.download_directory - if not os.path.exists(download_directory): - download_directory = self.conf.download_dir - full_path = os.path.join(download_directory, lbry_file.file_name) - mime_type = guess_media_type(lbry_file.file_name) - if os.path.isfile(full_path): - with open(full_path) as written_file: - written_file.seek(0, os.SEEK_END) - written_bytes = written_file.tell() - else: - written_bytes = 0 - - size = await lbry_file.get_total_bytes() - file_status = await lbry_file.status() - num_completed = file_status.num_completed - num_known = file_status.num_known - status = file_status.running_status - - return { - 'completed': lbry_file.completed, - 'file_name': lbry_file.file_name, - 'download_directory': download_directory, - 'points_paid': lbry_file.points_paid, - 'stopped': lbry_file.stopped, - 'stream_hash': lbry_file.stream_hash, - 'stream_name': lbry_file.stream_name, - 'suggested_file_name': lbry_file.suggested_file_name, - 'sd_hash': lbry_file.sd_hash, - 'download_path': full_path, - 'mime_type': mime_type, - 'key': key, - 'total_bytes': size, - 'written_bytes': written_bytes, - 'blobs_completed': num_completed, - 'blobs_in_stream': num_known, - 'status': status, - 'claim_id': lbry_file.claim_id, - 'txid': lbry_file.txid, - 'nout': lbry_file.nout, - 'outpoint': lbry_file.outpoint, - 'metadata': lbry_file.metadata, - 'channel_claim_id': lbry_file.channel_claim_id, - 'channel_name': lbry_file.channel_name, - 'claim_name': lbry_file.claim_name - } - - async def _get_lbry_file(self, search_by, val, return_json=False): - lbry_file = None - if search_by in FileID: - for l_f in self.file_manager.lbry_files: - if l_f.__dict__.get(search_by) == val: - lbry_file = l_f - break - else: - raise NoValidSearch(f'{search_by} is not a valid search operation') - if return_json and lbry_file: - lbry_file = await self._get_lbry_file_dict(lbry_file) - return lbry_file - - async def _get_lbry_files(self, return_json=False, **kwargs): - lbry_files = list(self.file_manager.lbry_files) - if kwargs: - for search_type, value in iter_lbry_file_search_values(kwargs): - lbry_files = [l_f for l_f in lbry_files if l_f.__dict__[search_type] == value] - if return_json: - file_dicts = [] - for lbry_file in lbry_files: - lbry_file_dict = await self._get_lbry_file_dict(lbry_file) - file_dicts.append(lbry_file_dict) - lbry_files = file_dicts - log.debug("Collected %i lbry files", len(lbry_files)) - return lbry_files - - def _sort_lbry_files(self, lbry_files, sort_by): - for field, direction in sort_by: - is_reverse = direction == DIRECTION_DESCENDING - key_getter = create_key_getter(field) if field else None - lbry_files = sorted(lbry_files, key=key_getter, reverse=is_reverse) - return lbry_files - - def _parse_lbry_files_sort(self, sort): - """ - Given a sort string like 'file_name, desc' or 'points_paid', - parse the string into a tuple of (field, direction). - Direction defaults to ascending. - """ - - pieces = [p.strip() for p in sort.split(',')] - field = pieces.pop(0) - direction = DIRECTION_ASCENDING - if pieces and pieces[0] in DIRECTIONS: - direction = pieces[0] - return field, direction - - def _get_single_peer_downloader(self): - downloader = SinglePeerDownloader(self.conf) - downloader.setup(self.wallet_manager) - return downloader - - async def _blob_availability(self, blob_hash, search_timeout, blob_timeout, downloader=None): - if not downloader: - downloader = self._get_single_peer_downloader() - search_timeout = search_timeout or self.conf.peer_search_timeout - blob_timeout = blob_timeout or self.conf.sd_download_timeout - reachable_peers = [] - unreachable_peers = [] - try: - peers = await self.jsonrpc_peer_list(blob_hash, search_timeout) - peer_infos = [{"peer": Peer(x['host'], x['port']), - "blob_hash": blob_hash, - "timeout": blob_timeout} for x in peers] - dl = [] - dl_peers = [] - dl_results = [] - for peer_info in peer_infos: - dl.append(downloader.download_temp_blob_from_peer(**peer_info)) - dl_peers.append("%s:%i" % (peer_info['peer'].host, peer_info['peer'].port)) - for dl_peer, download_result in zip(dl_peers, await asyncio.gather(*dl)): - if download_result: - reachable_peers.append(dl_peer) - else: - unreachable_peers.append(dl_peer) - dl_results.append(download_result) - is_available = any(dl_results) - except Exception as err: - return {'error': "Failed to get peers for blob: %s" % err} - - return { - 'is_available': is_available, - 'reachable_peers': reachable_peers, - 'unreachable_peers': unreachable_peers, - } - ############################################################################ # # # JSON-RPC API methods start here # @@ -1150,7 +702,7 @@ class Daemon(metaclass=JSONRPCServerType): } """ - connection_code = CONNECTION_STATUS_CONNECTED if self.connected_to_internet else CONNECTION_STATUS_NETWORK + connection_code = CONNECTION_STATUS_NETWORK response = { 'installation_id': self.installation_id, 'is_running': all(self.component_manager.get_components_status().values()), @@ -1196,7 +748,7 @@ class Daemon(metaclass=JSONRPCServerType): log.info("Get version info: " + json.dumps(platform_info)) return platform_info - def jsonrpc_report_bug(self, message=None): + async def jsonrpc_report_bug(self, message=None): """ Report a bug to slack @@ -1209,19 +761,21 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (bool) true if successful """ + platform_name = system_info.get_platform()['platform'] - query = get_loggly_query_string(self.installation_id) - requests.post( - utils.deobfuscate(SLACK_WEBHOOK), - json.dumps({ - "text": ( - f"os: {platform_name}\n " - f"version: {__version__}\n" - f"<{query}|loggly>\n" - f"{message}" - ) - }) + webhook = utils.deobfuscate(SLACK_WEBHOOK) + payload_template = "os: %s\n version: %s\n<%s|loggly>\n%s" + payload_params = ( + platform_name, + __version__, + get_loggly_query_string(self.installation_id), + message ) + payload = { + "text": payload_template % payload_params + } + async with aiohttp.request('post', webhook, data=json.dumps(payload)) as response: + pass return True SETTINGS_DOC = """ @@ -1874,8 +1428,8 @@ class Daemon(metaclass=JSONRPCServerType): File management. """ - @requires(FILE_MANAGER_COMPONENT) - async def jsonrpc_file_list(self, sort=None, **kwargs): + @requires(STREAM_MANAGER_COMPONENT) + def jsonrpc_file_list(self, sort=None, reverse=False, comparison=None, **kwargs): """ List files limited by optional filters @@ -1883,7 +1437,8 @@ class Daemon(metaclass=JSONRPCServerType): file_list [--sd_hash=] [--file_name=] [--stream_hash=] [--rowid=] [--claim_id=] [--outpoint=] [--txid=] [--nout=] [--channel_claim_id=] [--channel_name=] - [--claim_name=] [--sort=...] + [--claim_name=] [--sort=] [--reverse] [--comparison=] + [--full_status=] Options: --sd_hash= : (str) get file with matching sd hash @@ -1935,11 +1490,13 @@ class Daemon(metaclass=JSONRPCServerType): }, ] """ - result = await self._get_lbry_files(return_json=True, **kwargs) - if sort: - sort_by = [self._parse_lbry_files_sort(s) for s in sort] - result = self._sort_lbry_files(result, sort_by) - return result + sort = sort or 'status' + comparison = comparison or 'eq' + return [ + stream.as_dict() for stream in self.stream_manager.get_filtered_streams( + sort, reverse, comparison, **kwargs + ) + ] @requires(WALLET_COMPONENT) async def jsonrpc_resolve_name(self, name, force=False): @@ -2103,8 +1660,8 @@ class Daemon(metaclass=JSONRPCServerType): results[resolved_uri] = resolved[resolved_uri] return results - @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, - RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, + @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, + STREAM_MANAGER_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) async def jsonrpc_get(self, uri, file_name=None, timeout=None): """ @@ -2165,30 +1722,31 @@ class Daemon(metaclass=JSONRPCServerType): ) if 'error' in resolved: raise ResolveError(f"error resolving stream: {resolved['error']}") - txid, nout, name = resolved['txid'], resolved['nout'], resolved['name'] - claim_dict = ClaimDict.load_dict(resolved['value']) - sd_hash = claim_dict.source_hash.decode() - if sd_hash in self.streams: - log.info("Already waiting on lbry://%s to start downloading", name) - await d2f(self.streams[sd_hash].data_downloading_deferred) - - lbry_file = await self._get_lbry_file(FileID.SD_HASH, sd_hash, return_json=False) - - if lbry_file: - if not os.path.isfile(os.path.join(lbry_file.download_directory, lbry_file.file_name)): - log.info("Already have lbry file but missing file in %s, rebuilding it", - lbry_file.download_directory) - await d2f(lbry_file.start()) - else: - log.info('Already have a file for %s', name) - result = await self._get_lbry_file_dict(lbry_file) + claim = ClaimDict.load_dict(resolved['value']) + fee_amount, fee_address = None, None + if claim.has_fee: + fee_amount = round(self.exchange_rate_manager.convert_currency( + claim.source_fee.currency, "LBC", claim.source_fee.amount + ), 5) + fee_address = claim.source_fee.address + outpoint = f"{resolved['txid']}:{resolved['nout']}" + existing = self.stream_manager.get_filtered_streams(outpoint=outpoint) + if not existing: + existing.extend(self.stream_manager.get_filtered_streams(claim_id=resolved['claim_id'], + sd_hash=claim.source_hash)) + if existing: + log.info("already have matching stream for %s", uri) + stream = existing[0] else: - result = await self._download_name(name, claim_dict, sd_hash, txid, nout, - timeout=timeout, file_name=file_name) - return result + stream = await self.stream_manager.download_stream_from_claim( + self.dht_node, self.conf.download_dir, resolved, file_name, timeout, fee_amount, fee_address + ) + if stream: + return stream.as_dict() + raise DownloadSDTimeout(resolved['value']['stream']['source']['source']) - @requires(FILE_MANAGER_COMPONENT) + @requires(STREAM_MANAGER_COMPONENT) async def jsonrpc_file_set_status(self, status, **kwargs): """ Start or stop downloading a file @@ -2212,14 +1770,16 @@ class Daemon(metaclass=JSONRPCServerType): if status not in ['start', 'stop']: raise Exception('Status must be "start" or "stop".') - search_type, value = get_lbry_file_search_value(kwargs) - lbry_file = await self._get_lbry_file(search_type, value, return_json=False) - if not lbry_file: - raise Exception(f'Unable to find a file for {search_type}:{value}') - - if status == 'start' and lbry_file.stopped or status == 'stop' and not lbry_file.stopped: - await d2f(self.file_manager.toggle_lbry_file_running(lbry_file)) - msg = "Started downloading file" if status == 'start' else "Stopped downloading file" + streams = self.stream_manager.get_filtered_streams(**kwargs) + if not streams: + raise Exception(f'Unable to find a file for {kwargs}') + stream = streams[0] + if status == 'start' and not stream.running and not stream.finished: + stream.downloader.download(self.dht_node) + msg = "Resumed download" + elif status == 'stop' and stream.running: + await stream.stop_download() + msg = "Stopped download" else: msg = ( "File was already being downloaded" if status == 'start' @@ -2227,7 +1787,7 @@ class Daemon(metaclass=JSONRPCServerType): ) return msg - @requires(FILE_MANAGER_COMPONENT) + @requires(STREAM_MANAGER_COMPONENT) async def jsonrpc_file_delete(self, delete_from_download_dir=False, delete_all=False, **kwargs): """ Delete a LBRY file @@ -2259,52 +1819,49 @@ class Daemon(metaclass=JSONRPCServerType): (bool) true if deletion was successful """ - lbry_files = await self._get_lbry_files(return_json=False, **kwargs) + streams = self.stream_manager.get_filtered_streams(**kwargs) - if len(lbry_files) > 1: + if len(streams) > 1: if not delete_all: log.warning("There are %i files to delete, use narrower filters to select one", - len(lbry_files)) + len(streams)) return False else: log.warning("Deleting %i files", - len(lbry_files)) + len(streams)) - if not lbry_files: + if not streams: log.warning("There is no file to delete") return False else: - for lbry_file in lbry_files: - file_name, stream_hash = lbry_file.file_name, lbry_file.stream_hash - if lbry_file.sd_hash in self.streams: - del self.streams[lbry_file.sd_hash] - await d2f(self.file_manager.delete_lbry_file(lbry_file, delete_file=delete_from_download_dir)) - log.info("Deleted file: %s", file_name) - return True + for stream in streams: + await self.stream_manager.delete_stream(stream, delete_file=delete_from_download_dir) + log.info("Deleted file: %s", stream.file_name) + result = True + return result STREAM_DOC = """ Stream information. """ + @requires(WALLET_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, BLOB_COMPONENT, - DHT_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, + DHT_COMPONENT, DATABASE_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - def jsonrpc_stream_cost_estimate(self, uri, size=None): + def jsonrpc_stream_cost_estimate(self, uri): """ Get estimated cost for a lbry stream Usage: - stream_cost_estimate ( | --uri=) [ | --size=] + stream_cost_estimate ( | --uri=) Options: --uri= : (str) uri to use - --size= : (float) stream size in bytes. if provided an sd blob won't be - downloaded. Returns: (float) Estimated cost in lbry credits, returns None if uri is not resolvable """ - return self.get_est_cost(uri, size) + return self.get_est_cost_from_uri(uri) CHANNEL_DOC = """ Channel management. @@ -2423,7 +1980,7 @@ class Daemon(metaclass=JSONRPCServerType): return await self.wallet_manager.import_certificate_info(serialized_certificate_info) - @requires(WALLET_COMPONENT, FILE_MANAGER_COMPONENT, BLOB_COMPONENT, PAYMENT_RATE_COMPONENT, DATABASE_COMPONENT, + @requires(WALLET_COMPONENT, STREAM_MANAGER_COMPONENT, BLOB_COMPONENT, DATABASE_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) async def jsonrpc_publish( self, name, bid, metadata=None, file_path=None, fee=None, title=None, @@ -2528,6 +2085,7 @@ class Daemon(metaclass=JSONRPCServerType): account = self.get_account_or_default(account_id) available = await account.get_balance() + existing_claims = [] if amount >= available: existing_claims = await account.get_claims(claim_name=name) if len(existing_claims) == 1: @@ -2632,10 +2190,45 @@ class Daemon(metaclass=JSONRPCServerType): 'channel_name': channel_name }) - return await self._publish_stream( - account, name, amount, claim_dict, file_path, - certificate, claim_address, change_address + from lbrynet.extras.daemon.mime_types import guess_media_type + + if file_path: + if not os.path.isfile(file_path): + raise Exception(f"File {file_path} not found") + if os.path.getsize(file_path) == 0: + raise Exception(f"Cannot publish empty file {file_path}") + claim_dict['stream']['source'] = {} + + stream = await self.stream_manager.create_stream(file_path) + stream_hash = stream.stream_hash + await self.storage.save_published_file(stream_hash, os.path.basename(file_path), + os.path.dirname(file_path), 0) + claim_dict['stream']['source']['source'] = stream.sd_hash + claim_dict['stream']['source']['sourceType'] = 'lbry_sd_hash' + claim_dict['stream']['source']['contentType'] = guess_media_type(file_path) + claim_dict['stream']['source']['version'] = "_0_0_1" # need current version here + else: + if not ('source' not in claim_dict['stream'] and existing_claims): + raise Exception("no previous stream to update") + claim_dict['stream']['source'] = existing_claims[-1].claim_dict['stream']['source'] + stream_hash = await self.storage.get_stream_hash_for_sd_hash(claim_dict['stream']['source']['source']) + tx = await self.default_wallet.claim_name( + account, name, bid, claim_dict, certificate, claim_address ) + await self.storage.save_content_claim( + stream_hash, tx.outputs[0].id + ) + await self.analytics_manager.send_claim_action('publish') + nout = 0 + txo = tx.outputs[nout] + log.info("Success! Published to lbry://%s txid: %s nout: %d", name, tx.id, nout) + return { + "success": True, + "tx": tx, + "claim_id": txo.claim_id, + "claim_address": self.ledger.hash160_to_address(txo.script.values['pubkey_hash']), + "output": tx.outputs[nout] + } @requires(WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) async def jsonrpc_claim_abandon(self, claim_id=None, txid=None, nout=None, account_id=None, blocking=True): @@ -3115,48 +2708,30 @@ class Daemon(metaclass=JSONRPCServerType): Blob management. """ - @requires(WALLET_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT, RATE_LIMITER_COMPONENT, PAYMENT_RATE_COMPONENT, + @requires(WALLET_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - async def jsonrpc_blob_get(self, blob_hash, timeout=None, encoding=None, payment_rate_manager=None): + async def jsonrpc_blob_get(self, blob_hash, timeout=None, read=False): """ Download and return a blob Usage: - blob_get ( | --blob_hash=) [--timeout=] - [--encoding=] [--payment_rate_manager=] + blob_get ( | --blob_hash=) [--timeout=] [--read] Options: --blob_hash= : (str) blob hash of the blob to get --timeout= : (int) timeout in number of seconds - --encoding= : (str) by default no attempt at decoding - is made, can be set to one of the - following decoders: - 'json' - --payment_rate_manager= : (str) if not given the default payment rate - manager will be used. - supported alternative rate managers: - 'only-free' Returns: (str) Success/Fail message or (dict) decoded data """ - decoders = { - 'json': json.loads - } - - timeout = timeout or 30 - blob = await self._download_blob( - blob_hash, rate_manager=self.payment_rate_manager, timeout=timeout - ) - if encoding and encoding in decoders: - blob_file = blob.open_for_reading() - result = decoders[encoding](blob_file.read()) - blob_file.close() + downloader = BlobDownloader(asyncio.get_event_loop(), self.blob_manager, self.conf) + blob = await downloader.get_blob(blob_hash, self.dht_node) + if read: + with open(blob.file_path, 'rb') as handle: + return handle.read().decode() else: - result = "Downloaded blob %s" % blob_hash - - return result + return "Downloaded blob %s" % blob_hash @requires(BLOB_COMPONENT, DATABASE_COMPONENT) async def jsonrpc_blob_delete(self, blob_hash): @@ -3173,14 +2748,11 @@ class Daemon(metaclass=JSONRPCServerType): (str) Success/fail message """ - if blob_hash not in self.blob_manager.blobs: - return "Don't have that blob" - try: - stream_hash = await self.storage.get_stream_hash_for_sd_hash(blob_hash) - await self.storage.delete_stream(stream_hash) - except Exception as err: - pass - await d2f(self.blob_manager.delete_blobs([blob_hash])) + streams = self.stream_manager.get_filtered_streams(sd_hash=blob_hash) + if streams: + await self.stream_manager.delete_stream(streams[0]) + else: + await self.blob_manager.delete_blobs([blob_hash]) return "Deleted %s" % blob_hash PEER_DOC = """ @@ -3188,40 +2760,45 @@ class Daemon(metaclass=JSONRPCServerType): """ @requires(DHT_COMPONENT) - async def jsonrpc_peer_list(self, blob_hash, timeout=None): + async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None): """ Get peers for blob hash Usage: - peer_list ( | --blob_hash=) [ | --timeout=] + peer_list ( | --blob_hash=) + [ | --search_bottom_out_limit=] Options: - --blob_hash= : (str) find available peers for this blob hash - --timeout= : (int) peer search timeout in seconds + --blob_hash= : (str) find available peers for this blob hash + --search_bottom_out_limit= : (int) the number of search probes in a row + that don't find any new peers + before giving up and returning Returns: - (list) List of contact dictionaries {'host': , 'port': , 'node_id': } + (list) List of contact dictionaries {'address': , 'udp_port': , 'tcp_port': , + 'node_id': } """ if not is_valid_blobhash(blob_hash): raise Exception("invalid blob hash") - - finished_deferred = self.dht_node.iterativeFindValue(unhexlify(blob_hash)) - - def trap_timeout(err): - err.trap(defer.TimeoutError) - return [] - - finished_deferred.addTimeout(timeout or self.conf.peer_search_timeout, self.dht_node.clock) - finished_deferred.addErrback(trap_timeout) - peers = await d2f(finished_deferred) + if search_bottom_out_limit is not None: + search_bottom_out_limit = int(search_bottom_out_limit) + if search_bottom_out_limit <= 0: + raise Exception("invalid bottom out limit") + else: + search_bottom_out_limit = 4 + peers = [] + async for new_peers in self.dht_node.get_iterative_value_finder(unhexlify(blob_hash.encode()), max_results=1, + bottom_out_limit=search_bottom_out_limit): + peers.extend(new_peers) results = [ { - "node_id": hexlify(node_id).decode(), - "host": host, - "port": port + "node_id": hexlify(peer.node_id).decode(), + "address": peer.address, + "udp_port": peer.udp_port, + "tcp_port": peer.tcp_port, } - for node_id, host, port in peers + for peer in peers ] return results @@ -3259,40 +2836,9 @@ class Daemon(metaclass=JSONRPCServerType): await self.storage.should_single_announce_blobs(blob_hashes, immediate=True) return True - @requires(FILE_MANAGER_COMPONENT) - async def jsonrpc_file_reflect(self, **kwargs): - """ - Reflect all the blobs in a file matching the filter criteria - - Usage: - file_reflect [--sd_hash=] [--file_name=] - [--stream_hash=] [--rowid=] - [--reflector=] - - Options: - --sd_hash= : (str) get file with matching sd hash - --file_name= : (str) get file with matching file name in the - downloads folder - --stream_hash= : (str) get file with matching stream hash - --rowid= : (int) get file with matching row id - --reflector= : (str) reflector server, ip address or url - by default choose a server from the config - - Returns: - (list) list of blobs reflected - """ - lbry_files = await self._get_lbry_files(**kwargs) - if len(lbry_files) > 1: - raise Exception('Too many (%i) files found, need one' % len(lbry_files)) - elif not lbry_files: - raise Exception('No file found') - return await d2f(reupload.reflect_file( - lbry_files[0], reflector_server=kwargs.get('reflector', random.choice(self.conf.reflector_servers)) - )) - @requires(BLOB_COMPONENT, WALLET_COMPONENT) async def jsonrpc_blob_list(self, uri=None, stream_hash=None, sd_hash=None, needed=None, - finished=None, page_size=None, page=None): + finished=None, page_size=None, page=None): """ Returns blob hashes. If not given filters, returns all blobs known by the blob manager @@ -3315,6 +2861,7 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (list) List of blob hashes """ + if uri or stream_hash or sd_hash: if uri: metadata = (await self.wallet_manager.resolve(uri))[uri] @@ -3325,31 +2872,23 @@ class Daemon(metaclass=JSONRPCServerType): elif sd_hash: stream_hash = await self.storage.get_stream_hash_for_sd_hash(sd_hash) sd_hash = await self.storage.get_sd_blob_hash_for_stream(stream_hash) - if stream_hash: - crypt_blobs = await self.storage.get_blobs_for_stream(stream_hash) - blobs = await d2f(defer.gatherResults([ - self.blob_manager.get_blob(crypt_blob.blob_hash, crypt_blob.length) - for crypt_blob in crypt_blobs if crypt_blob.blob_hash is not None - ])) + if sd_hash: + blobs = [sd_hash] else: blobs = [] - # get_blobs_for_stream does not include the sd blob, so we'll add it manually - if sd_hash in self.blob_manager.blobs: - blobs = [self.blob_manager.blobs[sd_hash]] + blobs + if stream_hash: + blobs.extend([b.blob_hash for b in await self.storage.get_blobs_for_stream(stream_hash)]) else: - blobs = self.blob_manager.blobs.values() - + blobs = list(self.blob_manager.completed_blob_hashes) if needed: - blobs = [blob for blob in blobs if not blob.get_is_verified()] + blobs = [blob_hash for blob_hash in blobs if not self.blob_manager.get_blob(blob_hash).get_is_verified()] if finished: - blobs = [blob for blob in blobs if blob.get_is_verified()] - - blob_hashes = [blob.blob_hash for blob in blobs if blob.blob_hash] - page_size = page_size or len(blob_hashes) + blobs = [blob_hash for blob_hash in blobs if self.blob_manager.get_blob(blob_hash).get_is_verified()] + page_size = page_size or len(blobs) page = page or 0 start_index = page * page_size stop_index = start_index + page_size - return blob_hashes[start_index:stop_index] + return blobs[start_index:stop_index] @requires(BLOB_COMPONENT) async def jsonrpc_blob_reflect(self, blob_hashes, reflector_server=None): @@ -3365,9 +2904,8 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (list) reflected blob hashes """ - return await d2f(reupload.reflect_blob_hashes( - blob_hashes, self.blob_manager, reflector_server or random.choice(self.conf.reflector_servers) - )) + + raise NotImplementedError() @requires(BLOB_COMPONENT) async def jsonrpc_blob_reflect_all(self): @@ -3383,44 +2921,58 @@ class Daemon(metaclass=JSONRPCServerType): Returns: (bool) true if successful """ - blob_hashes = await d2f(self.blob_manager.get_all_verified_blobs()) - return await d2f(reupload.reflect_blob_hashes( - blob_hashes, self.blob_manager, random.choice(self.conf.reflector_servers) - )) + + raise NotImplementedError() + + @requires(STREAM_MANAGER_COMPONENT) + async def jsonrpc_file_reflect(self, **kwargs): + """ + Reflect all the blobs in a file matching the filter criteria + + Usage: + file_reflect [--sd_hash=] [--file_name=] + [--stream_hash=] [--rowid=] + [--reflector=] + + Options: + --sd_hash= : (str) get file with matching sd hash + --file_name= : (str) get file with matching file name in the + downloads folder + --stream_hash= : (str) get file with matching stream hash + --rowid= : (int) get file with matching row id + --reflector= : (str) reflector server, ip address or url + by default choose a server from the config + + Returns: + (list) list of blobs reflected + """ + + raise NotImplementedError() @requires(DHT_COMPONENT) - async def jsonrpc_peer_ping(self, node_id, address=None, port=None): + async def jsonrpc_peer_ping(self, node_id, address, port): """ Send a kademlia ping to the specified peer. If address and port are provided the peer is directly pinged, if not provided the peer is located first. Usage: - peer_ping ( | --node_id=) [
| --address=
] [ | --port=] - - Options: - --address=
: (str) ip address of the peer - --port= : (int) udp port of the peer - + peer_ping ( | --node_id=) (
| --address=
) ( | --port=) Returns: (str) pong, or {'error': } if an error is encountered """ - contact = None + peer = None if node_id and address and port: - contact = self.dht_node.contact_manager.get_contact(unhexlify(node_id), address, int(port)) - if not contact: - contact = self.dht_node.contact_manager.make_contact( - unhexlify(node_id), address, int(port), self.dht_node._protocol + peer = self.component_manager.peer_manager.get_peer(address, unhexlify(node_id), udp_port=int(port)) + if not peer: + peer = self.component_manager.peer_manager.make_peer( + address, unhexlify(node_id), udp_port=int(port) ) - if not contact: - try: - contact = await d2f(self.dht_node.findContact(unhexlify(node_id))) - except TimeoutError: - return {'error': 'timeout finding peer'} - if not contact: + if not peer: return {'error': 'peer not found'} try: - return (await d2f(contact.ping())).decode() + result = await peer.ping() + return result.decode() except TimeoutError: return {'error': 'ping timeout'} @@ -3436,175 +2988,38 @@ class Daemon(metaclass=JSONRPCServerType): None Returns: - (dict) dictionary containing routing and contact information + (dict) dictionary containing routing and peer information { "buckets": { : [ { "address": (str) peer address, - "port": (int) peer udp port + "udp_port": (int) peer udp port, + "tcp_port": (int) peer tcp port, "node_id": (str) peer node id, - "blobs": (list) blob hashes announced by peer } ] }, - "contacts": (list) contact node ids, - "blob_hashes": (list) all of the blob hashes stored by peers in the list of buckets, "node_id": (str) the local dht node id } """ - result = {} - data_store = self.dht_node._dataStore - hosts = {} - - for k, v in data_store.items(): - for contact in map(itemgetter(0), v): - hosts.setdefault(contact, []).append(hexlify(k).decode()) - - contact_set = set() - blob_hashes = set() - result['buckets'] = {} - - for i in range(len(self.dht_node._routingTable._buckets)): - result['buckets'][i] = [] - for contact in self.dht_node._routingTable._buckets[i]._contacts: - blobs = list(hosts.pop(contact)) if contact in hosts else [] - blob_hashes.update(blobs) - host = { - "address": contact.address, - "port": contact.port, - "node_id": hexlify(contact.id).decode(), - "blobs": blobs, - } - result['buckets'][i].append(host) - contact_set.add(hexlify(contact.id).decode()) - - result['contacts'] = list(contact_set) - result['blob_hashes'] = list(blob_hashes) - result['node_id'] = hexlify(self.dht_node.node_id).decode() - return result - - # the single peer downloader needs wallet access - @requires(DHT_COMPONENT, WALLET_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - def jsonrpc_blob_availability(self, blob_hash, search_timeout=None, blob_timeout=None): - """ - Get blob availability - - Usage: - blob_availability () [ | --search_timeout=] - [ | --blob_timeout=] - - Options: - --blob_hash= : (str) check availability for this blob hash - --search_timeout= : (int) how long to search for peers for the blob - in the dht - --blob_timeout= : (int) how long to try downloading from a peer - - Returns: - (dict) { - "is_available": - "reachable_peers": [":"], - "unreachable_peers": [":"] - } - """ - return self._blob_availability(blob_hash, search_timeout, blob_timeout) - - @requires(UPNP_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, conditions=[WALLET_IS_UNLOCKED]) - async def jsonrpc_stream_availability(self, uri, search_timeout=None, blob_timeout=None): - """ - Get stream availability for lbry uri - - Usage: - stream_availability ( | --uri=) - [ | --search_timeout=] - [ | --blob_timeout=] - - Options: - --uri= : (str) check availability for this uri - --search_timeout= : (int) how long to search for peers for the blob - in the dht - --blob_timeout= : (int) how long to try downloading from a peer - - Returns: - (dict) { - 'is_available': , - 'did_decode': , - 'did_resolve': , - 'is_stream': , - 'num_blobs_in_stream': , - 'sd_hash': , - 'sd_blob_availability': see `blob_availability`, - 'head_blob_hash': , - 'head_blob_availability': see `blob_availability`, - 'use_upnp': , - 'upnp_redirect_is_set': , - 'error': | error message - } - """ - - search_timeout = search_timeout or self.conf.peer_search_timeout - blob_timeout = blob_timeout or self.conf.sd_download_timeout - - response = { - 'is_available': False, - 'did_decode': False, - 'did_resolve': False, - 'is_stream': False, - 'num_blobs_in_stream': None, - 'sd_hash': None, - 'sd_blob_availability': {}, - 'head_blob_hash': None, - 'head_blob_availability': {}, - 'use_upnp': self.conf.use_upnp, - 'upnp_redirect_is_set': len(self.upnp.upnp_redirects), - 'error': None + result = { + 'buckets': {} } - try: - resolved_result = (await self.wallet_manager.resolve(uri))[uri] - response['did_resolve'] = True - except UnknownNameError: - response['error'] = "Failed to resolve name" - return response - except URIParseError: - response['error'] = "Invalid URI" - return response + for i in range(len(self.dht_node.protocol.routing_table.buckets)): + result['buckets'][i] = [] + for peer in self.dht_node.protocol.routing_table.buckets[i].peers: + host = { + "address": peer.address, + "udp_port": peer.udp_port, + "tcp_port": peer.tcp_port, + "node_id": hexlify(peer.node_id).decode(), + } + result['buckets'][i].append(host) - try: - claim_obj = smart_decode(resolved_result[uri]['claim']['hex']) - response['did_decode'] = True - except DecodeError: - response['error'] = "Failed to decode claim value" - return response - - response['is_stream'] = claim_obj.is_stream - if not claim_obj.is_stream: - response['error'] = "Claim for \"%s\" does not contain a stream" % uri - return response - - sd_hash = claim_obj.source_hash - response['sd_hash'] = sd_hash - head_blob_hash = None - downloader = self._get_single_peer_downloader() - have_sd_blob = sd_hash in self.blob_manager.blobs - try: - sd_blob = await self.jsonrpc_blob_get(sd_hash, timeout=blob_timeout, encoding="json") - if not have_sd_blob: - await self.jsonrpc_blob_delete(sd_hash) - if sd_blob and 'blobs' in sd_blob: - response['num_blobs_in_stream'] = len(sd_blob['blobs']) - 1 - head_blob_hash = sd_blob['blobs'][0]['blob_hash'] - head_blob_availability = await self._blob_availability( - head_blob_hash, search_timeout, blob_timeout, downloader) - response['head_blob_availability'] = head_blob_availability - except Exception as err: - response['error'] = err - response['head_blob_hash'] = head_blob_hash - response['sd_blob_availability'] = await self._blob_availability( - sd_hash, search_timeout, blob_timeout, downloader) - response['is_available'] = response['sd_blob_availability'].get('is_available') and \ - response['head_blob_availability'].get('is_available') - return response + result['node_id'] = hexlify(self.dht_node.protocol.node_id).decode() + return result async def get_channel_or_error( self, accounts: List[LBCAccount], channel_id: str = None, channel_name: str = None): @@ -3656,7 +3071,7 @@ class Daemon(metaclass=JSONRPCServerType): def loggly_time_string(dt): formatted_dt = dt.strftime("%Y-%m-%dT%H:%M:%S") milliseconds = str(round(dt.microsecond * (10.0 ** -5), 3)) - return urllib.parse.quote(formatted_dt + milliseconds + "Z") + return quote(formatted_dt + milliseconds + "Z") def get_loggly_query_string(installation_id): @@ -3668,33 +3083,5 @@ def get_loggly_query_string(installation_id): 'from': loggly_time_string(yesterday), 'to': loggly_time_string(now) } - data = urllib.parse.urlencode(params) + data = urlencode(params) return base_loggly_search_url + data - - -def get_lbry_file_search_value(search_fields): - for searchtype in FileID: - value = search_fields.get(searchtype, None) - if value is not None: - return searchtype, value - raise NoValidSearch(f'{search_fields} is missing a valid search type') - - -def iter_lbry_file_search_values(search_fields): - for searchtype in FileID: - value = search_fields.get(searchtype, None) - if value is not None: - yield searchtype, value - - -def create_key_getter(field): - search_path = field.split('.') - def key_getter(value): - for key in search_path: - try: - value = value[key] - except KeyError as e: - errmsg = "Failed to get '{}', key {} was not found." - raise Exception(errmsg.format(field, str(e))) - return value - return key_getter diff --git a/lbrynet/extras/daemon/analytics.py b/lbrynet/extras/daemon/analytics.py index f8d7495cc..5ee93bd59 100644 --- a/lbrynet/extras/daemon/analytics.py +++ b/lbrynet/extras/daemon/analytics.py @@ -160,28 +160,6 @@ class Manager: async def _send_heartbeat(self): await self.track(self._event(HEARTBEAT)) - async def _update_tracked_metrics(self): - should_send, value = self.summarize_and_reset(BLOB_BYTES_UPLOADED) - if should_send: - await self.track(self._metric_event(BLOB_BYTES_UPLOADED, value)) - - def add_observation(self, metric, value): - self._tracked_data[metric].append(value) - - def summarize_and_reset(self, metric, op=sum): - """Apply `op` on the current values for `metric`. - - This operation also resets the metric. - - Returns: - a tuple (should_send, value) - """ - try: - values = self._tracked_data.pop(metric) - return True, op(values) - except KeyError: - return False, None - def _event(self, event, event_properties=None): return { 'userId': 'lbry', diff --git a/lbrynet/extras/daemon/client.py b/lbrynet/extras/daemon/client.py new file mode 100644 index 000000000..77ace87d0 --- /dev/null +++ b/lbrynet/extras/daemon/client.py @@ -0,0 +1,58 @@ +from lbrynet import conf +import aiohttp +import logging +from urllib.parse import urlparse + + +log = logging.getLogger(__name__) +USER_AGENT = "AuthServiceProxy/0.1" +TWISTED_SECURE_SESSION = "TWISTED_SECURE_SESSION" +TWISTED_SESSION = "TWISTED_SESSION" +LBRY_SECRET = "LBRY_SECRET" +HTTP_TIMEOUT = 30 + + +class JSONRPCException(Exception): + def __init__(self, rpc_error): + super().__init__() + self.error = rpc_error + + +class UnAuthAPIClient: + def __init__(self, host, port, session): + self.host = host + self.port = port + self.session = session + + def __getattr__(self, method): + async def f(*args, **kwargs): + return await self.call(method, [args, kwargs]) + + return f + + @classmethod + async def from_url(cls, url): + url_fragment = urlparse(url) + host = url_fragment.hostname + port = url_fragment.port + connector = aiohttp.TCPConnector() + session = aiohttp.ClientSession(connector=connector) + return cls(host, port, session) + + async def call(self, method, params=None): + message = {'method': method, 'params': params} + async with self.session.get(conf.settings.get_api_connection_string(), json=message) as resp: + response_dict = await resp.json() + if 'error' in response_dict: + raise JSONRPCException(response_dict['error']) + else: + return response_dict['result'] + + +class LBRYAPIClient: + @staticmethod + def get_client(conf_path=None): + conf.conf_file = conf_path + if not conf.settings: + conf.initialize_settings() + return UnAuthAPIClient.from_url(conf.settings.get_api_connection_string()) diff --git a/lbrynet/extras/daemon/migrator/migrate8to9.py b/lbrynet/extras/daemon/migrator/migrate8to9.py index 3f8d2567d..316edee20 100644 --- a/lbrynet/extras/daemon/migrator/migrate8to9.py +++ b/lbrynet/extras/daemon/migrator/migrate8to9.py @@ -1,10 +1,9 @@ import sqlite3 import logging import os - -from lbrynet.p2p.Error import InvalidStreamDescriptorError -from lbrynet.p2p.StreamDescriptor import EncryptedFileStreamType, format_sd_info, format_blobs, validate_descriptor -from lbrynet.blob.CryptBlob import CryptBlobInfo +import asyncio +from lbrynet.blob.blob_info import BlobInfo +from lbrynet.stream.descriptor import StreamDescriptor log = logging.getLogger(__name__) @@ -22,18 +21,13 @@ def do_migration(conf): "left outer join blob b ON b.blob_hash=s.blob_hash order by s.position").fetchall() blobs_by_stream = {} for stream_hash, position, iv, blob_hash, blob_length in blobs: - blobs_by_stream.setdefault(stream_hash, []).append(CryptBlobInfo(blob_hash, position, blob_length or 0, iv)) + blobs_by_stream.setdefault(stream_hash, []).append(BlobInfo(position, blob_length or 0, iv, blob_hash)) for stream_name, stream_key, suggested_filename, sd_hash, stream_hash in streams: - sd_info = format_sd_info( - EncryptedFileStreamType, stream_name, stream_key, - suggested_filename, stream_hash, format_blobs(blobs_by_stream[stream_hash]) - ) - try: - validate_descriptor(sd_info) - except InvalidStreamDescriptorError as err: - log.warning("Stream for descriptor %s is invalid (%s), cleaning it up", - sd_hash, err.message) + sd = StreamDescriptor(asyncio.get_event_loop(), blob_dir, stream_name, stream_key, suggested_filename, + blobs_by_stream[stream_hash], stream_hash, sd_hash) + if sd_hash != sd.calculate_sd_hash(): + log.warning("Stream for descriptor %s is invalid, cleaning it up", sd_hash) blob_hashes = [blob.blob_hash for blob in blobs_by_stream[stream_hash]] delete_stream(cursor, stream_hash, sd_hash, blob_hashes, blob_dir) diff --git a/lbrynet/utils.py b/lbrynet/utils.py index 7955a4219..609367fb1 100644 --- a/lbrynet/utils.py +++ b/lbrynet/utils.py @@ -5,15 +5,14 @@ import random import socket import string import json -import traceback -import functools +import typing +import asyncio import logging import pkg_resources -from twisted.python.failure import Failure -from twisted.internet import defer from lbrynet.schema.claim import ClaimDict from lbrynet.cryptoutils import get_lbry_hash_obj + log = logging.getLogger(__name__) @@ -43,21 +42,6 @@ def datetime_obj(*args, **kwargs): return datetime.datetime(*args, **kwargs) -def call_later(delay, func, *args, **kwargs): - # Import here to ensure that it gets called after installing a reactor - # see: http://twistedmatrix.com/documents/current/core/howto/choosing-reactor.html - from twisted.internet import reactor - return reactor.callLater(delay, func, *args, **kwargs) - - -def safe_start_looping_call(looping_call, interval_sec): - if not looping_call.running: - looping_call.start(interval_sec) - -def safe_stop_looping_call(looping_call): - if looping_call.running: - looping_call.stop() - def generate_id(num=None): h = get_lbry_hash_obj() if num is not None: @@ -139,91 +123,16 @@ def json_dumps_pretty(obj, **kwargs): return json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': '), **kwargs) -class DeferredLockContextManager: - def __init__(self, lock): - self._lock = lock - - def __enter__(self): - yield self._lock.acquire() - - def __exit__(self, exc_type, exc_val, exc_tb): - yield self._lock.release() +def cancel_task(task: typing.Optional[asyncio.Task]): + if task and not task.done(): + task.cancel() -@defer.inlineCallbacks -def DeferredDict(d, consumeErrors=False): - keys = [] - dl = [] - response = {} - for k, v in d.items(): - keys.append(k) - dl.append(v) - results = yield defer.DeferredList(dl, consumeErrors=consumeErrors) - for k, (success, result) in zip(keys, results): - if success: - response[k] = result - defer.returnValue(response) +def cancel_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]): + for task in tasks: + cancel_task(task) -class DeferredProfiler: - def __init__(self): - self.profile_results = {} - - def add_result(self, fn, start_time, finished_time, stack, success): - self.profile_results[fn].append((start_time, finished_time, stack, success)) - - def show_profile_results(self, fn): - profile_results = list(self.profile_results[fn]) - call_counts = { - caller: [(start, finished, finished - start, success) - for (start, finished, _caller, success) in profile_results - if _caller == caller] - for caller in {result[2] for result in profile_results} - } - - log.info("called %s %i times from %i sources\n", fn.__name__, len(profile_results), len(call_counts)) - for caller in sorted(list(call_counts.keys()), key=lambda c: len(call_counts[c]), reverse=True): - call_info = call_counts[caller] - times = [r[2] for r in call_info] - own_time = sum(times) - times.sort() - longest = 0 if not times else times[-1] - shortest = 0 if not times else times[0] - log.info( - "%i successes and %i failures\nlongest %f, shortest %f, avg %f\ncaller:\n%s", - len([r for r in call_info if r[3]]), - len([r for r in call_info if not r[3]]), - longest, shortest, own_time / float(len(call_info)), caller - ) - - def profiled_deferred(self, reactor=None): - if not reactor: - from twisted.internet import reactor - - def _cb(result, fn, start, caller_info): - got_error = isinstance(result, (Failure, Exception)) - self.add_result(fn, start, reactor.seconds(), caller_info, not got_error) - if got_error: - raise result - else: - return result - - def _profiled_deferred(fn): - reactor.addSystemEventTrigger("after", "shutdown", self.show_profile_results, fn) - self.profile_results[fn] = [] - - @functools.wraps(fn) - def _wrapper(*args, **kwargs): - caller_info = "".join(traceback.format_list(traceback.extract_stack()[-3:-1])) - start = reactor.seconds() - d = defer.maybeDeferred(fn, *args, **kwargs) - d.addBoth(_cb, fn, start, caller_info) - return d - - return _wrapper - - return _profiled_deferred - - -_profiler = DeferredProfiler() -profile_deferred = _profiler.profiled_deferred +def drain_tasks(tasks: typing.List[typing.Optional[asyncio.Task]]): + while tasks: + cancel_task(tasks.pop())