import asyncio from asyncio.runners import _cancel_all_tasks # type: ignore import unittest from unittest.case import _Outcome import lbry_comment_server.database as db from lbry_comment_server import config import schema.db_helpers as schema class AsyncioTestCase(unittest.TestCase): # Implementation inspired by discussion: # https://bugs.python.org/issue32972 maxDiff = None async def asyncSetUp(self): # pylint: disable=C0103 pass async def asyncTearDown(self): # pylint: disable=C0103 pass def run(self, result=None): # pylint: disable=R0915 orig_result = result if result is None: result = self.defaultTestResult() startTestRun = getattr(result, 'startTestRun', None) # pylint: disable=C0103 if startTestRun is not None: startTestRun() result.startTest(self) testMethod = getattr(self, self._testMethodName) # pylint: disable=C0103 if (getattr(self.__class__, "__unittest_skip__", False) or getattr(testMethod, "__unittest_skip__", False)): # If the class or method was skipped. try: skip_why = (getattr(self.__class__, '__unittest_skip_why__', '') or getattr(testMethod, '__unittest_skip_why__', '')) self._addSkip(result, self, skip_why) finally: result.stopTest(self) return expecting_failure_method = getattr(testMethod, "__unittest_expecting_failure__", False) expecting_failure_class = getattr(self, "__unittest_expecting_failure__", False) expecting_failure = expecting_failure_class or expecting_failure_method outcome = _Outcome(result) self.loop = asyncio.new_event_loop() # pylint: disable=W0201 asyncio.set_event_loop(self.loop) self.loop.set_debug(True) try: self._outcome = outcome with outcome.testPartExecutor(self): self.setUp() self.loop.run_until_complete(self.asyncSetUp()) if outcome.success: outcome.expecting_failure = expecting_failure with outcome.testPartExecutor(self, isTest=True): maybe_coroutine = testMethod() if asyncio.iscoroutine(maybe_coroutine): self.loop.run_until_complete(maybe_coroutine) outcome.expecting_failure = False with outcome.testPartExecutor(self): self.loop.run_until_complete(self.asyncTearDown()) self.tearDown() self.doAsyncCleanups() try: _cancel_all_tasks(self.loop) self.loop.run_until_complete(self.loop.shutdown_asyncgens()) finally: asyncio.set_event_loop(None) self.loop.close() for test, reason in outcome.skipped: self._addSkip(result, test, reason) self._feedErrorsToResult(result, outcome.errors) if outcome.success: if expecting_failure: if outcome.expectedFailure: self._addExpectedFailure(result, outcome.expectedFailure) else: self._addUnexpectedSuccess(result) else: result.addSuccess(self) return result finally: result.stopTest(self) if orig_result is None: stopTestRun = getattr(result, 'stopTestRun', None) # pylint: disable=C0103 if stopTestRun is not None: stopTestRun() # pylint: disable=E1102 # explicitly break reference cycles: # outcome.errors -> frame -> outcome -> outcome.errors # outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure outcome.errors.clear() outcome.expectedFailure = None # clear the outcome, no more needed self._outcome = None def doAsyncCleanups(self): # pylint: disable=C0103 outcome = self._outcome or _Outcome() while self._cleanups: function, args, kwargs = self._cleanups.pop() with outcome.testPartExecutor(self): maybe_coroutine = function(*args, **kwargs) if asyncio.iscoroutine(maybe_coroutine): self.loop.run_until_complete(maybe_coroutine) class DatabaseTestCase(unittest.TestCase): def setUp(self) -> None: super().setUp() schema.setup_database(config['PATH']['TEST']) self.conn = db.obtain_connection(config['PATH']['TEST']) def tearDown(self) -> None: self.conn.close() schema.teardown_database(config['PATH']['TEST'])