Moves methods from class to functional

This commit is contained in:
Oleg Silkin 2019-05-20 05:07:39 -04:00
parent dad56bbec2
commit c2c6d78442

View file

@ -4,9 +4,8 @@ from faker.providers import internet
from faker.providers import lorem from faker.providers import lorem
from faker.providers import misc from faker.providers import misc
import conf import lbry_comment_server.conf as conf
import lbry_comment_server.database as db import lbry_comment_server.database as db
import sqlite3
import faker import faker
from random import randint from random import randint
@ -19,29 +18,27 @@ fake.add_provider(misc)
class DatabaseTestCase(unittest.TestCase): class DatabaseTestCase(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
super().setUp() super().setUp()
self.db = db.DatabaseConnection('test.db') self.conn = db.obtain_connection('test.db')
self.db.obtain_connection() db.generate_schema(self.conn, conf.schema_dir)
self.db.generate_schema(src.conf.schema_dir)
def tearDown(self) -> None: def tearDown(self) -> None:
curs = self.db.connection.execute('SELECT * FROM COMMENT') curs = self.conn.execute('SELECT * FROM COMMENT')
results = {'COMMENT': [dict(r) for r in curs.fetchall()]} results = {'COMMENT': [dict(r) for r in curs.fetchall()]}
curs = self.db.connection.execute('SELECT * FROM CHANNEL') curs = self.conn.execute('SELECT * FROM CHANNEL')
results['CHANNEL'] = [dict(r) for r in curs.fetchall()] results['CHANNEL'] = [dict(r) for r in curs.fetchall()]
curs = self.db.connection.execute('SELECT * FROM COMMENTS_ON_CLAIMS') curs = self.conn.execute('SELECT * FROM COMMENTS_ON_CLAIMS')
results['COMMENTS_ON_CLAIMS'] = [dict(r) for r in curs.fetchall()] results['COMMENTS_ON_CLAIMS'] = [dict(r) for r in curs.fetchall()]
curs = self.db.connection.execute('SELECT * FROM COMMENT_REPLIES') curs = self.conn.execute('SELECT * FROM COMMENT_REPLIES')
results['COMMENT_REPLIES'] = [dict(r) for r in curs.fetchall()] results['COMMENT_REPLIES'] = [dict(r) for r in curs.fetchall()]
# print(json.dumps(results, indent=4)) # print(json.dumps(results, indent=4))
conn: sqlite3.Connection = self.db.connection with self.conn:
with conn: self.conn.executescript("""
conn.executescript("""
DROP TABLE IF EXISTS COMMENT; DROP TABLE IF EXISTS COMMENT;
DROP TABLE IF EXISTS CHANNEL; DROP TABLE IF EXISTS CHANNEL;
DROP VIEW IF EXISTS COMMENTS_ON_CLAIMS; DROP VIEW IF EXISTS COMMENTS_ON_CLAIMS;
DROP VIEW IF EXISTS COMMENT_REPLIES; DROP VIEW IF EXISTS COMMENT_REPLIES;
""") """)
conn.close() self.conn.close()
class TestCommentCreation(DatabaseTestCase): class TestCommentCreation(DatabaseTestCase):
@ -49,8 +46,9 @@ class TestCommentCreation(DatabaseTestCase):
super().setUp() super().setUp()
self.claimId = '529357c3422c6046d3fec76be2358004ba22e340' self.claimId = '529357c3422c6046d3fec76be2358004ba22e340'
def testNamedComments(self): def test01NamedComments(self):
comment = self.db.create_comment( comment = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
comment='This is a named comment', comment='This is a named comment',
channel_name='@username', channel_name='@username',
@ -62,7 +60,8 @@ class TestCommentCreation(DatabaseTestCase):
self.assertIn('parent_id', comment) self.assertIn('parent_id', comment)
self.assertIsNone(comment['parent_id']) self.assertIsNone(comment['parent_id'])
previous_id = comment['comment_id'] previous_id = comment['comment_id']
reply = self.db.create_comment( reply = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
comment='This is a named response', comment='This is a named response',
channel_name='@another_username', channel_name='@another_username',
@ -76,8 +75,9 @@ class TestCommentCreation(DatabaseTestCase):
self.assertEqual(reply['parent_id'], comment['comment_id']) self.assertEqual(reply['parent_id'], comment['comment_id'])
self.assertEqual(reply['claim_id'], comment['claim_id']) self.assertEqual(reply['claim_id'], comment['claim_id'])
def testAnonymousComments(self): def test02AnonymousComments(self):
comment = self.db.create_comment( comment = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
comment='This is an anonymous comment' comment='This is an anonymous comment'
) )
@ -87,7 +87,8 @@ class TestCommentCreation(DatabaseTestCase):
self.assertIn('parent_id', comment) self.assertIn('parent_id', comment)
self.assertIsNone(comment['parent_id']) self.assertIsNone(comment['parent_id'])
previous_id = comment['comment_id'] previous_id = comment['comment_id']
reply = self.db.create_comment( reply = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
comment='This is an unnamed response', comment='This is an unnamed response',
parent_id=previous_id parent_id=previous_id
@ -99,13 +100,14 @@ class TestCommentCreation(DatabaseTestCase):
self.assertEqual(reply['parent_id'], comment['comment_id']) self.assertEqual(reply['parent_id'], comment['comment_id'])
self.assertEqual(reply['claim_id'], comment['claim_id']) self.assertEqual(reply['claim_id'], comment['claim_id'])
def testSignedComments(self): def test03SignedComments(self):
comment = self.db.create_comment( comment = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
comment='I like big butts and i cannot lie', comment='I like big butts and i cannot lie',
channel_name='@sirmixalot', channel_name='@sirmixalot',
channel_id='529357c3422c6046d3fec76be2358005ba22abcd', channel_id='529357c3422c6046d3fec76be2358005ba22abcd',
sig='siggy' signature='siggy'
) )
self.assertIsNotNone(comment) self.assertIsNotNone(comment)
self.assertIn('comment', comment) self.assertIn('comment', comment)
@ -113,13 +115,14 @@ class TestCommentCreation(DatabaseTestCase):
self.assertIn('parent_id', comment) self.assertIn('parent_id', comment)
self.assertIsNone(comment['parent_id']) self.assertIsNone(comment['parent_id'])
previous_id = comment['comment_id'] previous_id = comment['comment_id']
reply = self.db.create_comment( reply = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
comment='This is a LBRY verified response', comment='This is a LBRY verified response',
channel_name='@LBRY', channel_name='@LBRY',
channel_id='529357c3422c6046d3fec76be2358001ba224bcd', channel_id='529357c3422c6046d3fec76be2358001ba224bcd',
parent_id=previous_id, parent_id=previous_id,
sig='Cursive Font Goes Here' signature='Cursive Font Goes Here'
) )
self.assertIsNotNone(reply) self.assertIsNotNone(reply)
self.assertIn('comment', reply) self.assertIn('comment', reply)
@ -128,15 +131,17 @@ class TestCommentCreation(DatabaseTestCase):
self.assertEqual(reply['parent_id'], comment['comment_id']) self.assertEqual(reply['parent_id'], comment['comment_id'])
self.assertEqual(reply['claim_id'], comment['claim_id']) self.assertEqual(reply['claim_id'], comment['claim_id'])
def testUsernameVariations(self): def test04UsernameVariations(self):
invalid_comment = self.db.create_comment( invalid_comment = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
channel_name='$#(@#$@#$', channel_name='$#(@#$@#$',
channel_id='529357c3422c6046d3fec76be2358001ba224b23', channel_id='529357c3422c6046d3fec76be2358001ba224b23',
comment='this is an invalid username' comment='this is an invalid username'
) )
self.assertIsNone(invalid_comment) self.assertIsNone(invalid_comment)
valid_username = self.db.create_comment( valid_username = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
channel_name='@' + 'a'*255, channel_name='@' + 'a'*255,
channel_id='529357c3422c6046d3fec76be2358001ba224b23', channel_id='529357c3422c6046d3fec76be2358001ba224b23',
@ -144,21 +149,24 @@ class TestCommentCreation(DatabaseTestCase):
) )
self.assertIsNotNone(valid_username) self.assertIsNotNone(valid_username)
lengthy_username = self.db.create_comment( lengthy_username = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
channel_name='@' + 'a'*256, channel_name='@' + 'a'*256,
channel_id='529357c3422c6046d3fec76be2358001ba224b23', channel_id='529357c3422c6046d3fec76be2358001ba224b23',
comment='this username is too long' comment='this username is too long'
) )
self.assertIsNone(lengthy_username) self.assertIsNone(lengthy_username)
comment = self.db.create_comment( comment = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
channel_name='', channel_name='',
channel_id='529357c3422c6046d3fec76be2358001ba224b23', channel_id='529357c3422c6046d3fec76be2358001ba224b23',
comment='this username should not default to anonymous' comment='this username should not default to anonymous'
) )
self.assertIsNone(comment) self.assertIsNone(comment)
short_username = self.db.create_comment( short_username = db.create_comment(
conn=self.conn,
claim_id=self.claimId, claim_id=self.claimId,
channel_name='@', channel_name='@',
channel_id='529357c3422c6046d3fec76be2358001ba224b23', channel_id='529357c3422c6046d3fec76be2358001ba224b23',
@ -166,19 +174,14 @@ class TestCommentCreation(DatabaseTestCase):
) )
self.assertIsNone(short_username) self.assertIsNone(short_username)
def test05InsertRandomComments(self):
class PopulatedDatabaseTest(DatabaseTestCase):
def setUp(self) -> None:
super().setUp()
def test01InsertRandomComments(self):
top_comments, claim_ids = generate_top_comments_random() top_comments, claim_ids = generate_top_comments_random()
total = 0 total = 0
success = 0 success = 0
for _, comments in top_comments.items(): for _, comments in top_comments.items():
for i, comment in enumerate(comments): for i, comment in enumerate(comments):
with self.subTest(comment=comment): with self.subTest(comment=comment):
result = self.db.create_comment(**comment) result = db.create_comment(self.conn, **comment)
if result: if result:
success += 1 success += 1
comments[i] = result comments[i] = result
@ -188,7 +191,7 @@ class PopulatedDatabaseTest(DatabaseTestCase):
self.assertGreater(success, 0) self.assertGreater(success, 0)
success = 0 success = 0
for reply in generate_replies_random(top_comments): for reply in generate_replies_random(top_comments):
reply_id = self.db.create_comment(**reply) reply_id = db.create_comment(self.conn, **reply)
if reply_id: if reply_id:
success += 1 success += 1
self.assertGreater(success, 0) self.assertGreater(success, 0)
@ -196,12 +199,12 @@ class PopulatedDatabaseTest(DatabaseTestCase):
del top_comments del top_comments
del claim_ids del claim_ids
def test02GenerateAndListComments(self): def test06GenerateAndListComments(self):
top_comments, claim_ids = generate_top_comments() top_comments, claim_ids = generate_top_comments()
total, success = 0, 0 total, success = 0, 0
for _, comments in top_comments.items(): for _, comments in top_comments.items():
for i, comment in enumerate(comments): for i, comment in enumerate(comments):
result = self.db.create_comment(**comment) result = db.create_comment(self.conn, **comment)
if result: if result:
success += 1 success += 1
comments[i] = result comments[i] = result
@ -211,15 +214,15 @@ class PopulatedDatabaseTest(DatabaseTestCase):
self.assertGreater(total, 0) self.assertGreater(total, 0)
success, total = 0, 0 success, total = 0, 0
for reply in generate_replies(top_comments): for reply in generate_replies(top_comments):
self.db.create_comment(**reply) db.create_comment(self.conn, **reply)
self.assertEqual(success, total) self.assertEqual(success, total)
for claim_id in claim_ids: for claim_id in claim_ids:
comments_ids = self.db.get_comment_ids(claim_id) comments_ids = db.get_comment_ids(self.conn, claim_id)
with self.subTest(comments_ids=comments_ids): with self.subTest(comments_ids=comments_ids):
self.assertIs(type(comments_ids), list) self.assertIs(type(comments_ids), list)
self.assertGreaterEqual(len(comments_ids), 0) self.assertGreaterEqual(len(comments_ids), 0)
self.assertLessEqual(len(comments_ids), 50) self.assertLessEqual(len(comments_ids), 50)
replies = self.db.get_comments_by_id(comments_ids) replies = db.get_comments_by_id(self.conn, comments_ids)
self.assertLessEqual(len(replies), 50) self.assertLessEqual(len(replies), 50)
self.assertEqual(len(replies), len(comments_ids)) self.assertEqual(len(replies), len(comments_ids))
@ -229,34 +232,32 @@ class ListDatabaseTest(DatabaseTestCase):
super().setUp() super().setUp()
top_coms, self.claim_ids = generate_top_comments(5, 75) top_coms, self.claim_ids = generate_top_comments(5, 75)
self.top_comments = { self.top_comments = {
commie_id: [self.db.create_comment(**commie) for commie in commie_list] commie_id: [db.create_comment(self.conn, **commie) for commie in commie_list]
for commie_id, commie_list in top_coms.items() for commie_id, commie_list in top_coms.items()
} }
self.replies = [ self.replies = [
self.db.create_comment(**reply) db.create_comment(self.conn, **reply)
for reply in generate_replies(self.top_comments) for reply in generate_replies(self.top_comments)
] ]
def testLists(self): def testLists(self):
for claim_id in self.claim_ids: for claim_id in self.claim_ids:
with self.subTest(claim_id=claim_id): with self.subTest(claim_id=claim_id):
comments = self.db.get_claim_comments(claim_id) comments = db.get_claim_comments(self.conn, claim_id)
self.assertIsNotNone(comments) self.assertIsNotNone(comments)
self.assertLessEqual(len(comments), 50) self.assertLessEqual(len(comments), 50)
top_comments = self.db.get_claim_comments(claim_id, top_level=True, page=1, page_size=50) top_comments = db.get_claim_comments(self.conn, claim_id, top_level=True, page=1, page_size=50)
self.assertIsNotNone(top_comments) self.assertIsNotNone(top_comments)
self.assertLessEqual(len(top_comments), 50) self.assertLessEqual(len(top_comments), 50)
comment_ids = self.db.get_comment_ids(claim_id, page_size=50, page=1) comment_ids = db.get_comment_ids(self.conn, claim_id, page_size=50, page=1)
with self.subTest(comment_ids=comment_ids): with self.subTest(comment_ids=comment_ids):
self.assertIsNotNone(comment_ids) self.assertIsNotNone(comment_ids)
self.assertLessEqual(len(comment_ids), 50) self.assertLessEqual(len(comment_ids), 50)
matching_comments = self.db.get_comments_by_id(comment_ids) matching_comments = db.get_comments_by_id(self.conn, comment_ids)
self.assertIsNotNone(matching_comments) self.assertIsNotNone(matching_comments)
self.assertEqual(len(matching_comments), len(comment_ids)) self.assertEqual(len(matching_comments), len(comment_ids))
def generate_replies(top_comments): def generate_replies(top_comments):
return [{ return [{
'claim_id': comment['claim_id'], 'claim_id': comment['claim_id'],