This commit is contained in:
Lex Berezhny 2020-05-01 09:31:33 -04:00
parent 6a33d86bfe
commit fef09c1773
6 changed files with 143 additions and 71 deletions

View file

@ -10,7 +10,7 @@ import yaml
from appdirs import user_data_dir, user_config_dir
from lbry.error import InvalidCurrencyError
from lbry.dht import constants
from lbry.wallet.coinselection import STRATEGIES
from lbry.wallet.coinselection import COIN_SELECTION_STRATEGIES
log = logging.getLogger(__name__)
@ -382,8 +382,12 @@ class BaseConfig:
self.environment = {} # from environment variables
self.persisted = {} # from config file
self._updating_config = False
self.set(**kwargs)
def set(self, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
return self
@contextmanager
def update_config(self):
@ -603,6 +607,17 @@ class Config(CLIConfig):
# blockchain
blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main')
spv_address_filters = Toggle(
"Generate Golomb-Rice coding filters for blocks and transactions. Enables "
"light client to synchronize with a full node.",
True
)
lbrycrd_dir = Path(
"Directory containing lbrycrd data.",
previous_names=['lbrycrd_dir'], metavar='DIR'
)
# daemon
save_files = Toggle("Save downloaded files when calling `get` by default", True)
components_to_skip = Strings("components which will be skipped during start-up of daemon", [])
@ -620,7 +635,7 @@ class Config(CLIConfig):
coin_selection_strategy = StringChoice(
"Strategy to use when selecting UTXOs for a transaction",
STRATEGIES, "standard")
COIN_SELECTION_STRATEGIES, "standard")
save_resolved_claims = Toggle(
"Save content claims to the database when they are resolved to keep file_list up to date, "
@ -639,6 +654,15 @@ class Config(CLIConfig):
super().__init__(**kwargs)
self.set_default_paths()
@classmethod
def with_same_dir(cls, same_dir):
return cls(
data_dir=same_dir,
download_dir=same_dir,
wallet_dir=same_dir,
lbrycrd_dir=same_dir,
)
def set_default_paths(self):
if 'darwin' in sys.platform.lower():
get_directories = get_darwin_directories

View file

@ -2,26 +2,3 @@ NULL_HASH32 = b'\x00'*32
CENT = 1000000
COIN = 100*CENT
TIMEOUT = 30.0
TXO_TYPES = {
"other": 0,
"stream": 1,
"channel": 2,
"support": 3,
"purchase": 4,
"collection": 5,
"repost": 6,
}
CLAIM_TYPE_NAMES = [
'stream',
'channel',
'collection',
'repost',
]
CLAIM_TYPES = [
TXO_TYPES[name] for name in CLAIM_TYPE_NAMES
]

View file

@ -2,7 +2,7 @@ from coincurve import PublicKey, PrivateKey as _PrivateKey
from lbry.crypto.hash import hmac_sha512, hash160, double_sha256
from lbry.crypto.base58 import Base58
from .util import cachedproperty
from lbry.utils import cachedproperty
class DerivationError(Exception):

View file

@ -1,11 +1,15 @@
import asyncio
import threading
import multiprocessing
import logging
log = logging.getLogger(__name__)
class BroadcastSubscription:
def __init__(self, controller, on_data, on_error, on_done):
def __init__(self, controller: 'EventController', on_data, on_error, on_done):
self._controller = controller
self._previous = self._next = None
self._on_data = on_data
@ -45,10 +49,10 @@ class BroadcastSubscription:
self.is_closed = True
class StreamController:
class EventController:
def __init__(self, merge_repeated_events=False):
self.stream = Stream(self)
self.stream = EventStream(self)
self._first_subscription = None
self._last_subscription = None
self._last_event = None
@ -66,30 +70,25 @@ class StreamController:
next_sub = next_sub._next
yield subscription
def _notify_and_ensure_future(self, notify):
tasks = []
for subscription in self._iterate_subscriptions:
maybe_coroutine = notify(subscription)
async def _notify(self, notify, event):
try:
maybe_coroutine = notify(event)
if asyncio.iscoroutine(maybe_coroutine):
tasks.append(maybe_coroutine)
if tasks:
return asyncio.ensure_future(asyncio.wait(tasks))
else:
f = asyncio.get_event_loop().create_future()
f.set_result(None)
return f
await maybe_coroutine
except Exception as e:
log.exception(e)
raise
def add(self, event):
skip = self._merge_repeated and event == self._last_event
async def add(self, event):
if self._merge_repeated and event == self._last_event:
return
self._last_event = event
return self._notify_and_ensure_future(
lambda subscription: None if skip else subscription._add(event)
)
for subscription in self._iterate_subscriptions:
await self._notify(subscription._add, event)
def add_error(self, exception):
return self._notify_and_ensure_future(
lambda subscription: subscription._add_error(exception)
)
async def add_error(self, exception):
for subscription in self._iterate_subscriptions:
await self._notify(subscription._add_error, exception)
def close(self):
for subscription in self._iterate_subscriptions:
@ -121,16 +120,16 @@ class StreamController:
return subscription
class Stream:
class EventStream:
def __init__(self, controller):
self._controller = controller
def listen(self, on_data, on_error=None, on_done=None):
def listen(self, on_data, on_error=None, on_done=None) -> BroadcastSubscription:
return self._controller._listen(on_data, on_error, on_done)
def where(self, condition) -> asyncio.Future:
future = asyncio.get_event_loop().create_future()
future = asyncio.get_running_loop().create_future()
def where_test(value):
if condition(value):
@ -144,7 +143,7 @@ class Stream:
return future
@property
def first(self):
def first(self) -> asyncio.Future:
future = asyncio.get_event_loop().create_future()
subscription = self.listen(
lambda value: not future.done() and self._cancel_and_callback(subscription, future, value),
@ -165,12 +164,12 @@ class Stream:
class EventQueuePublisher(threading.Thread):
STOP = object()
STOP = 'STOP'
def __init__(self, queue: multiprocessing.Queue, stream_controller: StreamController):
def __init__(self, queue: multiprocessing.Queue, event_controller: EventController):
super().__init__()
self.queue = queue
self.stream_controller = stream_controller
self.event_controller = event_controller
self.loop = asyncio.get_running_loop()
def run(self):
@ -178,7 +177,9 @@ class EventQueuePublisher(threading.Thread):
msg = self.queue.get()
if msg == self.STOP:
return
self.loop.call_soon_threadsafe(self.stream_controller.add, msg)
asyncio.run_coroutine_threadsafe(
self.event_controller.add(msg), self.loop
)
def stop(self):
self.queue.put(self.STOP)

View file

@ -14,13 +14,18 @@ from time import time
from binascii import unhexlify
from functools import partial
from lbry.wallet import WalletManager, Wallet, Ledger, Account, Transaction
from lbry.blockchain.ledger import Ledger
from lbry.blockchain.transaction import Transaction, Input, Output
from lbry.blockchain.util import satoshis_to_coins
from lbry.constants import CENT, NULL_HASH32
from lbry.wallet.wallet import Wallet, Account
from lbry.wallet.manager import WalletManager
from lbry.conf import Config
from lbry.wallet.util import satoshis_to_coins
from lbry.wallet.orchstr8 import Conductor
from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode
from lbry.blockchain.lbrycrd import Lbrycrd
from lbry.service.full_node import FullNode
from lbry.service.daemon import Daemon
from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty
from lbry.extras.daemon.daemon import jsonrpc_dumps_pretty
from lbry.extras.daemon.components import Component, WalletComponent
from lbry.extras.daemon.components import (
DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT,
@ -36,6 +41,28 @@ from lbry.stream.reflector.server import ReflectorServer
from lbry.blob_exchange.server import BlobServer
def get_output(amount=CENT, pubkey_hash=NULL_HASH32, height=-2):
return Transaction(height=height) \
.add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \
.outputs[0]
def get_input(amount=CENT, pubkey_hash=NULL_HASH32):
return Input.spend(get_output(amount, pubkey_hash))
def get_transaction(txo=None):
return Transaction() \
.add_inputs([get_input()]) \
.add_outputs([txo or Output.pay_pubkey_hash(CENT, NULL_HASH32)])
def get_claim_transaction(claim_name, claim=b''):
return get_transaction(
Output.pay_claim_name_pubkey_hash(CENT, claim_name, claim, NULL_HASH32)
)
class ColorHandler(logging.StreamHandler):
level_color = {
@ -327,6 +354,32 @@ class CommandTestCase(IntegrationTestCase):
self.reflector = None
async def asyncSetUp(self):
self.chain = Lbrycrd.temp_regtest()
self.ledger = self.chain.ledger
await self.chain.ensure()
self.addCleanup(self.chain.stop)
await self.chain.start('-rpcworkqueue=128')
self.block_expected = 0
await self.generate(200, wait=False)
self.chain.ledger.conf.spv_address_filters = False
self.service = FullNode(
self.ledger, f'sqlite:///{self.chain.data_dir}/full_node.db', Lbrycrd(self.ledger)
)
self.daemon = Daemon(self.service)
self.api = self.daemon.api
self.addCleanup(self.daemon.stop)
await self.daemon.start()
self.wallet = self.service.wallet_manager.default_wallet
self.account = self.wallet.accounts[0]
addresses = await self.account.ensure_address_gap()
await self.chain.send_to_address(addresses[0], '10.0')
await self.generate(5)
async def XasyncSetUp(self):
await super().asyncSetUp()
logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY)
@ -435,10 +488,15 @@ class CommandTestCase(IntegrationTestCase):
addresses.add(txo['address'])
return list(addresses)
async def generate(self, blocks):
def is_expected_block(self, b):
return self.block_expected == b.height
async def generate(self, blocks, wait=True):
""" Ask lbrycrd to generate some blocks and wait until ledger has them. """
await self.blockchain.generate(blocks)
await self.ledger.on_header.where(self.blockchain.is_expected_block)
await self.chain.generate(blocks)
self.block_expected += blocks
if wait:
await self.service.sync.on_block.where(self.is_expected_block)
async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True):
txid = await self.blockchain._cli_cmnd('claimname', name, value, amount)
@ -454,18 +512,18 @@ class CommandTestCase(IntegrationTestCase):
async def out(self, awaitable):
""" Serializes lbrynet API results to JSON then loads and returns it as dictionary. """
return json.loads(jsonrpc_dumps_pretty(await awaitable, ledger=self.ledger))['result']
return json.loads(jsonrpc_dumps_pretty(await awaitable, service=self.service))['result']
def sout(self, value):
""" Synchronous version of `out` method. """
return json.loads(jsonrpc_dumps_pretty(value, ledger=self.ledger))['result']
return json.loads(jsonrpc_dumps_pretty(value, service=self.service))['result']
async def confirm_and_render(self, awaitable, confirm) -> Transaction:
tx = await awaitable
if confirm:
await self.ledger.wait(tx)
await self.service.wait(tx)
await self.generate(1)
await self.ledger.wait(tx, self.blockchain.block_expected)
await self.service.wait(tx)
return self.sout(tx)
def create_upload_file(self, data, prefix=None, suffix=None):
@ -519,7 +577,7 @@ class CommandTestCase(IntegrationTestCase):
async def channel_create(self, name='@arena', bid='1.0', confirm=True, **kwargs):
return await self.confirm_and_render(
self.daemon.jsonrpc_channel_create(name, bid, **kwargs), confirm
self.api.channel_create(name, bid, **kwargs), confirm
)
async def channel_update(self, claim_id, confirm=True, **kwargs):
@ -582,7 +640,7 @@ class CommandTestCase(IntegrationTestCase):
return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri]
async def claim_search(self, **kwargs):
return (await self.out(self.daemon.jsonrpc_claim_search(**kwargs)))['items']
return (await self.out(self.api.claim_search(**kwargs)))['items']
async def file_list(self, *args, **kwargs):
return (await self.out(self.daemon.jsonrpc_file_list(*args, **kwargs)))['items']

View file

@ -282,3 +282,15 @@ async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled
def is_running_from_bundle():
# see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html
return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')
class cachedproperty:
def __init__(self, f):
self.f = f
def __get__(self, obj, objtype):
obj = obj or objtype
value = self.f(obj)
setattr(obj, self.f.__name__, value)
return value