Merge pull request #3048 from lbryio/asyncio_loop_fix

asyncio loop fix
This commit is contained in:
Lex Berezhny 2020-09-16 20:16:13 -04:00 committed by GitHub
commit 20c7fe1e82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 65 additions and 43 deletions

View file

@ -14,7 +14,7 @@ idea:
start: start:
dropdb lbry --if-exists dropdb lbry --if-exists
createdb lbry createdb lbry
lbrynet start --full-node \ lbrynet start node \
--db-url=postgresql:///lbry --workers=0 --console=advanced --no-spv-address-filters \ --db-url=postgresql:///lbry --workers=0 --console=advanced --no-spv-address-filters \
--lbrycrd-rpc-user=lbry --lbrycrd-rpc-pass=somethingelse \ --lbrycrd-rpc-user=lbry --lbrycrd-rpc-pass=somethingelse \
--lbrycrd-dir=${HOME}/.lbrycrd --data-dir=/tmp/tmp-lbrynet --lbrycrd-dir=${HOME}/.lbrycrd --data-dir=/tmp/tmp-lbrynet

View file

@ -1,4 +1,4 @@
from .ledger import Ledger, RegTestLedger, TestNetLedger from .ledger import Ledger, RegTestLedger, TestNetLedger, ledger_class_from_name
from .transaction import Transaction, Output, Input from .transaction import Transaction, Output, Input
from .bcd_data_stream import BCDataStream from .bcd_data_stream import BCDataStream
from .dewies import dewies_to_lbc, lbc_to_dewies, dict_values_to_lbc from .dewies import dewies_to_lbc, lbc_to_dewies, dict_values_to_lbc

View file

