diff --git a/lbrynet/conf.py b/lbrynet/conf.py index f1d1b99f9..5cb23b17e 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -9,6 +9,7 @@ from argparse import ArgumentParser from contextlib import contextmanager from appdirs import user_data_dir, user_config_dir from lbrynet.error import InvalidCurrencyError +from lbrynet.dht import constants log = logging.getLogger(__name__) diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index 329d0b0ac..ef84a6d72 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -11,6 +11,7 @@ from textwrap import dedent from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version from lbrynet.conf import Config, CLIConfig from lbrynet.extras.daemon.Daemon import Daemon +from lbrynet.extras.daemon.client import LBRYAPIClient, JSONRPCException from lbrynet.extras.daemon.loggly_handler import get_loggly_handler log = logging.getLogger(lbrynet_name) @@ -49,7 +50,6 @@ async def start_daemon(conf: Config, args): log.info("Starting lbrynet-daemon from command line") daemon = Daemon(conf) - try: await daemon.start_listening() except (OSError, asyncio.CancelledError): @@ -66,23 +66,14 @@ def display(data): async def execute_command(conf, method, params): - async with aiohttp.ClientSession() as session: - try: - message = {'method': method, 'params': params} - async with session.get(conf.api_connection_url, json=message) as resp: - try: - data = await resp.json() - if 'result' in data: - display(data['result']) - elif 'error' in data: - if 'message' in data['error']: - display(data['error']['message']) - else: - display(data['error']) - except Exception as e: - log.exception('Could not process response from server:', exc_info=e) - except aiohttp.ClientConnectionError: - print("Could not connect to daemon. Are you sure it's running?") + client = LBRYAPIClient(conf) + try: + result = await getattr(client, method)(params) + print(display(result)) + except aiohttp.ClientConnectionError: + print("Could not connect to daemon. Are you sure it's running?") + except JSONRPCException as err: + print(err) def print_help(): diff --git a/lbrynet/extras/daemon/ComponentManager.py b/lbrynet/extras/daemon/ComponentManager.py index ab9518e16..126ad8886 100644 --- a/lbrynet/extras/daemon/ComponentManager.py +++ b/lbrynet/extras/daemon/ComponentManager.py @@ -56,7 +56,6 @@ class ComponentManager: for component_class in self.component_classes.values(): self.components.add(component_class(self)) - self.daemon = None def evaluate_condition(self, condition_name): if condition_name not in RegisteredConditions.conditions: diff --git a/lbrynet/extras/daemon/Components.py b/lbrynet/extras/daemon/Components.py index 3a2ecb9c7..15578b9e5 100644 --- a/lbrynet/extras/daemon/Components.py +++ b/lbrynet/extras/daemon/Components.py @@ -8,12 +8,14 @@ import typing import socket from hashlib import sha256 from types import SimpleNamespace +import base58 from aioupnp import __version__ as aioupnp_version from aioupnp.upnp import UPnP from aioupnp.fault import UPnPError import lbrynet.schema +from lbrynet import utils from lbrynet.conf import HEADERS_FILE_SHA256_CHECKSUM from lbrynet.dht.node import Node from lbrynet.dht.peer import KademliaPeer @@ -88,7 +90,7 @@ class DatabaseComponent(Component): @property def revision_filename(self): - return self.component_manager.daemon.db_revision_file_path + return os.path.join(self.conf.data_dir, 'db_revision') def _write_db_revision_file(self, version_num): with open(self.revision_filename, mode='w') as db_revision: @@ -357,16 +359,25 @@ class DHTComponent(Component): async def get_status(self): return { - 'node_id': binascii.hexlify(self.component_manager.daemon.node_id), + 'node_id': binascii.hexlify(self.dht_node.protocol.node_id), 'peers_in_routing_table': 0 if not self.dht_node else len(self.dht_node.protocol.routing_table.get_peers()) } + def get_node_id(self): + node_id_filename = os.path.join(self.conf.data_dir, "node_id") + if os.path.isfile(node_id_filename): + with open(node_id_filename, "r") as node_id_file: + return base58.b58decode(str(node_id_file.read()).strip()) + node_id = utils.generate_id() + with open(node_id_filename, "w") as node_id_file: + node_id_file.write(base58.b58encode(node_id).decode()) + return node_id + async def start(self): log.info("start the dht") self.upnp_component = self.component_manager.get_component(UPNP_COMPONENT) self.external_peer_port = self.upnp_component.upnp_redirects.get("TCP", self.conf.peer_port) self.external_udp_port = self.upnp_component.upnp_redirects.get("UDP", self.conf.dht_node_port) - node_id = self.component_manager.daemon.node_id external_ip = self.upnp_component.external_ip if not external_ip: log.warning("UPnP component failed to get external ip") @@ -377,7 +388,7 @@ class DHTComponent(Component): self.dht_node = Node( asyncio.get_event_loop(), self.component_manager.peer_manager, - node_id=node_id, + node_id=self.get_node_id(), internal_udp_port=self.conf.dht_node_port, udp_port=self.external_udp_port, external_ip=external_ip, @@ -479,7 +490,7 @@ class PeerProtocolServerComponent(Component): upnp = self.component_manager.get_component(UPNP_COMPONENT) blob_manager: BlobFileManager = self.component_manager.get_component(BLOB_COMPONENT) wallet: LbryWalletManager = self.component_manager.get_component(WALLET_COMPONENT) - peer_port = upnp.upnp_redirects.get("TCP", self.conf.settings["peer_port"]) + peer_port = upnp.upnp_redirects.get("TCP", self.conf.peer_port) address = await wallet.get_unused_address() self.blob_server = BlobServer(asyncio.get_event_loop(), blob_manager, address) self.blob_server.start_server(peer_port, interface='0.0.0.0') diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index e8e7d3bbd..6cd0894b1 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -253,8 +253,8 @@ class Daemon(metaclass=JSONRPCServerType): self._node_id = None self._installation_id = None self.session_id = base58.b58encode(utils.generate_id()).decode() - to_skip = conf.settings['components_to_skip'] - self.analytics_manager = analytics_manager or analytics.Manager(asyncio.get_event_loop()) + to_skip = conf.components_to_skip + self.analytics_manager = analytics_manager or analytics.Manager(conf, self.installation_id, self.session_id) self.component_manager = component_manager or ComponentManager( conf, analytics_manager=self.analytics_manager, skip_components=to_skip or [] ) @@ -361,19 +361,6 @@ class Daemon(metaclass=JSONRPCServerType): install_id_file.write(self._installation_id) return self._installation_id - @property - def node_id(self): - node_id_filename = os.path.join(self.conf.data_dir, "node_id") - if not self._node_id: - if os.path.isfile(node_id_filename): - with open(node_id_filename, "r") as node_id_file: - self._node_id = base58.b58decode(str(node_id_file.read()).strip()) - if not self._node_id: - self._node_id = utils.generate_id() - with open(node_id_filename, "w") as node_id_file: - node_id_file.write(base58.b58encode(self._node_id).decode()) - return self._node_id - def ensure_data_dir(self): # although there is a risk of a race condition here we don't # expect there to be multiple processes accessing this diff --git a/lbrynet/extras/daemon/client.py b/lbrynet/extras/daemon/client.py index 77ace87d0..2a02c37c7 100644 --- a/lbrynet/extras/daemon/client.py +++ b/lbrynet/extras/daemon/client.py @@ -1,8 +1,6 @@ -from lbrynet import conf import aiohttp import logging -from urllib.parse import urlparse - +from lbrynet.conf import Config log = logging.getLogger(__name__) USER_AGENT = "AuthServiceProxy/0.1" @@ -18,41 +16,18 @@ class JSONRPCException(Exception): self.error = rpc_error -class UnAuthAPIClient: - def __init__(self, host, port, session): - self.host = host - self.port = port - self.session = session +class LBRYAPIClient: + def __init__(self, conf: Config): + self._conf = conf def __getattr__(self, method): async def f(*args, **kwargs): - return await self.call(method, [args, kwargs]) - + async with aiohttp.ClientSession() as session: + message = {'method': method, 'params': [args, kwargs]} + async with session.get(self._conf.api_connection_url, json=message) as resp: + data = await resp.json() + if 'result' in data: + return data['result'] + elif 'error' in data: + raise JSONRPCException(data['error']) return f - - @classmethod - async def from_url(cls, url): - url_fragment = urlparse(url) - host = url_fragment.hostname - port = url_fragment.port - connector = aiohttp.TCPConnector() - session = aiohttp.ClientSession(connector=connector) - return cls(host, port, session) - - async def call(self, method, params=None): - message = {'method': method, 'params': params} - async with self.session.get(conf.settings.get_api_connection_string(), json=message) as resp: - response_dict = await resp.json() - if 'error' in response_dict: - raise JSONRPCException(response_dict['error']) - else: - return response_dict['result'] - - -class LBRYAPIClient: - @staticmethod - def get_client(conf_path=None): - conf.conf_file = conf_path - if not conf.settings: - conf.initialize_settings() - return UnAuthAPIClient.from_url(conf.settings.get_api_connection_string()) diff --git a/lbrynet/extras/daemon/loggly_handler.py b/lbrynet/extras/daemon/loggly_handler.py index 18e80e9f9..57456ce23 100644 --- a/lbrynet/extras/daemon/loggly_handler.py +++ b/lbrynet/extras/daemon/loggly_handler.py @@ -58,7 +58,7 @@ class HTTPSLogglyHandler(logging.Handler): asyncio.ensure_future(self._emit(record)) -def get_loggly_handler(loggly_token=LOGGLY_TOKEN): - handler = HTTPSLogglyHandler(loggly_token) +def get_loggly_handler(): + handler = HTTPSLogglyHandler(LOGGLY_TOKEN) handler.setFormatter(JsonFormatter()) return handler diff --git a/lbrynet/stream/downloader.py b/lbrynet/stream/downloader.py index cc8b7439a..4e8eabd19 100644 --- a/lbrynet/stream/downloader.py +++ b/lbrynet/stream/downloader.py @@ -2,7 +2,6 @@ import os import asyncio import typing import logging -from lbrynet import conf from lbrynet.utils import drain_tasks, cancel_task from lbrynet.stream.assembler import StreamAssembler from lbrynet.blob_exchange.client import BlobExchangeClientProtocol, request_blob @@ -24,7 +23,8 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t def __init__(self, loop: asyncio.BaseEventLoop, blob_manager: 'BlobFileManager', sd_hash: str, peer_timeout: float, peer_connect_timeout: float, output_dir: typing.Optional[str] = None, output_file_name: typing.Optional[str] = None, - fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None): + fixed_peers: typing.Optional[typing.List['KademliaPeer']] = None, + max_connections_per_stream: typing.Optional[int] = 8): super().__init__(loop, blob_manager, sd_hash) self.peer_timeout = peer_timeout self.peer_connect_timeout = peer_connect_timeout @@ -39,7 +39,7 @@ class StreamDownloader(StreamAssembler): # TODO: reduce duplication, refactor t self.output_dir = output_dir or os.getcwd() self.output_file_name = output_file_name self._lock = asyncio.Lock(loop=self.loop) - self.max_connections_per_stream = 8 if not conf.settings else conf.settings['max_connections_per_stream'] + self.max_connections_per_stream = max_connections_per_stream self.fixed_peers = fixed_peers or [] async def _update_current_blob(self, blob: 'BlobFile'): diff --git a/scripts/dht_monitor.py b/scripts/dht_monitor.py index 1e6e839b5..ad5268b53 100644 --- a/scripts/dht_monitor.py +++ b/scripts/dht_monitor.py @@ -1,14 +1,9 @@ import curses import time -import logging import asyncio -from lbrynet import conf +from lbrynet.conf import Config from lbrynet.extras.daemon.client import LBRYAPIClient -log = logging.getLogger(__name__) -log.addHandler(logging.FileHandler("dht contacts.log")) -# log.addHandler(logging.StreamHandler()) -log.setLevel(logging.INFO) stdscr = curses.initscr() @@ -52,8 +47,7 @@ def refresh(routing_table_info): async def main(): - conf.initialize_settings() - api = await LBRYAPIClient.get_client() + api = LBRYAPIClient(Config()) try: init_curses() @@ -64,7 +58,6 @@ async def main(): c = stdscr.getch() time.sleep(0.1) finally: - await api.session.close() teardown_curses() diff --git a/scripts/time_to_first_byte.py b/scripts/time_to_first_byte.py index e62f9fc0e..8c8c5d44e 100644 --- a/scripts/time_to_first_byte.py +++ b/scripts/time_to_first_byte.py @@ -6,7 +6,8 @@ import aiohttp import time from aiohttp import ClientConnectorError -from lbrynet import conf, __version__ +from lbrynet import __version__ +from lbrynet.conf import Config from lbrynet.schema.uri import parse_lbry_uri from lbrynet.extras.daemon.client import LBRYAPIClient from lbrynet.extras import system_info, cli @@ -79,18 +80,11 @@ async def wait_for_done(api, uri): async def main(start_daemon=True, uris=None): if not uris: uris = await get_frontpage_uris() - api = await LBRYAPIClient.get_client() + api = LBRYAPIClient(Config()) daemon = None try: await api.status() except (ClientConnectorError, ConnectionError): - await api.session.close() - if start_daemon: - print("Could not connect to running daemon, starting...") - daemon = await cli.start_daemon(console_output=False) - await daemon.component_manager.started.wait() - print("Started daemon") - return await main(start_daemon=False, uris=uris) print("Could not connect to daemon") return 1 print(f"Checking {len(uris)} uris from the front page") @@ -110,18 +104,12 @@ async def main(start_daemon=True, uris=None): download_failures = [] for uri in resolvable: - await api.call( - "file_delete", { - "delete_from_download_dir": True, - "delete_all": True, - "claim_name": parse_lbry_uri(uri).name - } - ) + await api.file_delete(delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) for i, uri in enumerate(resolvable): start = time.time() try: - await api.call("get", {"uri": uri}) + await api.get(uri) first_byte = time.time() first_byte_times.append(first_byte - start) print(f"{i + 1}/{len(resolvable)} - {first_byte - start} {uri}") @@ -135,12 +123,7 @@ async def main(start_daemon=True, uris=None): except: print(f"{i + 1}/{len(uris)} - timeout in {time.time() - start} {uri}") failures.append(uri) - await api.call( - "file_delete", { - "delete_from_download_dir": True, - "claim_name": parse_lbry_uri(uri).name - } - ) + await api.file_delete(delete_from_download_dir=True, claim_name=parse_lbry_uri(uri).name) await asyncio.sleep(0.1) print("**********************************************") @@ -153,12 +136,11 @@ async def main(start_daemon=True, uris=None): nt = '\n\t' result += f"\nFailures:\n\t{nt.join([f for f in failures])}" print(result) - await api.session.close() if daemon: await daemon.shutdown() - webhook = os.environ.get('TTFB_SLACK_TOKEN', None) - if webhook: - await report_to_slack(result, webhook) + # webhook = os.environ.get('TTFB_SLACK_TOKEN', None) + # if webhook: + # await report_to_slack(result, webhook) if __name__ == "__main__": @@ -167,6 +149,4 @@ if __name__ == "__main__": parser.add_argument("--wallet_dir") parser.add_argument("--download_directory") args = parser.parse_args() - - conf.initialize_settings() asyncio.run(main()) diff --git a/tests/unit/blob/test_blob_file.py b/tests/unit/blob/test_blob_file.py index 3862df63e..4254e0720 100644 --- a/tests/unit/blob/test_blob_file.py +++ b/tests/unit/blob/test_blob_file.py @@ -3,6 +3,7 @@ import tempfile import shutil import os from torba.testcase import AsyncioTestCase +from lbrynet.conf import Config from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_manager import BlobFileManager @@ -16,7 +17,7 @@ class TestBlobfile(AsyncioTestCase): tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - storage = SQLiteStorage(os.path.join(tmp_dir, "lbrynet.sqlite")) + storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite")) blob_manager = BlobFileManager(loop, tmp_dir, storage) await storage.open() diff --git a/tests/unit/blob_exchange/test_transfer_blob.py b/tests/unit/blob_exchange/test_transfer_blob.py index 2ed37635d..1aa0c6b27 100644 --- a/tests/unit/blob_exchange/test_transfer_blob.py +++ b/tests/unit/blob_exchange/test_transfer_blob.py @@ -3,6 +3,7 @@ import tempfile import shutil import os from torba.testcase import AsyncioTestCase +from lbrynet.conf import Config from lbrynet.extras.daemon.storage import SQLiteStorage from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob_exchange.server import BlobServer @@ -22,11 +23,11 @@ class BlobExchangeTestBase(AsyncioTestCase): self.addCleanup(shutil.rmtree, self.client_dir) self.addCleanup(shutil.rmtree, self.server_dir) - self.server_storage = SQLiteStorage(os.path.join(self.server_dir, "lbrynet.sqlite")) + self.server_storage = SQLiteStorage(Config(), os.path.join(self.server_dir, "lbrynet.sqlite")) self.server_blob_manager = BlobFileManager(self.loop, self.server_dir, self.server_storage) self.server = BlobServer(self.loop, self.server_blob_manager, 'bQEaw42GXsgCAGio1nxFncJSyRmnztSCjP') - self.client_storage = SQLiteStorage(os.path.join(self.client_dir, "lbrynet.sqlite")) + self.client_storage = SQLiteStorage(Config(), os.path.join(self.client_dir, "lbrynet.sqlite")) self.client_blob_manager = BlobFileManager(self.loop, self.client_dir, self.client_storage) self.client_peer_manager = PeerManager(self.loop) self.server_from_client = KademliaPeer(self.loop, "127.0.0.1", b'1' * 48, tcp_port=33333) diff --git a/tests/unit/database/test_SQLiteStorage.py b/tests/unit/database/test_SQLiteStorage.py index 308f83d55..e14b3fa6f 100644 --- a/tests/unit/database/test_SQLiteStorage.py +++ b/tests/unit/database/test_SQLiteStorage.py @@ -4,6 +4,7 @@ import unittest import asyncio import logging from torba.testcase import AsyncioTestCase +from lbrynet.conf import Config from lbrynet.blob.blob_info import BlobInfo from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.stream.descriptor import StreamDescriptor @@ -67,7 +68,7 @@ fake_claim_info = { class StorageTest(AsyncioTestCase): async def asyncSetUp(self): - self.storage = SQLiteStorage(':memory:') + self.storage = SQLiteStorage(Config(), ':memory:') self.blob_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.blob_dir) self.blob_manager = BlobFileManager(asyncio.get_event_loop(), self.blob_dir, self.storage) diff --git a/tests/unit/dht/test_node.py b/tests/unit/dht/test_node.py index 2c2926917..ca6af2827 100644 --- a/tests/unit/dht/test_node.py +++ b/tests/unit/dht/test_node.py @@ -10,6 +10,7 @@ from lbrynet.dht.peer import PeerManager class TestNodePingQueueDiscover(AsyncioTestCase): async def test_ping_queue_discover(self): loop = asyncio.get_event_loop() + loop.set_debug(False) peer_addresses = [ (constants.generate_id(1), '1.2.3.1'), diff --git a/tests/unit/stream/test_assembler.py b/tests/unit/stream/test_assembler.py index c0c7c68f4..dee23b02c 100644 --- a/tests/unit/stream/test_assembler.py +++ b/tests/unit/stream/test_assembler.py @@ -3,6 +3,7 @@ import asyncio import tempfile import shutil from torba.testcase import AsyncioTestCase +from lbrynet.conf import Config from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_file import MAX_BLOB_SIZE from lbrynet.extras.daemon.storage import SQLiteStorage @@ -19,7 +20,7 @@ class TestStreamAssembler(AsyncioTestCase): async def test_create_and_decrypt_one_blob_stream(self): tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - self.storage = SQLiteStorage(os.path.join(tmp_dir, "lbrynet.sqlite")) + self.storage = SQLiteStorage(Config(), os.path.join(tmp_dir, "lbrynet.sqlite")) await self.storage.open() self.blob_manager = BlobFileManager(self.loop, tmp_dir, self.storage) @@ -39,7 +40,7 @@ class TestStreamAssembler(AsyncioTestCase): for blob_info in sd.blobs: if blob_info.blob_hash: shutil.copy(os.path.join(tmp_dir, blob_info.blob_hash), os.path.join(download_dir, blob_info.blob_hash)) - downloader_storage = SQLiteStorage(os.path.join(download_dir, "lbrynet.sqlite")) + downloader_storage = SQLiteStorage(Config(), os.path.join(download_dir, "lbrynet.sqlite")) await downloader_storage.open() # add the blobs to the blob table (this would happen upon a blob download finishing)