diff --git a/lbrynet/conf.py b/lbrynet/conf.py index 88a655abd..c358973fa 100644 --- a/lbrynet/conf.py +++ b/lbrynet/conf.py @@ -34,6 +34,10 @@ if sys.platform == "win32": else: PIPE_NAME = './lbrypipe' + +IS_THE_SPECIAL_CYTHON_PR_MERGED = True + + class Setting(typing.Generic[T]): def __init__(self, doc: str, default: typing.Optional[T] = None, diff --git a/lbrynet/extras/cli.py b/lbrynet/extras/cli.py index ecf4e2d0f..4eba204b1 100644 --- a/lbrynet/extras/cli.py +++ b/lbrynet/extras/cli.py @@ -13,7 +13,7 @@ from aiohttp.web import GracefulExit from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version from lbrynet.extras.daemon.loggly_handler import get_loggly_handler -from lbrynet.conf import Config, CLIConfig +from lbrynet.conf import Config, CLIConfig, PIPE_NAME, IS_THE_SPECIAL_CYTHON_PR_MERGED from lbrynet.extras.daemon.Daemon import Daemon log = logging.getLogger(lbrynet_name) @@ -25,7 +25,10 @@ def display(data): async def execute_command(conf, method, params): - conn = aiohttp.NamedPipeConnector(path=r'\\.\pipe\lbrypipe') + conn = None + if sys.platform == "win32" and IS_THE_SPECIAL_CYTHON_PR_MERGED: + conn = aiohttp.NamedPipeConnector(path=PIPE_NAME) + async with aiohttp.ClientSession(connector=conn) as session: try: message = {'method': method, 'params': params} @@ -225,7 +228,7 @@ def ensure_directory_exists(path: str): def main(argv=None): - if sys.platform == 'win32': + if sys.platform == 'win32' and IS_THE_SPECIAL_CYTHON_PR_MERGED: asyncio.set_event_loop(asyncio.ProactorEventLoop()) argv = argv or sys.argv[1:] diff --git a/lbrynet/extras/daemon/Daemon.py b/lbrynet/extras/daemon/Daemon.py index aeda7b4ff..3f2a2a898 100644 --- a/lbrynet/extras/daemon/Daemon.py +++ b/lbrynet/extras/daemon/Daemon.py @@ -1,4 +1,5 @@ import os +import sys import asyncio import logging import json @@ -16,7 +17,7 @@ from functools import wraps from torba.client.baseaccount import SingleKey, HierarchicalDeterministic from lbrynet import __version__, utils -from lbrynet.conf import Config, Setting, SLACK_WEBHOOK, PIPE_NAME +from lbrynet.conf import Config, Setting, PIPE_NAME, IS_THE_SPECIAL_CYTHON_PR_MERGED from lbrynet.blob.blob_file import is_valid_blobhash from lbrynet.blob_exchange.downloader import download_blob from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted @@ -40,7 +41,6 @@ from lbrynet.schema.validator import validate_claim_id from lbrynet.schema.address import decode_address if typing.TYPE_CHECKING: - from asyncio import transports from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.dht.node import Node from lbrynet.extras.daemon.Components import UPnPComponent @@ -131,29 +131,6 @@ DHT_HAS_CONTACTS = "dht_has_contacts" WALLET_IS_UNLOCKED = "wallet_is_unlocked" -class NamedPipeServer(asyncio.Protocol): - def __init__(self, request_handler: 'asyncio.coroutine'): - self.transport = None - self.request_handler = request_handler - - async def handle_pipe_request(self, data): - log.info("received data from client %s", str(data)) - json_response = await self.request_handler(data) - self.transport.write(json_response.encode()) - - def connection_made(self, transport: 'transports.BaseTransport'): - self.transport = transport - - def connection_lost(self, exc: 'Optional[Exception]'): - pass - - def data_received(self, data: bytes): - asyncio.create_task(self.handle_pipe_request(data)) - - def eof_received(self): - pass - - class DHTHasContacts(RequiredCondition): name = DHT_HAS_CONTACTS component = DHT_COMPONENT @@ -407,29 +384,11 @@ class Daemon(metaclass=JSONRPCServerType): await self.update_connection_status() return CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK - # async def start_named_pipes(self): - # asyncio.set_event_loop(asyncio.ProactorEventLoop()) - # - # try: - # site = web.NamedPipeSite(self.runner, PIPE_NAME) - # await site.start() - # log.info('lbrynet API listening on pipe %s', site.name) - # except Exception as e: - # log.error(str(e)) - - async def start(self): - log.info("Starting LBRYNet Daemon") - log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2)) - log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2)) - await self.analytics_manager.send_server_startup() - await self.runner.setup() - + async def start_named_pipe_server(self) -> None: try: print(asyncio.get_event_loop()) site = web.NamedPipeSite(self.runner, PIPE_NAME) await site.start() - # loop = asyncio.get_event_loop() - # await loop.start_serving_pipe(lambda : NamedPipeServer(self.handle_pipe_request), PIPE_NAME) log.info('lbrynet API listening on pipe %s', site.name) except (PermissionError, RuntimeError) as e: log.error('lbrynet API failed to open Named Pipe %s for listening. Daemon is already running ' @@ -438,6 +397,15 @@ class Daemon(metaclass=JSONRPCServerType): # await self.analytics_manager.send_server_startup_error(str(e)) raise SystemExit() + async def start(self): + log.info("Starting LBRYNet Daemon") + log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2)) + log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2)) + await self.analytics_manager.send_server_startup() + await self.runner.setup() + + if sys.platform == "win32" and IS_THE_SPECIAL_CYTHON_PR_MERGED: + await self.start_named_pipe_server() try: site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port)