tons of small changes squashed together

This commit is contained in:
Jeffrey Picard 2021-06-09 22:40:32 -04:00 committed by Victor Shyba
parent 9dcaa829ea
commit 15a56ca25e
9 changed files with 159 additions and 183 deletions

View file

@ -10,6 +10,7 @@ import typing
import random import random
import hashlib import hashlib
import tracemalloc import tracemalloc
from decimal import Decimal
from urllib.parse import urlencode, quote from urllib.parse import urlencode, quote
from typing import Callable, Optional, List from typing import Callable, Optional, List
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
@ -51,7 +52,8 @@ from lbry.extras.daemon.security import ensure_request_allowed
from lbry.file_analysis import VideoFileAnalyzer from lbry.file_analysis import VideoFileAnalyzer
from lbry.schema.claim import Claim from lbry.schema.claim import Claim
from lbry.schema.url import URL from lbry.schema.url import URL
from lbry.wallet.orchstr8.node import fix_kwargs_for_hub from lbry.wallet.server.db.elasticsearch.constants import RANGE_FIELDS
MY_RANGE_FIELDS = RANGE_FIELDS - {"limit_claims_per_channel"}
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from lbry.blob.blob_manager import BlobManager from lbry.blob.blob_manager import BlobManager
@ -169,6 +171,118 @@ def paginate_list(items: List, page: Optional[int], page_size: Optional[int]):
} }
def fix_kwargs_for_hub(**kwargs):
repeated_fields = {"name", "claim_name", "normalized", "reposted_claim_id", "_id", "public_key_hash",
"public_key_bytes", "signature_digest", "signature", "tx_id", "channel_id",
"fee_currency", "media_type", "stream_type", "claim_type", "description", "author", "title",
"canonical_url", "short_url", "claim_id"}
value_fields = {"offset", "limit", "has_channel_signature", "has_source", "has_no_source",
"limit_claims_per_channel", "tx_nout", "remove_duplicates",
"signature_valid", "is_controlling", "amount_order", "no_totals"}
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
for key in list(kwargs.keys()):
value = kwargs[key]
if key == "txid":
kwargs["tx_id"] = kwargs.pop("txid")
key = "tx_id"
if key == "nout":
kwargs["tx_nout"] = kwargs.pop("nout")
key = "tx_nout"
if key == "valid_channel_signature":
kwargs["signature_valid"] = kwargs.pop("valid_channel_signature")
if key == "invalid_channel_signature":
kwargs["signature_valid"] = not kwargs.pop("invalid_channel_signature")
if key in {"valid_channel_signature", "invalid_channel_signature"}:
key = "signature_valid"
value = kwargs[key]
if key == "has_no_source":
kwargs["has_source"] = not kwargs.pop("has_no_source")
key = "has_source"
value = kwargs[key]
if key in value_fields:
kwargs[key] = {"value": value} if not isinstance(value, dict) else value
if key in repeated_fields and isinstance(value, str):
kwargs[key] = [value]
if key == "claim_id":
kwargs["claim_id"] = {
"invert": False,
"value": kwargs["claim_id"]
}
if key == "not_claim_id":
kwargs["claim_id"] = {
"invert": True,
"value": kwargs.pop("not_claim_id")
}
if key == "claim_ids":
kwargs["claim_id"] = {
"invert": False,
"value": kwargs.pop("claim_ids")
}
if key == "not_claim_ids":
kwargs["claim_id"] = {
"invert": True,
"value": kwargs["not_claim_ids"]
}
del kwargs["not_claim_ids"]
if key == "channel_id":
kwargs["channel_id"] = {
"invert": False,
"value": kwargs["channel_id"]
}
if key == "channel":
kwargs["channel_id"] = {
"invert": False,
"value": kwargs.pop("channel")
}
if key == "not_channel_id":
kwargs["channel_id"] = {
"invert": True,
"value": kwargs.pop("not_channel_id")
}
if key == "channel_ids":
kwargs["channel_ids"] = {
"invert": False,
"value": kwargs["channel_ids"]
}
if key == "not_channel_ids":
kwargs["channel_ids"] = {
"invert": True,
"value": kwargs.pop("not_channel_ids")
}
if key in MY_RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
operator_length = 2 if value[:2] in ops else 1
operator, value = value[:operator_length], value[operator_length:]
op = 0
if operator == '=':
op = 0
if operator in ('<=', 'lte'):
op = 1
if operator in ('>=', 'gte'):
op = 2
if operator in ('<', 'lt'):
op = 3
if operator in ('>', 'gt'):
op = 4
kwargs[key] = {"op": op, "value": [str(value)]}
elif key in MY_RANGE_FIELDS:
kwargs[key] = {"op": 0, "value": [str(value)]}
if key == 'fee_amount':
value = kwargs['fee_amount']
value.update({"value": [str(Decimal(value['value'][0]) * 1000)]})
kwargs['fee_amount'] = value
if key == 'stream_types':
kwargs['stream_type'] = kwargs.pop('stream_types')
if key == 'media_types':
kwargs['media_type'] = kwargs.pop('media_types')
return kwargs
DHT_HAS_CONTACTS = "dht_has_contacts" DHT_HAS_CONTACTS = "dht_has_contacts"
@ -2496,7 +2610,18 @@ class Daemon(metaclass=JSONRPCServerType):
Returns: {Paginated[Output]} Returns: {Paginated[Output]}
""" """
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true": if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true":
kwargs['new_sdk_server'] = "localhost:50051" log.warning("### Using go hub! ###")
host = os.environ.get("HUB_HOST", "localhost")
port = os.environ.get("HUB_PORT", "50051")
kwargs['new_sdk_server'] = f"{host}:{port}"
if kwargs.get("channel"):
channel = kwargs.pop("channel")
channel_obj = (await self.jsonrpc_resolve(channel))[channel]
if isinstance(channel_obj, dict):
# This happens when the channel doesn't exist
kwargs["channel_id"] = ""
else:
kwargs["channel_id"] = channel_obj.claim_id
kwargs = fix_kwargs_for_hub(**kwargs) kwargs = fix_kwargs_for_hub(**kwargs)
else: else:
# Don't do this if using the hub server, it screws everything up # Don't do this if using the hub server, it screws everything up

