aioupnp/tests/__init__.py

114 lines
4.1 KiB
Python
Raw Normal View History

2018-10-25 21:28:00 +02:00
import asyncio
import unittest
from unittest.case import _Outcome
try:
from asyncio.runners import _cancel_all_tasks
except ImportError:
# this is only available in py3.7
def _cancel_all_tasks(loop):
pass
class TestBase(unittest.TestCase):
# Implementation inspired by discussion:
# https://bugs.python.org/issue32972
async def asyncSetUp(self):
pass
async def asyncTearDown(self):
pass
async def doAsyncCleanups(self):
pass
def run(self, result=None):
orig_result = result
if result is None:
result = self.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
startTestRun()
result.startTest(self)
testMethod = getattr(self, self._testMethodName)
if (getattr(self.__class__, "__unittest_skip__", False) or
getattr(testMethod, "__unittest_skip__", False)):
# If the class or method was skipped.
try:
skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
or getattr(testMethod, '__unittest_skip_why__', ''))
self._addSkip(result, self, skip_why)
finally:
result.stopTest(self)
return
expecting_failure_method = getattr(testMethod,
"__unittest_expecting_failure__", False)
expecting_failure_class = getattr(self,
"__unittest_expecting_failure__", False)
expecting_failure = expecting_failure_class or expecting_failure_method
outcome = _Outcome(result)
try:
self._outcome = outcome
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
loop.set_debug(True)
with outcome.testPartExecutor(self):
self.setUp()
loop.run_until_complete(self.asyncSetUp())
if outcome.success:
outcome.expecting_failure = expecting_failure
with outcome.testPartExecutor(self, isTest=True):
possible_coroutine = testMethod()
if asyncio.iscoroutine(possible_coroutine):
loop.run_until_complete(possible_coroutine)
outcome.expecting_failure = False
with outcome.testPartExecutor(self):
loop.run_until_complete(self.asyncTearDown())
self.tearDown()
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()
self.doCleanups()
for test, reason in outcome.skipped:
self._addSkip(result, test, reason)
self._feedErrorsToResult(result, outcome.errors)
if outcome.success:
if expecting_failure:
if outcome.expectedFailure:
self._addExpectedFailure(result, outcome.expectedFailure)
else:
self._addUnexpectedSuccess(result)
else:
result.addSuccess(self)
return result
finally:
result.stopTest(self)
if orig_result is None:
stopTestRun = getattr(result, 'stopTestRun', None)
if stopTestRun is not None:
stopTestRun()
# explicitly break reference cycles:
# outcome.errors -> frame -> outcome -> outcome.errors
# outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure
outcome.errors.clear()
outcome.expectedFailure = None
# clear the outcome, no more needed
self._outcome = None
def setUp(self):
self.loop = asyncio.get_event_loop_policy().get_event_loop()