Upgrades Server to Allow Production Deployment #2

Merged
osilkin98 merged 10 commits from server_upgrade into master 2019-07-24 08:24:25 +02:00
18 changed files with 317 additions and 164 deletions

View file

@ -0,0 +1,8 @@
[Unit]
Description="LBRY Comment Server Watchdog"
After=network.target
Requires=comment-server@5921.service comment-server@5922.service comment-server@5923.service comment-server@5924.service
[Install]
WantedBy=multi-user.target

View file

@ -0,0 +1,15 @@
[Unit]
Description="LBRY Comment Server #%i"
PartOf=comment-server.target
[Service]
Type=simple
User=oleg
WorkingDirectory=/home/lbry/comment-server/
ExecStart=/home/lbry/comment-server/venv/bin/commentserv --port %i
Restart=on-failure
KillMode=process
[Install]
WantedBy=multi-user.target

51
config/nginx.conf Normal file
View file

@ -0,0 +1,51 @@
worker_processes 1;
daemon off;
error_log /dev/stdout warn;
events {
worker_connections 1024;
accept_mutex off;
}
http {
server {
default_type application/json;
listen 80;
client_max_body_size 4G;
server_name example;
location / {
proxy_set_header Host $http_host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_redirect off;
proxy_buffering off;
proxy_pass http://aiohttp;
}
}
# server {
# listen 5921;
# location /api {
# proxy_set_header HOST $host;
# proxy_set_header X-Forwarded-Proto $scheme;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
#
# proxy_redirect off;
# proxy_pass http://localhost:2903;
#
# }
#}
}
http {
upstream aiohttp {
server 127.0.0.1:5921 fail_timeout = 60;
server 127.0.0.1:5922 fail_timeout = 60;
server 127.0.0.1:5923 fail_timeout = 60;
server 127.0.0.1:5924 fail_timeout = 60;
}
}

View file

@ -1,10 +0,0 @@
Faker>=1.0.7
asyncio>=3.4.3
aiohttp==3.5.4
aiojobs==0.2.2
ecdsa==0.13
cryptography==2.5
aiosqlite==0.10.0
PyNaCl>=1.3.0
requests
cython

28
setup.py Normal file
View file

@ -0,0 +1,28 @@
import os
from setuptools import setup, find_packages
ROOT = os.path.dirname(__name__)
setup(
name='CommentServer',
version='0.0.1',
packages=find_packages(exclude=('tests',)),
entry_points={
'console_scripts': 'commentserv=src.main:main'
},
zip_safe=False,
data_files=[('config', ['config/conf.json',])],
include_package_data=True,
install_requires=[
'Faker>=1.0.7',
'asyncio>=3.4.3',
'aiohttp==3.5.4',
'aiojobs==0.2.2',
'ecdsa==0.13',
'cryptography==2.5',
'aiosqlite==0.10.0',
'PyNaCl>=1.3.0',
'requests',
'cython', # Not really needed anymore but w/e
]
)

View file

@ -1,97 +0,0 @@
# cython: language_level=3
import logging
import pathlib
import re
import time
import aiojobs
import aiojobs.aiohttp
import asyncio
from aiohttp import web
from src.schema.db_helpers import setup_database, backup_database
from src.database import obtain_connection, DatabaseWriter
from src.handles import api_endpoint, get_api_endpoint
logger = logging.getLogger(__name__)
async def setup_db_schema(app):
if not pathlib.Path(app['db_path']).exists():
logger.info('Setting up schema in %s', app['db_path'])
setup_database(app['db_path'], app['config']['PATH']['SCHEMA'])
else:
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
async def close_comment_scheduler(app):
logger.info('Closing comment_scheduler')
await app['comment_scheduler'].close()
async def database_backup_routine(app):
try:
while True:
await asyncio.sleep(app['config']['BACKUP_INT'])
with app['reader'] as conn:
logger.debug('backing up database')
backup_database(conn, app['backup'])
except asyncio.CancelledError:
pass
# noinspection PyDeprecation
async def start_background_tasks(app: web.Application):
app['reader'] = obtain_connection(app['db_path'], True)
app['waitful_backup'] = app.loop.create_task(database_backup_routine(app))
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
app['db_writer'] = DatabaseWriter(app['db_path'])
app['writer'] = app['db_writer'].connection
def insert_to_config(app, conf=None, db_file=None):
db_file = db_file if db_file else 'DEFAULT'
app['config'] = conf
app['db_path'] = conf['PATH'][db_file]
app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path'])
assert app['db_path'] != app['backup']
async def cleanup_background_tasks(app):
logger.info('Ending background backup loop')
app['waitful_backup'].cancel()
await app['waitful_backup']
app['reader'].close()
app['writer'].close()
def create_app(conf, db_path='DEFAULT', **kwargs):
app = web.Application()
app['START_TIME'] = int(time.time())
insert_to_config(app, conf, db_path)
app.on_startup.append(setup_db_schema)
app.on_startup.append(start_background_tasks)
app.on_shutdown.append(cleanup_background_tasks)
app.on_shutdown.append(close_comment_scheduler)
aiojobs.aiohttp.setup(app, **kwargs)
app.add_routes([
web.post('/api', api_endpoint),
web.get('/', get_api_endpoint),
web.get('/api', get_api_endpoint)
])
return app
def run_app(config):
appl = create_app(conf=config, db_path='DEFAULT', close_timeout=5.0)
try:
asyncio.run(web.run_app(
app=appl,
access_log=logging.getLogger('aiohttp.access'),
host=config['HOST'],
port=config['PORT']
))
except asyncio.CancelledError:
logging.warning('Server going down, asyncio loop raised cancelled error:')
except ValueError:
logging.exception('Server going down due to value error:')

