Unit tests for mass insertion of comments

This commit is contained in:
Oleg Silkin 2019-05-19 05:51:29 -04:00
parent 4d6855bcc0
commit dcbf4be04c
2 changed files with 105 additions and 26 deletions

View file

@ -93,28 +93,35 @@ class DatabaseConnection:
def create_comment(self, comment: str, claim_id: str, channel_name: str = None,
channel_id: str = None, **kwargs) -> dict:
if channel_id and channel_name:
thing = None
try:
validate_input(
comment=comment,
claim_id=claim_id,
channel_id=channel_id,
channel_name=channel_name,
)
self._insert_channel(channel_name, channel_id)
else:
channel_id=anonymous['channel_id']
comcast_id = self._insert_comment(
comment=comment,
claim_id=claim_id,
channel_id=channel_id,
**kwargs
)
curry = self.connection.execute(
'SELECT * FROM COMMENTS_ON_CLAIMS WHERE comment_id = ?', (comcast_id,)
)
thing = curry.fetchone()
return dict(thing) if thing else None
if channel_id and channel_name:
self._insert_channel(channel_name, channel_id)
else:
channel_id = anonymous['channel_id']
comcast_id = self._insert_comment(
comment=comment,
claim_id=claim_id,
channel_id=channel_id,
**kwargs
)
curry = self.connection.execute(
'SELECT * FROM COMMENTS_ON_CLAIMS WHERE comment_id = ?', (comcast_id,)
)
thing = curry.fetchone()
except AssertionError as e:
print(e)
finally:
return dict(thing) if thing else None
def get_comment_ids(self, claim_id: str, parent_id: str = None, page=1, page_size=50):
""" Just return a list of the comment IDs that are associated with the given claim_id.

View file

@ -1,16 +1,28 @@
import unittest
from faker.providers import internet
from faker.providers import lorem
from faker.providers import misc
import server.conf
import server.database as db
import sqlite3
import json
import faker
from random import randint
fake = faker.Faker()
fake.add_provider(internet)
fake.add_provider(lorem)
fake.add_provider(misc)
class TestCommentCreation(unittest.TestCase):
class DatabaseTestCase(unittest.TestCase):
def setUp(self) -> None:
super().setUp()
self.db = db.DatabaseConnection('test.db')
self.db.obtain_connection()
self.db.generate_schema(server.conf.schema_dir)
self.claimId = '529357c3422c6046d3fec76be2358004ba22e340'
def tearDown(self) -> None:
curs = self.db.connection.execute('SELECT * FROM COMMENT')
@ -21,17 +33,23 @@ class TestCommentCreation(unittest.TestCase):
results['COMMENTS_ON_CLAIMS'] = [dict(r) for r in curs.fetchall()]
curs = self.db.connection.execute('SELECT * FROM COMMENT_REPLIES')
results['COMMENT_REPLIES'] = [dict(r) for r in curs.fetchall()]
print(json.dumps(results, indent=4))
# print(json.dumps(results, indent=4))
conn: sqlite3.Connection = self.db.connection
conn.executescript("""
DROP TABLE IF EXISTS COMMENT;
DROP TABLE IF EXISTS CHANNEL;
DROP VIEW IF EXISTS COMMENTS_ON_CLAIMS;
DROP VIEW IF EXISTS COMMENT_REPLIES;
""")
conn.commit()
with conn:
conn.executescript("""
DROP TABLE IF EXISTS COMMENT;
DROP TABLE IF EXISTS CHANNEL;
DROP VIEW IF EXISTS COMMENTS_ON_CLAIMS;
DROP VIEW IF EXISTS COMMENT_REPLIES;
""")
conn.close()
class TestCommentCreation(DatabaseTestCase):
def setUp(self) -> None:
super().setUp()
self.claimId = '529357c3422c6046d3fec76be2358004ba22e340'
def testNamedComments(self):
comment = self.db.create_comment(
claim_id=self.claimId,
@ -153,3 +171,57 @@ class TestCommentCreation(unittest.TestCase):
comment='this username is too short'
)
class PopulatedDatabaseTest(DatabaseTestCase):
def testInsertComments(self):
success, total = 0, 0
top_comments = generate_top_comments()
for _, comments in top_comments.items():
for i, comment in enumerate(comments):
result = self.db.create_comment(**comment)
if result:
success += 1
del comment
comments[i] = result
total += len(comments)
self.assertLessEqual(success, total)
self.assertGreater(success, 0)
success = 0
for reply in generate_replies(top_comments):
reply_id = self.db.create_comment(**reply)
if reply_id:
success += 1
self.assertGreater(success, 0)
self.assertLess(success, total)
def generate_replies(top_comments):
return [{
'claim_id': comment['claim_id'],
'parent_id': comment['comment_id'],
'comment': ' '.join(fake.text(max_nb_chars=randint(50, 2500))),
'channel_name': '@' + fake.user_name(),
'channel_id': fake.sha1() if hash(comment['comment_id']) % 11 == 0 else None,
'signature': fake.uuid4() if hash(comment['comment_id']) % 11 == 0 else None
}
for claim, comments in top_comments.items()
for i, comment in enumerate(comments)
if comment
]
def generate_top_comments():
claim_ids = [fake.sha1() for _ in range(15)]
top_comments = {
cid: [{
'claim_id': cid,
'comment': ''.join(fake.text(max_nb_chars=randint(50, 2500))),
'channel_name': '@' + fake.user_name() if (hash(cid) * i) % 29*i > 0 else None,
'channel_id': fake.sha1() if (hash(cid) * i) % 29*i > 0 else None,
'signature': fake.uuid4() if (hash(cid) * i) % 29*i > 0 > hash(cid) else None
} for i in range(randint(0, hash(cid) % 91))]
for cid in claim_ids
}
return top_comments