From d11f4f9bed61f749ca6772ae54fdcf3ed7f7ff9c Mon Sep 17 00:00:00 2001 From: Lex Berezhny Date: Wed, 20 May 2020 18:04:18 -0400 Subject: [PATCH] switched Client class from plain GET RPC to WebSockets --- lbry/service/api.py | 70 +++++++++++++++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 19 deletions(-) diff --git a/lbry/service/api.py b/lbry/service/api.py index 91988c0f7..163561e77 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -2,13 +2,14 @@ import json import time import hashlib import inspect +import asyncio from typing import Union, Tuple, Callable, Optional, List, Dict from binascii import hexlify, unhexlify from functools import partial import ecdsa import base58 -import aiohttp +from aiohttp import ClientSession from lbry.conf import Setting, NOT_SET from lbry.db import TXO_TYPES @@ -16,7 +17,7 @@ from lbry.db.utils import constrain_single_or_list from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic from lbry.blockchain import Transaction, Output, dewies_to_lbc, dict_values_to_lbc from lbry.stream.managed_stream import ManagedStream - +from lbry.event import EventController, EventStream from .base import Service @@ -3418,25 +3419,56 @@ class API: class Client(API): - def __init__(self, service: Service, url): - super().__init__(service) + def __init__(self, url): self.url = url + self.session: Optional[ClientSession] = None + self.receive_messages_task: Optional[asyncio.Task] = None + self.ws = None + self.message_id = 0 + self.requests: Dict[int, EventController] = {} + self.subscriptions: Dict[str, EventController] = {} - async def send(self, method, **kwargs): - async with aiohttp.ClientSession() as session: - try: - message = {'method': method, 'params': kwargs} - async with session.get(self.url, json=message) as resp: - try: - data = await resp.json() - if 'result' in data: - return data['result'] - elif 'error' in data: - return data['error'] - except Exception as e: - log.exception('Could not process response from server:', exc_info=e) - except aiohttp.ClientConnectionError: - print("Could not connect to daemon. Are you sure it's running?") + async def connect(self): + self.session = ClientSession() + self.ws = await self.session.ws_connect(self.url) + self.receive_messages_task = asyncio.create_task(self.receive_messages()) + + def disconnect(self): + self.session.close() + self.receive_messages_task.cancel() + + async def receive_messages(self): + async for message in self.ws: + d = message.json() + if 'id' in d: + controller = self.requests[d['id']] + if 'event' in d: + await controller.add(d['event']) + continue + elif 'result' in d: + await controller.add(d['result']) + elif 'error' in d: + await controller.add_error(Exception(d['error'])) + else: + raise ValueError(f'Unknown message received: {d}') + await controller.close() + del self.requests[d['id']] + elif 'method' in d and d['method'].startswith('event'): + print(d) + else: + raise ValueError(f'Unknown message received: {d}') + + async def send(self, method, **kwargs) -> EventStream: + self.message_id += 1 + self.requests[self.message_id] = ec = EventController() + await self.ws.send_json({'id': self.message_id, 'method': method, 'params': kwargs}) + return ec.stream + + async def subscribe(self, event) -> EventStream: + if event not in self.subscriptions: + self.subscriptions[event] = EventController() + await self.ws.send_json({'id': None, 'method': 'subscribe', 'params': [event]}) + return self.subscriptions[event].stream def __getattribute__(self, name): if name in dir(API):