View file

@ -1,9 +1,10 @@
import logging.config import logging.config
import logging import logging
import os import argparse
from src.settings import config import sys
from src.app import run_app from src.settings import config
from src.server.app import run_app
def config_logging_from_settings(conf): def config_logging_from_settings(conf):
@ -69,7 +70,16 @@ def config_logging_from_settings(conf):
logging.config.dictConfig(_config) logging.config.dictConfig(_config)
if __name__ == '__main__': def main(argv=None):
argv = argv or sys.argv[1:]
parser = argparse.ArgumentParser(description='LBRY Comment Server')
parser.add_argument('--port', type=int)
args = parser.parse_args(argv)
if args.port:
config['PORT'] = args.port
config_logging_from_settings(config) config_logging_from_settings(config)
logger = logging.getLogger(__name__) run_app(config)
run_app(config)
if __name__ == '__main__':
sys.exit(main())

0
src/server/__init__.py Normal file
View file

122
src/server/app.py Normal file
View file

@ -0,0 +1,122 @@
# cython: language_level=3
import logging
import pathlib
import re
import signal
import time
import aiojobs
import aiojobs.aiohttp
import asyncio
from aiohttp import web
from src.schema.db_helpers import setup_database, backup_database
from src.server.database import obtain_connection, DatabaseWriter
from src.server.handles import api_endpoint, get_api_endpoint
logger = logging.getLogger(__name__)
async def setup_db_schema(app):
if not pathlib.Path(app['db_path']).exists():
logger.info('Setting up schema in %s', app['db_path'])
setup_database(app['db_path'], app['config']['PATH']['SCHEMA'])
else:
logger.info(f'Database already exists in {app["db_path"]}, skipping setup')
async def close_comment_scheduler(app):
logger.info('Closing comment_scheduler')
await app['comment_scheduler'].close()
async def database_backup_routine(app):
try:
while True:
await asyncio.sleep(app['config']['BACKUP_INT'])
with app['reader'] as conn:
logger.debug('backing up database')
backup_database(conn, app['backup'])
except asyncio.CancelledError:
pass
async def start_background_tasks(app):
app['reader'] = obtain_connection(app['db_path'], True)
app['waitful_backup'] = app.loop.create_task(database_backup_routine(app))
app['comment_scheduler'] = await aiojobs.create_scheduler(limit=1, pending_limit=0)
app['db_writer'] = DatabaseWriter(app['db_path'])
app['writer'] = app['db_writer'].connection
async def stop_background_tasks(app):
logger.info('Ending background backup loop')
app['waitful_backup'].cancel()
await app['waitful_backup']
app['reader'].close()
app['writer'].close()
class CommentDaemon:
def __init__(self, config, db_path=None, **kwargs):
self.config = config
app = web.Application()
self.insert_to_config(app, config, db_file=db_path)
app.on_startup.append(setup_db_schema)
app.on_startup.append(start_background_tasks)
app.on_shutdown.append(stop_background_tasks)
app.on_shutdown.append(close_comment_scheduler)
aiojobs.aiohttp.setup(app, **kwargs)
app.add_routes([
web.post('/api', api_endpoint),
web.get('/', get_api_endpoint),
web.get('/api', get_api_endpoint)
])
self.app = app
self.app_runner = web.AppRunner(app)
self.app_site = None
async def start(self):
self.app['START_TIME'] = time.time()
await self.app_runner.setup()
self.app_site = web.TCPSite(
runner=self.app_runner,
host=self.config['HOST'],
port=self.config['PORT'],
)
await self.app_site.start()
logger.info(f'Comment Server is running on {self.config["HOST"]}:{self.config["PORT"]}')
async def stop(self):
await self.app.shutdown()
await self.app.cleanup()
await self.app_runner.cleanup()
@staticmethod
def insert_to_config(app, conf=None, db_file=None):
db_file = db_file if db_file else 'DEFAULT'
app['config'] = conf
app['db_path'] = conf['PATH'][db_file]
app['backup'] = re.sub(r'\.db$', '.backup.db', app['db_path'])
assert app['db_path'] != app['backup']
def run_app(config):
comment_app = CommentDaemon(config=config, db_path='DEFAULT', close_timeout=5.0)
loop = asyncio.get_event_loop()
loop.set_debug(True)
def __exit():
raise web.GracefulExit()
loop.add_signal_handler(signal.SIGINT, __exit)
loop.add_signal_handler(signal.SIGTERM, __exit)
try:
loop.run_until_complete(comment_app.start())
loop.run_forever()
except (web.GracefulExit, KeyboardInterrupt, asyncio.CancelledError, ValueError):
logging.warning('Server going down, asyncio loop raised cancelled error:')
finally:
loop.run_until_complete(comment_app.stop())