@ -88,6 +88,7 @@ class Lbrycrd:
def temp_regtest(cls): def temp_regtest(cls):
return cls(RegTestLedger( return cls(RegTestLedger(
Config.with_same_dir(tempfile.mkdtemp()).set( Config.with_same_dir(tempfile.mkdtemp()).set(
blockchain="regtest",
lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port lbrycrd_rpc_port=9245 + 2, # avoid conflict with default rpc port
lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port lbrycrd_peer_port=9246 + 2, # avoid conflict with default peer port
lbrycrd_zmq_blocks="tcp://127.0.0.1:29002" # avoid conflict with default port lbrycrd_zmq_blocks="tcp://127.0.0.1:29002" # avoid conflict with default port

View file

@ -1,6 +1,6 @@
import typing
from binascii import unhexlify from binascii import unhexlify
from string import hexdigits from string import hexdigits
from typing import TYPE_CHECKING, Type
from lbry.crypto.hash import hash160, double_sha256 from lbry.crypto.hash import hash160, double_sha256
from lbry.crypto.base58 import Base58 from lbry.crypto.base58 import Base58
@ -10,7 +10,7 @@ from .checkpoints import HASHES
from .dewies import lbc_to_dewies from .dewies import lbc_to_dewies
if typing.TYPE_CHECKING: if TYPE_CHECKING:
from lbry.conf import Config from lbry.conf import Config
@ -164,3 +164,11 @@ class RegTestLedger(Ledger):
genesis_bits = 0x207fffff genesis_bits = 0x207fffff
target_timespan = 1 target_timespan = 1
checkpoints = {} checkpoints = {}
def ledger_class_from_name(name) -> Type[Ledger]:
return {
Ledger.network_name: Ledger,
TestNetLedger.network_name: TestNetLedger,
RegTestLedger.network_name: RegTestLedger
}[name]

View file

@ -10,11 +10,8 @@ from docopt import docopt
from lbry import __version__ from lbry import __version__
from lbry.conf import Config, CLIConfig from lbry.conf import Config, CLIConfig
from lbry.service import Daemon, Client from lbry.service import Daemon, Client, FullNode, LightClient
from lbry.service.metadata import interface from lbry.service.metadata import interface
from lbry.service.full_node import FullNode
from lbry.blockchain.ledger import Ledger
from lbry.console import Advanced as AdvancedConsole, Basic as BasicConsole
def split_subparser_argument(parent, original, name, condition): def split_subparser_argument(parent, original, name, condition):
@ -118,8 +115,7 @@ def get_argument_parser():
help='Start LBRY Network interface.' help='Start LBRY Network interface.'
) )
start.add_argument( start.add_argument(
'--full-node', dest='full_node', action="store_true", "service", choices=[LightClient.name, FullNode.name], default=LightClient.name, nargs="?"
help='Start a full node with local blockchain data, requires lbrycrd.'
) )
start.add_argument( start.add_argument(
'--quiet', dest='quiet', action="store_true", '--quiet', dest='quiet', action="store_true",
@ -176,10 +172,11 @@ def ensure_directory_exists(path: str):
async def execute_command(conf, method, params): async def execute_command(conf, method, params):
client = Client(f"http://{conf.api}/ws") client = Client(f"http://{conf.api}/ws")
await client.connect() await client.connect()
resp = await (await client.send(method, **params)).first responses = await client.send(method, **params)
print(resp) result = await responses.first
await client.disconnect() await client.disconnect()
return resp print(result)
return result
def normalize_value(x, key=None): def normalize_value(x, key=None):
@ -250,15 +247,10 @@ def main(argv=None):
elif args.command == 'start': elif args.command == 'start':
if args.help: if args.help:
args.start_parser.print_help() args.start_parser.print_help()
elif args.full_node: elif args.service == FullNode.name:
service = FullNode(Ledger(conf)) return Daemon.from_config(FullNode, conf).run()
if conf.console == "advanced":
console = AdvancedConsole(service)
else:
console = BasicConsole(service)
return Daemon(service, console).run()
else: else:
print('Only `start --full-node` is currently supported.') print(f'Only `start {FullNode.name}` is currently supported.')
elif args.command == 'install': elif args.command == 'install':
if args.help: if args.help:
args.install_parser.print_help() args.install_parser.print_help()

View file

@ -511,7 +511,7 @@ class Config(CLIConfig):
) )
console = StringChoice( console = StringChoice(
"Basic text console output or advanced colored output with progress bars.", "Basic text console output or advanced colored output with progress bars.",
["basic", "advanced"], "advanced" ["basic", "advanced", "none"], "advanced"
) )
# directories # directories
@ -615,14 +615,14 @@ class Config(CLIConfig):
comment_server = String("Comment server API URL", "https://comments.lbry.com/api") comment_server = String("Comment server API URL", "https://comments.lbry.com/api")
# blockchain # blockchain
blockchain = StringChoice("Blockchain network type.", ["mainnet", "regtest", "testnet"], "mainnet")
lbrycrd_rpc_user = String("Username for connecting to lbrycrd.", "rpcuser") lbrycrd_rpc_user = String("Username for connecting to lbrycrd.", "rpcuser")
lbrycrd_rpc_pass = String("Password for connecting to lbrycrd.", "rpcpassword") lbrycrd_rpc_pass = String("Password for connecting to lbrycrd.", "rpcpassword")
lbrycrd_rpc_host = String("Hostname for connecting to lbrycrd.", "localhost") lbrycrd_rpc_host = String("Hostname for connecting to lbrycrd.", "localhost")
lbrycrd_rpc_port = Integer("Port for connecting to lbrycrd.", 9245) lbrycrd_rpc_port = Integer("Port for connecting to lbrycrd.", 9245)
lbrycrd_peer_port = Integer("Port for connecting to lbrycrd.", 9246) lbrycrd_peer_port = Integer("Peer port for lbrycrd.", 9246)
lbrycrd_zmq_blocks = String("ZMQ block events address.") lbrycrd_zmq_blocks = String("ZMQ block events address.")
lbrycrd_dir = Path("Directory containing lbrycrd data.", metavar='DIR') lbrycrd_dir = Path("Directory containing lbrycrd data.", metavar='DIR')
blockchain_name = String("Blockchain name - lbrycrd_main, lbrycrd_regtest, or lbrycrd_testnet", 'lbrycrd_main')
spv_address_filters = Toggle( spv_address_filters = Toggle(
"Generate Golomb-Rice coding filters for blocks and transactions. Enables " "Generate Golomb-Rice coding filters for blocks and transactions. Enables "
"light client to synchronize with a full node.", "light client to synchronize with a full node.",
@ -703,7 +703,7 @@ class Config(CLIConfig):
def db_url_or_default(self): def db_url_or_default(self):
if self.db_url: if self.db_url:
return self.db_url return self.db_url
return 'sqlite:///'+os.path.join(self.data_dir, f'{self.blockchain_name}.db') return 'sqlite:///'+os.path.join(self.data_dir, f'{self.blockchain}.db')
def get_windows_directories() -> Tuple[str, str, str, str]: def get_windows_directories() -> Tuple[str, str, str, str]:

View file

@ -3,7 +3,7 @@ import sys
import time import time
import itertools import itertools
import logging import logging
from typing import Dict, Any from typing import Dict, Any, Type
from tempfile import TemporaryFile from tempfile import TemporaryFile
from tqdm.std import tqdm, Bar from tqdm.std import tqdm, Bar
@ -490,3 +490,7 @@ class Advanced(Basic):
# else: # else:
# self.stderr.flush(self.bars['read'].write) # self.stderr.flush(self.bars['read'].write)
# self.update_progress(e, d) # self.update_progress(e, d)
def console_class_from_name(name) -> Type[Console]:
return {'basic': Basic, 'advanced': Advanced}.get(name, Console)

View file

@ -113,7 +113,7 @@ class Database:
def temp_sqlite_regtest(cls, lbrycrd_dir=None): def temp_sqlite_regtest(cls, lbrycrd_dir=None):
from lbry import Config, RegTestLedger # pylint: disable=import-outside-toplevel from lbry import Config, RegTestLedger # pylint: disable=import-outside-toplevel
directory = tempfile.mkdtemp() directory = tempfile.mkdtemp()
conf = Config.with_same_dir(directory) conf = Config.with_same_dir(directory).set(blockchain="regtest")
if lbrycrd_dir is not None: if lbrycrd_dir is not None:
conf.lbrycrd_dir = lbrycrd_dir conf.lbrycrd_dir = lbrycrd_dir
ledger = RegTestLedger(conf) ledger = RegTestLedger(conf)

View file

@ -1,4 +1,5 @@
from .api import API, Client from .api import API, Client
from .base import Service
from .daemon import Daemon, jsonrpc_dumps_pretty from .daemon import Daemon, jsonrpc_dumps_pretty
from .full_node import FullNode from .full_node import FullNode
from .light_client import LightClient from .light_client import LightClient

View file

@ -2,17 +2,18 @@ import json
import signal import signal
import asyncio import asyncio
import logging import logging
from weakref import WeakSet from weakref import WeakSet
from asyncio.runners import _cancel_all_tasks from asyncio.runners import _cancel_all_tasks
from typing import Type
from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite, Response from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite, Response
from aiohttp.http_websocket import WSMsgType, WSCloseCode from aiohttp.http_websocket import WSMsgType, WSCloseCode
from lbry.conf import Config
from lbry.console import Console, console_class_from_name
from lbry.service import API, Service
from lbry.service.json_encoder import JSONResponseEncoder from lbry.service.json_encoder import JSONResponseEncoder
from lbry.service.base import Service from lbry.blockchain.ledger import ledger_class_from_name
from lbry.service.api import API
from lbry.console import Console
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -73,6 +74,7 @@ class Daemon:
Handles starting and stopping API Handles starting and stopping API
""" """
def __init__(self, service: Service, console: Console): def __init__(self, service: Service, console: Console):
self._loop = asyncio.get_running_loop()
self.service = service self.service = service
self.conf = service.conf self.conf = service.conf
self.console = console self.console = console
@ -91,22 +93,34 @@ class Daemon:
self.app.on_shutdown.append(self.on_shutdown) self.app.on_shutdown.append(self.on_shutdown)
self.runner = AppRunner(self.app) self.runner = AppRunner(self.app)
@classmethod
def from_config(cls, service_class: Type[Service], conf: Config, ) -> 'Daemon':
async def setup():
ledger_class = ledger_class_from_name(conf.blockchain)
ledger = ledger_class(conf)
service = service_class(ledger)
console_class = console_class_from_name(conf.console)
console = console_class(service)
return cls(service, console)
return asyncio.new_event_loop().run_until_complete(setup())
def run(self): def run(self):
loop = asyncio.new_event_loop()
for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT): for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, loop.stop) self._loop.add_signal_handler(sig, self._loop.stop)
try: try:
loop.run_until_complete(self.start()) self._loop.run_until_complete(self.start())
loop.run_forever() self._loop.run_forever()
finally: finally:
try: try:
loop.run_until_complete(self.stop()) self._loop.run_until_complete(self.stop())
finally: finally:
try: try:
_cancel_all_tasks(loop) _cancel_all_tasks(self._loop)
loop.run_until_complete(loop.shutdown_asyncgens()) self._loop.run_until_complete(self._loop.shutdown_asyncgens())
finally: finally:
loop.close() self._loop.close()
async def start(self): async def start(self):
self.console.starting() self.console.starting()

View file

@ -14,6 +14,8 @@ log = logging.getLogger(__name__)
class FullNode(Service): class FullNode(Service):
name = "node"
sync: BlockchainSync sync: BlockchainSync
def __init__(self, ledger: Ledger, chain: Lbrycrd = None): def __init__(self, ledger: Ledger, chain: Lbrycrd = None):

View file

@ -12,6 +12,8 @@ log = logging.getLogger(__name__)
class LightClient(Service): class LightClient(Service):
name = "client"
sync: SPVSync sync: SPVSync
def __init__(self, ledger: Ledger): def __init__(self, ledger: Ledger):

View file

@ -7,7 +7,6 @@ from unittest import TestCase
from lbry import Daemon, FullNode from lbry import Daemon, FullNode
from lbry.cli import execute_command from lbry.cli import execute_command
from lbry.console import Console
from lbry.blockchain.lbrycrd import Lbrycrd from lbry.blockchain.lbrycrd import Lbrycrd
from lbry.testcase import CommandTestCase from lbry.testcase import CommandTestCase
@ -23,9 +22,8 @@ class TestShutdown(TestCase):
chain_loop.run_until_complete(chain.ensure()) chain_loop.run_until_complete(chain.ensure())
chain_loop.run_until_complete(chain.start()) chain_loop.run_until_complete(chain.start())
chain_loop.run_until_complete(chain.generate(1)) chain_loop.run_until_complete(chain.generate(1))
chain.ledger.conf.set(workers=2) chain.ledger.conf.set(workers=2, console='none')
service = FullNode(chain.ledger) daemon = Daemon.from_config(FullNode, chain.ledger.conf)
daemon = Daemon(service, Console(service))
def send_signal(): def send_signal():
time.sleep(2) time.sleep(2)