import asyncio import unittest from unittest.case import _Outcome try: from asyncio.runners import _cancel_all_tasks except ImportError: # this is only available in py3.7 def _cancel_all_tasks(loop): pass class TestBase(unittest.TestCase): # Implementation inspired by discussion: # https://bugs.python.org/issue32972 async def asyncSetUp(self): pass async def asyncTearDown(self): pass async def doAsyncCleanups(self): pass def run(self, result=None): orig_result = result if result is None: result = self.defaultTestResult() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun() result.startTest(self) testMethod = getattr(self, self._testMethodName) if (getattr(self.__class__, "__unittest_skip__", False) or getattr(testMethod, "__unittest_skip__", False)): # If the class or method was skipped. try: skip_why = (getattr(self.__class__, '__unittest_skip_why__', '') or getattr(testMethod, '__unittest_skip_why__', '')) self._addSkip(result, self, skip_why) finally: result.stopTest(self) return expecting_failure_method = getattr(testMethod, "__unittest_expecting_failure__", False) expecting_failure_class = getattr(self, "__unittest_expecting_failure__", False) expecting_failure = expecting_failure_class or expecting_failure_method outcome = _Outcome(result) try: self._outcome = outcome loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) loop.set_debug(True) with outcome.testPartExecutor(self): self.setUp() loop.run_until_complete(self.asyncSetUp()) if outcome.success: outcome.expecting_failure = expecting_failure with outcome.testPartExecutor(self, isTest=True): possible_coroutine = testMethod() if asyncio.iscoroutine(possible_coroutine): loop.run_until_complete(possible_coroutine) outcome.expecting_failure = False with outcome.testPartExecutor(self): loop.run_until_complete(self.asyncTearDown()) self.tearDown() finally: try: _cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) finally: asyncio.set_event_loop(None) loop.close() self.doCleanups() for test, reason in outcome.skipped: self._addSkip(result, test, reason) self._feedErrorsToResult(result, outcome.errors) if outcome.success: if expecting_failure: if outcome.expectedFailure: self._addExpectedFailure(result, outcome.expectedFailure) else: self._addUnexpectedSuccess(result) else: result.addSuccess(self) return result finally: result.stopTest(self) if orig_result is None: stopTestRun = getattr(result, 'stopTestRun', None) if stopTestRun is not None: stopTestRun() # explicitly break reference cycles: # outcome.errors -> frame -> outcome -> outcome.errors # outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure outcome.errors.clear() outcome.expectedFailure = None # clear the outcome, no more needed self._outcome = None def setUp(self): self.loop = asyncio.get_event_loop_policy().get_event_loop()