View file

@ -6,15 +6,15 @@ import asyncio
from aiohttp import web from aiohttp import web
from aiojobs.aiohttp import atomic from aiojobs.aiohttp import atomic
from src.misc import clean_input_params from src.server.misc import clean_input_params
from src.database import get_claim_comments from src.server.database import get_claim_comments
from src.database import get_comments_by_id, get_comment_ids from src.server.database import get_comments_by_id, get_comment_ids
from src.database import get_channel_id_from_comment_id from src.server.database import get_channel_id_from_comment_id
from src.database import obtain_connection from src.server.database import obtain_connection
from src.misc import is_valid_base_comment from src.server.misc import is_valid_base_comment
from src.misc import is_valid_credential_input from src.server.misc import is_valid_credential_input
from src.misc import make_error from src.server.misc import make_error
from src.writes import delete_comment_if_authorized, write_comment from src.server.writes import delete_comment_if_authorized, write_comment
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -25,22 +25,22 @@ def ping(*args):
def handle_get_channel_from_comment_id(app, kwargs: dict): def handle_get_channel_from_comment_id(app, kwargs: dict):
with obtain_connection(app['db_path']) as conn: with app['reader'] as conn:
return get_channel_id_from_comment_id(conn, **kwargs) return get_channel_id_from_comment_id(conn, **kwargs)
def handle_get_comment_ids(app, kwargs): def handle_get_comment_ids(app, kwargs):
with obtain_connection(app['db_path']) as conn: with app['reader'] as conn:
return get_comment_ids(conn, **kwargs) return get_comment_ids(conn, **kwargs)
def handle_get_claim_comments(app, kwargs): def handle_get_claim_comments(app, kwargs):
with obtain_connection(app['db_path']) as conn: with app['reader'] as conn:
return get_claim_comments(conn, **kwargs) return get_claim_comments(conn, **kwargs)
def handle_get_comments_by_id(app, kwargs): def handle_get_comments_by_id(app, kwargs):
with obtain_connection(app['db_path']) as conn: with app['reader'] as conn:
return get_comments_by_id(conn, **kwargs) return get_comments_by_id(conn, **kwargs)

View file

@ -3,13 +3,13 @@ import sqlite3
from asyncio import coroutine from asyncio import coroutine
from src.database import delete_comment_by_id from src.server.database import delete_comment_by_id
from src.misc import is_authentic_delete_signal from src.server.misc import is_authentic_delete_signal
from src.database import get_comment_or_none from src.server.database import get_comment_or_none
from src.database import insert_comment from src.server.database import insert_comment
from src.database import insert_channel from src.server.database import insert_channel
from src.misc import channel_matches_pattern_or_error from src.server.misc import channel_matches_pattern_or_error
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -5,10 +5,10 @@ from faker.providers import internet
from faker.providers import lorem from faker.providers import lorem
from faker.providers import misc from faker.providers import misc
from src.database import get_comments_by_id from server.database import get_comments_by_id
from src.database import get_comment_ids from server.database import get_comment_ids
from src.database import get_claim_comments from server.database import get_claim_comments
from src.writes import create_comment_or_error from server.writes import create_comment_or_error
from tests.testcase import DatabaseTestCase from tests.testcase import DatabaseTestCase
fake = faker.Faker() fake = faker.Faker()

View file

@ -4,7 +4,7 @@
# * 'gtrp' and 'gtr' create a GET request with or without query parameters; # * 'gtrp' and 'gtr' create a GET request with or without query parameters;
# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body; # * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body;
# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data); # * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data);
POST http://localhost:5921/api POST http://localhost:5922/api
Content-Type: application/json Content-Type: application/json
{ {
@ -12,12 +12,12 @@ Content-Type: application/json
"id": null, "id": null,
"method": "create_comment", "method": "create_comment",
"params": { "params": {
"claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e8", "claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e7",
"comment": "This is literally, the most anonymous comment I have EVER seen.", "comment": "This is literally, the most anonymous comment I have EVER seen.",
"channel_id": "9cb713f01bf247a0e03170b5ed00d5161340c486", "channel_id": "9cb713f01bf247a0e03170b5ed00d5161340c486",
"channel_name": "@bbbbb", "channel_name": "@bbbbb",
"signature": "e8ad5dbc268365e5d16f6a9b5d82323efc550412a7a291aa3e715e31641cc57b18241a132ebed28a848086fca6e200a735281c631c7474b01b4c32851ea3dd57", "signature": "e8ad5dbd660365efd1646a9b5d82323efc550712a7a291aa3e715e31641cc57b18241a132ebed28a848086fca6e200a735281c631c7474b01b4c32851ea3dd57",
"signing_ts": "1561669614" "signing_ts": "1561669624"
} }
} }

View file

@ -4,7 +4,7 @@
# * 'gtrp' and 'gtr' create a GET request with or without query parameters; # * 'gtrp' and 'gtr' create a GET request with or without query parameters;
# * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body; # * 'ptr' and 'ptrp' create a POST request with a simple or parameter-like body;
# * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data); # * 'mptr' and 'fptr' create a POST request to submit a form with a text or file field (multipart/form-data);
POST http://localhost:5921/api POST http://localhost:5922/api
Content-Type: application/json Content-Type: application/json
{ {
@ -12,8 +12,9 @@ Content-Type: application/json
"id": null, "id": null,
"method": "get_claim_comments", "method": "get_claim_comments",
"params": { "params": {
"claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e8", "claim_id": "6d266af6c25c80fa2ac6cc7662921ad2e90a07e7",
"page": 1 "page": 1,
"page_size": 100
} }
} }

View file

@ -1,4 +1,6 @@
import unittest import unittest
from multiprocessing.pool import Pool
import requests import requests
import re import re
from itertools import * from itertools import *
@ -8,7 +10,7 @@ from faker.providers import internet
from faker.providers import lorem from faker.providers import lorem
from faker.providers import misc from faker.providers import misc
from src.settings import config from settings import config
fake = faker.Faker() fake = faker.Faker()
fake.add_provider(internet) fake.add_provider(internet)
@ -34,6 +36,22 @@ def nothing():
pass pass
replace = {
'claim_id': fake.sha1,
'comment': fake.text,
'channel_id': fake.sha1,
'channel_name': fake_lbryusername,
'signature': fake.uuid4,
'parent_id': fake.sha256
}
def create_test_comments(values: iter, **default):
vars_combo = chain.from_iterable(combinations(values, r) for r in range(1, len(values) + 1))
return [{k: replace[k]() if k in comb else v for k, v in default.items()}
for comb in vars_combo]
class ServerTest(unittest.TestCase): class ServerTest(unittest.TestCase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -216,23 +234,30 @@ class ListCommentsTest(unittest.TestCase):
self.assertEqual(response['total_pages'], response_one['total_pages']) self.assertEqual(response['total_pages'], response_one['total_pages'])
class ConcurrentWriteTest(unittest.TestCase):
@staticmethod
def make_comment(num):
return {
'jsonrpc': '2.0',
'id': num,
'method': 'create_comment',
'params': {
'comment': f'Comment #{num}',
'claim_id': '6d266af6c25c80fa2ac6cc7662921ad2e90a07e7',
}
}
@staticmethod
def send_comment_to_server(params):
with requests.post(params[0], json=params[1]) as req:
return req.json()
def test01Concurrency(self):
replace = { urls = [f'http://localhost:{port}/api' for port in range(5921, 5925)]
'claim_id': fake.sha1, comments = [self.make_comment(i) for i in range(1, 5)]
'comment': fake.text, inputs = list(zip(urls, comments))
'channel_id': fake.sha1, with Pool(4) as pool:
'channel_name': fake_lbryusername, results = pool.map(self.send_comment_to_server, inputs)
'signature': fake.uuid4, results = list(filter(lambda x: 'comment_id' in x['result'], results))
'parent_id': fake.sha256 self.assertIsNotNone(results)
} self.assertEqual(len(results), len(inputs))
def create_test_comments(values: iter, **default):
vars_combo = chain.from_iterable(combinations(values, r) for r in range(1, len(values) + 1))
return [{k: replace[k]() if k in comb else v for k, v in default.items()}
for comb in vars_combo]

View file

@ -6,8 +6,8 @@ from unittest.case import _Outcome
import asyncio import asyncio
from schema.db_helpers import setup_database, teardown_database from schema.db_helpers import setup_database, teardown_database
from src.database import obtain_connection from server.database import obtain_connection
from src.settings import config from settings import config
class AsyncioTestCase(unittest.TestCase): class AsyncioTestCase(unittest.TestCase):