lbry-sdk/tests/integration/service/test_daemon.py

36 lines
1 KiB
Python
Raw Normal View History

2020-08-20 10:43:44 -04:00
import os
import time
import asyncio
import signal
from threading import Thread
from unittest import TestCase
from lbry import Daemon, FullNode
from lbry.console import Console
from lbry.blockchain.lbrycrd import Lbrycrd
class TestShutdown(TestCase):
def test_graceful_fail(self):
chain_loop = asyncio.new_event_loop()
asyncio.set_event_loop(chain_loop)
chain = Lbrycrd.temp_regtest()
self.addCleanup(lambda: chain_loop.run_until_complete(chain.stop()))
self.addCleanup(lambda: asyncio.set_event_loop(chain_loop))
chain_loop.run_until_complete(chain.ensure())
chain_loop.run_until_complete(chain.start())
chain_loop.run_until_complete(chain.generate(1))
chain.ledger.conf.set(workers=2)
service = FullNode(chain.ledger)
daemon = Daemon(service, Console(service))
def send_signal():
time.sleep(2)
os.kill(os.getpid(), signal.SIGTERM)
thread = Thread(target=send_signal)
thread.start()
daemon.run()