From 9ed8ced457bfe2935ed174f3660a88da8b0266ec Mon Sep 17 00:00:00 2001 From: Jeffrey Picard Date: Thu, 24 Mar 2022 18:52:20 -0400 Subject: [PATCH] Changes to support go hub in the testing frameowkr --- lbry/extras/daemon/daemon.py | 6 ++++++ lbry/wallet/ledger.py | 4 ++-- lbry/wallet/network.py | 16 ++++++++++++++++ lbry/wallet/orchstr8/node.py | 9 +++++---- 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 6881889bc..a41ba5919 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -1140,6 +1140,12 @@ class Daemon(metaclass=JSONRPCServerType): } """ wallet = self.wallet_manager.get_wallet_or_default(wallet_id) + if self.ledger.config.get('use_go_hub'): + # If client isn't connected yet (which happens in some test for some reason?) + # just default to localhost + host = self.ledger.network.client.server[0] if self.ledger.network.client else "127.0.0.1" + port = "50051" + kwargs['new_sdk_server'] = f"{host}:{port}" if isinstance(urls, str): urls = [urls] diff --git a/lbry/wallet/ledger.py b/lbry/wallet/ledger.py index 2ecd81bd5..364b2e36e 100644 --- a/lbry/wallet/ledger.py +++ b/lbry/wallet/ledger.py @@ -786,7 +786,7 @@ class Ledger(metaclass=LedgerRegistry): if hub_server: outputs = Outputs.from_grpc(encoded_outputs) else: - outputs = Outputs.from_base64(encoded_outputs or '') # TODO: why is the server returning None? + outputs = Outputs.from_base64(encoded_outputs or b'') # TODO: why is the server returning None? txs: List[Transaction] = [] if len(outputs.txs) > 0: async for tx in self.request_transactions(tuple(outputs.txs), cached=True): @@ -866,7 +866,7 @@ class Ledger(metaclass=LedgerRegistry): txos = [] urls_copy = list(urls) if new_sdk_server: - resolve = partial(self.network.new_resolve, new_sdk_server) + resolve = partial(self.network.go_hub_resolve, new_sdk_server) else: resolve = partial(self.network.retriable_call, self.network.resolve) while urls_copy: diff --git a/lbry/wallet/network.py b/lbry/wallet/network.py index 27c57dd1b..7125bf01b 100644 --- a/lbry/wallet/network.py +++ b/lbry/wallet/network.py @@ -1,5 +1,6 @@ import logging import asyncio +import base64 import json import socket import random @@ -10,6 +11,7 @@ import aiohttp import grpc from lbry.schema.types.v2 import hub_pb2_grpc from lbry.schema.types.v2.hub_pb2 import SearchRequest +from lbry.schema.types.v2.hub_pb2 import StringArray from lbry import __version__ from lbry.utils import resolve_host @@ -492,6 +494,20 @@ class Network: raise RPCError(error.code(), error.details()) return response + async def go_hub_resolve(self, server, urls, **kwargs): + async with grpc.aio.insecure_channel(server) as channel: + stub = hub_pb2_grpc.HubStub(channel) + try: + response = await stub.Resolve(StringArray(value=urls)) + except grpc.aio.AioRpcError as error: + raise RPCError(error.code(), error.details()) + # Currently we get the grpc type from the go hub, so we need to serialize it and + # encode it on this side before returning it. + serialized = response.SerializeToString() + # log.warn(serialized) + response = base64.b64encode(serialized).decode() + return response + async def sum_supports(self, server, **kwargs): message = {"method": "support_sum", "params": kwargs} async with self.aiohttp_session.post(server, json=message) as r: diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index 9606c0a37..8b9c9485a 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -693,9 +693,6 @@ class HubProcess(asyncio.SubprocessProtocol): self.ready.set() if self.log: self.log.info(data.decode()) - if b'error' in data.lower(): - self.ready.set() - raise SystemError(data.decode()) if b'listening on' in data: self.ready.set() str_lines = str(data.decode()).split("\n") @@ -788,7 +785,11 @@ class HubNode: loop = asyncio.get_event_loop() asyncio.get_child_watcher().attach_loop(loop) command = [ - self.daemon_bin, 'serve', '--esindex', self.spv_node.index_name + 'claims', '--debug' + self.daemon_bin, + 'serve', + '--esindex', self.spv_node.index_name + 'claims', + '--debug', + '--db-path', os.path.join(self.spv_node.data_path, "lbry-rocksdb") ] self.log.info(' '.join(command)) self.protocol = HubProcess(self.running, self._stopped)