View file

@ -19,7 +19,6 @@ from lbry.conf import Config
from lbry.wallet.util import satoshis_to_coins from lbry.wallet.util import satoshis_to_coins
from lbry.wallet.orchstr8 import Conductor from lbry.wallet.orchstr8 import Conductor
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode, HubNode
from lbry.wallet.orchstr8.node import fix_kwargs_for_hub
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
from lbry.extras.daemon.components import Component, WalletComponent from lbry.extras.daemon.components import Component, WalletComponent

View file

@ -891,7 +891,7 @@ class Ledger(metaclass=LedgerRegistry):
claim_search(**kwargs), accounts, claim_search(**kwargs), accounts,
include_purchase_receipt=include_purchase_receipt, include_purchase_receipt=include_purchase_receipt,
include_is_my_output=include_is_my_output, include_is_my_output=include_is_my_output,
hub_server=True if new_sdk_server else False hub_server=new_sdk_server is not None
) )
async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output: async def get_claim_by_claim_id(self, accounts, claim_id, **kwargs) -> Output:

View file

@ -1,7 +1,6 @@
import logging import logging
import asyncio import asyncio
import json import json
import os
import socket import socket
import random import random
from time import perf_counter from time import perf_counter
@ -479,21 +478,14 @@ class Network:
return result['result'] return result['result']
async def new_claim_search(self, server, **kwargs): async def new_claim_search(self, server, **kwargs):
if os.environ.get("GO_HUB") and os.environ.get("GO_HUB") == "true": if "offset" in kwargs and isinstance(kwargs["offset"], int):
if "offset" in kwargs and type(kwargs["offset"]) == int: kwargs["offset"] = {"value": kwargs["offset"]}
kwargs["offset"] = {"value": kwargs["offset"]} if "limit" in kwargs and isinstance(kwargs["limit"], int):
if "limit" in kwargs and type(kwargs["limit"]) == int: kwargs["limit"] = {"value": kwargs["limit"]}
kwargs["limit"] = {"value": kwargs["limit"]} async with grpc.aio.insecure_channel(server) as channel:
async with grpc.aio.insecure_channel(server) as channel: stub = hub_pb2_grpc.HubStub(channel)
stub = hub_pb2_grpc.HubStub(channel) response = await stub.Search(SearchRequest(**kwargs))
response = await stub.Search(SearchRequest(**kwargs)) return response
return response
kwargs['protobuf'] = True
message = {"method": "claim_search", "params": kwargs}
async with self.aiohttp_session.post(server, json=message) as r:
result = await r.json()
return result['result']
async def sum_supports(self, server, **kwargs): async def sum_supports(self, server, **kwargs):
message = {"method": "support_sum", "params": kwargs} message = {"method": "support_sum", "params": kwargs}

