Minimum changes needed to support Named Pipes

This commit is contained in:
hackrush 2019-03-26 23:03:13 +05:30
parent c703f8bbe6
commit dc209e6b8f
No known key found for this signature in database
GPG key ID: E2629FA741E500A9
3 changed files with 22 additions and 47 deletions

View file

@ -34,6 +34,10 @@ if sys.platform == "win32":
else: else:
PIPE_NAME = './lbrypipe' PIPE_NAME = './lbrypipe'
IS_THE_SPECIAL_CYTHON_PR_MERGED = True
class Setting(typing.Generic[T]): class Setting(typing.Generic[T]):
def __init__(self, doc: str, default: typing.Optional[T] = None, def __init__(self, doc: str, default: typing.Optional[T] = None,

View file

@ -13,7 +13,7 @@ from aiohttp.web import GracefulExit
from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version from lbrynet import __name__ as lbrynet_name, __version__ as lbrynet_version
from lbrynet.extras.daemon.loggly_handler import get_loggly_handler from lbrynet.extras.daemon.loggly_handler import get_loggly_handler
from lbrynet.conf import Config, CLIConfig from lbrynet.conf import Config, CLIConfig, PIPE_NAME, IS_THE_SPECIAL_CYTHON_PR_MERGED
from lbrynet.extras.daemon.Daemon import Daemon from lbrynet.extras.daemon.Daemon import Daemon
log = logging.getLogger(lbrynet_name) log = logging.getLogger(lbrynet_name)
@ -25,7 +25,10 @@ def display(data):
async def execute_command(conf, method, params): async def execute_command(conf, method, params):
conn = aiohttp.NamedPipeConnector(path=r'\\.\pipe\lbrypipe') conn = None
if sys.platform == "win32" and IS_THE_SPECIAL_CYTHON_PR_MERGED:
conn = aiohttp.NamedPipeConnector(path=PIPE_NAME)
async with aiohttp.ClientSession(connector=conn) as session: async with aiohttp.ClientSession(connector=conn) as session:
try: try:
message = {'method': method, 'params': params} message = {'method': method, 'params': params}
@ -225,7 +228,7 @@ def ensure_directory_exists(path: str):
def main(argv=None): def main(argv=None):
if sys.platform == 'win32': if sys.platform == 'win32' and IS_THE_SPECIAL_CYTHON_PR_MERGED:
asyncio.set_event_loop(asyncio.ProactorEventLoop()) asyncio.set_event_loop(asyncio.ProactorEventLoop())
argv = argv or sys.argv[1:] argv = argv or sys.argv[1:]

View file

@ -1,4 +1,5 @@
import os import os
import sys
import asyncio import asyncio
import logging import logging
import json import json
@ -16,7 +17,7 @@ from functools import wraps
from torba.client.baseaccount import SingleKey, HierarchicalDeterministic from torba.client.baseaccount import SingleKey, HierarchicalDeterministic
from lbrynet import __version__, utils from lbrynet import __version__, utils
from lbrynet.conf import Config, Setting, SLACK_WEBHOOK, PIPE_NAME from lbrynet.conf import Config, Setting, PIPE_NAME, IS_THE_SPECIAL_CYTHON_PR_MERGED
from lbrynet.blob.blob_file import is_valid_blobhash from lbrynet.blob.blob_file import is_valid_blobhash
from lbrynet.blob_exchange.downloader import download_blob from lbrynet.blob_exchange.downloader import download_blob
from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted from lbrynet.error import InsufficientFundsError, DownloadSDTimeout, ComponentsNotStarted
@ -40,7 +41,6 @@ from lbrynet.schema.validator import validate_claim_id
from lbrynet.schema.address import decode_address from lbrynet.schema.address import decode_address
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from asyncio import transports
from lbrynet.blob.blob_manager import BlobFileManager from lbrynet.blob.blob_manager import BlobFileManager
from lbrynet.dht.node import Node from lbrynet.dht.node import Node
from lbrynet.extras.daemon.Components import UPnPComponent from lbrynet.extras.daemon.Components import UPnPComponent
@ -131,29 +131,6 @@ DHT_HAS_CONTACTS = "dht_has_contacts"
WALLET_IS_UNLOCKED = "wallet_is_unlocked" WALLET_IS_UNLOCKED = "wallet_is_unlocked"
class NamedPipeServer(asyncio.Protocol):
def __init__(self, request_handler: 'asyncio.coroutine'):
self.transport = None
self.request_handler = request_handler
async def handle_pipe_request(self, data):
log.info("received data from client %s", str(data))
json_response = await self.request_handler(data)
self.transport.write(json_response.encode())
def connection_made(self, transport: 'transports.BaseTransport'):
self.transport = transport
def connection_lost(self, exc: 'Optional[Exception]'):
pass
def data_received(self, data: bytes):
asyncio.create_task(self.handle_pipe_request(data))
def eof_received(self):
pass
class DHTHasContacts(RequiredCondition): class DHTHasContacts(RequiredCondition):
name = DHT_HAS_CONTACTS name = DHT_HAS_CONTACTS
component = DHT_COMPONENT component = DHT_COMPONENT
@ -407,29 +384,11 @@ class Daemon(metaclass=JSONRPCServerType):
await self.update_connection_status() await self.update_connection_status()
return CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK return CONNECTION_STATUS_CONNECTED if self._connection_status[1] else CONNECTION_STATUS_NETWORK
# async def start_named_pipes(self): async def start_named_pipe_server(self) -> None:
# asyncio.set_event_loop(asyncio.ProactorEventLoop())
#
# try:
# site = web.NamedPipeSite(self.runner, PIPE_NAME)
# await site.start()
# log.info('lbrynet API listening on pipe %s', site.name)
# except Exception as e:
# log.error(str(e))
async def start(self):
log.info("Starting LBRYNet Daemon")
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2))
await self.analytics_manager.send_server_startup()
await self.runner.setup()
try: try:
print(asyncio.get_event_loop()) print(asyncio.get_event_loop())
site = web.NamedPipeSite(self.runner, PIPE_NAME) site = web.NamedPipeSite(self.runner, PIPE_NAME)
await site.start() await site.start()
# loop = asyncio.get_event_loop()
# await loop.start_serving_pipe(lambda : NamedPipeServer(self.handle_pipe_request), PIPE_NAME)
log.info('lbrynet API listening on pipe %s', site.name) log.info('lbrynet API listening on pipe %s', site.name)
except (PermissionError, RuntimeError) as e: except (PermissionError, RuntimeError) as e:
log.error('lbrynet API failed to open Named Pipe %s for listening. Daemon is already running ' log.error('lbrynet API failed to open Named Pipe %s for listening. Daemon is already running '
@ -438,6 +397,15 @@ class Daemon(metaclass=JSONRPCServerType):
# await self.analytics_manager.send_server_startup_error(str(e)) # await self.analytics_manager.send_server_startup_error(str(e))
raise SystemExit() raise SystemExit()
async def start(self):
log.info("Starting LBRYNet Daemon")
log.debug("Settings: %s", json.dumps(self.conf.settings_dict, indent=2))
log.info("Platform: %s", json.dumps(system_info.get_platform(), indent=2))
await self.analytics_manager.send_server_startup()
await self.runner.setup()
if sys.platform == "win32" and IS_THE_SPECIAL_CYTHON_PR_MERGED:
await self.start_named_pipe_server()
try: try:
site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port) site = web.TCPSite(self.runner, self.conf.api_host, self.conf.api_port)