Changes to support go hub in the testing frameowkr

This commit is contained in:
Jeffrey Picard 2022-03-24 18:52:20 -04:00
parent b8867cd18c
commit 9ed8ced457
4 changed files with 29 additions and 6 deletions

View file

@ -1140,6 +1140,12 @@ class Daemon(metaclass=JSONRPCServerType):
} }
""" """
wallet = self.wallet_manager.get_wallet_or_default(wallet_id) wallet = self.wallet_manager.get_wallet_or_default(wallet_id)
if self.ledger.config.get('use_go_hub'):
# If client isn't connected yet (which happens in some test for some reason?)
# just default to localhost
host = self.ledger.network.client.server[0] if self.ledger.network.client else "127.0.0.1"
port = "50051"
kwargs['new_sdk_server'] = f"{host}:{port}"
if isinstance(urls, str): if isinstance(urls, str):
urls = [urls] urls = [urls]

View file

@ -786,7 +786,7 @@ class Ledger(metaclass=LedgerRegistry):
if hub_server: if hub_server:
outputs = Outputs.from_grpc(encoded_outputs) outputs = Outputs.from_grpc(encoded_outputs)
else: else:
outputs = Outputs.from_base64(encoded_outputs or '') # TODO: why is the server returning None? outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None?
txs: List[Transaction] = [] txs: List[Transaction] = []
if len(outputs.txs) > 0: if len(outputs.txs) > 0:
async for tx in self.request_transactions(tuple(outputs.txs), cached=True): async for tx in self.request_transactions(tuple(outputs.txs), cached=True):
@ -866,7 +866,7 @@ class Ledger(metaclass=LedgerRegistry):
txos = [] txos = []
urls_copy = list(urls) urls_copy = list(urls)
if new_sdk_server: if new_sdk_server:
resolve = partial(self.network.new_resolve, new_sdk_server) resolve = partial(self.network.go_hub_resolve, new_sdk_server)
else: else:
resolve = partial(self.network.retriable_call, self.network.resolve) resolve = partial(self.network.retriable_call, self.network.resolve)
while urls_copy: while urls_copy:

View file

@ -1,5 +1,6 @@
import logging import logging
import asyncio import asyncio
import base64
import json import json
import socket import socket
import random import random
@ -10,6 +11,7 @@ import aiohttp
import grpc import grpc
from lbry.schema.types.v2 import hub_pb2_grpc from lbry.schema.types.v2 import hub_pb2_grpc
from lbry.schema.types.v2.hub_pb2 import SearchRequest from lbry.schema.types.v2.hub_pb2 import SearchRequest
from lbry.schema.types.v2.hub_pb2 import StringArray
from lbry import __version__ from lbry import __version__
from lbry.utils import resolve_host from lbry.utils import resolve_host
@ -492,6 +494,20 @@ class Network:
raise RPCError(error.code(), error.details()) raise RPCError(error.code(), error.details())
return response return response
async def go_hub_resolve(self, server, urls, **kwargs):
async with grpc.aio.insecure_channel(server) as channel:
stub = hub_pb2_grpc.HubStub(channel)
try:
response = await stub.Resolve(StringArray(value=urls))
except grpc.aio.AioRpcError as error:
raise RPCError(error.code(), error.details())
# Currently we get the grpc type from the go hub, so we need to serialize it and
# encode it on this side before returning it.
serialized = response.SerializeToString()
# log.warn(serialized)
response = base64.b64encode(serialized).decode()
return response
async def sum_supports(self, server, **kwargs): async def sum_supports(self, server, **kwargs):
message = {"method": "support_sum", "params": kwargs} message = {"method": "support_sum", "params": kwargs}
async with self.aiohttp_session.post(server, json=message) as r: async with self.aiohttp_session.post(server, json=message) as r:

View file

@ -693,9 +693,6 @@ class HubProcess(asyncio.SubprocessProtocol):
self.ready.set() self.ready.set()
if self.log: if self.log:
self.log.info(data.decode()) self.log.info(data.decode())
if b'error' in data.lower():
self.ready.set()
raise SystemError(data.decode())
if b'listening on' in data: if b'listening on' in data:
self.ready.set() self.ready.set()
str_lines = str(data.decode()).split("\n") str_lines = str(data.decode()).split("\n")
@ -788,7 +785,11 @@ class HubNode:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop) asyncio.get_child_watcher().attach_loop(loop)
command = [ command = [
self.daemon_bin, 'serve', '--esindex', self.spv_node.index_name + 'claims', '--debug' self.daemon_bin,
'serve',
'--esindex', self.spv_node.index_name + 'claims',
'--debug',
'--db-path', os.path.join(self.spv_node.data_path, "lbry-rocksdb")
] ]
self.log.info(' '.join(command)) self.log.info(' '.join(command))
self.protocol = HubProcess(self.running, self._stopped) self.protocol = HubProcess(self.running, self._stopped)