Activates Comment Deletion functionality + adds & fixes a ton of stuff #3
4 changed files with 41 additions and 37 deletions
|
@ -1,10 +1,7 @@
|
||||||
{
|
{
|
||||||
"PATH": {
|
"PATH": {
|
||||||
"SCHEMA": "src/schema/comments_ddl.sql",
|
"SCHEMA": "src/schema/comments_ddl.sql",
|
||||||
"MAIN": "database/comments.db",
|
"DATABASE": "database/comments.db",
|
||||||
"BACKUP": "database/comments.backup.db",
|
|
||||||
"DEFAULT": "database/default.db",
|
|
||||||
"TEST": "tests/test.db",
|
|
||||||
"ERROR_LOG": "logs/error.log",
|
"ERROR_LOG": "logs/error.log",
|
||||||
"DEBUG_LOG": "logs/debug.log",
|
"DEBUG_LOG": "logs/debug.log",
|
||||||
"SERVER_LOG": "logs/server.log"
|
"SERVER_LOG": "logs/server.log"
|
||||||
|
|
|
@ -75,6 +75,7 @@ def main(argv=None):
|
||||||
parser = argparse.ArgumentParser(description='LBRY Comment Server')
|
parser = argparse.ArgumentParser(description='LBRY Comment Server')
|
||||||
parser.add_argument('--port', type=int)
|
parser.add_argument('--port', type=int)
|
||||||
args = parser.parse_args(argv)
|
args = parser.parse_args(argv)
|
||||||
|
config_logging_from_settings(config)
|
||||||
if args.port:
|
if args.port:
|
||||||
config['PORT'] = args.port
|
config['PORT'] = args.port
|
||||||
config_logging_from_settings(config)
|
config_logging_from_settings(config)
|
||||||
|
|
|
@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def setup_db_schema(app):
|
async def setup_db_schema(app):
|
||||||
|
|
||||||
if not pathlib.Path(app['db_path']).exists():
|
if not pathlib.Path(app['db_path']).exists():
|
||||||
logger.info('Setting up schema in %s', app['db_path'])
|
logger.info('Setting up schema in %s', app['db_path'])
|
||||||
setup_database(app['db_path'], app['config']['PATH']['SCHEMA'])
|
setup_database(app['db_path'], app['config']['PATH']['SCHEMA'])
|
||||||
|
@ -25,11 +26,6 @@ async def setup_db_schema(app):
|
||||||
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
|
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
|
||||||
|
|
||||||
|
|
||||||
async def close_comment_scheduler(app):
|
|
||||||
logger.info('Closing comment_scheduler')
|
|
||||||
await app['comment_scheduler'].close()
|
|
||||||
|
|
||||||
|
|
||||||
async def database_backup_routine(app):
|
async def database_backup_routine(app):
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
|
@ -43,29 +39,41 @@ async def database_backup_routine(app):
|
||||||
|
|
||||||
async def start_background_tasks(app):
|
async def start_background_tasks(app):
|
||||||
app['reader'] = obtain_connection(app['db_path'], True)
|
app['reader'] = obtain_connection(app['db_path'], True)
|
||||||
app['waitful_backup'] = app.loop.create_task(database_backup_routine(app))
|
app['waitful_backup'] = asyncio.create_task(database_backup_routine(app))
|
||||||
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
|
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
|
||||||
app['db_writer'] = DatabaseWriter(app['db_path'])
|
app['db_writer'] = DatabaseWriter(app['db_path'])
|
||||||
app['writer'] = app['db_writer'].connection
|
app['writer'] = app['db_writer'].connection
|
||||||
|
|
||||||
|
|
||||||
async def stop_background_tasks(app):
|
async def close_database_connections(app):
|
||||||
logger.info('Ending background backup loop')
|
logger.info('Ending background backup loop')
|
||||||
app['waitful_backup'].cancel()
|
app['waitful_backup'].cancel()
|
||||||
await app['waitful_backup']
|
await app['waitful_backup']
|
||||||
app['reader'].close()
|
app['reader'].close()
|
||||||
app['writer'].close()
|
app['writer'].close()
|
||||||
|
app['db_writer'].cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
async def close_comment_scheduler(app):
|
||||||
|
logger.info('Closing comment_scheduler')
|
||||||
|
await app['comment_scheduler'].close()
|
||||||
|
|
||||||
|
|
||||||
class CommentDaemon:
|
class CommentDaemon:
|
||||||
def __init__(self, config, db_path=None, **kwargs):
|
def __init__(self, config, db_file=None, backup=None, **kwargs):
|
||||||
self.config = config
|
self.config = config
|
||||||
app = web.Application()
|
app = web.Application()
|
||||||
self.insert_to_config(app, config, db_file=db_path)
|
app['config'] = config
|
||||||
|
if db_file:
|
||||||
|
app['db_path'] = db_file
|
||||||
|
app['backup'] = backup
|
||||||
|
else:
|
||||||
|
app['db_path'] = config['PATH']['DATABASE']
|
||||||
|
app['backup'] = backup or (app['db_path'] + '.backup')
|
||||||
app.on_startup.append(setup_db_schema)
|
app.on_startup.append(setup_db_schema)
|
||||||
app.on_startup.append(start_background_tasks)
|
app.on_startup.append(start_background_tasks)
|
||||||
app.on_shutdown.append(stop_background_tasks)
|
|
||||||
app.on_shutdown.append(close_comment_scheduler)
|
app.on_shutdown.append(close_comment_scheduler)
|
||||||
|
app.on_cleanup.append(close_database_connections)
|
||||||
aiojobs.aiohttp.setup(app, **kwargs)
|
aiojobs.aiohttp.setup(app, **kwargs)
|
||||||
app.add_routes([
|
app.add_routes([
|
||||||
web.post('/api', api_endpoint),
|
web.post('/api', api_endpoint),
|
||||||
|
@ -73,36 +81,28 @@ class CommentDaemon:
|
||||||
web.get('/api', get_api_endpoint)
|
web.get('/api', get_api_endpoint)
|
||||||
])
|
])
|
||||||
self.app = app
|
self.app = app
|
||||||
self.app_runner = web.AppRunner(app)
|
self.app_runner = None
|
||||||
self.app_site = None
|
self.app_site = None
|
||||||
|
|
||||||
async def start(self):
|
async def start(self, host=None, port=None):
|
||||||
self.app['START_TIME'] = time.time()
|
self.app['START_TIME'] = time.time()
|
||||||
|
self.app_runner = web.AppRunner(self.app)
|
||||||
await self.app_runner.setup()
|
await self.app_runner.setup()
|
||||||
self.app_site = web.TCPSite(
|
self.app_site = web.TCPSite(
|
||||||
runner=self.app_runner,
|
runner=self.app_runner,
|
||||||
host=self.config['HOST'],
|
host=host or self.config['HOST'],
|
||||||
port=self.config['PORT'],
|
port=port or self.config['PORT'],
|
||||||
)
|
)
|
||||||
await self.app_site.start()
|
await self.app_site.start()
|
||||||
logger.info(f'Comment Server is running on {self.config["HOST"]}:{self.config["PORT"]}')
|
logger.info(f'Comment Server is running on {self.config["HOST"]}:{self.config["PORT"]}')
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
await self.app.shutdown()
|
await self.app_runner.shutdown()
|
||||||
await self.app.cleanup()
|
|
||||||
await self.app_runner.cleanup()
|
await self.app_runner.cleanup()
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def insert_to_config(app, conf=None, db_file=None):
|
|
||||||
db_file = db_file if db_file else 'DEFAULT'
|
|
||||||
app['config'] = conf
|
|
||||||
app['db_path'] = conf['PATH'][db_file]
|
|
||||||
app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path'])
|
|
||||||
assert app['db_path'] != app['backup']
|
|
||||||
|
|
||||||
|
def run_app(config, db_file=None):
|
||||||
def run_app(config):
|
comment_app = CommentDaemon(config=config, db_file=db_file, close_timeout=5.0)
|
||||||
comment_app = CommentDaemon(config=config, db_path='DEFAULT', close_timeout=5.0)
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import os
|
||||||
import pathlib
|
import pathlib
|
||||||
import unittest
|
import unittest
|
||||||
from asyncio.runners import _cancel_all_tasks # type: ignore
|
from asyncio.runners import _cancel_all_tasks # type: ignore
|
||||||
|
@ -119,15 +120,20 @@ class AsyncioTestCase(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
class DatabaseTestCase(unittest.TestCase):
|
class DatabaseTestCase(unittest.TestCase):
|
||||||
|
db_file = 'test.db'
|
||||||
|
|
||||||
|
def __init__(self, methodName='DatabaseTest'):
|
||||||
|
super().__init__(methodName)
|
||||||
|
if pathlib.Path(self.db_file).exists():
|
||||||
|
os.remove(self.db_file)
|
||||||
|
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
super().setUp()
|
super().setUp()
|
||||||
if pathlib.Path(config['PATH']['TEST']).exists():
|
setup_database(self.db_file, config['PATH']['SCHEMA'])
|
||||||
teardown_database(config['PATH']['TEST'])
|
self.conn = obtain_connection(self.db_file)
|
||||||
setup_database(config['PATH']['TEST'], config['PATH']['SCHEMA'])
|
self.addCleanup(self.conn.close)
|
||||||
self.conn = obtain_connection(config['PATH']['TEST'])
|
self.addCleanup(os.remove, self.db_file)
|
||||||
|
|
||||||
def tearDown(self) -> None:
|
def tearDown(self) -> None:
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
teardown_database(config['PATH']['TEST'])
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue