switched Client class from plain GET RPC to WebSockets

This commit is contained in:
Lex Berezhny 2020-05-20 18:04:18 -04:00
parent f2e844c476
commit d11f4f9bed

View file

@ -2,13 +2,14 @@ import json
import time import time
import hashlib import hashlib
import inspect import inspect
import asyncio
from typing import Union, Tuple, Callable, Optional, List, Dict from typing import Union, Tuple, Callable, Optional, List, Dict
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from functools import partial from functools import partial
import ecdsa import ecdsa
import base58 import base58
import aiohttp from aiohttp import ClientSession
from lbry.conf import Setting, NOT_SET from lbry.conf import Setting, NOT_SET
from lbry.db import TXO_TYPES from lbry.db import TXO_TYPES
@ -16,7 +17,7 @@ from lbry.db.utils import constrain_single_or_list
from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic from lbry.wallet import Wallet, Account, SingleKey, HierarchicalDeterministic
from lbry.blockchain import Transaction, Output, dewies_to_lbc, dict_values_to_lbc from lbry.blockchain import Transaction, Output, dewies_to_lbc, dict_values_to_lbc
from lbry.stream.managed_stream import ManagedStream from lbry.stream.managed_stream import ManagedStream
from lbry.event import EventController, EventStream
from .base import Service from .base import Service
@ -3418,25 +3419,56 @@ class API:
class Client(API): class Client(API):
def __init__(self, service: Service, url): def __init__(self, url):
super().__init__(service)
self.url = url self.url = url
self.session: Optional[ClientSession] = None
self.receive_messages_task: Optional[asyncio.Task] = None
self.ws = None
self.message_id = 0
self.requests: Dict[int, EventController] = {}
self.subscriptions: Dict[str, EventController] = {}
async def send(self, method, **kwargs): async def connect(self):
async with aiohttp.ClientSession() as session: self.session = ClientSession()
try: self.ws = await self.session.ws_connect(self.url)
message = {'method': method, 'params': kwargs} self.receive_messages_task = asyncio.create_task(self.receive_messages())
async with session.get(self.url, json=message) as resp:
try: def disconnect(self):
data = await resp.json() self.session.close()
if 'result' in data: self.receive_messages_task.cancel()
return data['result']
elif 'error' in data: async def receive_messages(self):
return data['error'] async for message in self.ws:
except Exception as e: d = message.json()
log.exception('Could not process response from server:', exc_info=e) if 'id' in d:
except aiohttp.ClientConnectionError: controller = self.requests[d['id']]
print("Could not connect to daemon. Are you sure it's running?") if 'event' in d:
await controller.add(d['event'])
continue
elif 'result' in d:
await controller.add(d['result'])
elif 'error' in d:
await controller.add_error(Exception(d['error']))
else:
raise ValueError(f'Unknown message received: {d}')
await controller.close()
del self.requests[d['id']]
elif 'method' in d and d['method'].startswith('event'):
print(d)
else:
raise ValueError(f'Unknown message received: {d}')
async def send(self, method, **kwargs) -> EventStream:
self.message_id += 1
self.requests[self.message_id] = ec = EventController()
await self.ws.send_json({'id': self.message_id, 'method': method, 'params': kwargs})
return ec.stream
async def subscribe(self, event) -> EventStream:
if event not in self.subscriptions:
self.subscriptions[event] = EventController()
await self.ws.send_json({'id': None, 'method': 'subscribe', 'params': [event]})
return self.subscriptions[event].stream
def __getattribute__(self, name): def __getattribute__(self, name):
if name in dir(API): if name in dir(API):