Replace database methods with peewee ORM #39
7 changed files with 65 additions and 68 deletions
7
setup.py
7
setup.py
|
@ -17,12 +17,11 @@ setup(
|
||||||
'mysql-connector-python',
|
'mysql-connector-python',
|
||||||
'pyyaml',
|
'pyyaml',
|
||||||
'Faker>=1.0.7',
|
'Faker>=1.0.7',
|
||||||
'asyncio>=3.4.3',
|
'asyncio',
|
||||||
'aiohttp==3.5.4',
|
'aiohttp',
|
||||||
'aiojobs==0.2.2',
|
'aiojobs',
|
||||||
'ecdsa>=0.13.3',
|
'ecdsa>=0.13.3',
|
||||||
'cryptography==2.5',
|
'cryptography==2.5',
|
||||||
'aiosqlite==0.10.0',
|
|
||||||
'PyNaCl>=1.3.0',
|
'PyNaCl>=1.3.0',
|
||||||
'requests',
|
'requests',
|
||||||
'cython',
|
'cython',
|
||||||
|
|
|
@ -12,14 +12,6 @@ from src.server.validation import is_valid_base_comment
|
||||||
from src.misc import clean
|
from src.misc import clean
|
||||||
|
|
||||||
|
|
||||||
def get_database_connection(dbms, db_name, **params):
|
|
||||||
if dbms == 'mysql':
|
|
||||||
return MySQLDatabase(db_name, **params)
|
|
||||||
else:
|
|
||||||
# return SqliteDatabase('/home/oleg/PycharmProjects/comment-server/database/default_pw.db')
|
|
||||||
return SqliteDatabase(db_name)
|
|
||||||
|
|
||||||
|
|
||||||
class Channel(Model):
|
class Channel(Model):
|
||||||
claim_id = TextField(column_name='ClaimId', primary_key=True)
|
claim_id = TextField(column_name='ClaimId', primary_key=True)
|
||||||
name = TextField(column_name='Name')
|
name = TextField(column_name='Name')
|
||||||
|
@ -157,7 +149,7 @@ def create_comment(comment: str = None, claim_id: str = None,
|
||||||
raise ValueError('Invalid Parameters given for comment')
|
raise ValueError('Invalid Parameters given for comment')
|
||||||
|
|
||||||
channel, _ = Channel.get_or_create(name=channel_name, claim_id=channel_id)
|
channel, _ = Channel.get_or_create(name=channel_name, claim_id=channel_id)
|
||||||
if parent_id:
|
if parent_id and not claim_id:
|
||||||
parent: Comment = Comment.get_by_id(parent_id)
|
parent: Comment = Comment.get_by_id(parent_id)
|
||||||
claim_id = parent.claim_id
|
claim_id = parent.claim_id
|
||||||
|
|
||||||
|
@ -219,7 +211,4 @@ if __name__ == '__main__':
|
||||||
(Comment.claim_id ** '420%'))
|
(Comment.claim_id ** '420%'))
|
||||||
)
|
)
|
||||||
|
|
||||||
ids = get_comment_ids('4207d2378bf4340e68c9d88faf7ee24ea1a1f95a')
|
|
||||||
|
|
||||||
print(json.dumps(comments, indent=4))
|
print(json.dumps(comments, indent=4))
|
||||||
print(json.dumps(ids, indent=4))
|
|
||||||
|
|
12
src/main.py
12
src/main.py
|
@ -84,12 +84,13 @@ def get_config(filepath):
|
||||||
|
|
||||||
|
|
||||||
def setup_db_from_config(config: dict):
|
def setup_db_from_config(config: dict):
|
||||||
if 'sqlite' in config['database']:
|
mode = config['mode']
|
||||||
|
if config[mode]['database'] == 'sqlite':
|
||||||
if not os.path.exists(DATABASE_DIR):
|
if not os.path.exists(DATABASE_DIR):
|
||||||
os.mkdir(DATABASE_DIR)
|
os.mkdir(DATABASE_DIR)
|
||||||
|
|
||||||
config['db_path'] = os.path.join(
|
config[mode]['db_file'] = os.path.join(
|
||||||
DATABASE_DIR, config['database']['sqlite']
|
DATABASE_DIR, config[mode]['name']
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -98,10 +99,15 @@ def main(argv=None):
|
||||||
parser = argparse.ArgumentParser(description='LBRY Comment Server')
|
parser = argparse.ArgumentParser(description='LBRY Comment Server')
|
||||||
parser.add_argument('--port', type=int)
|
parser.add_argument('--port', type=int)
|
||||||
parser.add_argument('--config', type=str)
|
parser.add_argument('--config', type=str)
|
||||||
|
parser.add_argument('--mode', type=str)
|
||||||
args = parser.parse_args(argv)
|
args = parser.parse_args(argv)
|
||||||
|
|
||||||
config = get_config(CONFIG_FILE) if not args.config else args.config
|
config = get_config(CONFIG_FILE) if not args.config else args.config
|
||||||
setup_logging_from_config(config)
|
setup_logging_from_config(config)
|
||||||
|
|
||||||
|
if args.mode:
|
||||||
|
config['mode'] = args.mode
|
||||||
|
|
||||||
setup_db_from_config(config)
|
setup_db_from_config(config)
|
||||||
|
|
||||||
if args.port:
|
if args.port:
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
# cython: language_level=3
|
# cython: language_level=3
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import pathlib
|
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -9,61 +8,67 @@ import aiojobs
|
||||||
import aiojobs.aiohttp
|
import aiojobs.aiohttp
|
||||||
from aiohttp import web
|
from aiohttp import web
|
||||||
|
|
||||||
from src.database.queries import obtain_connection, DatabaseWriter
|
from peewee import *
|
||||||
from src.database.queries import setup_database
|
|
||||||
from src.server.handles import api_endpoint, get_api_endpoint
|
from src.server.handles import api_endpoint, get_api_endpoint
|
||||||
|
from src.database.models import Comment, Channel
|
||||||
|
|
||||||
|
MODELS = [Comment, Channel]
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def setup_db_schema(app):
|
def setup_database(app):
|
||||||
if not pathlib.Path(app['db_path']).exists():
|
config = app['config']
|
||||||
logger.info(f'Setting up schema in {app["db_path"]}')
|
mode = config['mode']
|
||||||
setup_database(app['db_path'])
|
|
||||||
else:
|
# switch between Database objects
|
||||||
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
|
if config[mode]['database'] == 'mysql':
|
||||||
|
app['db'] = MySQLDatabase(
|
||||||
|
database=config[mode]['name'],
|
||||||
|
user=config[mode]['user'],
|
||||||
|
host=config[mode]['host'],
|
||||||
|
password=config[mode]['password'],
|
||||||
|
port=config[mode]['port'],
|
||||||
|
)
|
||||||
|
elif config[mode]['database'] == 'sqlite':
|
||||||
|
app['db'] = SqliteDatabase(
|
||||||
|
config[mode]['file'],
|
||||||
|
pragmas=config[mode]['pragmas']
|
||||||
|
)
|
||||||
|
|
||||||
|
# bind the Model list to the database
|
||||||
|
app['db'].bind(MODELS, bind_refs=False, bind_backrefs=False)
|
||||||
|
|
||||||
|
|
||||||
async def start_background_tasks(app):
|
async def start_background_tasks(app):
|
||||||
# Reading the DB
|
app['db'].connect()
|
||||||
app['reader'] = obtain_connection(app['db_path'], True)
|
app['db'].create_tables(MODELS)
|
||||||
|
|
||||||
# Scheduler to prevent multiple threads from writing to DB simulataneously
|
|
||||||
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
|
|
||||||
app['db_writer'] = DatabaseWriter(app['db_path'])
|
|
||||||
app['writer'] = app['db_writer'].connection
|
|
||||||
|
|
||||||
# for requesting to external and internal APIs
|
# for requesting to external and internal APIs
|
||||||
app['webhooks'] = await aiojobs.create_scheduler(pending_limit=0)
|
app['webhooks'] = await aiojobs.create_scheduler(pending_limit=0)
|
||||||
|
|
||||||
|
|
||||||
async def close_database_connections(app):
|
async def close_database_connections(app):
|
||||||
app['reader'].close()
|
app['db'].close()
|
||||||
app['writer'].close()
|
|
||||||
app['db_writer'].cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
async def close_schedulers(app):
|
async def close_schedulers(app):
|
||||||
logger.info('Closing comment_scheduler')
|
|
||||||
await app['comment_scheduler'].close()
|
|
||||||
|
|
||||||
logger.info('Closing scheduler for webhook requests')
|
logger.info('Closing scheduler for webhook requests')
|
||||||
await app['webhooks'].close()
|
await app['webhooks'].close()
|
||||||
|
|
||||||
|
|
||||||
class CommentDaemon:
|
class CommentDaemon:
|
||||||
def __init__(self, config, db_file=None, **kwargs):
|
def __init__(self, config, **kwargs):
|
||||||
app = web.Application()
|
app = web.Application()
|
||||||
|
app['config'] = config
|
||||||
|
|
||||||
# configure the config
|
# configure the config
|
||||||
app['config'] = config
|
self.config = config
|
||||||
self.config = app['config']
|
self.host = config['host']
|
||||||
|
self.port = config['port']
|
||||||
|
|
||||||
# configure the db file
|
setup_database(app)
|
||||||
app['db_path'] = db_file or config.get('db_path')
|
|
||||||
|
|
||||||
# configure the order of tasks to run during app lifetime
|
# configure the order of tasks to run during app lifetime
|
||||||
app.on_startup.append(setup_db_schema)
|
|
||||||
app.on_startup.append(start_background_tasks)
|
app.on_startup.append(start_background_tasks)
|
||||||
app.on_shutdown.append(close_schedulers)
|
app.on_shutdown.append(close_schedulers)
|
||||||
app.on_cleanup.append(close_database_connections)
|
app.on_cleanup.append(close_database_connections)
|
||||||
|
@ -85,20 +90,19 @@ class CommentDaemon:
|
||||||
await self.app_runner.setup()
|
await self.app_runner.setup()
|
||||||
self.app_site = web.TCPSite(
|
self.app_site = web.TCPSite(
|
||||||
runner=self.app_runner,
|
runner=self.app_runner,
|
||||||
host=host or self.config['host'],
|
host=host or self.host,
|
||||||
port=port or self.config['port'],
|
port=port or self.port,
|
||||||
)
|
)
|
||||||
await self.app_site.start()
|
await self.app_site.start()
|
||||||
logger.info(f'Comment Server is running on {self.config["host"]}:{self.config["port"]}')
|
logger.info(f'Comment Server is running on {self.host}:{self.port}')
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
await self.app_runner.shutdown()
|
await self.app_runner.shutdown()
|
||||||
await self.app_runner.cleanup()
|
await self.app_runner.cleanup()
|
||||||
|
|
||||||
|
|
||||||
def run_app(config, db_file=None):
|
def run_app(config):
|
||||||
comment_app = CommentDaemon(config=config, db_file=db_file, close_timeout=5.0)
|
comment_app = CommentDaemon(config=config)
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
def __exit():
|
def __exit():
|
||||||
|
|
|
@ -6,7 +6,7 @@ from faker.providers import misc
|
||||||
|
|
||||||
from src.database.models import create_comment
|
from src.database.models import create_comment
|
||||||
from src.database.models import delete_comment
|
from src.database.models import delete_comment
|
||||||
from src.database.models import comment_list, get_comment, get_comments_by_id
|
from src.database.models import comment_list, get_comment
|
||||||
from src.database.models import set_hidden_flag
|
from src.database.models import set_hidden_flag
|
||||||
from test.testcase import DatabaseTestCase
|
from test.testcase import DatabaseTestCase
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,8 @@ from test.testcase import AsyncioTestCase
|
||||||
|
|
||||||
|
|
||||||
config = get_config(CONFIG_FILE)
|
config = get_config(CONFIG_FILE)
|
||||||
|
config['mode'] = 'testing'
|
||||||
|
config['testing']['file'] = ':memory:'
|
||||||
|
|
||||||
|
|
||||||
if 'slack_webhook' in config:
|
if 'slack_webhook' in config:
|
||||||
|
@ -74,10 +76,10 @@ def create_test_comments(values: iter, **default):
|
||||||
|
|
||||||
|
|
||||||
class ServerTest(AsyncioTestCase):
|
class ServerTest(AsyncioTestCase):
|
||||||
db_file = 'test.db'
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
config['mode'] = 'testing'
|
||||||
|
config['testing']['file'] = ':memory:'
|
||||||
self.host = 'localhost'
|
self.host = 'localhost'
|
||||||
self.port = 5931
|
self.port = 5931
|
||||||
|
|
||||||
|
@ -88,11 +90,10 @@ class ServerTest(AsyncioTestCase):
|
||||||
@classmethod
|
@classmethod
|
||||||
def tearDownClass(cls) -> None:
|
def tearDownClass(cls) -> None:
|
||||||
print('exit reached')
|
print('exit reached')
|
||||||
os.remove(cls.db_file)
|
|
||||||
|
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
await super().asyncSetUp()
|
await super().asyncSetUp()
|
||||||
self.server = app.CommentDaemon(config, db_file=self.db_file)
|
self.server = app.CommentDaemon(config)
|
||||||
await self.server.start(host=self.host, port=self.port)
|
await self.server.start(host=self.host, port=self.port)
|
||||||
self.addCleanup(self.server.stop)
|
self.addCleanup(self.server.stop)
|
||||||
|
|
||||||
|
@ -138,14 +139,16 @@ class ServerTest(AsyncioTestCase):
|
||||||
test_all = create_test_comments(replace.keys(), **{
|
test_all = create_test_comments(replace.keys(), **{
|
||||||
k: None for k in replace.keys()
|
k: None for k in replace.keys()
|
||||||
})
|
})
|
||||||
|
test_all.reverse()
|
||||||
for test in test_all:
|
for test in test_all:
|
||||||
with self.subTest(test=test):
|
nulls = 'null fields: ' + ', '.join(k for k, v in test.items() if not v)
|
||||||
|
with self.subTest(test=nulls):
|
||||||
message = await self.post_comment(**test)
|
message = await self.post_comment(**test)
|
||||||
self.assertTrue('result' in message or 'error' in message)
|
self.assertTrue('result' in message or 'error' in message)
|
||||||
if 'error' in message:
|
if 'error' in message:
|
||||||
self.assertFalse(is_valid_base_comment(**test))
|
self.assertFalse(is_valid_base_comment(**test, strict=True))
|
||||||
else:
|
else:
|
||||||
self.assertTrue(is_valid_base_comment(**test))
|
self.assertTrue(is_valid_base_comment(**test, strict=True))
|
||||||
|
|
||||||
async def test04CreateAllReplies(self):
|
async def test04CreateAllReplies(self):
|
||||||
claim_id = '1d8a5cc39ca02e55782d619e67131c0a20843be8'
|
claim_id = '1d8a5cc39ca02e55782d619e67131c0a20843be8'
|
||||||
|
@ -223,7 +226,8 @@ class ListCommentsTest(AsyncioTestCase):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.host = 'localhost'
|
self.host = 'localhost'
|
||||||
self.port = 5931
|
self.port = 5931
|
||||||
self.db_file = 'list_test.db'
|
config['mode'] = 'testing'
|
||||||
|
config['testing']['file'] = ':memory:'
|
||||||
self.claim_id = '1d8a5cc39ca02e55782d619e67131c0a20843be8'
|
self.claim_id = '1d8a5cc39ca02e55782d619e67131c0a20843be8'
|
||||||
self.comment_ids = None
|
self.comment_ids = None
|
||||||
|
|
||||||
|
@ -234,10 +238,6 @@ class ListCommentsTest(AsyncioTestCase):
|
||||||
async def post_comment(self, **params):
|
async def post_comment(self, **params):
|
||||||
return await jsonrpc_post(self.url, 'create_comment', **params)
|
return await jsonrpc_post(self.url, 'create_comment', **params)
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
|
||||||
print('exit reached')
|
|
||||||
os.remove(self.db_file)
|
|
||||||
|
|
||||||
async def create_lots_of_comments(self, n=23):
|
async def create_lots_of_comments(self, n=23):
|
||||||
self.comment_list = [{key: self.replace[key]() for key in self.replace.keys()} for _ in range(23)]
|
self.comment_list = [{key: self.replace[key]() for key in self.replace.keys()} for _ in range(23)]
|
||||||
for comment in self.comment_list:
|
for comment in self.comment_list:
|
||||||
|
@ -247,7 +247,7 @@ class ListCommentsTest(AsyncioTestCase):
|
||||||
|
|
||||||
async def asyncSetUp(self):
|
async def asyncSetUp(self):
|
||||||
await super().asyncSetUp()
|
await super().asyncSetUp()
|
||||||
self.server = app.CommentDaemon(config, db_file=self.db_file)
|
self.server = app.CommentDaemon(config)
|
||||||
await self.server.start(self.host, self.port)
|
await self.server.start(self.host, self.port)
|
||||||
self.addCleanup(self.server.stop)
|
self.addCleanup(self.server.stop)
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,6 @@ class DatabaseTestCase(unittest.TestCase):
|
||||||
test_db.close()
|
test_db.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class AsyncioTestCase(unittest.TestCase):
|
class AsyncioTestCase(unittest.TestCase):
|
||||||
# Implementation inspired by discussion:
|
# Implementation inspired by discussion:
|
||||||
# https://bugs.python.org/issue32972
|
# https://bugs.python.org/issue32972
|
||||||
|
|
Loading…
Reference in a new issue