diff --git a/config/comment-server.target b/config/comment-server.target new file mode 100644 index 0000000..afdcc05 --- /dev/null +++ b/config/comment-server.target @@ -0,0 +1,8 @@ +[Unit] +Description="LBRY Comment Server Watchdog" +After=network.target +Requires=comment-server@5921.service comment-server@5922.service comment-server@5923.service comment-server@5924.service + + +[Install] +WantedBy=multi-user.target diff --git a/config/comment-server@.service b/config/comment-server@.service new file mode 100644 index 0000000..e453fe4 --- /dev/null +++ b/config/comment-server@.service @@ -0,0 +1,15 @@ +[Unit] +Description="LBRY Comment Server #%i" +PartOf=comment-server.target + +[Service] +Type=simple +User=oleg +WorkingDirectory=/home/lbry/comment-server/ +ExecStart=/home/lbry/comment-server/venv/bin/commentserv --port %i +Restart=on-failure +KillMode=process + + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/config/nginx.conf b/config/nginx.conf new file mode 100644 index 0000000..bffc81b --- /dev/null +++ b/config/nginx.conf @@ -0,0 +1,51 @@ +worker_processes 1; +daemon off; +error_log /dev/stdout warn; + +events { + worker_connections 1024; + accept_mutex off; +} + +http { + + server { + default_type application/json; + listen 80; + client_max_body_size 4G; + server_name example; + + location / { + proxy_set_header Host $http_host; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_redirect off; + proxy_buffering off; + + proxy_pass http://aiohttp; + } + } + + # server { + # listen 5921; + # location /api { + # proxy_set_header HOST $host; + # proxy_set_header X-Forwarded-Proto $scheme; + # proxy_set_header X-Real-IP $remote_addr; + # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + # + # proxy_redirect off; + # proxy_pass http://localhost:2903; + # + # } + #} +} + +http { + upstream aiohttp { + server 127.0.0.1:5921 fail_timeout = 60; + server 127.0.0.1:5922 fail_timeout = 60; + server 127.0.0.1:5923 fail_timeout = 60; + server 127.0.0.1:5924 fail_timeout = 60; + + } +} diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 1290854..0000000 --- a/requirements.txt +++ /dev/null @@ -1,10 +0,0 @@ -Faker>=1.0.7 -asyncio>=3.4.3 -aiohttp==3.5.4 -aiojobs==0.2.2 -ecdsa==0.13 -cryptography==2.5 -aiosqlite==0.10.0 -PyNaCl>=1.3.0 -requests -cython diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8f5d57d --- /dev/null +++ b/setup.py @@ -0,0 +1,28 @@ +import os +from setuptools import setup, find_packages + +ROOT = os.path.dirname(__name__) + +setup( + name='CommentServer', + version='0.0.1', + packages=find_packages(exclude=('tests',)), + entry_points={ + 'console_scripts': 'commentserv=src.main:main' + }, + zip_safe=False, + data_files=[('config', ['config/conf.json',])], + include_package_data=True, + install_requires=[ + 'Faker>=1.0.7', + 'asyncio>=3.4.3', + 'aiohttp==3.5.4', + 'aiojobs==0.2.2', + 'ecdsa==0.13', + 'cryptography==2.5', + 'aiosqlite==0.10.0', + 'PyNaCl>=1.3.0', + 'requests', + 'cython', # Not really needed anymore but w/e + ] +) diff --git a/src/app.py b/src/app.py deleted file mode 100644 index f8b2290..0000000 --- a/src/app.py +++ /dev/null @@ -1,97 +0,0 @@ -# cython: language_level=3 -import logging -import pathlib -import re -import time - -import aiojobs -import aiojobs.aiohttp -import asyncio -from aiohttp import web - -from src.schema.db_helpers import setup_database, backup_database -from src.database import obtain_connection, DatabaseWriter -from src.handles import api_endpoint, get_api_endpoint - -logger = logging.getLogger(__name__) - - -async def setup_db_schema(app): - if not pathlib.Path(app['db_path']).exists(): - logger.info('Setting up schema in %s', app['db_path']) - setup_database(app['db_path'], app['config']['PATH']['SCHEMA']) - else: - logger.info(f'Database already exists in {app["db_path"]}, skipping setup') - - -async def close_comment_scheduler(app): - logger.info('Closing comment_scheduler') - await app['comment_scheduler'].close() - - -async def database_backup_routine(app): - try: - while True: - await asyncio.sleep(app['config']['BACKUP_INT']) - with app['reader'] as conn: - logger.debug('backing up database') - backup_database(conn, app['backup']) - except asyncio.CancelledError: - pass - - -# noinspection PyDeprecation -async def start_background_tasks(app: web.Application): - app['reader'] = obtain_connection(app['db_path'], True) - app['waitful_backup'] = app.loop.create_task(database_backup_routine(app)) - app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0) - app['db_writer'] = DatabaseWriter(app['db_path']) - app['writer'] = app['db_writer'].connection - - -def insert_to_config(app, conf=None, db_file=None): - db_file = db_file if db_file else 'DEFAULT' - app['config'] = conf - app['db_path'] = conf['PATH'][db_file] - app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path']) - assert app['db_path'] != app['backup'] - - -async def cleanup_background_tasks(app): - logger.info('Ending background backup loop') - app['waitful_backup'].cancel() - await app['waitful_backup'] - app['reader'].close() - app['writer'].close() - - -def create_app(conf, db_path='DEFAULT', **kwargs): - app = web.Application() - app['START_TIME'] = int(time.time()) - insert_to_config(app, conf, db_path) - app.on_startup.append(setup_db_schema) - app.on_startup.append(start_background_tasks) - app.on_shutdown.append(cleanup_background_tasks) - app.on_shutdown.append(close_comment_scheduler) - aiojobs.aiohttp.setup(app, **kwargs) - app.add_routes([ - web.post('/api', api_endpoint), - web.get('/', get_api_endpoint), - web.get('/api', get_api_endpoint) - ]) - return app - - -def run_app(config): - appl = create_app(conf=config, db_path='DEFAULT', close_timeout=5.0) - try: - asyncio.run(web.run_app( - app=appl, - access_log=logging.getLogger('aiohttp.access'), - host=config['HOST'], - port=config['PORT'] - )) - except asyncio.CancelledError: - logging.warning('Server going down, asyncio loop raised cancelled error:') - except ValueError: - logging.exception('Server going down due to value error:') diff --git a/main.py b/src/main.py similarity index 85% rename from main.py rename to src/main.py index 293d625..0a3a9cd 100644 --- a/main.py +++ b/src/main.py @@ -1,9 +1,10 @@ import logging.config import logging -import os -from src.settings import config +import argparse +import sys -from src.app import run_app +from src.settings import config +from src.server.app import run_app def config_logging_from_settings(conf): @@ -69,7 +70,16 @@ def config_logging_from_settings(conf): logging.config.dictConfig(_config) -if __name__ == '__main__': +def main(argv=None): + argv = argv or sys.argv[1:] + parser = argparse.ArgumentParser(description='LBRY Comment Server') + parser.add_argument('--port', type=int) + args = parser.parse_args(argv) + if args.port: + config['PORT'] = args.port config_logging_from_settings(config) - logger = logging.getLogger(__name__) - run_app(config) \ No newline at end of file + run_app(config) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/src/server/__init__.py b/src/server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/app.py b/src/server/app.py new file mode 100644 index 0000000..39fda5b --- /dev/null +++ b/src/server/app.py @@ -0,0 +1,122 @@ +# cython: language_level=3 +import logging +import pathlib +import re +import signal +import time + +import aiojobs +import aiojobs.aiohttp +import asyncio +from aiohttp import web + +from src.schema.db_helpers import setup_database, backup_database +from src.server.database import obtain_connection, DatabaseWriter +from src.server.handles import api_endpoint, get_api_endpoint + +logger = logging.getLogger(__name__) + + +async def setup_db_schema(app): + if not pathlib.Path(app['db_path']).exists(): + logger.info('Setting up schema in %s', app['db_path']) + setup_database(app['db_path'], app['config']['PATH']['SCHEMA']) + else: + logger.info(f'Database already exists in {app["db_path"]}, skipping setup') + + +async def close_comment_scheduler(app): + logger.info('Closing comment_scheduler') + await app['comment_scheduler'].close() + + +async def database_backup_routine(app): + try: + while True: + await asyncio.sleep(app['config']['BACKUP_INT']) + with app['reader'] as conn: + logger.debug('backing up database') + backup_database(conn, app['backup']) + except asyncio.CancelledError: + pass + + +async def start_background_tasks(app): + app['reader'] = obtain_connection(app['db_path'], True) + app['waitful_backup'] = app.loop.create_task(database_backup_routine(app)) + app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0) + app['db_writer'] = DatabaseWriter(app['db_path']) + app['writer'] = app['db_writer'].connection + + +async def stop_background_tasks(app): + logger.info('Ending background backup loop') + app['waitful_backup'].cancel() + await app['waitful_backup'] + app['reader'].close() + app['writer'].close() + + +class CommentDaemon: + def __init__(self, config, db_path=None, **kwargs): + self.config = config + app = web.Application() + self.insert_to_config(app, config, db_file=db_path) + app.on_startup.append(setup_db_schema) + app.on_startup.append(start_background_tasks) + app.on_shutdown.append(stop_background_tasks) + app.on_shutdown.append(close_comment_scheduler) + aiojobs.aiohttp.setup(app, **kwargs) + app.add_routes([ + web.post('/api', api_endpoint), + web.get('/', get_api_endpoint), + web.get('/api', get_api_endpoint) + ]) + self.app = app + self.app_runner = web.AppRunner(app) + self.app_site = None + + async def start(self): + self.app['START_TIME'] = time.time() + await self.app_runner.setup() + self.app_site = web.TCPSite( + runner=self.app_runner, + host=self.config['HOST'], + port=self.config['PORT'], + ) + await self.app_site.start() + logger.info(f'Comment Server is running on {self.config["HOST"]}:{self.config["PORT"]}') + + async def stop(self): + await self.app.shutdown() + await self.app.cleanup() + await self.app_runner.cleanup() + + @staticmethod + def insert_to_config(app, conf=None, db_file=None): + db_file = db_file if db_file else 'DEFAULT' + app['config'] = conf + app['db_path'] = conf['PATH'][db_file] + app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path']) + assert app['db_path'] != app['backup'] + + +def run_app(config): + comment_app = CommentDaemon(config=config, db_path='DEFAULT', close_timeout=5.0) + + loop = asyncio.get_event_loop() + loop.set_debug(True) + + def __exit(): + raise web.GracefulExit() + + loop.add_signal_handler(signal.SIGINT, __exit) + loop.add_signal_handler(signal.SIGTERM, __exit) + + try: + loop.run_until_complete(comment_app.start()) + loop.run_forever() + except (web.GracefulExit, KeyboardInterrupt, asyncio.CancelledError, ValueError): + logging.warning('Server going down, asyncio loop raised cancelled error:') + finally: + loop.run_until_complete(comment_app.stop()) \ No newline at end of file diff --git a/src/database.py b/src/server/database.py similarity index 100% rename from src/database.py rename to src/server/database.py diff --git a/src/handles.py b/src/server/handles.py similarity index 84% rename from src/handles.py rename to src/server/handles.py index 2dad791..ada8449 100644 --- a/src/handles.py +++ b/src/server/handles.py @@ -6,15 +6,15 @@ import asyncio from aiohttp import web from aiojobs.aiohttp import atomic -from src.misc import clean_input_params -from src.database import get_claim_comments -from src.database import get_comments_by_id, get_comment_ids -from src.database import get_channel_id_from_comment_id -from src.database import obtain_connection -from src.misc import is_valid_base_comment -from src.misc import is_valid_credential_input -from src.misc import make_error -from src.writes import delete_comment_if_authorized, write_comment +from src.server.misc import clean_input_params +from src.server.database import get_claim_comments +from src.server.database import get_comments_by_id, get_comment_ids +from src.server.database import get_channel_id_from_comment_id +from src.server.database import obtain_connection +from src.server.misc import is_valid_base_comment +from src.server.misc import is_valid_credential_input +from src.server.misc import make_error +from src.server.writes import delete_comment_if_authorized, write_comment logger = logging.getLogger(__name__) @@ -25,22 +25,22 @@ def ping(*args): def handle_get_channel_from_comment_id(app, kwargs: dict): - with obtain_connection(app['db_path']) as conn: + with app['reader'] as conn: return get_channel_id_from_comment_id(conn, **kwargs) def handle_get_comment_ids(app, kwargs): - with obtain_connection(app['db_path']) as conn: + with app['reader'] as conn: return get_comment_ids(conn, **kwargs) def handle_get_claim_comments(app, kwargs): - with obtain_connection(app['db_path']) as conn: + with app['reader'] as conn: return get_claim_comments(conn, **kwargs) def handle_get_comments_by_id(app, kwargs): - with obtain_connection(app['db_path']) as conn: + with app['reader'] as conn: return get_comments_by_id(conn, **kwargs) diff --git a/src/misc.py b/src/server/misc.py similarity index 100% rename from src/misc.py rename to src/server/misc.py diff --git a/src/writes.py b/src/server/writes.py similarity index 83% rename from src/writes.py rename to src/server/writes.py index 1b11e76..2feb928 100644 --- a/src/writes.py +++ b/src/server/writes.py @@ -3,13 +3,13 @@ import sqlite3 from asyncio import coroutine -from src.database import delete_comment_by_id -from src.misc import is_authentic_delete_signal +from src.server.database import delete_comment_by_id +from src.server.misc import is_authentic_delete_signal -from src.database import get_comment_or_none -from src.database import insert_comment -from src.database import insert_channel -from src.misc import channel_matches_pattern_or_error +from src.server.database import get_comment_or_none +from src.server.database import insert_comment +from src.server.database import insert_channel +from src.server.misc import channel_matches_pattern_or_error logger = logging.getLogger(__name__) diff --git a/tests/database_test.py b/tests/database_test.py index 27307bc..269d39b 100644 --- a/tests/database_test.py +++ b/tests/database_test.py @@ -5,10 +5,10 @@ from faker.providers import internet from faker.providers import lorem from faker.providers import misc -from src.database import get_comments_by_id -from src.database import get_comment_ids -from src.database import get_claim_comments -from src.writes import create_comment_or_error +from server.database import get_comments_by_id +from server.database import get_comment_ids +from server.database import get_claim_comments +from server.writes import create_comment_or_error from tests.testcase import DatabaseTestCase fake = faker.Faker() diff --git a/tests/http_requests/create_comment_local.http b/tests/http_requests/create_comment_local.http index ade754f..f73c488 100644 --- a/tests/http_requests/create_comment_local.http +++ b/tests/http_requests/create_comment_local.http @@ -4,7 +4,7 @@ # * 'gtrp' and 'gtr' create a GET request with or without query parameters; # * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body; # * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data); -POST http://localhost:5921/api +POST http://localhost:5922/api Content-Type: application/json { @@ -12,12 +12,12 @@ Content-Type: application/json "id": null, "method": "create_comment", "params": { - "claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e8", + "claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e7", "comment": "This is literally, the most anonymous comment I have EVER seen.", "channel_id": "9cb713f01bf247a0e03170b5ed00d5161340c486", "channel_name": "@bbbbb", - "signature": "e8ad5dbc268365e5d16f6a9b5d82323efc550412a7a291aa3e715e31641cc57b18241a132ebed28a848086fca6e200a735281c631c7474b01b4c32851ea3dd57", - "signing_ts": "1561669614" + "signature": "e8ad5dbd660365efd1646a9b5d82323efc550712a7a291aa3e715e31641cc57b18241a132ebed28a848086fca6e200a735281c631c7474b01b4c32851ea3dd57", + "signing_ts": "1561669624" } } diff --git a/tests/http_requests/local-server-request.http b/tests/http_requests/local-server-request.http index 99ef40d..b3131fc 100644 --- a/tests/http_requests/local-server-request.http +++ b/tests/http_requests/local-server-request.http @@ -4,7 +4,7 @@ # * 'gtrp' and 'gtr' create a GET request with or without query parameters; # * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body; # * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data); -POST http://localhost:5921/api +POST http://localhost:5922/api Content-Type: application/json { @@ -12,8 +12,9 @@ Content-Type: application/json "id": null, "method": "get_claim_comments", "params": { - "claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e8", - "page": 1 + "claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e7", + "page": 1, + "page_size": 100 } } diff --git a/tests/server_test.py b/tests/server_test.py index 8fdcf65..1a5ab0e 100644 --- a/tests/server_test.py +++ b/tests/server_test.py @@ -1,4 +1,6 @@ import unittest +from multiprocessing.pool import Pool + import requests import re from itertools import * @@ -8,7 +10,7 @@ from faker.providers import internet from faker.providers import lorem from faker.providers import misc -from src.settings import config +from settings import config fake = faker.Faker() fake.add_provider(internet) @@ -34,6 +36,22 @@ def nothing(): pass +replace = { + 'claim_id': fake.sha1, + 'comment': fake.text, + 'channel_id': fake.sha1, + 'channel_name': fake_lbryusername, + 'signature': fake.uuid4, + 'parent_id': fake.sha256 +} + + +def create_test_comments(values: iter, **default): + vars_combo = chain.from_iterable(combinations(values, r) for r in range(1, len(values) + 1)) + return [{k: replace[k]() if k in comb else v for k, v in default.items()} + for comb in vars_combo] + + class ServerTest(unittest.TestCase): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -216,23 +234,30 @@ class ListCommentsTest(unittest.TestCase): self.assertEqual(response['total_pages'], response_one['total_pages']) +class ConcurrentWriteTest(unittest.TestCase): + @staticmethod + def make_comment(num): + return { + 'jsonrpc': '2.0', + 'id': num, + 'method': 'create_comment', + 'params': { + 'comment': f'Comment #{num}', + 'claim_id': '6d266af6c25c80fa2ac6cc7662921ad2e90a07e7', + } + } + @staticmethod + def send_comment_to_server(params): + with requests.post(params[0], json=params[1]) as req: + return req.json() - -replace = { - 'claim_id': fake.sha1, - 'comment': fake.text, - 'channel_id': fake.sha1, - 'channel_name': fake_lbryusername, - 'signature': fake.uuid4, - 'parent_id': fake.sha256 -} - - -def create_test_comments(values: iter, **default): - vars_combo = chain.from_iterable(combinations(values, r) for r in range(1, len(values) + 1)) - return [{k: replace[k]() if k in comb else v for k, v in default.items()} - for comb in vars_combo] - - - + def test01Concurrency(self): + urls = [f'http://localhost:{port}/api' for port in range(5921, 5925)] + comments = [self.make_comment(i) for i in range(1, 5)] + inputs = list(zip(urls, comments)) + with Pool(4) as pool: + results = pool.map(self.send_comment_to_server, inputs) + results = list(filter(lambda x: 'comment_id' in x['result'], results)) + self.assertIsNotNone(results) + self.assertEqual(len(results), len(inputs)) diff --git a/tests/testcase.py b/tests/testcase.py index 979d36e..6bf1ca6 100644 --- a/tests/testcase.py +++ b/tests/testcase.py @@ -6,8 +6,8 @@ from unittest.case import _Outcome import asyncio from schema.db_helpers import setup_database, teardown_database -from src.database import obtain_connection -from src.settings import config +from server.database import obtain_connection +from settings import config class AsyncioTestCase(unittest.TestCase):