diff --git a/lbry/blockchain/database.py b/lbry/blockchain/database.py index 3d74fe412..66709b82b 100644 --- a/lbry/blockchain/database.py +++ b/lbry/blockchain/database.py @@ -51,7 +51,7 @@ class BlockchainDB: self.executor: Optional[ThreadPoolExecutor] = None async def run_in_executor(self, *args): - return await asyncio.get_event_loop().run_in_executor(self.executor, *args) + return await asyncio.get_running_loop().run_in_executor(self.executor, *args) def sync_open(self): self.connection = sqlite3.connect( diff --git a/lbry/blockchain/lbrycrd.py b/lbry/blockchain/lbrycrd.py index 92905936b..086087493 100644 --- a/lbry/blockchain/lbrycrd.py +++ b/lbry/blockchain/lbrycrd.py @@ -179,10 +179,11 @@ class Lbrycrd: async def close(self): await self.db.close() - await self.session.close() + if self.session is not None: + await self.session.close() async def start(self, *args): - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() command = self.get_start_command(*args) log.info(' '.join(command)) self.transport, self.protocol = await loop.subprocess_exec(Process, *command) diff --git a/lbry/db/database.py b/lbry/db/database.py index 050d716c3..6ec100216 100644 --- a/lbry/db/database.py +++ b/lbry/db/database.py @@ -141,7 +141,7 @@ class Database: db.execute(text(f"CREATE DATABASE {name}")) async def create(self, name): - return await asyncio.get_event_loop().run_in_executor(None, self.sync_create, name) + return await asyncio.get_running_loop().run_in_executor(None, self.sync_create, name) def sync_drop(self, name): engine = create_engine(self.url) @@ -150,7 +150,7 @@ class Database: db.execute(text(f"DROP DATABASE IF EXISTS {name}")) async def drop(self, name): - return await asyncio.get_event_loop().run_in_executor(None, self.sync_drop, name) + return await asyncio.get_running_loop().run_in_executor(None, self.sync_drop, name) async def open(self): assert self.executor is None, "Database already open." @@ -175,11 +175,17 @@ class Database: await self.run(uninitialize) self.executor.shutdown() self.executor = None + # fixes "OSError: handle is closed" + # seems to only happen when running in PyCharm + # https://github.com/python/cpython/pull/6084#issuecomment-564585446 + # TODO: delete this in Python 3.8/3.9? + from concurrent.futures.process import _threads_wakeups + _threads_wakeups.clear() async def run(self, func, *args, **kwargs): if kwargs: clean_wallet_account_ids(kwargs) - return await asyncio.get_event_loop().run_in_executor( + return await asyncio.get_running_loop().run_in_executor( self.executor, partial(func, *args, **kwargs) ) diff --git a/lbry/event.py b/lbry/event.py index 233925d3c..e0b15c129 100644 --- a/lbry/event.py +++ b/lbry/event.py @@ -149,7 +149,7 @@ class EventStream: @property def first(self) -> asyncio.Future: - future = asyncio.get_event_loop().create_future() + future = asyncio.get_running_loop().create_future() subscription = self.listen( lambda value: not future.done() and self._cancel_and_callback(subscription, future, value), lambda exception: not future.done() and self._cancel_and_error(subscription, future, exception) @@ -158,7 +158,7 @@ class EventStream: @property def last(self) -> asyncio.Future: - future = asyncio.get_event_loop().create_future() + future = asyncio.get_running_loop().create_future() value = None def update_value(_value): @@ -237,7 +237,8 @@ class EventQueuePublisher(threading.Thread): def stop(self): self.queue.put(self.STOP) - self.join() + if self.is_alive(): + self.join() def __enter__(self): self.start() diff --git a/lbry/file_analysis.py b/lbry/file_analysis.py index 624e4b60f..595b77c16 100644 --- a/lbry/file_analysis.py +++ b/lbry/file_analysis.py @@ -54,11 +54,11 @@ class VideoFileAnalyzer: # We work around that issue here by using run_in_executor. Check it again in Python 3.8. async def _execute_ffmpeg(self, arguments): arguments = self._which_ffmpeg + " " + arguments - return await asyncio.get_event_loop().run_in_executor(None, self._execute, arguments, self._env_copy) + return await asyncio.get_running_loop().run_in_executor(None, self._execute, arguments, self._env_copy) async def _execute_ffprobe(self, arguments): arguments = self._which_ffprobe + " " + arguments - return await asyncio.get_event_loop().run_in_executor(None, self._execute, arguments, self._env_copy) + return await asyncio.get_running_loop().run_in_executor(None, self._execute, arguments, self._env_copy) async def _verify_executables(self): try: diff --git a/lbry/service/api.py b/lbry/service/api.py index 014f7d39b..f7401b4e8 100644 --- a/lbry/service/api.py +++ b/lbry/service/api.py @@ -2922,7 +2922,7 @@ class API: """ - blob = await download_blob(asyncio.get_event_loop(), self.conf, self.blob_manager, self.dht_node, blob_hash) + blob = await download_blob(asyncio.get_running_loop(), self.conf, self.blob_manager, self.dht_node, blob_hash) if read: with blob.reader_context() as handle: return handle.read().decode() diff --git a/lbry/service/daemon.py b/lbry/service/daemon.py index f89a35b99..a0a40bef8 100644 --- a/lbry/service/daemon.py +++ b/lbry/service/daemon.py @@ -1,9 +1,11 @@ import json +import signal import asyncio import logging -import signal + from weakref import WeakSet -from aiohttp.web import GracefulExit +from asyncio.runners import _cancel_all_tasks + from aiohttp.web import Application, AppRunner, WebSocketResponse, TCPSite, Response from aiohttp.http_websocket import WSMsgType, WSCloseCode @@ -13,6 +15,9 @@ from lbry.service.api import API from lbry.console import Console +log = logging.getLogger(__name__) + + def jsonrpc_dumps_pretty(obj, **kwargs): #if not isinstance(obj, dict): # data = {"jsonrpc": "2.0", "error": obj.to_dict()} @@ -84,28 +89,21 @@ class Daemon: self.runner = AppRunner(self.app) def run(self): - loop = asyncio.get_event_loop() - - def graceful_exit(): - raise GracefulExit() - - try: - loop.add_signal_handler(signal.SIGINT, graceful_exit) - loop.add_signal_handler(signal.SIGTERM, graceful_exit) - except NotImplementedError: - pass # Not implemented on Windows - + loop = asyncio.new_event_loop() + for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, loop.stop) try: loop.run_until_complete(self.start()) loop.run_forever() - except (GracefulExit, KeyboardInterrupt, asyncio.CancelledError): - pass finally: - loop.run_until_complete(self.stop()) - logging.shutdown() - - if hasattr(loop, 'shutdown_asyncgens'): - loop.run_until_complete(loop.shutdown_asyncgens()) + try: + loop.run_until_complete(self.stop()) + finally: + try: + _cancel_all_tasks(loop) + loop.run_until_complete(loop.shutdown_asyncgens()) + finally: + loop.close() async def start(self): self.console.starting() diff --git a/lbry/tasks.py b/lbry/tasks.py index 2ec60709e..90a7d386f 100644 --- a/lbry/tasks.py +++ b/lbry/tasks.py @@ -1,10 +1,10 @@ -from asyncio import Event, get_event_loop +from asyncio import Event, get_running_loop class TaskGroup: def __init__(self, loop=None): - self._loop = loop or get_event_loop() + self._loop = loop or get_running_loop() self._tasks = set() self.done = Event() self.started = Event() diff --git a/lbry/wallet/account.py b/lbry/wallet/account.py index faa4e7225..f8b996321 100644 --- a/lbry/wallet/account.py +++ b/lbry/wallet/account.py @@ -478,7 +478,7 @@ class Account: continue if "-----BEGIN EC PRIVATE KEY-----" not in private_key_pem: continue - public_key_der = await asyncio.get_event_loop().run_in_executor(None, to_der, private_key_pem) + public_key_der = await asyncio.get_running_loop().run_in_executor(None, to_der, private_key_pem) channel_keys[self.ledger.public_key_to_address(public_key_der)] = private_key_pem if self.channel_keys != channel_keys: self.channel_keys = channel_keys diff --git a/tests/integration/service/__init__.py b/tests/integration/service/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/service/test_daemon.py b/tests/integration/service/test_daemon.py new file mode 100644 index 000000000..480b6f84d --- /dev/null +++ b/tests/integration/service/test_daemon.py @@ -0,0 +1,35 @@ +import os +import time +import asyncio +import signal +from threading import Thread +from unittest import TestCase + +from lbry import Daemon, FullNode +from lbry.console import Console +from lbry.blockchain.lbrycrd import Lbrycrd + + +class TestShutdown(TestCase): + + def test_graceful_fail(self): + chain_loop = asyncio.new_event_loop() + asyncio.set_event_loop(chain_loop) + chain = Lbrycrd.temp_regtest() + self.addCleanup(lambda: chain_loop.run_until_complete(chain.stop())) + self.addCleanup(lambda: asyncio.set_event_loop(chain_loop)) + chain_loop.run_until_complete(chain.ensure()) + chain_loop.run_until_complete(chain.start()) + chain_loop.run_until_complete(chain.generate(1)) + chain.ledger.conf.set(workers=2) + service = FullNode(chain.ledger) + daemon = Daemon(service, Console(service)) + + def send_signal(): + time.sleep(2) + os.kill(os.getpid(), signal.SIGTERM) + + thread = Thread(target=send_signal) + thread.start() + + daemon.run()