View file

@ -1,2 +1,5 @@
__hub_url__ = (
"https://github.com/lbryio/hub/releases/download/v0.2021.06.14-beta/hub"
)
from .node import Conductor from .node import Conductor
from .service import ConductorService from .service import ConductorService

View file

@ -18,11 +18,8 @@ from lbry.wallet.server.server import Server
from lbry.wallet.server.env import Env from lbry.wallet.server.env import Env
from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent from lbry.wallet import Wallet, Ledger, RegTestLedger, WalletManager, Account, BlockHeightEvent
from lbry.conf import KnownHubsList, Config from lbry.conf import KnownHubsList, Config
from lbry.wallet.orchstr8 import __hub_url__
from decimal import Decimal
from lbry.wallet.server.db.elasticsearch.constants import INDEX_DEFAULT_SETTINGS, REPLACEMENTS, FIELDS, TEXT_FIELDS, \
RANGE_FIELDS
MY_RANGE_FIELDS = RANGE_FIELDS - {"limit_claims_per_channel"}
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -51,7 +48,7 @@ class Conductor:
self.wallet_node = WalletNode( self.wallet_node = WalletNode(
self.manager_module, RegTestLedger, default_seed=seed self.manager_module, RegTestLedger, default_seed=seed
) )
self.hub_node = HubNode("asdf", "hub", "hub") self.hub_node = HubNode(__hub_url__, "hub", "hub")
self.blockchain_started = False self.blockchain_started = False
self.spv_started = False self.spv_started = False
@ -482,139 +479,17 @@ class HubProcess(asyncio.SubprocessProtocol):
raise SystemError(data.decode()) raise SystemError(data.decode())
if b'listening on' in data: if b'listening on' in data:
self.ready.set() self.ready.set()
print(data.decode("utf-8"))
def process_exited(self): def process_exited(self):
self.stopped.set() self.stopped.set()
self.ready.set() self.ready.set()
def fix_kwargs_for_hub(**kwargs):
# DEFAULT_PAGE_SIZE = 20
# page_num, page_size = abs(kwargs.pop('page', 1)), min(abs(kwargs.pop('page_size', DEFAULT_PAGE_SIZE)), 50)
# kwargs.update({'offset': page_size * (page_num - 1), 'limit': page_size})
repeated_fields = {"name", "claim_name", "normalized", "reposted_claim_id", "_id", "public_key_hash",
"public_key_bytes", "signature_digest", "signature", "tx_id", "channel_id",
"fee_currency", "media_type", "stream_type", "claim_type", "description", "author", "title",
"canonical_url", "short_url", "claim_id"}
value_fields = {"offset", "limit", "has_channel_signature", "has_source", "has_no_source",
"limit_claims_per_channel", "tx_nout", "remove_duplicates",
"signature_valid", "is_controlling", "amount_order"}
ops = {'<=': 'lte', '>=': 'gte', '<': 'lt', '>': 'gt'}
for key in list(kwargs.keys()):
value = kwargs[key]
if "txid" == key:
kwargs["tx_id"] = kwargs.pop("txid")
key = "tx_id"
if "nout" == key:
kwargs["tx_nout"] = kwargs.pop("nout")
key = "tx_nout"
if "valid_channel_signature" == key:
kwargs["signature_valid"] = kwargs.pop("valid_channel_signature")
if "invalid_channel_signature" == key:
kwargs["signature_valid"] = not kwargs.pop("invalid_channel_signature")
if key in {"valid_channel_signature", "invalid_channel_signature"}:
key = "signature_valid"
value = kwargs[key]
if "has_no_source" == key:
kwargs["has_source"] = not kwargs.pop("has_no_source")
key = "has_source"
value = kwargs[key]
if key in value_fields:
kwargs[key] = {"value": value} if type(value) != dict else value
if key in repeated_fields:
kwargs[key] = [value]
if "claim_id" == key:
kwargs["claim_id"] = {
"invert": False,
"value": kwargs["claim_id"]
}
if "not_claim_id" == key:
kwargs["claim_id"] = {
"invert": True,
"value": kwargs["not_claim_id"]
}
del kwargs["not_claim_id"]
if "claim_ids" == key:
kwargs["claim_id"] = {
"invert": False,
"value": kwargs["claim_ids"]
}
del kwargs["claim_ids"]
if "not_claim_ids" == key:
kwargs["claim_id"] = {
"invert": True,
"value": kwargs["not_claim_ids"]
}
del kwargs["not_claim_ids"]
if "channel_id" == key:
kwargs["channel_id"] = {
"invert": False,
"value": kwargs["channel_id"]
}
if "channel" == key:
kwargs["channel_id"] = {
"invert": False,
"value": kwargs["channel"]
}
del kwargs["channel"]
if "not_channel_id" == key:
kwargs["channel_id"] = {
"invert": True,
"value": kwargs["not_channel_id"]
}
del kwargs["not_channel_id"]
if "channel_ids" == key:
kwargs["channel_ids"] = {
"invert": False,
"value": kwargs["channel_ids"]
}
if "not_channel_ids" == key:
kwargs["channel_ids"] = {
"invert": True,
"value": kwargs["not_channel_ids"]
}
del kwargs["not_channel_ids"]
if key in MY_RANGE_FIELDS and isinstance(value, str) and value[0] in ops:
operator_length = 2 if value[:2] in ops else 1
operator, value = value[:operator_length], value[operator_length:]
op = 0
if operator == '=':
op = 0
if operator == '<=' or operator == 'lte':
op = 1
if operator == '>=' or operator == 'gte':
op = 2
if operator == '<' or operator == 'lt':
op = 3
if operator == '>' or operator == 'gt':
op = 4
kwargs[key] = {"op": op, "value": [str(value)]}
elif key in MY_RANGE_FIELDS:
kwargs[key] = {"op": 0, "value": [str(value)]}
if 'fee_amount' == key:
value = kwargs['fee_amount']
value.update({"value": [str(Decimal(value['value'][0]) * 1000)]})
kwargs['fee_amount'] = value
if 'stream_types' == key:
kwargs['stream_type'] = kwargs.pop('stream_types')
if 'media_types' == key:
kwargs['media_type'] = kwargs.pop('media_types')
return kwargs
class HubNode: class HubNode:
def __init__(self, url, daemon, cli): def __init__(self, url, daemon, cli):
self.debug = True self.debug = False
self.latest_release_url = url self.latest_release_url = url
self.project_dir = os.path.dirname(os.path.dirname(__file__)) self.project_dir = os.path.dirname(os.path.dirname(__file__))
@ -622,24 +497,15 @@ class HubNode:
self.daemon_bin = os.path.join(self.bin_dir, daemon) self.daemon_bin = os.path.join(self.bin_dir, daemon)
self.cli_bin = os.path.join(self.bin_dir, daemon) self.cli_bin = os.path.join(self.bin_dir, daemon)
self.log = log.getChild('hub') self.log = log.getChild('hub')
self.data_path = None
self.protocol = None
self.transport = None self.transport = None
self.block_expected = 0 self.protocol = None
self.hostname = 'localhost' self.hostname = 'localhost'
# self.peerport = 9246 + 13 # avoid conflict with default peer port self.rpcport = 50051 # avoid conflict with default rpc port
self.rpcport = 50051 # avoid conflict with default rpc port
self.rpcuser = 'rpcuser'
self.rpcpassword = 'rpcpassword'
self.stopped = False self.stopped = False
self.restart_ready = asyncio.Event() self.restart_ready = asyncio.Event()
self.restart_ready.set() self.restart_ready.set()
self.running = asyncio.Event() self.running = asyncio.Event()
@property
def rpc_url(self):
return f'http://{self.rpcuser}:{self.rpcpassword}@{self.hostname}:{self.rpcport}/'
@property @property
def exists(self): def exists(self):
return ( return (
@ -675,6 +541,8 @@ class HubNode:
with tarfile.open(downloaded_file) as tar: with tarfile.open(downloaded_file) as tar:
tar.extractall(self.bin_dir) tar.extractall(self.bin_dir)
os.chmod(self.daemon_bin, 0o755)
return self.exists return self.exists
def ensure(self): def ensure(self):
@ -682,11 +550,10 @@ class HubNode:
async def start(self): async def start(self):
assert self.ensure() assert self.ensure()
self.data_path = tempfile.mkdtemp()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
asyncio.get_child_watcher().attach_loop(loop) asyncio.get_child_watcher().attach_loop(loop)
command = [ command = [
self.daemon_bin, 'serve', self.daemon_bin, 'serve', '--dev'
] ]
self.log.info(' '.join(command)) self.log.info(' '.join(command))
while not self.stopped: while not self.stopped:
@ -720,19 +587,8 @@ class HubNode:
if cleanup: if cleanup:
self.cleanup() self.cleanup()
async def clear_mempool(self):
self.restart_ready.clear()
self.transport.terminate()
await self.protocol.stopped.wait()
self.transport.close()
self.running.clear()
# os.remove(os.path.join(self.data_path, 'regtest', 'mempool.dat'))
self.restart_ready.set()
await self.running.wait()
def cleanup(self): def cleanup(self):
pass pass
#shutil.rmtree(self.data_path, ignore_errors=True)
async def _cli_cmnd(self, *args): async def _cli_cmnd(self, *args):
cmnd_args = [ cmnd_args = [

View file

@ -135,8 +135,6 @@ class ClaimSearchCommand(ClaimTestCase):
# three streams in channel, zero streams in abandoned channel # three streams in channel, zero streams in abandoned channel
claims = [three, two, signed] claims = [three, two, signed]
await self.assertFindsClaims(claims, channel_ids=[self.channel_id]) await self.assertFindsClaims(claims, channel_ids=[self.channel_id])
# FIXME
# channel param doesn't work yet because we need to implement resolve url from search first
cid = await self.daemon.jsonrpc_resolve(f"@abc#{self.channel_id}") cid = await self.daemon.jsonrpc_resolve(f"@abc#{self.channel_id}")
await self.assertFindsClaims(claims, channel_id=cid[f"@abc#{self.channel_id}"].claim_id) await self.assertFindsClaims(claims, channel_id=cid[f"@abc#{self.channel_id}"].claim_id)
cid = await self.daemon.jsonrpc_resolve(f"@inexistent") cid = await self.daemon.jsonrpc_resolve(f"@inexistent")
@ -168,9 +166,10 @@ class ClaimSearchCommand(ClaimTestCase):
# FIXME # FIXME
# print(valid_claims) # print(valid_claims)
# Something happens in inflation I think and this gets switch from valid to not # Something happens in inflation I think and this gets switch from valid to not
# self.assertTrue(all([c['is_channel_signature_valid'] for c in valid_claims])) self.assertTrue(all([c['is_channel_signature_valid'] for c in valid_claims]))
# And signing channel only has id? 'signing_channel': {'channel_id': '6f4513e9bbd63d7b7f13dbf4fd2ef28c560ac89b'} # import json
# self.assertEqual('@abc', valid_claims[0]['signing_channel']['name']) # print(json.dumps(valid_claims, indent=4, sort_keys=True))
self.assertEqual('@abc', valid_claims[0]['signing_channel']['name'])
# abandoned stream won't show up for streams in channel search # abandoned stream won't show up for streams in channel search
await self.stream_abandon(txid=signed2['txid'], nout=0) await self.stream_abandon(txid=signed2['txid'], nout=0)
@ -397,6 +396,7 @@ class ClaimSearchCommand(ClaimTestCase):
limit_claims_per_channel=3, claim_type='stream' limit_claims_per_channel=3, claim_type='stream'
) )
# @skip("okay")
async def test_no_duplicates(self): async def test_no_duplicates(self):
await self.generate(10) await self.generate(10)
match = self.assertFindsClaims match = self.assertFindsClaims
@ -499,7 +499,7 @@ class ClaimSearchCommand(ClaimTestCase):
await self.assertFindsClaims([], duration='>100') await self.assertFindsClaims([], duration='>100')
await self.assertFindsClaims([], duration='<14') await self.assertFindsClaims([], duration='<14')
# @skip("okay") # # @skip("okay")
async def test_search_by_text(self): async def test_search_by_text(self):
chan1_id = self.get_claim_id(await self.channel_create('@SatoshiNakamoto')) chan1_id = self.get_claim_id(await self.channel_create('@SatoshiNakamoto'))
chan2_id = self.get_claim_id(await self.channel_create('@Bitcoin')) chan2_id = self.get_claim_id(await self.channel_create('@Bitcoin'))

View file

@ -88,13 +88,13 @@ class TestLanguages(TestCase):
def test_language_error_parsing(self): def test_language_error_parsing(self):
stream = Stream() stream = Stream()
with self.assertRaisesRegex(ValueError, 'Language has no value defined for name zz'): with self.assertRaisesRegex(ValueError, "Enum Language has no value defined for name 'zz'"):
stream.languages.append('zz') stream.languages.append('zz')
with self.assertRaisesRegex(ValueError, 'Script has no value defined for name Zabc'): with self.assertRaisesRegex(ValueError, "Enum Script has no value defined for name 'Zabc'"):
stream.languages.append('en-Zabc') stream.languages.append('en-Zabc')
with self.assertRaisesRegex(ValueError, 'Country has no value defined for name ZZ'): with self.assertRaisesRegex(ValueError, "Enum Country has no value defined for name 'ZZ'"):
stream.languages.append('en-Zzzz-ZZ') stream.languages.append('en-Zzzz-ZZ')
with self.assertRaisesRegex(AssertionError, 'Failed to parse language tag: en-Zzz-US'): with self.assertRaisesRegex(AssertionError, "Failed to parse language tag: en-Zzz-US"):
stream.languages.append('en-Zzz-US') stream.languages.append('en-Zzz-US')

View file

@ -8,6 +8,7 @@ changedir = {toxinidir}/tests
setenv = setenv =
HOME=/tmp HOME=/tmp
ELASTIC_HOST={env:ELASTIC_HOST:localhost} ELASTIC_HOST={env:ELASTIC_HOST:localhost}
GO_HUB=true
commands = commands =
orchstr8 download orchstr8 download
blockchain: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.blockchain {posargs} blockchain: coverage run -p --source={envsitepackagesdir}/lbry -m unittest discover -vv integration.blockchain {posargs}