diff --git a/test/test_database.py b/test/test_database.py index c43bb0a..bf25bba 100644 --- a/test/test_database.py +++ b/test/test_database.py @@ -1,18 +1,13 @@ -import sqlite3 - from random import randint import faker from faker.providers import internet from faker.providers import lorem from faker.providers import misc -from src.database.queries import get_comments_by_id -from src.database.queries import get_comment_ids -from src.database.queries import get_claim_comments -from src.database.queries import get_claim_hidden_comments -from src.database.writes import create_comment_or_error -from src.database.queries import hide_comments_by_id -from src.database.queries import delete_comment_by_id +from src.database.models import create_comment +from src.database.models import delete_comment +from src.database.models import comment_list, get_comment, get_comments_by_id +from src.database.models import set_hidden_flag from test.testcase import DatabaseTestCase fake = faker.Faker() @@ -27,26 +22,25 @@ class TestDatabaseOperations(DatabaseTestCase): self.claimId = '529357c3422c6046d3fec76be2358004ba22e340' def test01NamedComments(self): - comment = create_comment_or_error( - conn=self.conn, + comment = create_comment( claim_id=self.claimId, comment='This is a named comment', channel_name='@username', channel_id='529357c3422c6046d3fec76be2358004ba22abcd', - signature=fake.uuid4(), + signature='22'*64, signing_ts='aaa' ) self.assertIsNotNone(comment) self.assertNotIn('parent_in', comment) + previous_id = comment['comment_id'] - reply = create_comment_or_error( - conn=self.conn, + reply = create_comment( claim_id=self.claimId, comment='This is a named response', channel_name='@another_username', channel_id='529357c3422c6046d3fec76be2358004ba224bcd', parent_id=previous_id, - signature=fake.uuid4(), + signature='11'*64, signing_ts='aaa' ) self.assertIsNotNone(reply) @@ -54,34 +48,32 @@ class TestDatabaseOperations(DatabaseTestCase): def test02AnonymousComments(self): self.assertRaises( - sqlite3.IntegrityError, - create_comment_or_error, - conn=self.conn, + ValueError, + create_comment, claim_id=self.claimId, comment='This is an ANONYMOUS comment' ) def test03SignedComments(self): - comment = create_comment_or_error( - conn=self.conn, + comment = create_comment( claim_id=self.claimId, comment='I like big butts and i cannot lie', channel_name='@sirmixalot', channel_id='529357c3422c6046d3fec76be2358005ba22abcd', - signature=fake.uuid4(), + signature='24'*64, signing_ts='asdasd' ) self.assertIsNotNone(comment) self.assertIn('signing_ts', comment) + previous_id = comment['comment_id'] - reply = create_comment_or_error( - conn=self.conn, + reply = create_comment( claim_id=self.claimId, comment='This is a LBRY verified response', channel_name='@LBRY', channel_id='529357c3422c6046d3fec76be2358001ba224bcd', parent_id=previous_id, - signature=fake.uuid4(), + signature='12'*64, signing_ts='sfdfdfds' ) self.assertIsNotNone(reply) @@ -90,75 +82,109 @@ class TestDatabaseOperations(DatabaseTestCase): def test04UsernameVariations(self): self.assertRaises( - AssertionError, - callable=create_comment_or_error, - conn=self.conn, + ValueError, + create_comment, claim_id=self.claimId, channel_name='$#(@#$@#$', channel_id='529357c3422c6046d3fec76be2358001ba224b23', - comment='this is an invalid username' + comment='this is an invalid username', + signature='1' * 128, + signing_ts='123' ) - valid_username = create_comment_or_error( - conn=self.conn, + + valid_username = create_comment( claim_id=self.claimId, channel_name='@' + 'a' * 255, channel_id='529357c3422c6046d3fec76be2358001ba224b23', - comment='this is a valid username' + comment='this is a valid username', + signature='1'*128, + signing_ts='123' ) self.assertIsNotNone(valid_username) - self.assertRaises(AssertionError, - callable=create_comment_or_error, - conn=self.conn, - claim_id=self.claimId, - channel_name='@' + 'a' * 256, - channel_id='529357c3422c6046d3fec76be2358001ba224b23', - comment='this username is too long' - ) self.assertRaises( - AssertionError, - callable=create_comment_or_error, - conn=self.conn, + ValueError, + create_comment, + claim_id=self.claimId, + channel_name='@' + 'a' * 256, + channel_id='529357c3422c6046d3fec76be2358001ba224b23', + comment='this username is too long', + signature='2' * 128, + signing_ts='123' + ) + + self.assertRaises( + ValueError, + create_comment, claim_id=self.claimId, channel_name='', channel_id='529357c3422c6046d3fec76be2358001ba224b23', - comment='this username should not default to ANONYMOUS' + comment='this username should not default to ANONYMOUS', + signature='3' * 128, + signing_ts='123' ) + self.assertRaises( - AssertionError, - callable=create_comment_or_error, - conn=self.conn, + ValueError, + create_comment, claim_id=self.claimId, channel_name='@', channel_id='529357c3422c6046d3fec76be2358001ba224b23', - comment='this username is too short' + comment='this username is too short', + signature='3' * 128, + signing_ts='123' ) def test05HideComments(self): - comm = create_comment_or_error(self.conn, 'Comment #1', self.claimId, '1'*40, '@Doge123', 'a'*128, '123') - comment = get_comments_by_id(self.conn, [comm['comment_id']]).pop() + comm = create_comment( + comment='Comment #1', + claim_id=self.claimId, + channel_id='1'*40, + channel_name='@Doge123', + signature='a'*128, + signing_ts='123' + ) + comment = get_comment(comm['comment_id']) self.assertFalse(comment['is_hidden']) - success = hide_comments_by_id(self.conn, [comm['comment_id']]) + + success = set_hidden_flag([comm['comment_id']]) self.assertTrue(success) - comment = get_comments_by_id(self.conn, [comm['comment_id']]).pop() + + comment = get_comment(comm['comment_id']) self.assertTrue(comment['is_hidden']) - success = hide_comments_by_id(self.conn, [comm['comment_id']]) + + success = set_hidden_flag([comm['comment_id']]) self.assertTrue(success) - comment = get_comments_by_id(self.conn, [comm['comment_id']]).pop() + + comment = get_comment(comm['comment_id']) self.assertTrue(comment['is_hidden']) def test06DeleteComments(self): - comm = create_comment_or_error(self.conn, 'Comment #1', self.claimId, '1'*40, '@Doge123', 'a'*128, '123') - comments = get_claim_comments(self.conn, self.claimId) - match = list(filter(lambda x: comm['comment_id'] == x['comment_id'], comments['items'])) - self.assertTrue(match) - deleted = delete_comment_by_id(self.conn, comm['comment_id']) + # make sure that the comment was created + comm = create_comment( + comment='Comment #1', + claim_id=self.claimId, + channel_id='1'*40, + channel_name='@Doge123', + signature='a'*128, + signing_ts='123' + ) + comments = comment_list(self.claimId) + match = [x for x in comments['items'] if x['comment_id'] == comm['comment_id']] + self.assertTrue(len(match) > 0) + + deleted = delete_comment(comm['comment_id']) self.assertTrue(deleted) - comments = get_claim_comments(self.conn, self.claimId) - match = list(filter(lambda x: comm['comment_id'] == x['comment_id'], comments['items'])) + + # make sure that we can't find the comment here + comments = comment_list(self.claimId) + match = [x for x in comments['items'] if x['comment_id'] == comm['comment_id']] self.assertFalse(match) - deleted = delete_comment_by_id(self.conn, comm['comment_id']) - self.assertFalse(deleted) + self.assertRaises( + ValueError, + delete_comment, + comment_id=comm['comment_id'], + ) class ListDatabaseTest(DatabaseTestCase): @@ -169,61 +195,75 @@ class ListDatabaseTest(DatabaseTestCase): def testLists(self): for claim_id in self.claim_ids: with self.subTest(claim_id=claim_id): - comments = get_claim_comments(self.conn, claim_id) + comments = comment_list(claim_id) self.assertIsNotNone(comments) self.assertGreater(comments['page_size'], 0) self.assertIn('has_hidden_comments', comments) self.assertFalse(comments['has_hidden_comments']) - top_comments = get_claim_comments(self.conn, claim_id, top_level=True, page=1, page_size=50) + top_comments = comment_list(claim_id, top_level=True, page=1, page_size=50) self.assertIsNotNone(top_comments) self.assertEqual(top_comments['page_size'], 50) self.assertEqual(top_comments['page'], 1) self.assertGreaterEqual(top_comments['total_pages'], 0) self.assertGreaterEqual(top_comments['total_items'], 0) - comment_ids = get_comment_ids(self.conn, claim_id, page_size=50, page=1) + comment_ids = comment_list(claim_id, page_size=50, page=1) with self.subTest(comment_ids=comment_ids): self.assertIsNotNone(comment_ids) self.assertLessEqual(len(comment_ids), 50) - matching_comments = get_comments_by_id(self.conn, comment_ids) + matching_comments = (comment_ids) self.assertIsNotNone(matching_comments) self.assertEqual(len(matching_comments), len(comment_ids)) def testHiddenCommentLists(self): claim_id = 'a'*40 - comm1 = create_comment_or_error(self.conn, 'Comment #1', claim_id, '1'*40, '@Doge123', 'a'*128, '123') - comm2 = create_comment_or_error(self.conn, 'Comment #2', claim_id, '1'*40, '@Doge123', 'b'*128, '123') - comm3 = create_comment_or_error(self.conn, 'Comment #3', claim_id, '1'*40, '@Doge123', 'c'*128, '123') + comm1 = create_comment( + 'Comment #1', + claim_id, + channel_id='1'*40, + channel_name='@Doge123', + signature='a'*128, + signing_ts='123' + ) + comm2 = create_comment( + 'Comment #2', claim_id, + channel_id='1'*40, + channel_name='@Doge123', + signature='b'*128, + signing_ts='123' + ) + comm3 = create_comment( + 'Comment #3', claim_id, + channel_id='1'*40, + channel_name='@Doge123', + signature='c'*128, + signing_ts='123' + ) comments = [comm1, comm2, comm3] - comment_list = get_claim_comments(self.conn, claim_id) - self.assertIn('items', comment_list) - self.assertIn('has_hidden_comments', comment_list) - self.assertEqual(len(comments), comment_list['total_items']) - self.assertIn('has_hidden_comments', comment_list) - self.assertFalse(comment_list['has_hidden_comments']) - hide_comments_by_id(self.conn, [comm2['comment_id']]) + listed_comments = comment_list(claim_id) + self.assertEqual(len(comments), listed_comments['total_items']) + self.assertFalse(listed_comments['has_hidden_comments']) - default_comments = get_claim_hidden_comments(self.conn, claim_id) - self.assertIn('has_hidden_comments', default_comments) + set_hidden_flag([comm2['comment_id']]) + hidden = comment_list(claim_id, exclude_mode='hidden') - hidden_comments = get_claim_hidden_comments(self.conn, claim_id, hidden=True) - self.assertIn('has_hidden_comments', hidden_comments) - self.assertEqual(default_comments, hidden_comments) + self.assertTrue(hidden['has_hidden_comments']) + self.assertGreater(len(hidden['items']), 0) - hidden_comment = hidden_comments['items'][0] + visible = comment_list(claim_id, exclude_mode='visible') + self.assertFalse(visible['has_hidden_comments']) + self.assertNotEqual(listed_comments['items'], visible['items']) + + # make sure the hidden comment is the one we marked as hidden + hidden_comment = hidden['items'][0] self.assertEqual(hidden_comment['comment_id'], comm2['comment_id']) - visible_comments = get_claim_hidden_comments(self.conn, claim_id, hidden=False) - self.assertIn('has_hidden_comments', visible_comments) - self.assertNotIn(hidden_comment, visible_comments['items']) - - hidden_ids = [c['comment_id'] for c in hidden_comments['items']] - visible_ids = [c['comment_id'] for c in visible_comments['items']] + hidden_ids = [c['comment_id'] for c in hidden['items']] + visible_ids = [c['comment_id'] for c in visible['items']] composite_ids = hidden_ids + visible_ids + listed_comments = comment_list(claim_id) + all_ids = [c['comment_id'] for c in listed_comments['items']] composite_ids.sort() - - comment_list = get_claim_comments(self.conn, claim_id) - all_ids = [c['comment_id'] for c in comment_list['items']] all_ids.sort() self.assertEqual(composite_ids, all_ids) diff --git a/test/testcase.py b/test/testcase.py index ddd8b44..12e1197 100644 --- a/test/testcase.py +++ b/test/testcase.py @@ -1,12 +1,39 @@ import os import pathlib import unittest -from asyncio.runners import _cancel_all_tasks # type: ignore from unittest.case import _Outcome import asyncio +from asyncio.runners import _cancel_all_tasks # type: ignore +from peewee import * + +from src.database.models import Channel, Comment + + +test_db = SqliteDatabase(':memory:') + + +MODELS = [Channel, Comment] + + +class DatabaseTestCase(unittest.TestCase): + def __init__(self, methodName='DatabaseTest'): + super().__init__(methodName) + + def setUp(self) -> None: + super().setUp() + test_db.bind(MODELS, bind_refs=False, bind_backrefs=False) + + test_db.connect() + test_db.create_tables(MODELS) + + def tearDown(self) -> None: + # drop tables for next test + test_db.drop_tables(MODELS) + + # close connection + test_db.close() -from src.database.queries import obtain_connection, setup_database class AsyncioTestCase(unittest.TestCase): @@ -117,21 +144,3 @@ class AsyncioTestCase(unittest.TestCase): self.loop.run_until_complete(maybe_coroutine) -class DatabaseTestCase(unittest.TestCase): - db_file = 'test.db' - - def __init__(self, methodName='DatabaseTest'): - super().__init__(methodName) - if pathlib.Path(self.db_file).exists(): - os.remove(self.db_file) - - def setUp(self) -> None: - super().setUp() - setup_database(self.db_file) - self.conn = obtain_connection(self.db_file) - self.addCleanup(self.conn.close) - self.addCleanup(os.remove, self.db_file) - - def tearDown(self) -> None: - self.conn.close() -