saving changes in case of power outage
This commit is contained in:
parent
5a525316d8
commit
3c63ce8a50
7 changed files with 171 additions and 54 deletions
tests
143
tests/testcase.py
Normal file
143
tests/testcase.py
Normal file
|
@ -0,0 +1,143 @@
|
|||
import asyncio
|
||||
from asyncio.runners import _cancel_all_tasks # type: ignore
|
||||
import unittest
|
||||
from unittest.case import _Outcome
|
||||
import lbry_comment_server.database as db
|
||||
|
||||
from lbry_comment_server import config
|
||||
import schema.db_helpers as schema
|
||||
|
||||
class AsyncioTestCase(unittest.TestCase):
|
||||
# Implementation inspired by discussion:
|
||||
# https://bugs.python.org/issue32972
|
||||
|
||||
maxDiff = None
|
||||
|
||||
async def asyncSetUp(self): # pylint: disable=C0103
|
||||
pass
|
||||
|
||||
async def asyncTearDown(self): # pylint: disable=C0103
|
||||
pass
|
||||
|
||||
def run(self, result=None): # pylint: disable=R0915
|
||||
orig_result = result
|
||||
if result is None:
|
||||
result = self.defaultTestResult()
|
||||
startTestRun = getattr(result, 'startTestRun', None) # pylint: disable=C0103
|
||||
if startTestRun is not None:
|
||||
startTestRun()
|
||||
|
||||
result.startTest(self)
|
||||
|
||||
testMethod = getattr(self, self._testMethodName) # pylint: disable=C0103
|
||||
if (getattr(self.__class__, "__unittest_skip__", False) or
|
||||
getattr(testMethod, "__unittest_skip__", False)):
|
||||
# If the class or method was skipped.
|
||||
try:
|
||||
skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
|
||||
or getattr(testMethod, '__unittest_skip_why__', ''))
|
||||
self._addSkip(result, self, skip_why)
|
||||
finally:
|
||||
result.stopTest(self)
|
||||
return
|
||||
expecting_failure_method = getattr(testMethod,
|
||||
"__unittest_expecting_failure__", False)
|
||||
expecting_failure_class = getattr(self,
|
||||
"__unittest_expecting_failure__", False)
|
||||
expecting_failure = expecting_failure_class or expecting_failure_method
|
||||
outcome = _Outcome(result)
|
||||
|
||||
self.loop = asyncio.new_event_loop() # pylint: disable=W0201
|
||||
asyncio.set_event_loop(self.loop)
|
||||
self.loop.set_debug(True)
|
||||
|
||||
try:
|
||||
self._outcome = outcome
|
||||
|
||||
with outcome.testPartExecutor(self):
|
||||
self.setUp()
|
||||
self.loop.run_until_complete(self.asyncSetUp())
|
||||
if outcome.success:
|
||||
outcome.expecting_failure = expecting_failure
|
||||
with outcome.testPartExecutor(self, isTest=True):
|
||||
maybe_coroutine = testMethod()
|
||||
if asyncio.iscoroutine(maybe_coroutine):
|
||||
self.loop.run_until_complete(maybe_coroutine)
|
||||
outcome.expecting_failure = False
|
||||
with outcome.testPartExecutor(self):
|
||||
self.loop.run_until_complete(self.asyncTearDown())
|
||||
self.tearDown()
|
||||
|
||||
self.doAsyncCleanups()
|
||||
|
||||
try:
|
||||
_cancel_all_tasks(self.loop)
|
||||
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
|
||||
finally:
|
||||
asyncio.set_event_loop(None)
|
||||
self.loop.close()
|
||||
|
||||
for test, reason in outcome.skipped:
|
||||
self._addSkip(result, test, reason)
|
||||
self._feedErrorsToResult(result, outcome.errors)
|
||||
if outcome.success:
|
||||
if expecting_failure:
|
||||
if outcome.expectedFailure:
|
||||
self._addExpectedFailure(result, outcome.expectedFailure)
|
||||
else:
|
||||
self._addUnexpectedSuccess(result)
|
||||
else:
|
||||
result.addSuccess(self)
|
||||
return result
|
||||
finally:
|
||||
result.stopTest(self)
|
||||
if orig_result is None:
|
||||
stopTestRun = getattr(result, 'stopTestRun', None) # pylint: disable=C0103
|
||||
if stopTestRun is not None:
|
||||
stopTestRun() # pylint: disable=E1102
|
||||
|
||||
# explicitly break reference cycles:
|
||||
# outcome.errors -> frame -> outcome -> outcome.errors
|
||||
# outcome.expectedFailure -> frame -> outcome -> outcome.expectedFailure
|
||||
outcome.errors.clear()
|
||||
outcome.expectedFailure = None
|
||||
|
||||
# clear the outcome, no more needed
|
||||
self._outcome = None
|
||||
|
||||
def doAsyncCleanups(self): # pylint: disable=C0103
|
||||
outcome = self._outcome or _Outcome()
|
||||
while self._cleanups:
|
||||
function, args, kwargs = self._cleanups.pop()
|
||||
with outcome.testPartExecutor(self):
|
||||
maybe_coroutine = function(*args, **kwargs)
|
||||
if asyncio.iscoroutine(maybe_coroutine):
|
||||
self.loop.run_until_complete(maybe_coroutine)
|
||||
|
||||
|
||||
|
||||
class DatabaseTestCase(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
schema.setup_database(config['PATH']['TEST'])
|
||||
self.conn = db.obtain_connection(config['PATH']['TEST'])
|
||||
|
||||
def tearDown(self) -> None:
|
||||
curs = self.conn.execute('SELECT * FROM COMMENT')
|
||||
results = {'COMMENT': [dict(r) for r in curs.fetchall()]}
|
||||
curs = self.conn.execute('SELECT * FROM CHANNEL')
|
||||
results['CHANNEL'] = [dict(r) for r in curs.fetchall()]
|
||||
curs = self.conn.execute('SELECT * FROM COMMENTS_ON_CLAIMS')
|
||||
results['COMMENTS_ON_CLAIMS'] = [dict(r) for r in curs.fetchall()]
|
||||
curs = self.conn.execute('SELECT * FROM COMMENT_REPLIES')
|
||||
results['COMMENT_REPLIES'] = [dict(r) for r in curs.fetchall()]
|
||||
# print(json.dumps(results, indent=4))
|
||||
with self.conn:
|
||||
self.conn.executescript("""
|
||||
DROP TABLE IF EXISTS COMMENT;
|
||||
DROP TABLE IF EXISTS CHANNEL;
|
||||
DROP VIEW IF EXISTS COMMENTS_ON_CLAIMS;
|
||||
DROP VIEW IF EXISTS COMMENT_REPLIES;
|
||||
""")
|
||||
self.conn.close()
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue