Replace database methods with peewee ORM #39
2 changed files with 160 additions and 111 deletions
|
@ -1,18 +1,13 @@
|
|||
import sqlite3
|
||||
|
||||
from random import randint
|
||||
import faker
|
||||
from faker.providers import internet
|
||||
from faker.providers import lorem
|
||||
from faker.providers import misc
|
||||
|
||||
from src.database.queries import get_comments_by_id
|
||||
from src.database.queries import get_comment_ids
|
||||
from src.database.queries import get_claim_comments
|
||||
from src.database.queries import get_claim_hidden_comments
|
||||
from src.database.writes import create_comment_or_error
|
||||
from src.database.queries import hide_comments_by_id
|
||||
from src.database.queries import delete_comment_by_id
|
||||
from src.database.models import create_comment
|
||||
from src.database.models import delete_comment
|
||||
from src.database.models import comment_list, get_comment, get_comments_by_id
|
||||
from src.database.models import set_hidden_flag
|
||||
from test.testcase import DatabaseTestCase
|
||||
|
||||
fake = faker.Faker()
|
||||
|
@ -27,26 +22,25 @@ class TestDatabaseOperations(DatabaseTestCase):
|
|||
self.claimId = '529357c3422c6046d3fec76be2358004ba22e340'
|
||||
|
||||
def test01NamedComments(self):
|
||||
comment = create_comment_or_error(
|
||||
conn=self.conn,
|
||||
comment = create_comment(
|
||||
claim_id=self.claimId,
|
||||
comment='This is a named comment',
|
||||
channel_name='@username',
|
||||
channel_id='529357c3422c6046d3fec76be2358004ba22abcd',
|
||||
signature=fake.uuid4(),
|
||||
signature='22'*64,
|
||||
signing_ts='aaa'
|
||||
)
|
||||
self.assertIsNotNone(comment)
|
||||
self.assertNotIn('parent_in', comment)
|
||||
|
||||
previous_id = comment['comment_id']
|
||||
reply = create_comment_or_error(
|
||||
conn=self.conn,
|
||||
reply = create_comment(
|
||||
claim_id=self.claimId,
|
||||
comment='This is a named response',
|
||||
channel_name='@another_username',
|
||||
channel_id='529357c3422c6046d3fec76be2358004ba224bcd',
|
||||
parent_id=previous_id,
|
||||
signature=fake.uuid4(),
|
||||
signature='11'*64,
|
||||
signing_ts='aaa'
|
||||
)
|
||||
self.assertIsNotNone(reply)
|
||||
|
@ -54,34 +48,32 @@ class TestDatabaseOperations(DatabaseTestCase):
|
|||
|
||||
def test02AnonymousComments(self):
|
||||
self.assertRaises(
|
||||
sqlite3.IntegrityError,
|
||||
create_comment_or_error,
|
||||
conn=self.conn,
|
||||
ValueError,
|
||||
create_comment,
|
||||
claim_id=self.claimId,
|
||||
comment='This is an ANONYMOUS comment'
|
||||
)
|
||||
|
||||
def test03SignedComments(self):
|
||||
comment = create_comment_or_error(
|
||||
conn=self.conn,
|
||||
comment = create_comment(
|
||||
claim_id=self.claimId,
|
||||
comment='I like big butts and i cannot lie',
|
||||
channel_name='@sirmixalot',
|
||||
channel_id='529357c3422c6046d3fec76be2358005ba22abcd',
|
||||
signature=fake.uuid4(),
|
||||
signature='24'*64,
|
||||
signing_ts='asdasd'
|
||||
)
|
||||
self.assertIsNotNone(comment)
|
||||
self.assertIn('signing_ts', comment)
|
||||
|
||||
previous_id = comment['comment_id']
|
||||
reply = create_comment_or_error(
|
||||
conn=self.conn,
|
||||
reply = create_comment(
|
||||
claim_id=self.claimId,
|
||||
comment='This is a LBRY verified response',
|
||||
channel_name='@LBRY',
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224bcd',
|
||||
parent_id=previous_id,
|
||||
signature=fake.uuid4(),
|
||||
signature='12'*64,
|
||||
signing_ts='sfdfdfds'
|
||||
)
|
||||
self.assertIsNotNone(reply)
|
||||
|
@ -90,75 +82,109 @@ class TestDatabaseOperations(DatabaseTestCase):
|
|||
|
||||
def test04UsernameVariations(self):
|
||||
self.assertRaises(
|
||||
AssertionError,
|
||||
callable=create_comment_or_error,
|
||||
conn=self.conn,
|
||||
ValueError,
|
||||
create_comment,
|
||||
claim_id=self.claimId,
|
||||
channel_name='$#(@#$@#$',
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224b23',
|
||||
comment='this is an invalid username'
|
||||
comment='this is an invalid username',
|
||||
signature='1' * 128,
|
||||
signing_ts='123'
|
||||
)
|
||||
valid_username = create_comment_or_error(
|
||||
conn=self.conn,
|
||||
|
||||
valid_username = create_comment(
|
||||
claim_id=self.claimId,
|
||||
channel_name='@' + 'a' * 255,
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224b23',
|
||||
comment='this is a valid username'
|
||||
comment='this is a valid username',
|
||||
signature='1'*128,
|
||||
signing_ts='123'
|
||||
)
|
||||
self.assertIsNotNone(valid_username)
|
||||
self.assertRaises(AssertionError,
|
||||
callable=create_comment_or_error,
|
||||
conn=self.conn,
|
||||
claim_id=self.claimId,
|
||||
channel_name='@' + 'a' * 256,
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224b23',
|
||||
comment='this username is too long'
|
||||
)
|
||||
|
||||
self.assertRaises(
|
||||
AssertionError,
|
||||
callable=create_comment_or_error,
|
||||
conn=self.conn,
|
||||
ValueError,
|
||||
create_comment,
|
||||
claim_id=self.claimId,
|
||||
channel_name='@' + 'a' * 256,
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224b23',
|
||||
comment='this username is too long',
|
||||
signature='2' * 128,
|
||||
signing_ts='123'
|
||||
)
|
||||
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
create_comment,
|
||||
claim_id=self.claimId,
|
||||
channel_name='',
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224b23',
|
||||
comment='this username should not default to ANONYMOUS'
|
||||
comment='this username should not default to ANONYMOUS',
|
||||
signature='3' * 128,
|
||||
signing_ts='123'
|
||||
)
|
||||
|
||||
self.assertRaises(
|
||||
AssertionError,
|
||||
callable=create_comment_or_error,
|
||||
conn=self.conn,
|
||||
ValueError,
|
||||
create_comment,
|
||||
claim_id=self.claimId,
|
||||
channel_name='@',
|
||||
channel_id='529357c3422c6046d3fec76be2358001ba224b23',
|
||||
comment='this username is too short'
|
||||
comment='this username is too short',
|
||||
signature='3' * 128,
|
||||
signing_ts='123'
|
||||
)
|
||||
|
||||
def test05HideComments(self):
|
||||
comm = create_comment_or_error(self.conn, 'Comment #1', self.claimId, '1'*40, '@Doge123', 'a'*128, '123')
|
||||
comment = get_comments_by_id(self.conn, [comm['comment_id']]).pop()
|
||||
comm = create_comment(
|
||||
comment='Comment #1',
|
||||
claim_id=self.claimId,
|
||||
channel_id='1'*40,
|
||||
channel_name='@Doge123',
|
||||
signature='a'*128,
|
||||
signing_ts='123'
|
||||
)
|
||||
comment = get_comment(comm['comment_id'])
|
||||
self.assertFalse(comment['is_hidden'])
|
||||
success = hide_comments_by_id(self.conn, [comm['comment_id']])
|
||||
|
||||
success = set_hidden_flag([comm['comment_id']])
|
||||
self.assertTrue(success)
|
||||
comment = get_comments_by_id(self.conn, [comm['comment_id']]).pop()
|
||||
|
||||
comment = get_comment(comm['comment_id'])
|
||||
self.assertTrue(comment['is_hidden'])
|
||||
success = hide_comments_by_id(self.conn, [comm['comment_id']])
|
||||
|
||||
success = set_hidden_flag([comm['comment_id']])
|
||||
self.assertTrue(success)
|
||||
comment = get_comments_by_id(self.conn, [comm['comment_id']]).pop()
|
||||
|
||||
comment = get_comment(comm['comment_id'])
|
||||
self.assertTrue(comment['is_hidden'])
|
||||
|
||||
def test06DeleteComments(self):
|
||||
comm = create_comment_or_error(self.conn, 'Comment #1', self.claimId, '1'*40, '@Doge123', 'a'*128, '123')
|
||||
comments = get_claim_comments(self.conn, self.claimId)
|
||||
match = list(filter(lambda x: comm['comment_id'] == x['comment_id'], comments['items']))
|
||||
self.assertTrue(match)
|
||||
deleted = delete_comment_by_id(self.conn, comm['comment_id'])
|
||||
# make sure that the comment was created
|
||||
comm = create_comment(
|
||||
comment='Comment #1',
|
||||
claim_id=self.claimId,
|
||||
channel_id='1'*40,
|
||||
channel_name='@Doge123',
|
||||
signature='a'*128,
|
||||
signing_ts='123'
|
||||
)
|
||||
comments = comment_list(self.claimId)
|
||||
match = [x for x in comments['items'] if x['comment_id'] == comm['comment_id']]
|
||||
self.assertTrue(len(match) > 0)
|
||||
|
||||
deleted = delete_comment(comm['comment_id'])
|
||||
self.assertTrue(deleted)
|
||||
comments = get_claim_comments(self.conn, self.claimId)
|
||||
match = list(filter(lambda x: comm['comment_id'] == x['comment_id'], comments['items']))
|
||||
|
||||
# make sure that we can't find the comment here
|
||||
comments = comment_list(self.claimId)
|
||||
match = [x for x in comments['items'] if x['comment_id'] == comm['comment_id']]
|
||||
self.assertFalse(match)
|
||||
deleted = delete_comment_by_id(self.conn, comm['comment_id'])
|
||||
self.assertFalse(deleted)
|
||||
self.assertRaises(
|
||||
ValueError,
|
||||
delete_comment,
|
||||
comment_id=comm['comment_id'],
|
||||
)
|
||||
|
||||
|
||||
class ListDatabaseTest(DatabaseTestCase):
|
||||
|
@ -169,61 +195,75 @@ class ListDatabaseTest(DatabaseTestCase):
|
|||
def testLists(self):
|
||||
for claim_id in self.claim_ids:
|
||||
with self.subTest(claim_id=claim_id):
|
||||
comments = get_claim_comments(self.conn, claim_id)
|
||||
comments = comment_list(claim_id)
|
||||
self.assertIsNotNone(comments)
|
||||
self.assertGreater(comments['page_size'], 0)
|
||||
self.assertIn('has_hidden_comments', comments)
|
||||
self.assertFalse(comments['has_hidden_comments'])
|
||||
top_comments = get_claim_comments(self.conn, claim_id, top_level=True, page=1, page_size=50)
|
||||
top_comments = comment_list(claim_id, top_level=True, page=1, page_size=50)
|
||||
self.assertIsNotNone(top_comments)
|
||||
self.assertEqual(top_comments['page_size'], 50)
|
||||
self.assertEqual(top_comments['page'], 1)
|
||||
self.assertGreaterEqual(top_comments['total_pages'], 0)
|
||||
self.assertGreaterEqual(top_comments['total_items'], 0)
|
||||
comment_ids = get_comment_ids(self.conn, claim_id, page_size=50, page=1)
|
||||
comment_ids = comment_list(claim_id, page_size=50, page=1)
|
||||
with self.subTest(comment_ids=comment_ids):
|
||||
self.assertIsNotNone(comment_ids)
|
||||
self.assertLessEqual(len(comment_ids), 50)
|
||||
matching_comments = get_comments_by_id(self.conn, comment_ids)
|
||||
matching_comments = (comment_ids)
|
||||
self.assertIsNotNone(matching_comments)
|
||||
self.assertEqual(len(matching_comments), len(comment_ids))
|
||||
|
||||
def testHiddenCommentLists(self):
|
||||
claim_id = 'a'*40
|
||||
comm1 = create_comment_or_error(self.conn, 'Comment #1', claim_id, '1'*40, '@Doge123', 'a'*128, '123')
|
||||
comm2 = create_comment_or_error(self.conn, 'Comment #2', claim_id, '1'*40, '@Doge123', 'b'*128, '123')
|
||||
comm3 = create_comment_or_error(self.conn, 'Comment #3', claim_id, '1'*40, '@Doge123', 'c'*128, '123')
|
||||
comm1 = create_comment(
|
||||
'Comment #1',
|
||||
claim_id,
|
||||
channel_id='1'*40,
|
||||
channel_name='@Doge123',
|
||||
signature='a'*128,
|
||||
signing_ts='123'
|
||||
)
|
||||
comm2 = create_comment(
|
||||
'Comment #2', claim_id,
|
||||
channel_id='1'*40,
|
||||
channel_name='@Doge123',
|
||||
signature='b'*128,
|
||||
signing_ts='123'
|
||||
)
|
||||
comm3 = create_comment(
|
||||
'Comment #3', claim_id,
|
||||
channel_id='1'*40,
|
||||
channel_name='@Doge123',
|
||||
signature='c'*128,
|
||||
signing_ts='123'
|
||||
)
|
||||
comments = [comm1, comm2, comm3]
|
||||
|
||||
comment_list = get_claim_comments(self.conn, claim_id)
|
||||
self.assertIn('items', comment_list)
|
||||
self.assertIn('has_hidden_comments', comment_list)
|
||||
self.assertEqual(len(comments), comment_list['total_items'])
|
||||
self.assertIn('has_hidden_comments', comment_list)
|
||||
self.assertFalse(comment_list['has_hidden_comments'])
|
||||
hide_comments_by_id(self.conn, [comm2['comment_id']])
|
||||
listed_comments = comment_list(claim_id)
|
||||
self.assertEqual(len(comments), listed_comments['total_items'])
|
||||
self.assertFalse(listed_comments['has_hidden_comments'])
|
||||
|
||||
default_comments = get_claim_hidden_comments(self.conn, claim_id)
|
||||
self.assertIn('has_hidden_comments', default_comments)
|
||||
set_hidden_flag([comm2['comment_id']])
|
||||
hidden = comment_list(claim_id, exclude_mode='hidden')
|
||||
|
||||
hidden_comments = get_claim_hidden_comments(self.conn, claim_id, hidden=True)
|
||||
self.assertIn('has_hidden_comments', hidden_comments)
|
||||
self.assertEqual(default_comments, hidden_comments)
|
||||
self.assertTrue(hidden['has_hidden_comments'])
|
||||
self.assertGreater(len(hidden['items']), 0)
|
||||
|
||||
hidden_comment = hidden_comments['items'][0]
|
||||
visible = comment_list(claim_id, exclude_mode='visible')
|
||||
self.assertFalse(visible['has_hidden_comments'])
|
||||
self.assertNotEqual(listed_comments['items'], visible['items'])
|
||||
|
||||
# make sure the hidden comment is the one we marked as hidden
|
||||
hidden_comment = hidden['items'][0]
|
||||
self.assertEqual(hidden_comment['comment_id'], comm2['comment_id'])
|
||||
|
||||
visible_comments = get_claim_hidden_comments(self.conn, claim_id, hidden=False)
|
||||
self.assertIn('has_hidden_comments', visible_comments)
|
||||
self.assertNotIn(hidden_comment, visible_comments['items'])
|
||||
|
||||
hidden_ids = [c['comment_id'] for c in hidden_comments['items']]
|
||||
visible_ids = [c['comment_id'] for c in visible_comments['items']]
|
||||
hidden_ids = [c['comment_id'] for c in hidden['items']]
|
||||
visible_ids = [c['comment_id'] for c in visible['items']]
|
||||
composite_ids = hidden_ids + visible_ids
|
||||
listed_comments = comment_list(claim_id)
|
||||
all_ids = [c['comment_id'] for c in listed_comments['items']]
|
||||
composite_ids.sort()
|
||||
|
||||
comment_list = get_claim_comments(self.conn, claim_id)
|
||||
all_ids = [c['comment_id'] for c in comment_list['items']]
|
||||
all_ids.sort()
|
||||
self.assertEqual(composite_ids, all_ids)
|
||||
|
||||
|
|
|
@ -1,12 +1,39 @@
|
|||
import os
|
||||
import pathlib
|
||||
import unittest
|
||||
from asyncio.runners import _cancel_all_tasks # type: ignore
|
||||
from unittest.case import _Outcome
|
||||
|
||||
import asyncio
|
||||
from asyncio.runners import _cancel_all_tasks # type: ignore
|
||||
from peewee import *
|
||||
|
||||
from src.database.models import Channel, Comment
|
||||
|
||||
|
||||
test_db = SqliteDatabase(':memory:')
|
||||
|
||||
|
||||
MODELS = [Channel, Comment]
|
||||
|
||||
|
||||
class DatabaseTestCase(unittest.TestCase):
|
||||
def __init__(self, methodName='DatabaseTest'):
|
||||
super().__init__(methodName)
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
test_db.bind(MODELS, bind_refs=False, bind_backrefs=False)
|
||||
|
||||
test_db.connect()
|
||||
test_db.create_tables(MODELS)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
# drop tables for next test
|
||||
test_db.drop_tables(MODELS)
|
||||
|
||||
# close connection
|
||||
test_db.close()
|
||||
|
||||
from src.database.queries import obtain_connection, setup_database
|
||||
|
||||
|
||||
class AsyncioTestCase(unittest.TestCase):
|
||||
|
@ -117,21 +144,3 @@ class AsyncioTestCase(unittest.TestCase):
|
|||
self.loop.run_until_complete(maybe_coroutine)
|
||||
|
||||
|
||||
class DatabaseTestCase(unittest.TestCase):
|
||||
db_file = 'test.db'
|
||||
|
||||
def __init__(self, methodName='DatabaseTest'):
|
||||
super().__init__(methodName)
|
||||
if pathlib.Path(self.db_file).exists():
|
||||
os.remove(self.db_file)
|
||||
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
setup_database(self.db_file)
|
||||
self.conn = obtain_connection(self.db_file)
|
||||
self.addCleanup(self.conn.close)
|
||||
self.addCleanup(os.remove, self.db_file)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.conn.close()
|
||||
|
||||
|
|
Loading…
Reference in a new issue