diff --git a/lbry/conf.py b/lbry/conf.py index fa395d8a9..076b09565 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -10,7 +10,7 @@ import yaml from appdirs import user_data_dir, user_config_dir from lbry.error import InvalidCurrencyError from lbry.dht import constants -from lbry.wallet.coinselection import STRATEGIES +from lbry.wallet.coinselection import COIN_SELECTION_STRATEGIES log = logging.getLogger(__name__) @@ -382,8 +382,12 @@ class BaseConfig: self.environment = {} # from environment variables self.persisted = {} # from config file self._updating_config = False + self.set(**kwargs) + + def set(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) + return self @contextmanager def update_config(self): @@ -603,6 +607,17 @@ class Config(CLIConfig): # blockchain blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main') + spv_address_filters = Toggle( + "Generate Golomb-Rice coding filters for blocks and transactions. Enables " + "light client to synchronize with a full node.", + True + ) + + lbrycrd_dir = Path( + "Directory containing lbrycrd data.", + previous_names=['lbrycrd_dir'], metavar='DIR' + ) + # daemon save_files = Toggle("Save downloaded files when calling `get` by default", True) components_to_skip = Strings("components which will be skipped during start-up of daemon", []) @@ -620,7 +635,7 @@ class Config(CLIConfig): coin_selection_strategy = StringChoice( "Strategy to use when selecting UTXOs for a transaction", - STRATEGIES, "standard") + COIN_SELECTION_STRATEGIES, "standard") save_resolved_claims = Toggle( "Save content claims to the database when they are resolved to keep file_list up to date, " @@ -639,6 +654,15 @@ class Config(CLIConfig): super().__init__(**kwargs) self.set_default_paths() + @classmethod + def with_same_dir(cls, same_dir): + return cls( + data_dir=same_dir, + download_dir=same_dir, + wallet_dir=same_dir, + lbrycrd_dir=same_dir, + ) + def set_default_paths(self): if 'darwin' in sys.platform.lower(): get_directories = get_darwin_directories diff --git a/lbry/constants.py b/lbry/constants.py index 34b8baa88..c1497ac19 100644 --- a/lbry/constants.py +++ b/lbry/constants.py @@ -2,26 +2,3 @@ NULL_HASH32 = b'\x00'*32 CENT = 1000000 COIN = 100*CENT - -TIMEOUT = 30.0 - -TXO_TYPES = { - "other": 0, - "stream": 1, - "channel": 2, - "support": 3, - "purchase": 4, - "collection": 5, - "repost": 6, -} - -CLAIM_TYPE_NAMES = [ - 'stream', - 'channel', - 'collection', - 'repost', -] - -CLAIM_TYPES = [ - TXO_TYPES[name] for name in CLAIM_TYPE_NAMES -] diff --git a/lbry/crypto/bip32.py b/lbry/crypto/bip32.py index 3e6bc3a7f..cd2db26d5 100644 --- a/lbry/crypto/bip32.py +++ b/lbry/crypto/bip32.py @@ -2,7 +2,7 @@ from coincurve import PublicKey, PrivateKey as _PrivateKey from lbry.crypto.hash import hmac_sha512, hash160, double_sha256 from lbry.crypto.base58 import Base58 -from .util import cachedproperty +from lbry.utils import cachedproperty class DerivationError(Exception): diff --git a/lbry/event.py b/lbry/event.py index 2c80bcd00..deed7dea0 100644 --- a/lbry/event.py +++ b/lbry/event.py @@ -1,11 +1,15 @@ import asyncio import threading import multiprocessing +import logging + + +log = logging.getLogger(__name__) class BroadcastSubscription: - def __init__(self, controller, on_data, on_error, on_done): + def __init__(self, controller: 'EventController', on_data, on_error, on_done): self._controller = controller self._previous = self._next = None self._on_data = on_data @@ -45,10 +49,10 @@ class BroadcastSubscription: self.is_closed = True -class StreamController: +class EventController: def __init__(self, merge_repeated_events=False): - self.stream = Stream(self) + self.stream = EventStream(self) self._first_subscription = None self._last_subscription = None self._last_event = None @@ -66,30 +70,25 @@ class StreamController: next_sub = next_sub._next yield subscription - def _notify_and_ensure_future(self, notify): - tasks = [] - for subscription in self._iterate_subscriptions: - maybe_coroutine = notify(subscription) + async def _notify(self, notify, event): + try: + maybe_coroutine = notify(event) if asyncio.iscoroutine(maybe_coroutine): - tasks.append(maybe_coroutine) - if tasks: - return asyncio.ensure_future(asyncio.wait(tasks)) - else: - f = asyncio.get_event_loop().create_future() - f.set_result(None) - return f + await maybe_coroutine + except Exception as e: + log.exception(e) + raise - def add(self, event): - skip = self._merge_repeated and event == self._last_event + async def add(self, event): + if self._merge_repeated and event == self._last_event: + return self._last_event = event - return self._notify_and_ensure_future( - lambda subscription: None if skip else subscription._add(event) - ) + for subscription in self._iterate_subscriptions: + await self._notify(subscription._add, event) - def add_error(self, exception): - return self._notify_and_ensure_future( - lambda subscription: subscription._add_error(exception) - ) + async def add_error(self, exception): + for subscription in self._iterate_subscriptions: + await self._notify(subscription._add_error, exception) def close(self): for subscription in self._iterate_subscriptions: @@ -121,16 +120,16 @@ class StreamController: return subscription -class Stream: +class EventStream: def __init__(self, controller): self._controller = controller - def listen(self, on_data, on_error=None, on_done=None): + def listen(self, on_data, on_error=None, on_done=None) -> BroadcastSubscription: return self._controller._listen(on_data, on_error, on_done) def where(self, condition) -> asyncio.Future: - future = asyncio.get_event_loop().create_future() + future = asyncio.get_running_loop().create_future() def where_test(value): if condition(value): @@ -144,7 +143,7 @@ class Stream: return future @property - def first(self): + def first(self) -> asyncio.Future: future = asyncio.get_event_loop().create_future() subscription = self.listen( lambda value: not future.done() and self._cancel_and_callback(subscription, future, value), @@ -165,12 +164,12 @@ class Stream: class EventQueuePublisher(threading.Thread): - STOP = object() + STOP = 'STOP' - def __init__(self, queue: multiprocessing.Queue, stream_controller: StreamController): + def __init__(self, queue: multiprocessing.Queue, event_controller: EventController): super().__init__() self.queue = queue - self.stream_controller = stream_controller + self.event_controller = event_controller self.loop = asyncio.get_running_loop() def run(self): @@ -178,7 +177,9 @@ class EventQueuePublisher(threading.Thread): msg = self.queue.get() if msg == self.STOP: return - self.loop.call_soon_threadsafe(self.stream_controller.add, msg) + asyncio.run_coroutine_threadsafe( + self.event_controller.add(msg), self.loop + ) def stop(self): self.queue.put(self.STOP) diff --git a/lbry/testcase.py b/lbry/testcase.py index d1d359a0e..9cf394dc9 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -14,13 +14,18 @@ from time import time from binascii import unhexlify from functools import partial -from lbry.wallet import WalletManager, Wallet, Ledger, Account, Transaction +from lbry.blockchain.ledger import Ledger +from lbry.blockchain.transaction import Transaction, Input, Output +from lbry.blockchain.util import satoshis_to_coins +from lbry.constants import CENT, NULL_HASH32 +from lbry.wallet.wallet import Wallet, Account +from lbry.wallet.manager import WalletManager from lbry.conf import Config -from lbry.wallet.util import satoshis_to_coins -from lbry.wallet.orchstr8 import Conductor -from lbry.wallet.orchstr8.node import BlockchainNode, WalletNode +from lbry.blockchain.lbrycrd import Lbrycrd +from lbry.service.full_node import FullNode +from lbry.service.daemon import Daemon -from lbry.extras.daemon.daemon import Daemon, jsonrpc_dumps_pretty +from lbry.extras.daemon.daemon import jsonrpc_dumps_pretty from lbry.extras.daemon.components import Component, WalletComponent from lbry.extras.daemon.components import ( DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, @@ -36,6 +41,28 @@ from lbry.stream.reflector.server import ReflectorServer from lbry.blob_exchange.server import BlobServer +def get_output(amount=CENT, pubkey_hash=NULL_HASH32, height=-2): + return Transaction(height=height) \ + .add_outputs([Output.pay_pubkey_hash(amount, pubkey_hash)]) \ + .outputs[0] + + +def get_input(amount=CENT, pubkey_hash=NULL_HASH32): + return Input.spend(get_output(amount, pubkey_hash)) + + +def get_transaction(txo=None): + return Transaction() \ + .add_inputs([get_input()]) \ + .add_outputs([txo or Output.pay_pubkey_hash(CENT, NULL_HASH32)]) + + +def get_claim_transaction(claim_name, claim=b''): + return get_transaction( + Output.pay_claim_name_pubkey_hash(CENT, claim_name, claim, NULL_HASH32) + ) + + class ColorHandler(logging.StreamHandler): level_color = { @@ -327,6 +354,32 @@ class CommandTestCase(IntegrationTestCase): self.reflector = None async def asyncSetUp(self): + self.chain = Lbrycrd.temp_regtest() + self.ledger = self.chain.ledger + await self.chain.ensure() + self.addCleanup(self.chain.stop) + await self.chain.start('-rpcworkqueue=128') + + self.block_expected = 0 + await self.generate(200, wait=False) + + self.chain.ledger.conf.spv_address_filters = False + self.service = FullNode( + self.ledger, f'sqlite:///{self.chain.data_dir}/full_node.db', Lbrycrd(self.ledger) + ) + self.daemon = Daemon(self.service) + self.api = self.daemon.api + self.addCleanup(self.daemon.stop) + await self.daemon.start() + + self.wallet = self.service.wallet_manager.default_wallet + self.account = self.wallet.accounts[0] + addresses = await self.account.ensure_address_gap() + + await self.chain.send_to_address(addresses[0], '10.0') + await self.generate(5) + + async def XasyncSetUp(self): await super().asyncSetUp() logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY) @@ -435,10 +488,15 @@ class CommandTestCase(IntegrationTestCase): addresses.add(txo['address']) return list(addresses) - async def generate(self, blocks): + def is_expected_block(self, b): + return self.block_expected == b.height + + async def generate(self, blocks, wait=True): """ Ask lbrycrd to generate some blocks and wait until ledger has them. """ - await self.blockchain.generate(blocks) - await self.ledger.on_header.where(self.blockchain.is_expected_block) + await self.chain.generate(blocks) + self.block_expected += blocks + if wait: + await self.service.sync.on_block.where(self.is_expected_block) async def blockchain_claim_name(self, name: str, value: str, amount: str, confirm=True): txid = await self.blockchain._cli_cmnd('claimname', name, value, amount) @@ -454,18 +512,18 @@ class CommandTestCase(IntegrationTestCase): async def out(self, awaitable): """ Serializes lbrynet API results to JSON then loads and returns it as dictionary. """ - return json.loads(jsonrpc_dumps_pretty(await awaitable, ledger=self.ledger))['result'] + return json.loads(jsonrpc_dumps_pretty(await awaitable, service=self.service))['result'] def sout(self, value): """ Synchronous version of `out` method. """ - return json.loads(jsonrpc_dumps_pretty(value, ledger=self.ledger))['result'] + return json.loads(jsonrpc_dumps_pretty(value, service=self.service))['result'] async def confirm_and_render(self, awaitable, confirm) -> Transaction: tx = await awaitable if confirm: - await self.ledger.wait(tx) + await self.service.wait(tx) await self.generate(1) - await self.ledger.wait(tx, self.blockchain.block_expected) + await self.service.wait(tx) return self.sout(tx) def create_upload_file(self, data, prefix=None, suffix=None): @@ -519,7 +577,7 @@ class CommandTestCase(IntegrationTestCase): async def channel_create(self, name='@arena', bid='1.0', confirm=True, **kwargs): return await self.confirm_and_render( - self.daemon.jsonrpc_channel_create(name, bid, **kwargs), confirm + self.api.channel_create(name, bid, **kwargs), confirm ) async def channel_update(self, claim_id, confirm=True, **kwargs): @@ -582,7 +640,7 @@ class CommandTestCase(IntegrationTestCase): return (await self.out(self.daemon.jsonrpc_resolve(uri, **kwargs)))[uri] async def claim_search(self, **kwargs): - return (await self.out(self.daemon.jsonrpc_claim_search(**kwargs)))['items'] + return (await self.out(self.api.claim_search(**kwargs)))['items'] async def file_list(self, *args, **kwargs): return (await self.out(self.daemon.jsonrpc_file_list(*args, **kwargs)))['items'] diff --git a/lbry/utils.py b/lbry/utils.py index c24d8a971..093b63e61 100644 --- a/lbry/utils.py +++ b/lbry/utils.py @@ -282,3 +282,15 @@ async def get_external_ip() -> typing.Optional[str]: # used if upnp is disabled def is_running_from_bundle(): # see https://pyinstaller.readthedocs.io/en/stable/runtime-information.html return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS') + + +class cachedproperty: + + def __init__(self, f): + self.f = f + + def __get__(self, obj, objtype): + obj = obj or objtype + value = self.f(obj) + setattr(obj, self.f.__name__, value) + return value